go 进阶 go-zero相关: 五. 服务发现原理
【代码】go 进阶 go-zero相关: 四. 服务发现原理。
·
目录
三. discov 服务发现底层原理
- 服务提供方注册完成后, 服务消费方如何获取服务发起调用的, 多服务节点如何选择指定服务的,在使用go-zero时, 服务消费方需要编写如下代码, 将服务提供方设置到ServiceContext中
//1.当前服务消费方的ServiceContext中添加需要调用的rpc服务
type ServiceContext struct {
Config config.Config
//添加rpc服务UserRpc,当前服务需要调用这个rpc服务
UserRpc userclient.User
}
//2.初始化ServiceContext时,添加初始化rpc客户端逻辑
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
//添加初始化rpc客户端逻辑
UserRpc: userclient.NewUser(zrpc.MustNewClient(c.UserRpc)),
}
}
- 服务发现的相关逻辑重点就在zrpc.MustNewClient()中,查看该函数源码,在该函数内会调用NewClient()
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
//调用NewClient()
cli, err := NewClient(c, options...)
if err != nil {
log.Fatal(err)
}
return cli
}
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: c.App,
Token: c.Token,
})))
}
if c.NonBlock {
opts = append(opts, WithNonBlock())
}
if c.Timeout > 0 {
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}
opts = append(opts, options...)
//调用BuildTarget()生成target协议,是一个string字符串,例如"discov://127.0.0.1:2379/user.rpc"
target, err := c.BuildTarget()
if err != nil {
return nil, err
}
//调用internal下的NewClient()
client, err := internal.NewClient(target, opts...)
if err != nil {
return nil, err
}
return &RpcClient{
client: client,
}, nil
}
- 总结NewClient()重点关注两个步骤:
- 执行BuildTarget()生成target协议
- 执行internal下的NewClient()函数,服务发现等逻辑
1. BuildTarget() 生成target协议
- 查看生成target协议源码: 在微服务中需要有注册中心, 服务提供方启动时会将该服务的服务名服务地址保存到注册中心, 服务消费方启动时连接注册中心,通过目标服务名称在注册中心获取指定服务的调用地址,在go-zero中默认以etcd作为注册中心,这个target协议就是etcd的地址加上目标服务的key,例如"discov://127.0.0.1:2379/user.rpc"
RpcClientConf struct {
Etcd discov.EtcdConf `json:",optional,inherit"`
Endpoints []string `json:",optional"`
Target string `json:",optional"`
App string `json:",optional"`
Token string `json:",optional"`
NonBlock bool `json:",optional"`
Timeout int64 `json:",default=2000"`
}
func (cc RpcClientConf) BuildTarget() (string, error) {
if len(cc.Endpoints) > 0 {
return resolver.BuildDirectTarget(cc.Endpoints), nil
} else if len(cc.Target) > 0 {
return cc.Target, nil
}
if err := cc.Etcd.Validate(); err != nil {
return "", err
}
if cc.Etcd.HasAccount() {
discov.RegisterAccount(cc.Etcd.Hosts, cc.Etcd.User, cc.Etcd.Pass)
}
if cc.Etcd.HasTLS() {
if err := discov.RegisterTLS(cc.Etcd.Hosts, cc.Etcd.CertFile, cc.Etcd.CertKeyFile,
cc.Etcd.CACertFile, cc.Etcd.InsecureSkipVerify); err != nil {
return "", err
}
}
return resolver.BuildDiscovTarget(cc.Etcd.Hosts, cc.Etcd.Key), nil
}
2. internal下的NewClient()
// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
//负载均衡相关处理
svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)
balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))
opts = append([]ClientOption{balancerOpt}, opts...)
//dial()重点
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
func (c *client) dial(server string, opts ...ClientOption) error {
//该方法内部注册了一些拦截器(6个,比较重要的有熔断拦截器BreakerInterceptor)
options := c.buildDialOptions(opts...)
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
//DialContext()调用grpc的方法
conn, err := grpc.DialContext(timeCtx, server, options...)
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {
pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {
service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",
server, err.Error(), service)
}
c.conn = conn
return nil
}
- 查看DialContext()源码,实际该方法内部执行了很多比较重要的逻辑,如下图

