三. discov 服务发现底层原理

  1. 服务提供方注册完成后, 服务消费方如何获取服务发起调用的, 多服务节点如何选择指定服务的,在使用go-zero时, 服务消费方需要编写如下代码, 将服务提供方设置到ServiceContext中
//1.当前服务消费方的ServiceContext中添加需要调用的rpc服务
type ServiceContext struct {
    Config  config.Config
    //添加rpc服务UserRpc,当前服务需要调用这个rpc服务
    UserRpc userclient.User  
}

//2.初始化ServiceContext时,添加初始化rpc客户端逻辑
func NewServiceContext(c config.Config) *ServiceContext {
    return &ServiceContext{
        Config:  c,
        //添加初始化rpc客户端逻辑
        UserRpc: userclient.NewUser(zrpc.MustNewClient(c.UserRpc)), 
    }
}
  1. 服务发现的相关逻辑重点就在zrpc.MustNewClient()中,查看该函数源码,在该函数内会调用NewClient()
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
	//调用NewClient()
	cli, err := NewClient(c, options...)
	if err != nil {
		log.Fatal(err)
	}
	return cli
}

func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
	var opts []ClientOption
	if c.HasCredential() {
		opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
			App:   c.App,
			Token: c.Token,
		})))
	}
	if c.NonBlock {
		opts = append(opts, WithNonBlock())
	}
	if c.Timeout > 0 {
		opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
	}

	opts = append(opts, options...)
	//调用BuildTarget()生成target协议,是一个string字符串,例如"discov://127.0.0.1:2379/user.rpc"
	target, err := c.BuildTarget()
	if err != nil {
		return nil, err
	}

	//调用internal下的NewClient()
	client, err := internal.NewClient(target, opts...)
	if err != nil {
		return nil, err
	}

	return &RpcClient{
		client: client,
	}, nil
}
  1. 总结NewClient()重点关注两个步骤:
  1. 执行BuildTarget()生成target协议
  2. 执行internal下的NewClient()函数,服务发现等逻辑

1. BuildTarget() 生成target协议

  1. 查看生成target协议源码: 在微服务中需要有注册中心, 服务提供方启动时会将该服务的服务名服务地址保存到注册中心, 服务消费方启动时连接注册中心,通过目标服务名称在注册中心获取指定服务的调用地址,在go-zero中默认以etcd作为注册中心,这个target协议就是etcd的地址加上目标服务的key,例如"discov://127.0.0.1:2379/user.rpc"
RpcClientConf struct {
	Etcd      discov.EtcdConf `json:",optional,inherit"`
	Endpoints []string        `json:",optional"`
	Target    string          `json:",optional"`
	App       string          `json:",optional"`
	Token     string          `json:",optional"`
	NonBlock  bool            `json:",optional"`
	Timeout   int64           `json:",default=2000"`
}

func (cc RpcClientConf) BuildTarget() (string, error) {
	if len(cc.Endpoints) > 0 {
		return resolver.BuildDirectTarget(cc.Endpoints), nil
	} else if len(cc.Target) > 0 {
		return cc.Target, nil
	}

	if err := cc.Etcd.Validate(); err != nil {
		return "", err
	}

	if cc.Etcd.HasAccount() {
		discov.RegisterAccount(cc.Etcd.Hosts, cc.Etcd.User, cc.Etcd.Pass)
	}
	if cc.Etcd.HasTLS() {
		if err := discov.RegisterTLS(cc.Etcd.Hosts, cc.Etcd.CertFile, cc.Etcd.CertKeyFile,
			cc.Etcd.CACertFile, cc.Etcd.InsecureSkipVerify); err != nil {
			return "", err
		}
	}

	return resolver.BuildDiscovTarget(cc.Etcd.Hosts, cc.Etcd.Key), nil
}

2. internal下的NewClient()

// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
	var cli client
	//负载均衡相关处理
	svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)
	balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))
	opts = append([]ClientOption{balancerOpt}, opts...)
	//dial()重点
	if err := cli.dial(target, opts...); err != nil {
		return nil, err
	}

	return &cli, nil
}

