基于etcd租约与Watch机制构建跨语言服务发现与动态配置系统


在一个包含 Go 和 .NET 技术栈的异构微服务环境中,服务间通信与配置管理的首要挑战是其动态性。服务的实例地址会因部署、扩缩容或故障而频繁变更,同时,功能开关、超时阈值等配置也需要能够在运行时动态调整而无需重启服务。依赖静态配置文件或硬编码地址的传统方式在这种环境下会迅速成为维护的噩梦。

问题定义很清晰:需要一个可靠的、跨语言的机制,用于服务的自动注册与发现,以及配置的集中式动态推送。

方案权衡与技术选型

常见的解决方案包括 Consul、Zookeeper 或云厂商提供的原生服务注册中心。这些都是成熟且功能丰富的系统。然而,在我们的场景中,技术选型更倾向于轻量级、高性能且与云原生生态(特别是Kubernetes)紧密结合的组件。

  • 方案A: Consul/Zookeeper

    • 优势: 功能全面,提供了服务发现、健康检查、KV存储等完整解决方案,社区成熟。
    • 劣势: 相对重量级,引入了新的技术栈和运维复杂性。对于仅需核心服务发现与配置功能的场景,可能有些过度设计。
  • 方案B: Kubernetes原生服务发现

    • 优势: 与Kubernetes深度集成,通过Service资源和DNS即可实现服务发现,是K8s环境下的标准实践。
    • 劣势: 该方案强绑定于Kubernetes平台。如果系统需要同时支持容器化和非容器化部署,或者在开发环境中不希望启动完整的K8s集群,这种方案便会受限。此外,它主要解决服务发现,对于应用层级的动态配置管理支持较弱。
  • 最终选择: etcd

    • 理由: etcd 是一个强一致、高可用的键值存储,是Kubernetes的基石。选择它有几个关键原因:
      1. 核心原语强大: etcd 提供了 Lease (租约) 和 Watch 两个核心机制。Lease 天然适用于实现服务实例的临时性注册与健康检查,当服务实例宕机,租约到期后其对应的键会自动删除,完美解决了服务下线的难题。Watch 机制则允许客户端实时监听键值的变化,是实现服务发现和动态配置推送的理想基础。
      2. 轻量与高性能: 相比Consul,etcd更为纯粹和轻量。其基于Raft协议的实现保证了数据一致性,并且在读写性能上表现优异。
      3. 跨语言支持: 官方和社区都为 Go、C# (.NET)、Java、Python 等主流语言提供了成熟的客户端库。

我们的架构决策是,利用 etcd 构建一个自定义的服务注册与配置中心。服务提供方(例如,一个Go-Fiber编写的用户服务)在启动时,通过Lease机制将自己的地址注册到 etcd;服务消费方(例如,一个ASP.NET Core编写的产品服务)通过Watch机制实时感知用户服务实例列表的变化,并实现客户端负载均衡。同理,共享配置也存储在 etcd 中,所有服务Watch相应的配置键,实现热更新。

架构设计与核心流程

整体交互流程可以用下图来描述:

sequenceDiagram
    participant GoUserService as Go 用户服务 (Fiber)
    participant Etcd
    participant DotNetProductService as .NET 产品服务 (ASP.NET Core)

    GoUserService->>+Etcd: 1. 创建租约 (Lease Grant)
    Etcd-->>-GoUserService: 返回 LeaseID

    GoUserService->>+Etcd: 2. 注册服务 (Put key with LeaseID)
/services/user/instance1 = "10.0.1.10:8080" GoUserService->>Etcd: 3. 持续为租约续期 (KeepAlive) DotNetProductService->>+Etcd: 4. 监听服务目录 (Watch /services/user/) Etcd-->>-DotNetProductService: 推送当前实例列表 Note right of DotNetProductService: 本地缓存实例列表
["10.0.1.10:8080"] loop 业务调用 DotNetProductService->>DotNetProductService: 5. 从本地缓存选择一个实例 DotNetProductService->>GoUserService: 6. 发起API请求 end GoUserService-->>Etcd: (服务崩溃,KeepAlive中断) Note over Etcd: 租约到期 Etcd->>Etcd: 自动删除 /services/user/instance1 Etcd-->>DotNetProductService: 7. 推送删除事件 Note right of DotNetProductService: 从本地缓存移除失效实例

Go 服务提供方实现 (Fiber + etcd)

我们将创建一个用户服务,它使用Go-Fiber框架提供API,并在启动时将自己注册到etcd。

项目结构:

go-user-service/
├── go.mod
├── go.sum
├── main.go
└── registry/
    └── etcd_registry.go