- 当前我们先关注一下:
- resolverBuilder, err := cc.parseTargetAndFindResolver() //解析target协议
- rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) //包装Resolver
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range extraDialOptions {
opt.apply(&cc.dopts)
}
for _, opt := range opts {
opt.apply(&cc.dopts)
}
//拦截器处理
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
defer func() {
if err != nil {
cc.Close()
}
}()
pid := cc.dopts.channelzParentID
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target)
ted := &channelz.TraceEventDesc{
Desc: "Channel created",
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
cc.csMgr.channelzID = cc.channelzID
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
return nil, errNoTransportCredsInBundle
}
transportCreds := cc.dopts.copts.TransportCredentials
if transportCreds == nil {
transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
}
if transportCreds.Info().SecurityProtocol == "insecure" {
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
switch {
case ctx.Err() == err:
conn = nil
case err == nil || !cc.dopts.returnLastError:
conn, err = nil, ctx.Err()
default:
conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
}
default:
}
}()
scSet := false
if cc.dopts.scChan != nil {
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
}
}
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.DefaultExponential
}
//根据协议解析target
resolverBuilder, err := cc.parseTargetAndFindResolver()
if err != nil {
return nil, err
}
cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
if err != nil {
return nil, err
}
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
cc.Connect()
s := cc.GetState()
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
}
}
}
return cc, nil
}
ClientConn
- 第一步会先封装一个ClientConn结构体变量
type ClientConn struct {
ctx context.Context // 客户端连接的上下文,用于取消或超时
cancel context.CancelFunc // 用于取消客户端连接的函数
target string // 客户端连接的目标地址,可以是dns或etcd等
parsedTarget resolver.Target // 对target解析后的目标地址,包含scheme, authority, endpoint等信息
authority string // 客户端连接的授权信息,用于验证服务端身份
dopts dialOptions // 客户端连接的拨号选项,包含各种配置参数和拦截器
channelzID *channelz.Identifier // 客户端连接的通道标识符,用于监控和调试
balancerWrapper *ccBalancerWrapper // 客户端连接的负载均衡器包装器,用于管理多个子连接和选择器
csMgr *connectivityStateManager // 客户端连接的状态管理器,用于跟踪和更新连接状态
blockingpicker *pickerWrapper // 客户端连接的阻塞选择器,用于在非阻塞模式下选择一个子连接
safeConfigSelector iresolver.SafeConfigSelector // 客户端连接的安全配置选择器,用于从服务配置中选择一个子集
czData *channelzData // 客户端连接的通道数据,用于记录通道的统计信息
retryThrottler atomic.Value // 客户端连接的重试节流器,用于控制重试频率和开关
firstResolveEvent *grpcsync.Event // 客户端连接的首次解析事件,用于标记是否已经完成首次解析
mu sync.RWMutex // 用于保护客户端连接的读写锁
resolverWrapper *ccResolverWrapper // 客户端连接的解析器包装器,用于监听目标地址变化和更新服务配置
sc *ServiceConfig // 客户端连接的服务配置,包含负载均衡策略和方法配置等信息
conns map[*addrConn]struct{} // 客户端连接管理的子连接集合,以地址为键
mkp keepalive.ClientParameters // 客户端连接的保活参数,包含超时时间和间隔时间等信息
lceMu sync.Mutex // 用于保护客户端连接最后一次错误信息的互斥锁
lastConnectionError error // 客户端连接最后一次错误信息,用于记录拨号失败或断开原因
}
- ClientConn结构体主要是用来表示一个客户端和服务端之间的连接,它封装了连接的创建,管理,状态,配置,拦截器等功能
parseTargetAndFindResolver()
- parseTargetAndFindResolver()中重点关注两步
- 执行parseTarget()解析target协议字符串,封装为Target结构体对象
- 执行getResolver()在一个map中检测获取对应当前scheme的resolver,也就是当前scheme对应的Builder实现
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
var rb resolver.Builder
//解析target协议,封装为Target结构体
parsedTarget, err := parseTarget(cc.target)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
} else {
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
//parsedTarget.Scheme的值就是"discov"
rb = cc.getResolver(parsedTarget.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
return rb, nil
}
}
// We are here because the user's dial target did not contain a scheme or
// specified an unregistered scheme. We should fallback to the default
// scheme, except when a custom dialer is specified in which case, we should
// always use passthrough scheme.
defScheme := resolver.GetDefaultScheme()
channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
canonicalTarget := defScheme + ":///" + cc.target
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
return nil, err
}
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme)
if rb == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
}
cc.parsedTarget = parsedTarget
return rb, nil
}
parseTarget(cc.target)将target协议字符串封装为 Target 结构体
func parseTarget(target string) (resolver.Target, error) {
u, err := url.Parse(target)
if err != nil {
return resolver.Target{}, err
}
// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
// value returned from url.Parse() contains a leading "/". Although this is
// in accordance with RFC 3986, we do not want to break existing resolver
// implementations which expect the endpoint without the leading "/". So, we
// end up stripping the leading "/" here. But this will result in an
// incorrect parsing for something like "unix:///path/to/socket". Since we
// own the "unix" resolver, we can workaround in the unix resolver by using
// the `URL` field instead of the `Endpoint` field.
endpoint := u.Path
if endpoint == "" {
endpoint = u.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")
return resolver.Target{
Scheme: u.Scheme,
Authority: u.Host,
Endpoint: endpoint,
URL: *u,
}, nil
}
type Target struct {
// Deprecated: use URL.Scheme instead.
Scheme string
// Deprecated: use URL.Host instead.
Authority string
// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when
// the former is empty.
Endpoint string
// URL contains the parsed dial target with an optional default scheme added
// to it if the original dial target contained no scheme or contained an
// unregistered scheme. Any query params specified in the original dial
// target can be accessed from here.
URL url.URL
}
getResolver(scheme string)获取该scheme对应的Builder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() {
return rb
}
}
return resolver.Get(scheme)
}
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme = "passthrough"
)
func Register(b Builder) {
m[b.Scheme()] = b
}
//调用这个方法,获取m中对应当前scheme的Bulider
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
- 总结getResolver(): 通过接收到的scheme在m中获取对应的Builder,先放一放具体放在第三步骤中讲解
- 这里先知道m是一个map, 针对这个map提供了Register()存储函数,Get()获取函数
newCCResolverWrapper()
- 该函数可以理解为获取resolverWrapper, resolverWrapper实现了resolver.ClientConn,通过resolver.ClientConn实现服务地址的更新
- ccResolverWrapper的流程如下图,在这里resolver会和balancer会进行关联,balancer的处理方式和resolver类似也是通过wrapper进行了一次封装