func (c *client) dial(server string, opts ...ClientOption) error {
	//该方法内部注册了一些拦截器(6个,比较重要的有熔断拦截器BreakerInterceptor)
	options := c.buildDialOptions(opts...)
	timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
	defer cancel()
	//DialContext()调用grpc的方法
	conn, err := grpc.DialContext(timeCtx, server, options...)
	if err != nil {
		service := server
		if errors.Is(err, context.DeadlineExceeded) {
			pos := strings.LastIndexByte(server, separator)
			// len(server) - 1 is the index of last char
			if 0 < pos && pos < len(server)-1 {
				service = server[pos+1:]
			}
		}
		return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",
			server, err.Error(), service)
	}

	c.conn = conn
	return nil
}
  1. 查看DialContext()源码,实际该方法内部执行了很多比较重要的逻辑,如下图
    在这里插入图片描述
  2. 当前我们先关注一下:
  1. resolverBuilder, err := cc.parseTargetAndFindResolver() //解析target协议
  2. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) //包装Resolver
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(),
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(),
	}
	cc.retryThrottler.Store((*retryThrottler)(nil))
	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
	cc.ctx, cc.cancel = context.WithCancel(context.Background())

	for _, opt := range extraDialOptions {
		opt.apply(&cc.dopts)
	}

	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}
	//拦截器处理
	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

	defer func() {
		if err != nil {
			cc.Close()
		}
	}()

	pid := cc.dopts.channelzParentID
	cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target)
	ted := &channelz.TraceEventDesc{
		Desc:     "Channel created",
		Severity: channelz.CtInfo,
	}
	if cc.dopts.channelzParentID != nil {
		ted.Parent = &channelz.TraceEventDesc{
			Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
			Severity: channelz.CtInfo,
		}
	}
	channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
	cc.csMgr.channelzID = cc.channelzID

	if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
		return nil, errNoTransportSecurity
	}
	if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
		return nil, errTransportCredsAndBundle
	}
	if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
		return nil, errNoTransportCredsInBundle
	}
	transportCreds := cc.dopts.copts.TransportCredentials
	if transportCreds == nil {
		transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
	}
	if transportCreds.Info().SecurityProtocol == "insecure" {
		for _, cd := range cc.dopts.copts.PerRPCCredentials {
			if cd.RequireTransportSecurity() {
				return nil, errTransportCredentialsMissing
			}
		}
	}

	if cc.dopts.defaultServiceConfigRawJSON != nil {
		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
		if scpr.Err != nil {
			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
		}
		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
	}
	cc.mkp = cc.dopts.copts.KeepaliveParams

	if cc.dopts.copts.UserAgent != "" {
		cc.dopts.copts.UserAgent += " " + grpcUA
	} else {
		cc.dopts.copts.UserAgent = grpcUA
	}

	if cc.dopts.timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
		defer cancel()
	}
	defer func() {
		select {
		case <-ctx.Done():
			switch {
			case ctx.Err() == err:
				conn = nil
			case err == nil || !cc.dopts.returnLastError:
				conn, err = nil, ctx.Err()
			default:
				conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
			}
		default:
		}
	}()

	scSet := false
	if cc.dopts.scChan != nil {
		// Try to get an initial service config.
		select {
		case sc, ok := <-cc.dopts.scChan:
			if ok {
				cc.sc = &sc
				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
				scSet = true
			}
		default:
		}
	}
	if cc.dopts.bs == nil {
		cc.dopts.bs = backoff.DefaultExponential
	}

	//根据协议解析target
	resolverBuilder, err := cc.parseTargetAndFindResolver()
	if err != nil {
		return nil, err
	}
	cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
	if err != nil {
		return nil, err
	}
	channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)

	if cc.dopts.scChan != nil && !scSet {
		// Blocking wait for the initial service config.
		select {
		case sc, ok := <-cc.dopts.scChan:
			if ok {
				cc.sc = &sc
				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
			}
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
	if cc.dopts.scChan != nil {
		go cc.scWatcher()
	}

	var credsClone credentials.TransportCredentials
	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
		credsClone = creds.Clone()
	}
	cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
		DialCreds:        credsClone,
		CredsBundle:      cc.dopts.copts.CredsBundle,
		Dialer:           cc.dopts.copts.Dialer,
		Authority:        cc.authority,
		CustomUserAgent:  cc.dopts.copts.UserAgent,
		ChannelzParentID: cc.channelzID,
		Target:           cc.parsedTarget,
	})

	// Build the resolver.
	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
	if err != nil {
		return nil, fmt.Errorf("failed to build resolver: %v", err)
	}
	cc.mu.Lock()
	cc.resolverWrapper = rWrapper
	cc.mu.Unlock()

	// A blocking dial blocks until the clientConn is ready.
	if cc.dopts.block {
		for {
			cc.Connect()
			s := cc.GetState()
			if s == connectivity.Ready {
				break
			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
				if err = cc.connectionError(); err != nil {
					terr, ok := err.(interface {
						Temporary() bool
					})
					if ok && !terr.Temporary() {
						return nil, err
					}
				}
			}
			if !cc.WaitForStateChange(ctx, s) {
				// ctx got timeout or canceled.
				if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
					return nil, err
				}
				return nil, ctx.Err()
			}
		}
	}

	return cc, nil
}

ClientConn

  1. 第一步会先封装一个ClientConn结构体变量