etcd 服务注册模块 registry/etcd_registry.go:

这段代码是整个服务注册的核心。它封装了与etcd的所有交互,包括创建租约、注册服务和后台续约。

package registry

import (
	"context"
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

// ServiceRegistry 定义了服务注册器的接口
type ServiceRegistry struct {
	client  *clientv3.Client
	leaseID clientv3.LeaseID
	// 用于优雅关闭
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string
	value         string
}

// NewServiceRegistry 创建一个新的服务注册实例
func NewServiceRegistry(endpoints []string, serviceName, serviceAddr string, leaseTTL int64) (*ServiceRegistry, error) {
	// 创建 etcd 客户端
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Printf("无法连接到 etcd: %v", err)
		return nil, err
	}

	// 构造服务在 etcd 中的 key
	// 格式: /services/{serviceName}/{serviceAddr}
	// 这种设计允许同一个服务有多个实例
	serviceKey := fmt.Sprintf("/services/%s/%s", serviceName, serviceAddr)

	return &ServiceRegistry{
		client: cli,
		key:    serviceKey,
		value:  serviceAddr,
	}, nil
}

// RegisterService 注册服务到 etcd
func (r *ServiceRegistry) RegisterService(ctx context.Context, leaseTTL int64) error {
	// 1. 创建一个租约
	resp, err := r.client.Grant(ctx, leaseTTL)
	if err != nil {
		return fmt.Errorf("创建租约失败: %w", err)
	}
	r.leaseID = resp.ID

	// 2. 将服务信息与租约绑定,写入 etcd
	_, err = r.client.Put(ctx, r.key, r.value, clientv3.WithLease(r.leaseID))
	if err != nil {
		// 如果写入失败,最好撤销租约
		r.revokeLease(context.Background())
		return fmt.Errorf("注册服务失败: %w", err)
	}

	// 3. 启动一个 goroutine 自动续约
	keepAliveChan, err := r.client.KeepAlive(ctx, r.leaseID)
	if err != nil {
		r.revokeLease(context.Background())
		return fmt.Errorf("启动续约失败: %w", err)
	}

	r.keepAliveChan = keepAliveChan
	log.Printf("服务 '%s' 已注册,地址: %s, LeaseID: %d", r.key, r.value, r.leaseID)

	// 监听续约通道的响应
	go r.listenForKeepAlive()

	return nil
}

// listenForKeepAlive 在后台处理续约响应
func (r *ServiceRegistry) listenForKeepAlive() {
	for range r.keepAliveChan {
		// 可以选择在这里记录日志,但在生产环境中这会很吵
		// log.Printf("Lease %d续约成功", r.leaseID)
	}
	log.Println("续约已停止")
}

// UnregisterService 从 etcd 中注销服务
func (r *ServiceRegistry) UnregisterService() error {
	log.Printf("正在注销服务 '%s'", r.key)
	// 撤销租约,etcd 会自动删除与此租约关联的所有键
	return r.revokeLease(context.Background())
}

func (r *ServiceRegistry) revokeLease(ctx context.Context) error {
	if r.leaseID != 0 {
		_, err := r.client.Revoke(ctx, r.leaseID)
		if err != nil {
			log.Printf("撤销 Lease %d 失败: %v", r.leaseID, err)
			return err
		}
	}
	if r.client != nil {
		return r.client.Close()
	}
	return nil
}

主程序 main.go:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"go-user-service/registry"

	"github.com/gofiber/fiber/v2"
)

const (
	ServiceName = "user-service"
	ServiceAddr = "127.0.0.1:8080" // 在真实环境中应从配置或环境变量获取
	LeaseTTL    = 10                 // 租约时间,单位秒
)

var EtcdEndpoints = []string{"http://localhost:2379"}

func main() {
	// 初始化 Fiber App
	app := fiber.New()

	// 模拟一个用户信息的ORM模型
	type User struct {
		ID   int    `json:"id"`
		Name string `json:"name"`
	}

	app.Get("/api/users/:id", func(c *fiber.Ctx) error {
		id, _ := c.ParamsInt("id")
		// 实际项目中会从数据库通过 ORM 查询
		log.Printf("接收到请求,查询用户ID: %d", id)
		return c.JSON(User{ID: id, Name: "Test User"})
	})

	// 创建服务注册实例
	reg, err := registry.NewServiceRegistry(EtcdEndpoints, ServiceName, ServiceAddr, LeaseTTL)
	if err != nil {
		log.Fatalf("创建服务注册器失败: %v", err)
	}

	// 启动一个 goroutine 运行 HTTP 服务
	go func() {
		if err := app.Listen(ServiceAddr); err != nil {
			log.Fatalf("启动 Fiber 服务失败: %v", err)
		}
	}()

	// 在后台注册服务
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	if err := reg.RegisterService(ctx, LeaseTTL); err != nil {
		log.Fatalf("服务注册失败: %v", err)
	}

	// 优雅关机处理
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("服务准备关闭...")

	// 注销服务
	if err := reg.UnregisterService(); err != nil {
		log.Printf("服务注销失败: %v", err)
	}

	// 关闭 Fiber 服务
	if err := app.Shutdown(); err != nil {
		log.Printf("Fiber 服务关闭失败: %v", err)
	}
	log.Println("服务已成功关闭")
}