- 当执行ccBalancerWrapper后,接着会根据获取到的地址创建http2的链接

- 到此ClientConn创建过程基本结束,我们再一起梳理一下整个过程,首先获取resolver,其中ccResolverWrapper实现了resovler.ClientConn接口,通过Resolver的UpdateState方法触发获取Balancer,其中ccBalancerWrapper实现了balancer.ClientConn接口,通过Balnacer的UpdateClientConnState方法触发创建连接(SubConn),最后创建HTTP2 Client
- 在上方的parseTargetAndFindResolver()中已经获取到当前scheme对应的Builder对象,该Builder对象上绑定了一个Build方法,在当前newCCResolverWrapper()函数中会执行这个Build方法
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
ccr := &ccResolverWrapper{
cc: cc,
done: grpcsync.NewEvent(),
}
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
rbo := resolver.BuildOptions{
DisableServiceConfig: cc.dopts.disableServiceConfig,
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
}
var err error
// We need to hold the lock here while we assign to the ccr.resolver field
// to guard against a data race caused by the following code path,
// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
// accessing ccr.resolver which is being assigned here.
ccr.resolverMu.Lock()
defer ccr.resolverMu.Unlock()
//执行Build()方法
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
if err != nil {
return nil, err
}
return ccr, nil
}
3. Builder接口及Resolver解析器的注册
- 问题: 在上方parseTargetAndFindResolver()方法中会通过一个map获取当前scheme对应的Builder结构体对象,Builder是什么, 什么时候存储到map中的
- Builder是gRPC中提供的一个接口,使用不同的组件作为注册中心时,要针对该组件实现Builder接口,该接口中有两个方法
- Scheme(): 该方法会返回一个字符串, 注册的Resolver解析器会被保存在一个全局的变量m中,m是一个map,这个map的key即为 Scheme() 方法返回的字符串
- Build(): 该方法有三个参数,还有Resolver返回值,在newCCResolverWrapper()中会获取
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
- go-zero的服务发现是在客户端实现的,在创建zRPC客户端的时候会执行一个init()方法,通过该方法默认注册了四个自定义的Resolver解析器
//go-zero/zrpc/internal/client.go
func init() {
resolver.Register()
}
func Register() {
//调用下方的RegisterResolver()
internal.RegisterResolver()
}
//go-zero/zrpc/resolver/internal/resolver.go
const (
// DirectScheme stands for direct scheme.
DirectScheme = "direct"
// DiscovScheme stands for discov scheme.
DiscovScheme = "discov"
// EtcdScheme stands for etcd scheme.
EtcdScheme = "etcd"
// KubernetesScheme stands for k8s scheme.
KubernetesScheme = "k8s"
// EndpointSepChar is the separator cha in endpoints.
EndpointSepChar = ','
subsetSize = 32
)
var (
// EndpointSep is the separator string in endpoints.
EndpointSep = fmt.Sprintf("%c", EndpointSepChar)
directResolverBuilder directBuilder
discovResolverBuilder discovBuilder
etcdResolverBuilder etcdBuilder
k8sResolverBuilder kubeBuilder
)
func RegisterResolver() {
resolver.Register(&directResolverBuilder)
resolver.Register(&discovResolverBuilder)
//etcdResolverBuilder是etcd使用的,
//此处的Register()方法就是上方m源码中的,
//向全局变量m存储对应的Bulider的函数
resolver.Register(&etcdResolverBuilder)
resolver.Register(&k8sResolverBuilder)
}
type nopResolver struct {
cc resolver.ClientConn
}
func (r *nopResolver) Close() {
}
func (r *nopResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
- 通过 goctl 自动生成的rpc代码默认使用的是etcd作为服务注册与发现组件的, 针对etcd实现Builder接口,提供了etcdBuilder
//go-zero/zrpc/resolver/internal/etcdbuilder.go
type etcdBuilder struct {
//该结构体继承自discovBuilder
discovBuilder
}
//该方法返回的就是上面的"etcd"
func (b *etcdBuilder) Scheme() string {
return EtcdScheme
}
- 总结: 在gRPC中提供了一个Builder接口,针对不同的注册中心,需要实现这个Builder接口, 在启动go-zero服务时, 会执行一个init()方法,在该方法中会调用RegisterResolver(),默认情况下会将4个Builder接口的实现类,调用Register()保存到m一个map中,以etcd为例对应Bulider接口的实现是etcdBuilder
4. 通过etcdBuilder了解服务发现的具体逻辑
- 上方我们知道了默认情况下go-zero使用etcd作为注册中心,提供了etcdBuilder结构体, 注意该结构体继承自discovBuilder,我们看一下discovBuilder,重写了Scheme()方法, 我们看一下discovBuilder的Builder方法
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
resolver.Resolver, error) {
//1.获取etcd注册中心地址
hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
return r == EndpointSepChar
})
//2.获取一个新的订阅对象Subscriber
sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
if err != nil {
return nil, err
}
//3.创建服务状态的更新函数
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
//调用UpdateState方法更新
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
}
//4.update方法会被添加到事件监听中,当有PUT和DELETE事件触发,会调用update方法进行服务状态的更新
//事件监听是通过etcd的Watch机制实现
sub.AddListener(update)
//更新服务列表
update()
return &nopResolver{cc: cc}, nil
}
NewSubscriber() 获取订阅对象Subscriber
- Subscriber 表示一个订阅者,用于监听etcd中的服务发现事件,并更新自己的服务列表,该结构体上重点绑定了如下方法:
- AddListener: 添加一个UpdateListener到listeners切片中
- Values: 返回当前订阅的服务列表
- Watch: 开始监控etcd中的服务变化,并更新自己的服务列表。
- Close: 用于关闭订阅者,取消监控etcd中的服务变化,并清空自己的服务列表
Subscriber struct {
endpoints []string
//用于标记是否使用独占模式,即是否只订阅自己所在的集群的服务
exclusive bool
//用于存储当前订阅的服务列表
items *container
}
func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
sub := &Subscriber{
endpoints: endpoints,
}
for _, opt := range opts {
opt(sub)
}
sub.items = newContainer(sub.exclusive)
//先执行GetRegistry()获取到Registry,然后调用Registry的Monitor()方法
if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
return nil, err
}
return sub, nil
}
- Registry 管理了etcd客户端连接的集群,内部有一个map类型的clusters属性,etcd的endpoints为该map的key,值是cluster
- cluster是一个结构体封装了与etcd客户端连接的创建,监听,更新等功能是直接与etcd进行交互的核心
var (
registry = Registry{
clusters: make(map[string]*cluster),
}
connManager = syncx.NewResourceManager()
)
type Registry struct {
clusters map[string]*cluster
lock sync.Mutex
}
func GetRegistry() *Registry {
return ®istry
}
//入参:
//1.endpoints是etcd的地址列表
//2.key是要监听的etcd中的key
//3.l是要注册的UpdateListener函数
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
//判断当前endpoints是否存在
c, exists := r.getCluster(endpoints)
// if exists, the existing values should be updated to the listener.
if exists {
kvs := c.getCurrent(key)
for _, kv := range kvs {
l.OnAdd(kv)
}
}
//若不存在执行cluster下的monitor()方法
return c.monitor(key, l)
}
cluster
- 在上方当判断endpoints 不存在时会执行cluster下的monitor()方法, cluster才是服务发现的核心,在cluster上绑定了大量的服务发现,监听更新等方法
type cluster struct {
//表示etcd的地址列表
endpoints []string
//表示etcd客户端连接的集群的键,由endpoints生成
key string
//用于存储etcd中key对应的键值对,表示当前服务监听的服务列表
values map[string]map[string]string
//表示当前注册的监听器,其中对应的value为UpdateListener是一个接口
//它定义了OnAdd和OnDelete两个方法,用于处理etcd中key的添加和删除事件
listeners map[string][]UpdateListener
//一个协程组,用于管理监听etcd中服务变化的协程
watchGroup *threading.RoutineGroup
//表示一个通道,用于接收关闭集群的信号
done chan lang.PlaceholderType
//用于保护values和listeners字段的并发访问
lock sync.Mutex
}
//初始化方法
func newCluster(endpoints []string) *cluster {
return &cluster{
endpoints: endpoints,
key: getClusterKey(endpoints),
values: make(map[string]map[string]string),
listeners: make(map[string][]UpdateListener),
watchGroup: threading.NewRoutineGroup(),
done: make(chan lang.PlaceholderType),
}
}
func (c *cluster) monitor(key string, l UpdateListener) error {
c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.lock.Unlock()
cli, err := c.getClient()
if err != nil {
return err
}
rev := c.load(cli, key)
c.watchGroup.Run(func() {
c.watch(cli, key, rev)
})
return nil
}
func (c *cluster) context(cli EtcdClient) context.Context
func (c *cluster) getClient() (EtcdClient, error)
func (c *cluster) getCurrent(key string) []KV
//处理变化: 将etcd中获取的值添加到一个c.values中,
//检测c.values中存在的key在etcd中是否还存活,
//如果已经不存活了,就将它从c.values中剔除
func (c *cluster) handleChanges(key string, kvs []KV)
//处理监听事件(该方法中最终会调用 Build 方法中定义的update进行服务状态的更新)
func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event)
//第一次会调用该方法,从etcd中获取信息在这里我们的key是user.rpc,整合成etcd语句就是
//./etcdctl --endpoints=127.0.0.1:12379 get user.rpc --prefix
//获取到服务地址列表后,会通过map存储在本地,当有事件触发时通过操作map进行服务列表的更新,
//优点是当etcd连失败或出现故障时,通过缓存的服务地址列表工作,保障服务继续正常运行
func (c *cluster) load(cli EtcdClient, key string) int64
func (c *cluster) newClient() (EtcdClient, error)
func (c *cluster) reload(cli EtcdClient)
//开启一个监听,监听key在etcd的变化(该方法中会调用下方的watchStream())
func (c *cluster) watch(cli EtcdClient, key string, rev int64)
//watchStream()开启一个监听,监听key在etcd的变化,通过etcd的Watch机制实现,对应生成etcd命令
//./etcdctl --endpoints=127.0.0.1:12379 watch user.rpc --prefix
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool
func (c *cluster) watchConnState(cli EtcdClient)
discovBuilder.Build()方法中封装的update()更新函数
- 在discovBuilder.Build()方法中封装了一个update()更新函数,根据前面我们了解到cluster是etcd注册中心的核心,上面绑定了大量的与etcd进行交互的方法,包括监听watch(),读取load等等,cluster上有一个monitor()方法中,该方法中基于watch()监听判断当有事件触发的时候,会调用cluster上的handleWatchEvents()事件处理方法,最终会调用 Build 方法封装的这个update()函数进行服务状态的更新
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
//重点
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
}
- 这个update 函数内部调用了ccResolverWrapper上的UpdateState()方法
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
ccr.incomingMu.Lock()
defer ccr.incomingMu.Unlock()
if ccr.done.HasFired() {
return nil
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
//
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}
return nil
}
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
defer cc.firstResolveEvent.Fire()
cc.mu.Lock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
cc.mu.Unlock()
return nil
}
if err != nil {
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)
cc.balancerWrapper.resolverError(err)
// No addresses are valid with err set; return early.
cc.mu.Unlock()
return balancer.ErrBadResolverState
}
var ret error
if cc.dopts.disableServiceConfig {
channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig(s.Addresses)
} else if s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig(s.Addresses)
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.sc == nil {
// Apply the failing LB only if we haven't received valid service config
// from the name resolver in the past.
cc.applyFailingLB(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
}
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
bw := cc.balancerWrapper
cc.mu.Unlock()
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
return ret
}
- 最终会调用到baseBalancer的UpdateClientConnState()方法,获取目标服务的所有节点信息,封装SubConn,存储到负载均衡器上(这里我不太确定,因为我跟源码的时候UpdateClientConnState()方法跳的不是baseBalancer下的)
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.ServiceConfig?
if logger.V(2) {
logger.Info("base.baseBalancer: got new ClientConn state: ", s)
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}
}
for _, a := range b.subConns.Keys() {
sci, _ := b.subConns.Get(a)
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
}
}
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
// the zero address information.
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}
四. 总结
- 了解服务发现,首先要了解go-zero支持的几种模式
- 在服务提供方运行时,go-zero提供了direct直连, discov服务发现,k8s等模式
- 在服务消费方启动时,go-zero提供了direct直连, discov服务发现,etcd, k8s等模式
- 继续查看源码,在go-zero/zrpc/internal/client.go文件中执行了一个init()初始化函数,该函数中调用了Register()—>RegisterResolver()函数,该函数中会获取go-zero框架中支持的所有服务发现模式,根据不同的模式,创建对应的Builder存储到一个名为m的map中:
- Builder是一个接口,内部有两个方法,Scheme()方法返回当前的服务发现模式,Build()方法根据当前服务发现模式创建Resolver对象,在后台启动一个定时器和一个watcher,定期或者实时更新服务端地
- go-zero实现Builder接口当前有directBuilder,discovBuilder,etcdBuilder, kubeBuilder,注意服务发现模式对应的discovBuilder与etcd模式对应的etcdBuilder,etcdBuilder继承自discovBuilder大致相同
- 在编写服务消费方时,也要在yaml中配置当前服务名称,服务端口号等,还需要配置连接的注册中心地址,比如Etcd.Hosta,目标服务key等,调用MustLoad()函数读取配置文件到一个Config结构体变量上,需要调用zrpc下的MustNewClient()函数创建一个客户端,该函数中会调用一个NewClient(),在NewClient()函数中重点关注:
- 调用BuildTarget()生成target协议也就是注册中心的地址,etcd的地址加上目标服务的key,例如"discov://注册中心地址/user.rpc",后续会解析这个地址请求注册中心,获取指定key的服务调用地址列表
- 执行internal下的NewClient()函数,内部添加了一系列拦截器,针对负载均衡,熔断降级等进行了初始化
- 查看internal下的NewClient()函数,内部添加了一系列拦截器,针对负载均衡,熔断降级等进行了初始化,重点关注该函数内部会封装一个client结构体变量,调用该结构体变量上的dial()方法,查看dial()方法源码,内部
- 调用client上的buildDialOptions()注册了6个拦截器,分别是clientinterceptors.UnaryTracingInterceptor,clientinterceptors.DurationInterceptor,
clientinterceptors.PrometheusInterceptor,clientinterceptors.BreakerInterceptor, clientinterceptors.TimeoutInterceptor,clientinterceptors.StreamTracingInterceptor,其中BreakerInterceptor是熔断器- 调用grpc下的DialContext()函数
- 在grpc下的DialContext()函数中,重点关注:
- 先封装了一个grpc下的ClientConn结构体变量,该结构体表示一个客户端和服务端之间的连接,封装了连接的创建,管理,状态,配置,拦截器等功能
- 执行chainUnaryClientInterceptors() 与 chainStreamClientInterceptors()函数处理拦截器,将拦截器串联起来
- 调用parseTargetAndFindResolver()会再次解析target协议,将target解析成一个Target 结构体变量,该结构体中存在一个Scheme属性,内部存储的就是当前rpc服务发现模式,通过这个Scheme在m中获取到对应的Builder,在使用etcd作为注册中心时获取到的就是etcdBuilder
- 执行newCCBalancerWrapper()创建负载均衡器包装器ccResolverWrapper
- 执行newCCResolverWrapper(),在该函数中通过前面拿到的Builder,在使用etcd时也就是etcdBuilder,执行他的Builder()方法,获取resolverWrapper, resolverWrapper实现了resolver.ClientConn,通过resolver.ClientConn实现服务地址的更新
- 查看etcdBuilder该结构体继承自discovBuilder,只重写了Scheme()方法,直接看一下discovBuilder的Builder方法
- 通过targets获取到注册中心etcd的hosts地址
- 执行NewSubscriber()函数获取订阅对象Subscriber
- 封装一个更新服务状态的update函数,
- 执行Subscriber下的AddListener()方法将这个更新服务状态的函数添加到Subscriber订阅对象中,并执行这个update函数
- 最后返回Resolver
- 在执行上方的NewSubscriber()获取订阅对象Subscriber时,内部会调用GetRegistry()获取一个Registry结构体变量,Registry 管理了etcd客户端连接的集群,内部有一个map类型的clusters属性,key是etcd的连接地址endpoints, 值是一个cluster类型变量
- cluster封装了与etcd客户端连接的创建,监听,更新等功能是直接与etcd进行交互的核心
- 在NewSubscriber()函数内部:
- 先封装一个订阅对象Subscriber
- 调用GetRegistry()函数获取到Registry
- 执行Registry上的Monitor()方法,该方法中最终会调用到cluster上的monitor()方法,
- cluster上的monitor()方法中:
- 调用newClient()函数获取到连接etcd注册中心的客户端EtcdClient
- 调用cluster上的load()方法获取到服务服务提供方地址列表,该函数中会调用cluster()上的handleChanges()方法,将获取到的服务地址列表存储到cluster的values属性中,并且加测values中的key在etcd中是否存活,不存活则踢除
- 启动cluster上的watchGroup 协程组,内部执行cluster上的watch()方法该方法内会执行cluster上的watchStream()方法,开启一个监听,监听key在etcd的变化
- 当有事件触发的时候,会调用cluster上的handleWatchEvents()事件处理方法,最终会调用 Build 方法中定义的update进行服务状态的更新
- 查看封装的update()更新函数,内部会调用ccResolverWrapper上的UpdateState()方法,通过该方法,最终会调用到baseBalancer的UpdateClientConnState()方法,该方法中根据目标服务key,在注册中心获取目标服务的所有节点信息,封装SubConn,存储到负载均衡器上
- go-zero服务消费方整合etcd作为注册中心时通过watch机制监听etcd中key的变化来及时更新服务提供方地址,有长轮询和流式两种模式,长轮询模式是指etcd客户端连接每隔一段时间向etcd服务器发送一个请求,询问key是否有变化。流式模式是指etcd客户端连接只发送一次请求,然后保持连接不断开,接收etcd服务器推送的key变化消息,
- 默认使用流式模式,当etcd客户端连接收到key变化消息时,基于watch机制,通知所有监听该服务节点地址信息变化的Watcher对象,并把变化的键值对作为参数传入,回调函数会根据键值对的类型(添加或删除),调用container对象的OnAdd或OnDelete方法,并通知container对象的监听器
- 根据服务注册etcd会在客户端与服务端第一次建立连接时,注册一个租约lease,并且绑定该租约的TTL为10秒。当服务端由于宕机、网络故障等原因无法维持长连接时,客户端将无法更新租约的TTL字段,因此租约会在10秒后过期,etcd会根据租约的ID删除对应的注册信息
- go-zero使用etcd作为注册中心时与服务提供方,服务消费方之间的心跳时间分别是多少:
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)