type ClientConn struct {
	ctx                 context.Context              // 客户端连接的上下文,用于取消或超时
	cancel              context.CancelFunc           // 用于取消客户端连接的函数
	target              string                       // 客户端连接的目标地址,可以是dns或etcd等
	parsedTarget        resolver.Target              // 对target解析后的目标地址,包含scheme, authority, endpoint等信息
	authority           string                       // 客户端连接的授权信息,用于验证服务端身份 
	dopts               dialOptions                  // 客户端连接的拨号选项,包含各种配置参数和拦截器 
	channelzID          *channelz.Identifier         // 客户端连接的通道标识符,用于监控和调试 
	balancerWrapper     *ccBalancerWrapper           // 客户端连接的负载均衡器包装器,用于管理多个子连接和选择器 
	csMgr               *connectivityStateManager    // 客户端连接的状态管理器,用于跟踪和更新连接状态 
	blockingpicker      *pickerWrapper               // 客户端连接的阻塞选择器,用于在非阻塞模式下选择一个子连接 
	safeConfigSelector  iresolver.SafeConfigSelector // 客户端连接的安全配置选择器,用于从服务配置中选择一个子集 
	czData              *channelzData                // 客户端连接的通道数据,用于记录通道的统计信息 
	retryThrottler      atomic.Value                 // 客户端连接的重试节流器,用于控制重试频率和开关 
	firstResolveEvent   *grpcsync.Event              // 客户端连接的首次解析事件,用于标记是否已经完成首次解析 
	mu                  sync.RWMutex                 // 用于保护客户端连接的读写锁 
	resolverWrapper     *ccResolverWrapper           // 客户端连接的解析器包装器,用于监听目标地址变化和更新服务配置 
	sc                  *ServiceConfig               // 客户端连接的服务配置,包含负载均衡策略和方法配置等信息 
	conns               map[*addrConn]struct{}       // 客户端连接管理的子连接集合,以地址为键 
	mkp                 keepalive.ClientParameters   // 客户端连接的保活参数,包含超时时间和间隔时间等信息 
	lceMu               sync.Mutex                   // 用于保护客户端连接最后一次错误信息的互斥锁 
	lastConnectionError error                        // 客户端连接最后一次错误信息,用于记录拨号失败或断开原因 
}
  1. ClientConn结构体主要是用来表示一个客户端和服务端之间的连接,它封装了连接的创建,管理,状态,配置,拦截器等功能

parseTargetAndFindResolver()

  1. parseTargetAndFindResolver()中重点关注两步
  1. 执行parseTarget()解析target协议字符串,封装为Target结构体对象
  2. 执行getResolver()在一个map中检测获取对应当前scheme的resolver,也就是当前scheme对应的Builder实现
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
	channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)

	var rb resolver.Builder
	//解析target协议,封装为Target结构体
	parsedTarget, err := parseTarget(cc.target)
	if err != nil {
		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
	} else {
		channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
		//parsedTarget.Scheme的值就是"discov"
		rb = cc.getResolver(parsedTarget.Scheme)
		if rb != nil {
			cc.parsedTarget = parsedTarget
			return rb, nil
		}
	}

	// We are here because the user's dial target did not contain a scheme or
	// specified an unregistered scheme. We should fallback to the default
	// scheme, except when a custom dialer is specified in which case, we should
	// always use passthrough scheme.
	defScheme := resolver.GetDefaultScheme()
	channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
	canonicalTarget := defScheme + ":///" + cc.target

	parsedTarget, err = parseTarget(canonicalTarget)
	if err != nil {
		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
		return nil, err
	}
	channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
	rb = cc.getResolver(parsedTarget.Scheme)
	if rb == nil {
		return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
	}
	cc.parsedTarget = parsedTarget
	return rb, nil
}
parseTarget(cc.target)将target协议字符串封装为 Target 结构体
func parseTarget(target string) (resolver.Target, error) {
	u, err := url.Parse(target)
	if err != nil {
		return resolver.Target{}, err
	}
	// For targets of the form "[scheme]://[authority]/endpoint, the endpoint
	// value returned from url.Parse() contains a leading "/". Although this is
	// in accordance with RFC 3986, we do not want to break existing resolver
	// implementations which expect the endpoint without the leading "/". So, we
	// end up stripping the leading "/" here. But this will result in an
	// incorrect parsing for something like "unix:///path/to/socket". Since we
	// own the "unix" resolver, we can workaround in the unix resolver by using
	// the `URL` field instead of the `Endpoint` field.
	endpoint := u.Path
	if endpoint == "" {
		endpoint = u.Opaque
	}
	endpoint = strings.TrimPrefix(endpoint, "/")
	return resolver.Target{
		Scheme:    u.Scheme,
		Authority: u.Host,
		Endpoint:  endpoint,
		URL:       *u,
	}, nil
}

type Target struct {
	// Deprecated: use URL.Scheme instead.
	Scheme string
	// Deprecated: use URL.Host instead.
	Authority string
	// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when
	// the former is empty.
	Endpoint string
	// URL contains the parsed dial target with an optional default scheme added
	// to it if the original dial target contained no scheme or contained an
	// unregistered scheme. Any query params specified in the original dial
	// target can be accessed from here.
	URL url.URL
}
getResolver(scheme string)获取该scheme对应的Builder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
	for _, rb := range cc.dopts.resolvers {
		if scheme == rb.Scheme() {
			return rb
		}
	}
	return resolver.Get(scheme)
}
var (
	// m is a map from scheme to resolver builder.
	m = make(map[string]Builder)
	// defaultScheme is the default scheme to use.
	defaultScheme = "passthrough"
)