.NET 服务消费方实现 (ASP.NET Core + etcd)

现在我们来构建产品服务,它需要调用用户服务。我们将实现一个 EtcdServiceDiscovery 类来实时发现用户服务实例,并结合 IHttpClientFactory 实现动态路由和简单的负载均衡。

项目结构:

DotNetProductService/
├── DotNetProductService.csproj
├── Program.cs
├── appsettings.json
└── Services/
    ├── IServiceDiscovery.cs
    └── EtcdServiceDiscovery.cs
└── Handlers/
    └── ServiceDiscoveryDelegatingHandler.cs

服务发现接口与实现 Services/:

// Services/IServiceDiscovery.cs
namespace DotNetProductService.Services;

public interface IServiceDiscovery : IDisposable
{
    Task WatchServiceAsync(string serviceName);
    IList<string> GetServiceEndpoints(string serviceName);
}

// Services/EtcdServiceDiscovery.cs
using dotnet_etcd;
using Etcdserverpb;
using Grpc.Core;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;

namespace DotNetProductService.Services;

public class EtcdServiceDiscovery : IServiceDiscovery
{
    private readonly EtcdClient _client;
    private readonly ILogger<EtcdServiceDiscovery> _logger;
    // 使用 ConcurrentDictionary 保证线程安全的服务实例缓存
    private readonly ConcurrentDictionary<string, ConcurrentBag<string>> _serviceEndpoints = new();
    private readonly CancellationTokenSource _cts = new();

    public EtcdServiceDiscovery(IOptions<EtcdConfig> config, ILogger<EtcdServiceDiscovery> logger)
    {
        var etcdUrl = config.Value.Url ?? "http://localhost:2379";
        _client = new EtcdClient(etcdUrl);
        _logger = logger;
    }

    public IList<string> GetServiceEndpoints(string serviceName)
    {
        return _serviceEndpoints.TryGetValue(serviceName, out var endpoints) ? endpoints.ToList() : new List<string>();
    }

    public async Task WatchServiceAsync(string serviceName)
    {
        var servicePrefix = $"/services/{serviceName}/";
        _serviceEndpoints.TryAdd(serviceName, new ConcurrentBag<string>());

        try
        {
            // 1. 启动时先获取一次全量服务列表
            var existingServices = await _client.GetRangeAsync(servicePrefix);
            foreach (var kv in existingServices.Kvs)
            {
                var endpoint = kv.Value.ToStringUtf8();
                _serviceEndpoints[serviceName].Add(endpoint);
                _logger.LogInformation("初始发现服务 [{service}]: {endpoint}", serviceName, endpoint);
            }

            // 2. 启动 Watch 协程,实时监听变化
            _logger.LogInformation("开始监听服务目录: {prefix}", servicePrefix);
            var watchRequest = new WatchRequest
            {
                CreateRequest = new WatchCreateRequest
                {
                    Key = Google.Protobuf.ByteString.CopyFromUtf8(servicePrefix),
                    RangeEnd = Google.Protobuf.ByteString.CopyFromUtf8(GetRangeEnd(servicePrefix))
                }
            };

            using var call = _client.Watch(watchRequest, cancellationToken: _cts.Token);
            await foreach (var response in call.ResponseStream.ReadAllAsync(_cts.Token))
            {
                foreach (var ev in response.Events)
                {
                    var endpoint = ev.Kv.Value.ToStringUtf8();
                    switch (ev.Type)
                    {
                        case Mvccpb.Event.Types.EventType.Put:
                            if (!_serviceEndpoints[serviceName].Contains(endpoint))
                            {
                                _serviceEndpoints[serviceName].Add(endpoint);
                                _logger.LogInformation("服务上线 [{service}]: {endpoint}", serviceName, endpoint);
                            }
                            break;
                        case Mvccpb.Event.Types.EventType.Delete:
                            // ConcurrentBag 不支持高效删除,因此我们创建一个新的列表
                            var updatedEndpoints = new ConcurrentBag<string>(_serviceEndpoints[serviceName].Where(e => e != endpoint));
                            _serviceEndpoints[serviceName] = updatedEndpoints;
                            _logger.LogWarning("服务下线 [{service}]: {endpoint}", serviceName, endpoint);
                            break;
                    }
                }
            }
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
        {
            _logger.LogInformation("Watch for {serviceName} was cancelled.", serviceName);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "监听服务 {serviceName} 时发生错误", serviceName);
        }
    }

