gRPC Web 应用实战从入门到精通(四):构建支持自发现的 gRPC 微服务(基于 Consul)
摘要: 本文介绍了如何构建支持自发现的gRPC微服务,使用Consul实现服务注册、发现与负载均衡。通过Docker快速部署Consul,并演示如何将gRPC服务注册到Consul,同时实现健康检查。客户端通过动态获取服务实例列表,结合gRPC负载均衡策略,实现多实例请求分配。代码示例展示了服务端注册逻辑和健康检查配置,为微服务架构提供了可扩展的解决方案。
gRPC Web 应用实战从入门到精通(四):构建支持自发现的 gRPC 微服务(基于 Consul)
在前面三篇文章中,我们依次掌握了:
- 为什么选择 gRPC
- gRPC 的通信机制
- 如何构建与部署一个基础 gRPC Web 应用
从本篇文章开始,我们进入“微服务能力篇”。
在真实线上系统中,gRPC 服务至少需要满足三个特性:
- 可注册:服务实例启动后自动汇报自己
- 可发现:客户端能动态获取可用实例
- 可负载均衡:多个节点间自动分配请求
本篇,我们将使用 Consul + gRPC 来完成一个 支持自动发现 + 负载均衡的微服务 Demo。
一、为什么需要服务注册与发现?
在 “单机部署” 场景中,客户端直接使用:
127.0.0.1:50051
但在微服务中,一个服务可能有:
- 多实例副本
- 动态扩容与缩容
- 随时上下线
- IP & 端口随机变化(如 Docker/K8s)
所以客户端 无法写死地址。
我们需要一个注册中心来解决:
| 需求 | 为什么 |
|---|---|
| 服务注册 | 实例启动 → 上报自己的 IP/Port |
| 服务发现 | 客户端实时获取可用实例列表 |
| 健康检查 | 下线不健康的节点 |
| 自动负载均衡 | 多个实例轮询或权重调度 |
Consul 正是这样一个组件。
二、Consul 简介
Consul 是 HashiCorp 开发的一款开源服务网格工具,具有:
- 服务注册 / 发现
- 健康检查
- KV 存储
- 多数据中心
- 内置 UI
- 原生支持 gRPC 健康检查
优点:
- 上手极快
- 单机即可跑
- 轻量级,无依赖
- 完美适配 gRPC
三、使用 Docker 启动 Consul
下面是一条命令启动 UI + Server 的 Consul:
docker pull consul:1.15.4
docker run -d \
--name=consul \
-p 8500:8500 \
-p 8600:8600/udp \
consul:1.15.4 agent -dev -client 0.0.0.0
访问浏览器:
http://localhost:8500
即可看到 Consul UI,如下图:
四、构建 gRPC 服务并注册到 Consul
我们来实现一个最简单的 gRPC 服务:HelloService。
1. 生成 proto
假设已有以下 service:
syntax = "proto3";
package hello;
option go_package = "./;pb";
service HelloService {
rpc SayHello(HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
执行
2. gRPC Server + Consul 注册
下面的代码会自动注册服务,并暴露 gRPC 健康检查:
package main
import (
"context"
"fmt"
"log"
"net"
"os"
"time"
pb "consul-learn/proto"
"github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// HelloServer 实现 gRPC 服务
// 继承 UnimplementedHelloServiceServer 以确保向前兼容性
type HelloServer struct {
pb.UnimplementedHelloServiceServer
instanceID string // 服务实例唯一标识
}
// SayHello 实现 SayHello RPC 方法
// 这是核心的业务接口,用于响应客户端的问候请求
func (s *HelloServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{
Message: fmt.Sprintf("Hello %s from %s", req.Name, s.instanceID),
}, nil
}
// registerToConsul 将服务注册到 Consul 服务发现中心
// instanceID: 服务实例ID
// host: 服务监听的主机地址
// port: 服务监听的端口
func registerToConsul(instanceID, host string, port int) error {
// 创建 Consul 客户端配置,使用默认配置
cfg := api.DefaultConfig()
// 指定 Consul 服务器地址
cfg.Address = "127.0.0.1:8500"
// 创建 Consul 客户端
client, err := api.NewClient(cfg)
if err != nil {
return err
}
// 构造服务ID,确保唯一性
serviceID := fmt.Sprintf("%s-%d", instanceID, port)
// 配置服务注册信息
registration := &api.AgentServiceRegistration{
ID: serviceID, // 服务唯一标识
Name: "hello-service", // 服务名称
Address: host, // 服务地址
Port: port, // 服务端口
Check: &api.AgentServiceCheck{ // 健康检查配置
// 使用 gRPC 健康检查,检查服务的 gRPC 接口是否可用
// TODO: 由于是容器启动,这里要使用eth0网卡和容器交互的地址
GRPC: fmt.Sprintf("%s:%d", host, port),
GRPCUseTLS: false, // 不使用 TLS
Interval: "5s", // 每5秒检查一次
Timeout: "2s", // 检查超时时间2秒
DeregisterCriticalServiceAfter: "30s", // 服务不健康30秒后自动注销
},
}
// 执行服务注册
return client.Agent().ServiceRegister(registration)
}
func main() {
// 支持多个实例监听不同端口
// 默认端口为 50051,可通过环境变量 PORT 覆盖
port := 50051
if p := os.Getenv("PORT"); p != "" {
fmt.Sscanf(p, "%d", &port)
}
// 获取服务实例ID
// 优先使用环境变量 INSTANCE_ID,否则生成唯一ID
instanceID := os.Getenv("INSTANCE_ID")
if instanceID == "" {
instanceID = fmt.Sprintf("hello-%d", time.Now().UnixNano())
}
// 监听所有网络接口
host := "0.0.0.0"
// 创建 TCP 监听器
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
log.Fatalf("listen failed: %v", err)
}
// 创建 gRPC 服务器
grpcServer := grpc.NewServer()
// 注册业务服务到 gRPC 服务器
pb.RegisterHelloServiceServer(grpcServer, &HelloServer{instanceID: instanceID})
// 注册 gRPC 健康检查服务
// 这是 gRPC 标准的健康检查机制,用于服务发现和负载均衡
healthServer := health.NewServer()
healthpb.RegisterHealthServer(grpcServer, healthServer)
// 设置服务状态为 SERVING,表示服务可用
healthServer.SetServingStatus("hello-service", healthpb.HealthCheckResponse_SERVING)
// 注册服务到 Consul 服务发现中心
// 使用 host.docker.internal 作为注册地址,确保容器外可以访问
if err := registerToConsul(instanceID, "host.docker.internal", port); err != nil {
log.Fatalf("Consul register failed: %v", err)
}
// 启动服务并输出启动信息
log.Printf("Server [%s] started at %s:%d", instanceID, host, port)
// 开始处理 gRPC 请求(阻塞调用)
grpcServer.Serve(lis)
}
Consul UI 中会出现:
hello-service

五、实现 Client 端:动态服务发现 + 负载均衡
Consul 本身不提供 gRPC resolver,所以我们需要自定义一个。
你可以把它理解为:
gRPC 客户端每隔 N 秒从 Consul 拉取一次可用节点列表,然后更新自己的负载均衡池。
完整 resolver,负载均衡的相关细节可以参考该文章【写给go开发者的gRPC教程-服务发现与负载均衡】,这篇文章详细解释了resolver接口实现负载均衡的方式:
/*
------------------------
Consul Resolver - 自定义服务发现解析器
------------------------
*/
// consulResolverBuilder 实现 gRPC resolver.Builder 接口
// 用于构建 Consul 服务发现解析器
type consulResolverBuilder struct{}
// Scheme 返回解析器支持的协议方案
// 这里返回 "consul",支持 "consul://service-name" 格式的 URI
func (*consulResolverBuilder) Scheme() string { return "consul" }
// Build 构建解析器实例
// target: 解析目标,包含服务名称等信息
// cc: gRPC 客户端连接,用于更新服务地址
// opts: 构建选项
func (*consulResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 从目标中提取服务名称
serviceName := target.Endpoint()
// 创建 Consul 解析器实例
r := &consulResolver{
serviceName: serviceName,
cc: cc,
stop: make(chan struct{}),
}
// 启动服务发现监听协程
go r.watch()
return r, nil
}
// consulResolver 实现 gRPC resolver.Resolver 接口
// 负责从 Consul 获取服务实例地址并更新给 gRPC 客户端
type consulResolver struct {
serviceName string // 要发现的服务名称
cc resolver.ClientConn // gRPC 客户端连接
stop chan struct{} // 停止信号通道
}
// ResolveNow 立即解析服务地址(可选实现)
func (r *consulResolver) ResolveNow(resolver.ResolveNowOptions) {}
// Close 关闭解析器,释放资源
func (r *consulResolver) Close() { close(r.stop) }
// watch 监听 Consul 中的服务变化,实时更新服务地址
func (r *consulResolver) watch() {
// 创建 Consul 客户端
config := api.DefaultConfig()
client, _ := api.NewClient(config)
// 创建定时器,每3秒检查一次服务状态
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 从 Consul 获取健康的服务实例
// 参数说明:服务名、标签、只返回健康实例、查询选项
svcs, _, err := client.Health().Service(r.serviceName, "", true, nil)
if err != nil {
continue // 忽略错误,继续下一次检查
}
// 构建服务地址列表
addrs := make([]resolver.Address, 0)
for _, svc := range svcs {
// 格式化服务地址为 "host:port"
addr := fmt.Sprintf("%s:%d", svc.Service.Address, svc.Service.Port)
addrs = append(addrs, resolver.Address{Addr: addr})
}
// 更新 gRPC 客户端的服务地址状态
r.cc.UpdateState(resolver.State{Addresses: addrs})
case <-r.stop:
// 收到停止信号,退出监听
return
}
}
}
六、启动多个服务实例(验证负载均衡)
为了方便,我们实现一个yaml 读取程序,并根据yaml中的内容启动多个server。
start_servers.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
type ServiceConfig struct {
InstanceId string `yaml:"instanceId"`
Port int `yaml:"port"`
}
type Config struct {
Services []ServiceConfig `yaml:"services"`
}
func main() {
// 读取配置文件
configFile := "config.yaml"
if len(os.Args) > 1 {
configFile = os.Args[1]
}
configData, err := os.ReadFile(configFile)
if err != nil {
log.Fatalf("读取配置文件失败: %v", err)
}
var config Config
err = yaml.Unmarshal(configData, &config)
if err != nil {
log.Fatalf("解析配置文件失败: %v", err)
}
if len(config.Services) == 0 {
log.Fatal("配置文件中没有定义服务")
}
fmt.Printf("准备启动 %d 个服务实例...\n", len(config.Services))
var wg sync.WaitGroup
wd, _ := os.Getwd()
serverPath := filepath.Join(wd, "server")
serverExePath := filepath.Join(serverPath, "server.exe")
// 存储所有子进程
var processes []*exec.Cmd
var mu sync.Mutex
// 2. 启动所有服务
for _, svc := range config.Services {
wg.Add(1)
go func(svc ServiceConfig) {
defer wg.Done()
env := os.Environ()
env = append(env, fmt.Sprintf("INSTANCE_ID=%s", svc.InstanceId))
env = append(env, fmt.Sprintf("PORT=%d", svc.Port))
cmd := exec.Command(serverExePath)
cmd.Env = env
cmd.Dir = serverPath
if err := cmd.Start(); err != nil {
log.Printf("启动服务 %s 失败: %v", svc.InstanceId, err)
return
}
mu.Lock()
processes = append(processes, cmd)
mu.Unlock()
fmt.Printf("服务 %s 启动成功 PID=%d\n", svc.InstanceId, cmd.Process.Pid)
}(svc)
time.Sleep(1 * time.Second)
}
wg.Wait()
fmt.Println("所有服务启动成功")
fmt.Println("按 Ctrl + C 退出...")
// 3. 启动优雅退出监听
gracefulExit(processes)
}
func gracefulExit(processes []*exec.Cmd) {
// 创建一个信号监听通道
sigChan := make(chan os.Signal, 1)
// 监听 Ctrl+C、kill、关闭窗口
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
fmt.Println()
fmt.Println("收到退出信号:", sig)
fmt.Println("准备关闭所有服务...")
// 创建一个超时上下文(例如最多等 5 秒)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var wg sync.WaitGroup
for _, cmd := range processes {
if cmd.Process == nil {
continue
}
wg.Add(1)
go func(cmd *exec.Cmd) {
defer wg.Done()
fmt.Printf("发送退出信号给 PID=%d...\n", cmd.Process.Pid)
// Windows + Linux 通用的终止进程方式
err := cmd.Process.Signal(syscall.SIGINT)
if err != nil {
fmt.Printf("发送 SIGINT 失败,尝试 Kill: %v\n", err)
_ = cmd.Process.Kill()
return
}
// 等待退出(支持超时)
exitCh := make(chan error)
go func() {
exitCh <- cmd.Wait()
}()
select {
case <-ctx.Done():
fmt.Printf("PID %d 超时未退出,强制 Kill\n", cmd.Process.Pid)
_ = cmd.Process.Kill()
case err := <-exitCh:
fmt.Printf("PID %d 已退出(err=%v)\n", cmd.Process.Pid, err)
}
}(cmd)
}
wg.Wait()
fmt.Println("所有服务已关闭,程序退出")
}
加载的config.yaml文件
services:
- instanceId: "hello-server-1"
port: 50051
- instanceId: "hello-server-2"
port: 50052
- instanceId: "hello-server-3"
port: 50053
Consul UI 里你会看到:
| ID | Address | Port |
|---|---|---|
| hello-server-1 | host.docker.internal | 50051 |
| hello-server-2 | host.docker.internal | 50052 |
| hello-server-3 | host.docker.internal | 50053 |
客户端输出将变成:
Hello Jack from hello-server-1
Hello Jack from hello-server-2
Hello Jack from hello-server-3
Hello Jack from hello-server-1
Hello Jack from hello-server-2
...

这说明:
- 服务自动注册
- 客户端自动发现
- gRPC 轮询负载均衡(round robin)
七、总结
这一篇我们实现了一个完整微服务能力链路:
| 能力 | 使用组件 |
|---|---|
| 服务注册 | Consul Agent |
| 健康检查 | Consul Health Check |
| 服务发现 | 自定义 gRPC resolver |
| 自动负载均衡 | round_robin |
| 多实例调度 | gRPC balancer |
参考文章
【docker 安装/部署consul容器】
如何在 Go 语言中使用 Consul
consul的基本使用—golang
Golang 中文学习文档:Consul
写给go开发者的gRPC教程-服务发现与负载均衡
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)