func Register(b Builder) {
    m[b.Scheme()] = b
}

//调用这个方法,获取m中对应当前scheme的Bulider
func Get(scheme string) Builder {
    if b, ok := m[scheme]; ok {
        return b
    }
    return nil
}
  1. 总结getResolver(): 通过接收到的scheme在m中获取对应的Builder,先放一放具体放在第三步骤中讲解
  2. 这里先知道m是一个map, 针对这个map提供了Register()存储函数,Get()获取函数

newCCResolverWrapper()

  1. 该函数可以理解为获取resolverWrapper, resolverWrapper实现了resolver.ClientConn,通过resolver.ClientConn实现服务地址的更新
  2. ccResolverWrapper的流程如下图,在这里resolver会和balancer会进行关联,balancer的处理方式和resolver类似也是通过wrapper进行了一次封装
    在这里插入图片描述
  3. 当执行ccBalancerWrapper后,接着会根据获取到的地址创建http2的链接
    在这里插入图片描述
  4. 到此ClientConn创建过程基本结束,我们再一起梳理一下整个过程,首先获取resolver,其中ccResolverWrapper实现了resovler.ClientConn接口,通过Resolver的UpdateState方法触发获取Balancer,其中ccBalancerWrapper实现了balancer.ClientConn接口,通过Balnacer的UpdateClientConnState方法触发创建连接(SubConn),最后创建HTTP2 Client
  5. 在上方的parseTargetAndFindResolver()中已经获取到当前scheme对应的Builder对象,该Builder对象上绑定了一个Build方法,在当前newCCResolverWrapper()函数中会执行这个Build方法
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
	ccr := &ccResolverWrapper{
		cc:   cc,
		done: grpcsync.NewEvent(),
	}

	var credsClone credentials.TransportCredentials
	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
		credsClone = creds.Clone()
	}
	rbo := resolver.BuildOptions{
		DisableServiceConfig: cc.dopts.disableServiceConfig,
		DialCreds:            credsClone,
		CredsBundle:          cc.dopts.copts.CredsBundle,
		Dialer:               cc.dopts.copts.Dialer,
	}

	var err error
	// We need to hold the lock here while we assign to the ccr.resolver field
	// to guard against a data race caused by the following code path,
	// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
	// accessing ccr.resolver which is being assigned here.
	ccr.resolverMu.Lock()
	defer ccr.resolverMu.Unlock()
	//执行Build()方法
	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
	if err != nil {
		return nil, err
	}
	return ccr, nil
}

3. Builder接口及Resolver解析器的注册

  1. 问题: 在上方parseTargetAndFindResolver()方法中会通过一个map获取当前scheme对应的Builder结构体对象,Builder是什么, 什么时候存储到map中的
  2. Builder是gRPC中提供的一个接口,使用不同的组件作为注册中心时,要针对该组件实现Builder接口,该接口中有两个方法
  1. Scheme(): 该方法会返回一个字符串, 注册的Resolver解析器会被保存在一个全局的变量m中,m是一个map,这个map的key即为 Scheme() 方法返回的字符串
  2. Build(): 该方法有三个参数,还有Resolver返回值,在newCCResolverWrapper()中会获取
type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}
  1. go-zero的服务发现是在客户端实现的,在创建zRPC客户端的时候会执行一个init()方法,通过该方法默认注册了四个自定义的Resolver解析器
//go-zero/zrpc/internal/client.go
func init() {
    resolver.Register()
}

func Register() {
	//调用下方的RegisterResolver()
	internal.RegisterResolver()
}
//go-zero/zrpc/resolver/internal/resolver.go
const (
	// DirectScheme stands for direct scheme.
	DirectScheme = "direct"
	// DiscovScheme stands for discov scheme.
	DiscovScheme = "discov"
	// EtcdScheme stands for etcd scheme.
	EtcdScheme = "etcd"
	// KubernetesScheme stands for k8s scheme.
	KubernetesScheme = "k8s"
	// EndpointSepChar is the separator cha in endpoints.
	EndpointSepChar = ','

	subsetSize = 32
)

var (
	// EndpointSep is the separator string in endpoints.
	EndpointSep = fmt.Sprintf("%c", EndpointSepChar)

	directResolverBuilder directBuilder
	discovResolverBuilder discovBuilder
	etcdResolverBuilder   etcdBuilder
	k8sResolverBuilder    kubeBuilder
)