    // etcd 的 RangeEnd 是一个左闭右开区间,计算前缀的下一个key
    private static string GetRangeEnd(string prefix)
    {
        var prefixBytes = System.Text.Encoding.UTF8.GetBytes(prefix);
        for (var i = prefixBytes.Length - 1; i >= 0; i--)
        {
            if (prefixBytes[i] < 0xff)
            {
                prefixBytes[i]++;
                return System.Text.Encoding.UTF8.GetString(prefixBytes.AsSpan(0, i + 1));
            }
        }
        return "\0"; // Should not happen with typical prefixes
    }

    public void Dispose()
    {
        _cts.Cancel();
        _cts.Dispose();
        _client?.Dispose();
        GC.SuppressFinalize(this);
    }
}

// 用于配置绑定的辅助类
public class EtcdConfig
{
    public string Url { get; set; } = string.Empty;
}

结合 IHttpClientFactoryDelegatingHandler:

这个处理器是实现客户端负载均衡的关键。它会拦截 HttpClient 的请求,将逻辑服务名(如 http://user-service/)替换为从服务发现缓存中获取的真实实例地址。

// Handlers/ServiceDiscoveryDelegatingHandler.cs
using DotNetProductService.Services;
using System.Collections.Concurrent;

namespace DotNetProductService.Handlers;

public class ServiceDiscoveryDelegatingHandler : DelegatingHandler
{
    private readonly IServiceDiscovery _serviceDiscovery;
    private readonly ILogger<ServiceDiscoveryDelegatingHandler> _logger;
    // 用于实现简单的轮询负载均衡
    private static readonly ConcurrentDictionary<string, int> _counters = new();

    public ServiceDiscoveryDelegatingHandler(IServiceDiscovery serviceDiscovery, ILogger<ServiceDiscoveryDelegatingHandler> logger)
    {
        _serviceDiscovery = serviceDiscovery;
        _logger = logger;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        var originalUri = request.RequestUri;
        if (originalUri == null || string.IsNullOrEmpty(originalUri.Host))
        {
            _logger.LogError("请求URI或Host为空,无法进行服务发现");
            return await base.SendAsync(request, cancellationToken);
        }

        var serviceName = originalUri.Host;
        var endpoints = _serviceDiscovery.GetServiceEndpoints(serviceName);

        if (endpoints == null || !endpoints.Any())
        {
            _logger.LogError("服务 [{service}] 没有可用的实例", serviceName);
            throw new InvalidOperationException($"No healthy instances available for service '{serviceName}'.");
        }

        // 简单的轮询负载均衡
        var counter = _counters.AddOrUpdate(serviceName, 0, (key, oldValue) => oldValue + 1);
        var endpoint = endpoints[counter % endpoints.Count];

        // 重写请求的 URI
        var newUri = new UriBuilder(originalUri)
        {
            Scheme = "http", // 假设服务都是 http
            Host = endpoint.Split(':')[0],
            Port = int.Parse(endpoint.Split(':')[1])
        }.Uri;

        request.RequestUri = newUri;
        _logger.LogInformation("服务发现: [{original}] -> [{resolved}]", originalUri, newUri);

        return await base.SendAsync(request, cancellationToken);
    }
}

Program.cs 中组装所有组件:

using DotNetProductService.Handlers;
using DotNetProductService.Services;
using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);

// 1. 配置绑定
builder.Services.Configure<EtcdConfig>(builder.Configuration.GetSection("Etcd"));

// 2. 注册服务发现为单例
builder.Services.AddSingleton<IServiceDiscovery, EtcdServiceDiscovery>();
builder.Services.AddHostedService<ServiceWatcherHostedService>();

// 3. 注册自定义的 DelegatingHandler
builder.Services.AddTransient<ServiceDiscoveryDelegatingHandler>();

// 4. 配置 HttpClientFactory
builder.Services.AddHttpClient("user-service-client", client =>
{
    // 使用逻辑服务名作为 BaseAddress
    client.BaseAddress = new Uri("http://user-service/");
}).AddHttpMessageHandler<ServiceDiscoveryDelegatingHandler>();


