gRPC Web 应用实战从入门到精通(四):构建支持自发现的 gRPC 微服务(基于 Consul)

在前面三篇文章中,我们依次掌握了:

  • 为什么选择 gRPC
  • gRPC 的通信机制
  • 如何构建与部署一个基础 gRPC Web 应用

从本篇文章开始,我们进入“微服务能力篇”。
在真实线上系统中,gRPC 服务至少需要满足三个特性:

  • 可注册:服务实例启动后自动汇报自己
  • 可发现:客户端能动态获取可用实例
  • 可负载均衡:多个节点间自动分配请求

本篇,我们将使用 Consul + gRPC 来完成一个 支持自动发现 + 负载均衡的微服务 Demo


一、为什么需要服务注册与发现?

在 “单机部署” 场景中,客户端直接使用:

127.0.0.1:50051

但在微服务中,一个服务可能有:

  • 多实例副本
  • 动态扩容与缩容
  • 随时上下线
  • IP & 端口随机变化(如 Docker/K8s)

所以客户端 无法写死地址

我们需要一个注册中心来解决:

需求 为什么
服务注册 实例启动 → 上报自己的 IP/Port
服务发现 客户端实时获取可用实例列表
健康检查 下线不健康的节点
自动负载均衡 多个实例轮询或权重调度

Consul 正是这样一个组件。


二、Consul 简介

Consul 是 HashiCorp 开发的一款开源服务网格工具,具有:

  • 服务注册 / 发现
  • 健康检查
  • KV 存储
  • 多数据中心
  • 内置 UI
  • 原生支持 gRPC 健康检查

优点:

  • 上手极快
  • 单机即可跑
  • 轻量级,无依赖
  • 完美适配 gRPC

三、使用 Docker 启动 Consul

下面是一条命令启动 UI + Server 的 Consul:

docker pull consul:1.15.4

docker run -d \
  --name=consul \
  -p 8500:8500 \
  -p 8600:8600/udp \
  consul:1.15.4 agent -dev -client 0.0.0.0

访问浏览器:

http://localhost:8500

即可看到 Consul UI,如下图:
Consul


四、构建 gRPC 服务并注册到 Consul

我们来实现一个最简单的 gRPC 服务:HelloService。

1. 生成 proto

假设已有以下 service:

syntax = "proto3";

package hello;
option go_package = "./;pb";