func RegisterResolver() {
	resolver.Register(&directResolverBuilder)
	resolver.Register(&discovResolverBuilder)
	//etcdResolverBuilder是etcd使用的,
	//此处的Register()方法就是上方m源码中的,
	//向全局变量m存储对应的Bulider的函数
	resolver.Register(&etcdResolverBuilder)
	resolver.Register(&k8sResolverBuilder)
}

type nopResolver struct {
	cc resolver.ClientConn
}

func (r *nopResolver) Close() {
}

func (r *nopResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
  1. 通过 goctl 自动生成的rpc代码默认使用的是etcd作为服务注册与发现组件的, 针对etcd实现Builder接口,提供了etcdBuilder
//go-zero/zrpc/resolver/internal/etcdbuilder.go
type etcdBuilder struct {
	//该结构体继承自discovBuilder
	discovBuilder
}
//该方法返回的就是上面的"etcd"
func (b *etcdBuilder) Scheme() string {
	return EtcdScheme
}
  1. 总结: 在gRPC中提供了一个Builder接口,针对不同的注册中心,需要实现这个Builder接口, 在启动go-zero服务时, 会执行一个init()方法,在该方法中会调用RegisterResolver(),默认情况下会将4个Builder接口的实现类,调用Register()保存到m一个map中,以etcd为例对应Bulider接口的实现是etcdBuilder

4. 通过etcdBuilder了解服务发现的具体逻辑

  1. 上方我们知道了默认情况下go-zero使用etcd作为注册中心,提供了etcdBuilder结构体, 注意该结构体继承自discovBuilder,我们看一下discovBuilder,重写了Scheme()方法, 我们看一下discovBuilder的Builder方法
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	//1.获取etcd注册中心地址
	hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
		return r == EndpointSepChar
	})
	//2.获取一个新的订阅对象Subscriber
	sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
	if err != nil {
		return nil, err
	}
	
	//3.创建服务状态的更新函数
	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		//调用UpdateState方法更新
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			logx.Error(err)
		}
	}
	//4.update方法会被添加到事件监听中,当有PUT和DELETE事件触发,会调用update方法进行服务状态的更新
	//事件监听是通过etcd的Watch机制实现
	sub.AddListener(update)
	//更新服务列表
	update()

	return &nopResolver{cc: cc}, nil
}

NewSubscriber() 获取订阅对象Subscriber

  1. Subscriber 表示一个订阅者,用于监听etcd中的服务发现事件,并更新自己的服务列表,该结构体上重点绑定了如下方法:
  1. AddListener: 添加一个UpdateListener到listeners切片中
  2. Values: 返回当前订阅的服务列表
  3. Watch: 开始监控etcd中的服务变化,并更新自己的服务列表。
  4. Close: 用于关闭订阅者,取消监控etcd中的服务变化,并清空自己的服务列表
Subscriber struct {
	endpoints []string
	//用于标记是否使用独占模式,即是否只订阅自己所在的集群的服务
	exclusive bool
	//用于存储当前订阅的服务列表
	items     *container
}

func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
	sub := &Subscriber{
		endpoints: endpoints,
	}
	for _, opt := range opts {
		opt(sub)
	}
	sub.items = newContainer(sub.exclusive)

	//先执行GetRegistry()获取到Registry,然后调用Registry的Monitor()方法
	if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
		return nil, err
	}

	return sub, nil
}
  1. Registry 管理了etcd客户端连接的集群,内部有一个map类型的clusters属性,etcd的endpoints为该map的key,值是cluster
  2. cluster是一个结构体封装了与etcd客户端连接的创建,监听,更新等功能是直接与etcd进行交互的核心
var (
	registry = Registry{
		clusters: make(map[string]*cluster),
	}
	connManager = syncx.NewResourceManager()
)

type Registry struct {
	clusters map[string]*cluster
	lock     sync.Mutex
}

func GetRegistry() *Registry {
	return &registry
}

//入参:
//1.endpoints是etcd的地址列表
//2.key是要监听的etcd中的key
//3.l是要注册的UpdateListener函数
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
	//判断当前endpoints是否存在
	c, exists := r.getCluster(endpoints)
	// if exists, the existing values should be updated to the listener.
	if exists {
		kvs := c.getCurrent(key)
		for _, kv := range kvs {
			l.OnAdd(kv)
		}
	}
	//若不存在执行cluster下的monitor()方法
	return c.monitor(key, l)
}

cluster

  1. 在上方当判断endpoints 不存在时会执行cluster下的monitor()方法, cluster才是服务发现的核心,在cluster上绑定了大量的服务发现,监听更新等方法
type cluster struct {
	//表示etcd的地址列表
	endpoints  []string
	//表示etcd客户端连接的集群的键,由endpoints生成
	key        string
	//用于存储etcd中key对应的键值对,表示当前服务监听的服务列表
	values     map[string]map[string]string
	//表示当前注册的监听器,其中对应的value为UpdateListener是一个接口
	//它定义了OnAdd和OnDelete两个方法,用于处理etcd中key的添加和删除事件
	listeners  map[string][]UpdateListener
	//一个协程组,用于管理监听etcd中服务变化的协程
	watchGroup *threading.RoutineGroup
	//表示一个通道,用于接收关闭集群的信号
	done       chan lang.PlaceholderType
	//用于保护values和listeners字段的并发访问
	lock       sync.Mutex
}