builder.Services.AddControllers();
var app = builder.Build();

app.MapGet("/api/products/{id}", async (int id, [FromServices] IHttpClientFactory clientFactory) =>
{
    var httpClient = clientFactory.CreateClient("user-service-client");
    try
    {
        // 这里的调用会经过 ServiceDiscoveryDelegatingHandler
        var user = await httpClient.GetFromJsonAsync<object>($"api/users/{id * 10}");
        return Results.Ok(new { ProductId = id, ProductName = "Awesome Gadget", UserDetails = user });
    }
    catch (Exception ex)
    {
        return Results.Problem($"调用用户服务失败: {ex.Message}");
    }
});

app.Run();


// 后台服务,用于启动时开始监听
public class ServiceWatcherHostedService : IHostedService
{
    private readonly IServiceDiscovery _serviceDiscovery;
    private readonly ILogger<ServiceWatcherHostedService> _logger;

    public ServiceWatcherHostedService(IServiceDiscovery serviceDiscovery, ILogger<ServiceWatcherHostedService> logger)
    {
        _serviceDiscovery = serviceDiscovery;
        _logger = logger;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("启动服务监听...");
        // 在后台启动对 user-service 的监听
        Task.Run(() => _serviceDiscovery.WatchServiceAsync("user-service"), cancellationToken);
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("停止服务监听。");
        _serviceDiscovery.Dispose();
        return Task.CompletedTask;
    }
}

动态配置管理

服务发现的模式同样适用于动态配置。我们可以将配置项存储在etcd的特定路径下,例如 /config/product-service/timeout

在.NET服务中实现配置热更新:

ASP.NET Core的配置系统具有良好的扩展性。我们可以创建一个自定义的ConfigurationProvider,它Watch etcd中的配置键,并在变化时触发Reload()

// 这是一个简化的概念实现,生产级代码需要更完善的错误处理和重试逻辑

public class EtcdConfigurationProvider : ConfigurationProvider, IDisposable
{
    private readonly EtcdClient _client;
    private readonly string _configPrefix;
    private readonly CancellationTokenSource _cts = new();

    public EtcdConfigurationProvider(string url, string prefix)
    {
        _client = new EtcdClient(url);
        _configPrefix = prefix;
        Task.Run(() => WatchConfigChangesAsync());
    }

    public override void Load()
    {
        var response = _client.GetRange(_configPrefix);
        foreach (var kvp in response.Kvs)
        {
            var key = kvp.Key.ToStringUtf8().Substring(_configPrefix.Length).Replace('/', ':');
            Data[key] = kvp.Value.ToStringUtf8();
        }
    }
    
    private async Task WatchConfigChangesAsync()
    {
        // ... Watch 逻辑类似于服务发现 ...
        // 当收到 PUT 事件时:
        // var key = ev.Kv.Key.ToStringUtf8().Substring(_configPrefix.Length).Replace('/', ':');
        // Data[key] = ev.Kv.Value.ToStringUtf8();
        // OnReload(); // 触发配置更新
    }
    
    public void Dispose() { /* ... */ }
}

Program.cs 中,我们可以将这个Provider添加到配置构建器中。之后,通过 IOptionsMonitor<T> 注入的配置对象就能自动反映etcd中的最新值。

方案的局限性与未来展望

这个基于etcd的自定义方案,虽然轻量且高效,但也存在一些局限性。

首先,客户端负载均衡策略非常初级(简单轮询),在生产环境中,需要更复杂的策略,如加权轮询、最少连接数或基于延迟的路由。这些高级功能通常由成熟的服务网格(Service Mesh)如 Istio 或 Linkerd 提供。

其次,服务健康检查的机制相对单一。目前完全依赖于etcd的租约机制,即进程存活则服务可用。一个更健壮的系统会包含应用层面的健康检查(例如,一个/health端点),注册中心会主动探测这些端点来判断服务真实可用性。

最后,动态配置管理虽然可行,但相比于专用的配置中心(如Apollo、Nacos),缺少了版本管理、灰度发布、权限控制和审计等企业级功能。

尽管如此,该方案作为构建异构微服务体系的基石,清晰地展示了如何利用etcd的核心原语解决分布式系统中最根本的服务通信与配置问题。它为后续引入更复杂的治理能力(如服务网格)或迁移到更专业的配置中心打下了坚实的基础。未来的演进方向可能是在此基础上集成更智能的路由逻辑,或者将服务发现的职责逐步交由服务网格的数据平面来处理。


  目录