service HelloService {
  rpc SayHello(HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

执行


2. gRPC Server + Consul 注册

下面的代码会自动注册服务,并暴露 gRPC 健康检查:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"os"
	"time"

	pb "consul-learn/proto"

	"github.com/hashicorp/consul/api"
	"google.golang.org/grpc"
	"google.golang.org/grpc/health"
	healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// HelloServer 实现 gRPC 服务
// 继承 UnimplementedHelloServiceServer 以确保向前兼容性
type HelloServer struct {
	pb.UnimplementedHelloServiceServer
	instanceID string // 服务实例唯一标识
}

// SayHello 实现 SayHello RPC 方法
// 这是核心的业务接口,用于响应客户端的问候请求
func (s *HelloServer) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{
		Message: fmt.Sprintf("Hello %s from %s", req.Name, s.instanceID),
	}, nil
}

// registerToConsul 将服务注册到 Consul 服务发现中心
// instanceID: 服务实例ID
// host: 服务监听的主机地址
// port: 服务监听的端口
func registerToConsul(instanceID, host string, port int) error {
	// 创建 Consul 客户端配置,使用默认配置
	cfg := api.DefaultConfig()
	// 指定 Consul 服务器地址
	cfg.Address = "127.0.0.1:8500"

	// 创建 Consul 客户端
	client, err := api.NewClient(cfg)
	if err != nil {
		return err
	}

	// 构造服务ID,确保唯一性
	serviceID := fmt.Sprintf("%s-%d", instanceID, port)

	// 配置服务注册信息
	registration := &api.AgentServiceRegistration{
		ID:      serviceID,                    // 服务唯一标识
		Name:    "hello-service",             // 服务名称
		Address: host,                         // 服务地址
		Port:    port,                         // 服务端口
		Check: &api.AgentServiceCheck{         // 健康检查配置
			// 使用 gRPC 健康检查,检查服务的 gRPC 接口是否可用
			// TODO: 由于是容器启动,这里要使用eth0网卡和容器交互的地址
			GRPC:                           fmt.Sprintf("%s:%d", host, port),
			GRPCUseTLS:                     false, // 不使用 TLS
			Interval:                       "5s",  // 每5秒检查一次
			Timeout:                        "2s",  // 检查超时时间2秒
			DeregisterCriticalServiceAfter: "30s", // 服务不健康30秒后自动注销
		},
	}

	// 执行服务注册
	return client.Agent().ServiceRegister(registration)
}

func main() {
	// 支持多个实例监听不同端口
	// 默认端口为 50051,可通过环境变量 PORT 覆盖
	port := 50051
	if p := os.Getenv("PORT"); p != "" {
		fmt.Sscanf(p, "%d", &port)
	}

	// 获取服务实例ID
	// 优先使用环境变量 INSTANCE_ID,否则生成唯一ID
	instanceID := os.Getenv("INSTANCE_ID")
	if instanceID == "" {
		instanceID = fmt.Sprintf("hello-%d", time.Now().UnixNano())
	}

	// 监听所有网络接口
	host := "0.0.0.0"
	// 创建 TCP 监听器
	lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
	if err != nil {
		log.Fatalf("listen failed: %v", err)
	}

	// 创建 gRPC 服务器
	grpcServer := grpc.NewServer()

	// 注册业务服务到 gRPC 服务器
	pb.RegisterHelloServiceServer(grpcServer, &HelloServer{instanceID: instanceID})

	// 注册 gRPC 健康检查服务
	// 这是 gRPC 标准的健康检查机制,用于服务发现和负载均衡
	healthServer := health.NewServer()
	healthpb.RegisterHealthServer(grpcServer, healthServer)
	// 设置服务状态为 SERVING,表示服务可用
	healthServer.SetServingStatus("hello-service", healthpb.HealthCheckResponse_SERVING)

	// 注册服务到 Consul 服务发现中心
	// 使用 host.docker.internal 作为注册地址,确保容器外可以访问
	if err := registerToConsul(instanceID, "host.docker.internal", port); err != nil {
		log.Fatalf("Consul register failed: %v", err)
	}

	// 启动服务并输出启动信息
	log.Printf("Server [%s] started at %s:%d", instanceID, host, port)
	// 开始处理 gRPC 请求(阻塞调用)
	grpcServer.Serve(lis)
}

Consul UI 中会出现:

hello-service

实例加入


五、实现 Client 端:动态服务发现 + 负载均衡

Consul 本身不提供 gRPC resolver,所以我们需要自定义一个。

你可以把它理解为:
gRPC 客户端每隔 N 秒从 Consul 拉取一次可用节点列表,然后更新自己的负载均衡池。

完整 resolver,负载均衡的相关细节可以参考该文章【写给go开发者的gRPC教程-服务发现与负载均衡】,这篇文章详细解释了resolver接口实现负载均衡的方式:

/*
------------------------
 Consul Resolver - 自定义服务发现解析器
------------------------
*/

// consulResolverBuilder 实现 gRPC resolver.Builder 接口
// 用于构建 Consul 服务发现解析器
type consulResolverBuilder struct{}

// Scheme 返回解析器支持的协议方案
// 这里返回 "consul",支持 "consul://service-name" 格式的 URI
func (*consulResolverBuilder) Scheme() string { return "consul" }

// Build 构建解析器实例
// target: 解析目标,包含服务名称等信息
// cc: gRPC 客户端连接,用于更新服务地址
// opts: 构建选项
func (*consulResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	// 从目标中提取服务名称
	serviceName := target.Endpoint()

	// 创建 Consul 解析器实例
	r := &consulResolver{
		serviceName: serviceName,
		cc:          cc,
		stop:        make(chan struct{}),
	}

	// 启动服务发现监听协程
	go r.watch()
	return r, nil
}

// consulResolver 实现 gRPC resolver.Resolver 接口
// 负责从 Consul 获取服务实例地址并更新给 gRPC 客户端
type consulResolver struct {
	serviceName string            // 要发现的服务名称
	cc          resolver.ClientConn // gRPC 客户端连接
	stop        chan struct{}      // 停止信号通道
}

// ResolveNow 立即解析服务地址(可选实现)
func (r *consulResolver) ResolveNow(resolver.ResolveNowOptions) {}

// Close 关闭解析器,释放资源
func (r *consulResolver) Close() { close(r.stop) }

// watch 监听 Consul 中的服务变化,实时更新服务地址
func (r *consulResolver) watch() {
	// 创建 Consul 客户端
	config := api.DefaultConfig()
	client, _ := api.NewClient(config)

	// 创建定时器,每3秒检查一次服务状态
	ticker := time.NewTicker(3 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// 从 Consul 获取健康的服务实例
			// 参数说明:服务名、标签、只返回健康实例、查询选项
			svcs, _, err := client.Health().Service(r.serviceName, "", true, nil)
			if err != nil {
				continue // 忽略错误,继续下一次检查
			}

			// 构建服务地址列表
			addrs := make([]resolver.Address, 0)
			for _, svc := range svcs {
				// 格式化服务地址为 "host:port"
				addr := fmt.Sprintf("%s:%d", svc.Service.Address, svc.Service.Port)
				addrs = append(addrs, resolver.Address{Addr: addr})
			}

			// 更新 gRPC 客户端的服务地址状态
			r.cc.UpdateState(resolver.State{Addresses: addrs})

		case <-r.stop:
			// 收到停止信号,退出监听
			return
		}
	}
}

六、启动多个服务实例(验证负载均衡)

为了方便,我们实现一个yaml 读取程序,并根据yaml中的内容启动多个server。

start_servers.go

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"gopkg.in/yaml.v3"
)