//初始化方法
func newCluster(endpoints []string) *cluster {
	return &cluster{
		endpoints:  endpoints,
		key:        getClusterKey(endpoints),
		values:     make(map[string]map[string]string),
		listeners:  make(map[string][]UpdateListener),
		watchGroup: threading.NewRoutineGroup(),
		done:       make(chan lang.PlaceholderType),
	}
}

func (c *cluster) monitor(key string, l UpdateListener) error {
	c.lock.Lock()
	c.listeners[key] = append(c.listeners[key], l)
	c.lock.Unlock()

	cli, err := c.getClient()
	if err != nil {
		return err
	}

	rev := c.load(cli, key)
	c.watchGroup.Run(func() {
		c.watch(cli, key, rev)
	})

	return nil
}

func (c *cluster) context(cli EtcdClient) context.Context

func (c *cluster) getClient() (EtcdClient, error)

func (c *cluster) getCurrent(key string) []KV 

//处理变化: 将etcd中获取的值添加到一个c.values中,
//检测c.values中存在的key在etcd中是否还存活,
//如果已经不存活了,就将它从c.values中剔除
func (c *cluster) handleChanges(key string, kvs []KV) 

//处理监听事件(该方法中最终会调用 Build 方法中定义的update进行服务状态的更新)
func (c *cluster) handleWatchEvents(key string, events []*clientv3.Event)

//第一次会调用该方法,从etcd中获取信息在这里我们的key是user.rpc,整合成etcd语句就是
//./etcdctl --endpoints=127.0.0.1:12379 get user.rpc --prefix
//获取到服务地址列表后,会通过map存储在本地,当有事件触发时通过操作map进行服务列表的更新,
//优点是当etcd连失败或出现故障时,通过缓存的服务地址列表工作,保障服务继续正常运行
func (c *cluster) load(cli EtcdClient, key string) int64

func (c *cluster) newClient() (EtcdClient, error)

func (c *cluster) reload(cli EtcdClient)

//开启一个监听,监听key在etcd的变化(该方法中会调用下方的watchStream())
func (c *cluster) watch(cli EtcdClient, key string, rev int64) 
//watchStream()开启一个监听,监听key在etcd的变化,通过etcd的Watch机制实现,对应生成etcd命令
//./etcdctl --endpoints=127.0.0.1:12379 watch user.rpc --prefix
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool

func (c *cluster) watchConnState(cli EtcdClient)

discovBuilder.Build()方法中封装的update()更新函数

  1. 在discovBuilder.Build()方法中封装了一个update()更新函数,根据前面我们了解到cluster是etcd注册中心的核心,上面绑定了大量的与etcd进行交互的方法,包括监听watch(),读取load等等,cluster上有一个monitor()方法中,该方法中基于watch()监听判断当有事件触发的时候,会调用cluster上的handleWatchEvents()事件处理方法,最终会调用 Build 方法封装的这个update()函数进行服务状态的更新
	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		//重点
		if err := cc.UpdateState(resolver.State{
			Addresses: addrs,
		}); err != nil {
			logx.Error(err)
		}
	}
  1. 这个update 函数内部调用了ccResolverWrapper上的UpdateState()方法
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
	ccr.incomingMu.Lock()
	defer ccr.incomingMu.Unlock()
	if ccr.done.HasFired() {
		return nil
	}
	ccr.addChannelzTraceEvent(s)
	ccr.curState = s
	//
	if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
		return balancer.ErrBadResolverState
	}
	return nil
}

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
	defer cc.firstResolveEvent.Fire()
	cc.mu.Lock()
	// Check if the ClientConn is already closed. Some fields (e.g.
	// balancerWrapper) are set to nil when closing the ClientConn, and could
	// cause nil pointer panic if we don't have this check.
	if cc.conns == nil {
		cc.mu.Unlock()
		return nil
	}

	if err != nil {
		// May need to apply the initial service config in case the resolver
		// doesn't support service configs, or doesn't provide a service config
		// with the new addresses.
		cc.maybeApplyDefaultServiceConfig(nil)

		cc.balancerWrapper.resolverError(err)

		// No addresses are valid with err set; return early.
		cc.mu.Unlock()
		return balancer.ErrBadResolverState
	}

	var ret error
	if cc.dopts.disableServiceConfig {
		channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
		cc.maybeApplyDefaultServiceConfig(s.Addresses)
	} else if s.ServiceConfig == nil {
		cc.maybeApplyDefaultServiceConfig(s.Addresses)
		// TODO: do we need to apply a failing LB policy if there is no
		// default, per the error handling design?
	} else {
		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
			configSelector := iresolver.GetConfigSelector(s)
			if configSelector != nil {
				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
					channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
				}
			} else {
				configSelector = &defaultConfigSelector{sc}
			}
			cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
		} else {
			ret = balancer.ErrBadResolverState
			if cc.sc == nil {
				// Apply the failing LB only if we haven't received valid service config
				// from the name resolver in the past.
				cc.applyFailingLB(s.ServiceConfig)
				cc.mu.Unlock()
				return ret
			}
		}
	}

	var balCfg serviceconfig.LoadBalancingConfig
	if cc.sc != nil && cc.sc.lbConfig != nil {
		balCfg = cc.sc.lbConfig.cfg
	}
	bw := cc.balancerWrapper
	cc.mu.Unlock()

	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
	if ret == nil {
		ret = uccsErr // prefer ErrBadResolver state since any other error is
		// currently meaningless to the caller.
	}
	return ret
}
  1. 最终会调用到baseBalancer的UpdateClientConnState()方法,获取目标服务的所有节点信息,封装SubConn,存储到负载均衡器上(这里我不太确定,因为我跟源码的时候UpdateClientConnState()方法跳的不是baseBalancer下的)
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
	// TODO: handle s.ResolverState.ServiceConfig?
	if logger.V(2) {
		logger.Info("base.baseBalancer: got new ClientConn state: ", s)
	}
	// Successful resolution; clear resolver error and ensure we return nil.
	b.resolverErr = nil
	// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
	addrsSet := resolver.NewAddressMap()
	for _, a := range s.ResolverState.Addresses {
		addrsSet.Set(a, nil)
		if _, ok := b.subConns.Get(a); !ok {
			// a is a new address (not existing in b.subConns).
			sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
			if err != nil {
				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
				continue
			}
			b.subConns.Set(a, sc)
			b.scStates[sc] = connectivity.Idle
			b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
			sc.Connect()
		}
	}
	for _, a := range b.subConns.Keys() {
		sci, _ := b.subConns.Get(a)
		sc := sci.(balancer.SubConn)
		// a was removed by resolver.
		if _, ok := addrsSet.Get(a); !ok {
			b.cc.RemoveSubConn(sc)
			b.subConns.Delete(a)
			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
			// The entry will be deleted in UpdateSubConnState.
		}
	}
	// If resolver state contains no addresses, return an error so ClientConn
	// will trigger re-resolve. Also records this as an resolver error, so when
	// the overall state turns transient failure, the error message will have
	// the zero address information.
	if len(s.ResolverState.Addresses) == 0 {
		b.ResolverError(errors.New("produced zero addresses"))
		return balancer.ErrBadResolverState
	}

	b.regeneratePicker()
	b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
	return nil
}