type ServiceConfig struct {
	InstanceId string `yaml:"instanceId"`
	Port       int    `yaml:"port"`
}

type Config struct {
	Services []ServiceConfig `yaml:"services"`
}

func main() {
	// 读取配置文件
	configFile := "config.yaml"
	if len(os.Args) > 1 {
		configFile = os.Args[1]
	}

	configData, err := os.ReadFile(configFile)
	if err != nil {
		log.Fatalf("读取配置文件失败: %v", err)
	}

	var config Config
	err = yaml.Unmarshal(configData, &config)
	if err != nil {
		log.Fatalf("解析配置文件失败: %v", err)
	}

	if len(config.Services) == 0 {
		log.Fatal("配置文件中没有定义服务")
	}

	fmt.Printf("准备启动 %d 个服务实例...\n", len(config.Services))

	var wg sync.WaitGroup
	wd, _ := os.Getwd()
	serverPath := filepath.Join(wd, "server")
	serverExePath := filepath.Join(serverPath, "server.exe")

	// 存储所有子进程
	var processes []*exec.Cmd
	var mu sync.Mutex

	// 2. 启动所有服务
	for _, svc := range config.Services {
		wg.Add(1)
		go func(svc ServiceConfig) {
			defer wg.Done()

			env := os.Environ()
			env = append(env, fmt.Sprintf("INSTANCE_ID=%s", svc.InstanceId))
			env = append(env, fmt.Sprintf("PORT=%d", svc.Port))

			cmd := exec.Command(serverExePath)
			cmd.Env = env
			cmd.Dir = serverPath

			if err := cmd.Start(); err != nil {
				log.Printf("启动服务 %s 失败: %v", svc.InstanceId, err)
				return
			}

			mu.Lock()
			processes = append(processes, cmd)
			mu.Unlock()

			fmt.Printf("服务 %s 启动成功 PID=%d\n", svc.InstanceId, cmd.Process.Pid)
		}(svc)

		time.Sleep(1 * time.Second)
	}

	wg.Wait()
	fmt.Println("所有服务启动成功")
	fmt.Println("按 Ctrl + C 退出...")

	// 3. 启动优雅退出监听
	gracefulExit(processes)
}

func gracefulExit(processes []*exec.Cmd) {
	// 创建一个信号监听通道
	sigChan := make(chan os.Signal, 1)

	// 监听 Ctrl+C、kill、关闭窗口
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	sig := <-sigChan
	fmt.Println()
	fmt.Println("收到退出信号:", sig)

	fmt.Println("准备关闭所有服务...")

	// 创建一个超时上下文(例如最多等 5 秒)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var wg sync.WaitGroup

	for _, cmd := range processes {
		if cmd.Process == nil {
			continue
		}

		wg.Add(1)
		go func(cmd *exec.Cmd) {
			defer wg.Done()

			fmt.Printf("发送退出信号给 PID=%d...\n", cmd.Process.Pid)

			// Windows + Linux 通用的终止进程方式
			err := cmd.Process.Signal(syscall.SIGINT)
			if err != nil {
				fmt.Printf("发送 SIGINT 失败,尝试 Kill: %v\n", err)
				_ = cmd.Process.Kill()
				return
			}

			// 等待退出(支持超时)
			exitCh := make(chan error)
			go func() {
				exitCh <- cmd.Wait()
			}()

			select {
			case <-ctx.Done():
				fmt.Printf("PID %d 超时未退出,强制 Kill\n", cmd.Process.Pid)
				_ = cmd.Process.Kill()
			case err := <-exitCh:
				fmt.Printf("PID %d 已退出(err=%v)\n", cmd.Process.Pid, err)
			}

		}(cmd)
	}

	wg.Wait()
	fmt.Println("所有服务已关闭,程序退出")
}

加载的config.yaml文件

services:
  - instanceId: "hello-server-1"
    port: 50051
  - instanceId: "hello-server-2"
    port: 50052
  - instanceId: "hello-server-3"
    port: 50053

Consul UI 里你会看到:

ID Address Port
hello-server-1 host.docker.internal 50051
hello-server-2 host.docker.internal 50052
hello-server-3 host.docker.internal 50053

客户端输出将变成:

Hello Jack from hello-server-1
Hello Jack from hello-server-2
Hello Jack from hello-server-3
Hello Jack from hello-server-1
Hello Jack from hello-server-2
...

client访问

这说明:

  1. 服务自动注册
  2. 客户端自动发现
  3. gRPC 轮询负载均衡(round robin)

七、总结

这一篇我们实现了一个完整微服务能力链路:

能力 使用组件
服务注册 Consul Agent
健康检查 Consul Health Check
服务发现 自定义 gRPC resolver
自动负载均衡 round_robin
多实例调度 gRPC balancer

参考文章

【docker 安装/部署consul容器】
如何在 Go 语言中使用 Consul
consul的基本使用—golang
Golang 中文学习文档:Consul
写给go开发者的gRPC教程-服务发现与负载均衡

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