四. 总结

  1. 了解服务发现,首先要了解go-zero支持的几种模式
  1. 在服务提供方运行时,go-zero提供了direct直连, discov服务发现,k8s等模式
  2. 在服务消费方启动时,go-zero提供了direct直连, discov服务发现,etcd, k8s等模式
  1. 继续查看源码,在go-zero/zrpc/internal/client.go文件中执行了一个init()初始化函数,该函数中调用了Register()—>RegisterResolver()函数,该函数中会获取go-zero框架中支持的所有服务发现模式,根据不同的模式,创建对应的Builder存储到一个名为m的map中:
  1. Builder是一个接口,内部有两个方法,Scheme()方法返回当前的服务发现模式,Build()方法根据当前服务发现模式创建Resolver对象,在后台启动一个定时器和一个watcher,定期或者实时更新服务端地
  2. go-zero实现Builder接口当前有directBuilder,discovBuilder,etcdBuilder, kubeBuilder,注意服务发现模式对应的discovBuilder与etcd模式对应的etcdBuilder,etcdBuilder继承自discovBuilder大致相同
  1. 在编写服务消费方时,也要在yaml中配置当前服务名称,服务端口号等,还需要配置连接的注册中心地址,比如Etcd.Hosta,目标服务key等,调用MustLoad()函数读取配置文件到一个Config结构体变量上,需要调用zrpc下的MustNewClient()函数创建一个客户端,该函数中会调用一个NewClient(),在NewClient()函数中重点关注:
  1. 调用BuildTarget()生成target协议也就是注册中心的地址,etcd的地址加上目标服务的key,例如"discov://注册中心地址/user.rpc",后续会解析这个地址请求注册中心,获取指定key的服务调用地址列表
  2. 执行internal下的NewClient()函数,内部添加了一系列拦截器,针对负载均衡,熔断降级等进行了初始化
  1. 查看internal下的NewClient()函数,内部添加了一系列拦截器,针对负载均衡,熔断降级等进行了初始化,重点关注该函数内部会封装一个client结构体变量,调用该结构体变量上的dial()方法,查看dial()方法源码,内部
  1. 调用client上的buildDialOptions()注册了6个拦截器,分别是clientinterceptors.UnaryTracingInterceptor,clientinterceptors.DurationInterceptor,
    clientinterceptors.PrometheusInterceptor,clientinterceptors.BreakerInterceptor, clientinterceptors.TimeoutInterceptor,clientinterceptors.StreamTracingInterceptor,其中BreakerInterceptor是熔断器
  2. 调用grpc下的DialContext()函数
  1. 在grpc下的DialContext()函数中,重点关注:
  1. 先封装了一个grpc下的ClientConn结构体变量,该结构体表示一个客户端和服务端之间的连接,封装了连接的创建,管理,状态,配置,拦截器等功能
  2. 执行chainUnaryClientInterceptors() 与 chainStreamClientInterceptors()函数处理拦截器,将拦截器串联起来
  3. 调用parseTargetAndFindResolver()会再次解析target协议,将target解析成一个Target 结构体变量,该结构体中存在一个Scheme属性,内部存储的就是当前rpc服务发现模式,通过这个Scheme在m中获取到对应的Builder,在使用etcd作为注册中心时获取到的就是etcdBuilder
  4. 执行newCCBalancerWrapper()创建负载均衡器包装器ccResolverWrapper
  5. 执行newCCResolverWrapper(),在该函数中通过前面拿到的Builder,在使用etcd时也就是etcdBuilder,执行他的Builder()方法,获取resolverWrapper, resolverWrapper实现了resolver.ClientConn,通过resolver.ClientConn实现服务地址的更新
  1. 查看etcdBuilder该结构体继承自discovBuilder,只重写了Scheme()方法,直接看一下discovBuilder的Builder方法
  1. 通过targets获取到注册中心etcd的hosts地址
  2. 执行NewSubscriber()函数获取订阅对象Subscriber
  3. 封装一个更新服务状态的update函数,
  4. 执行Subscriber下的AddListener()方法将这个更新服务状态的函数添加到Subscriber订阅对象中,并执行这个update函数
  5. 最后返回Resolver
  1. 在执行上方的NewSubscriber()获取订阅对象Subscriber时,内部会调用GetRegistry()获取一个Registry结构体变量,Registry 管理了etcd客户端连接的集群,内部有一个map类型的clusters属性,key是etcd的连接地址endpoints, 值是一个cluster类型变量
  2. cluster封装了与etcd客户端连接的创建,监听,更新等功能是直接与etcd进行交互的核心
  3. 在NewSubscriber()函数内部:
  1. 先封装一个订阅对象Subscriber
  2. 调用GetRegistry()函数获取到Registry
  3. 执行Registry上的Monitor()方法,该方法中最终会调用到cluster上的monitor()方法,
  1. cluster上的monitor()方法中:
  1. 调用newClient()函数获取到连接etcd注册中心的客户端EtcdClient
  2. 调用cluster上的load()方法获取到服务服务提供方地址列表,该函数中会调用cluster()上的handleChanges()方法,将获取到的服务地址列表存储到cluster的values属性中,并且加测values中的key在etcd中是否存活,不存活则踢除
  3. 启动cluster上的watchGroup 协程组,内部执行cluster上的watch()方法该方法内会执行cluster上的watchStream()方法,开启一个监听,监听key在etcd的变化
  4. 当有事件触发的时候,会调用cluster上的handleWatchEvents()事件处理方法,最终会调用 Build 方法中定义的update进行服务状态的更新
  5. 查看封装的update()更新函数,内部会调用ccResolverWrapper上的UpdateState()方法,通过该方法,最终会调用到baseBalancer的UpdateClientConnState()方法,该方法中根据目标服务key,在注册中心获取目标服务的所有节点信息,封装SubConn,存储到负载均衡器上
  1. go-zero服务消费方整合etcd作为注册中心时通过watch机制监听etcd中key的变化来及时更新服务提供方地址,有长轮询和流式两种模式,长轮询模式是指etcd客户端连接每隔一段时间向etcd服务器发送一个请求,询问key是否有变化。流式模式是指etcd客户端连接只发送一次请求,然后保持连接不断开,接收etcd服务器推送的key变化消息,
  2. 默认使用流式模式,当etcd客户端连接收到key变化消息时,基于watch机制,通知所有监听该服务节点地址信息变化的Watcher对象,并把变化的键值对作为参数传入,回调函数会根据键值对的类型(添加或删除),调用container对象的OnAdd或OnDelete方法,并通知container对象的监听器
  3. 根据服务注册etcd会在客户端与服务端第一次建立连接时,注册一个租约lease,并且绑定该租约的TTL为10秒。当服务端由于宕机、网络故障等原因无法维持长连接时,客户端将无法更新租约的TTL字段,因此租约会在10秒后过期,etcd会根据租约的ID删除对应的注册信息
  4. go-zero使用etcd作为注册中心时与服务提供方,服务消费方之间的心跳时间分别是多少:
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