详解Spring Cloud loadbalancer服务调用机制

loadbalancer使用说明

本文所使用的loadbalancer依赖,可以看到版本为3.1.5,作为服务消费者我们通过loadbalancer作为负载均衡器替换到默认的ribbon,同时我们还引入caffeine触发Loadbalancer完成基于caffeine的服务缓存装配:

<!--使用loadbalancer负载均衡器替换ribbon-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
            <version>3.1.5</version>
        </dependency>

<dependency>
            <groupId>com.github.ben-manes.caffeine</groupId>
            <artifactId>caffeine</artifactId>
            <version>2.9.3</version>
        </dependency>

从项目的架构图,可以看到当前项目给出两个服务提供者nacos-provider和一个服务消费者nacos-consumer,当服务消费者从nacos拉取到可用服务之后,会通过权重算法调用可用的服务示例:
在这里插入图片描述

服务消费者所用到的负载均衡算法的配置:

/**
     * 将负载均衡算法设置为权重算法
     *
     * @param environment
     * @param loadBalancerClientFactory
     * @return
     */
    @Bean
    ReactorLoadBalancer<ServiceInstance> weightedLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new WeightedLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);

    }

WeightedLoadBalancer 这个权重算法的实现:

/**
 * 基于权重的负载均衡算法
 */
public class WeightedLoadBalancer implements ReactorServiceInstanceLoadBalancer {


    /**
     * loadbalancer 提供的访问当前服务的名称
     */
    final String serviceId;

    /**
     * 基于nacos缓存获取服务列表
     */
    ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;


    public WeightedLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
      this.serviceId = serviceId;
      this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }

    @Autowired
    private NacosDiscoveryProperties nacosDiscoveryProperties;

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        //如果serviceInstanceListSupplierProvider不存在则采用NoopServiceInstanceListSupplier返回空实例列表
        ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        //基于服务获取策略supplier略获取可用实例,采用权重算法返回本地调用的服务实例
        return supplier.get(request)
                .next()
                .map(serviceInstances -> new DefaultResponse(NacosBalancer.getHostByRandomWeight3(serviceInstances)));
    }
}

loadbalancer自动装配初始化

基于上述的配置,loadbalancer进行自动装配的时候就会识别到Caffeine和CaffeineCacheManager的存在,于是触发caffeineLoadBalancerCacheManager的装配,该管理类内部有一个cacheMap,本次装配将会以cachingServiceInstanceListSupplierCache为key,caffeine cache为value作为键值对存入,后续所有的服务缓存实例信息都会存储在这个键值对中的caffeine cache中:

在这里插入图片描述

自动装配的源码,如下所示,可以看到因为Caffeine和CaffeineCacheManager的存在,我们触发了CaffeineBasedLoadBalancerCacheManager的装配:

@Configuration(proxyBeanMethods = false)
 //Caffeine和CaffeineCacheManager都存在触发自动装配
 @ConditionalOnClass({ Caffeine.class, CaffeineCacheManager.class })
 protected static class CaffeineLoadBalancerCacheManagerConfiguration {

  @Bean(autowireCandidate = false)
  @ConditionalOnMissingBean
  LoadBalancerCacheManager caffeineLoadBalancerCacheManager(LoadBalancerCacheProperties cacheProperties) {
  //生成CaffeineBasedLoadBalancerCacheManager管理类
   return new CaffeineBasedLoadBalancerCacheManager(cacheProperties);
  }

 }

通过查看CaffeineBasedLoadBalancerCacheManager内部初始化逻辑,最终可以看到setCacheNames这个方法,该方法就会执行就是我们上文中所说的以cachingServiceInstanceListSupplierCache为key,以caffeineCache这个缓存为value的键值对初始化工作:

public CaffeineCacheManager(String... cacheNames) {
  //基于
  setCacheNames(Arrays.asList(cacheNames));
 }

public void setCacheNames(@Nullable Collection<String> cacheNames) {
  if (cacheNames != null) {
   for (String name : cacheNames) {
    this.cacheMap.put(name, createCaffeineCache(name));
   }
   this.dynamic = false;
  }
  else {
   this.dynamic = true;
  }
 }

初始化后的调试结果如下,我们可以很直观的看到这个记录服务实例的缓存键值对:
在这里插入图片描述

loadbalancer如何基于缓存完成服务调用

默认情况下loadbalancer缓存是没有任何信息的,假设我们nacos-consumer即服务消费者发起对nacos-provider的调用,loadbalancer是如何拿到服务实例的信息呢?

实际上,在loadbalancer初始化的时候,服务实例查询组件ServiceInstanceListSupplier内部聚合了nacos的服务查询组件DiscoveryClientServiceInstanceListSupplier,所以当我们通过feign发起调用时,loadbalancer的执行步骤为:

loadbalancer代理会先通过ServiceInstanceListSupplier到缓存中查看是否存在服务提供者nacos-provider的信息,如果不为空直接返回调用即可,如果不存在则执行步骤2。
尝试到基于nacos服务查询组件DiscoveryClientServiceInstanceListSupplier查看是否存在nacos-provider如果有直接返回。
ServiceInstanceListSupplier基于nacos服务组件的结果拿到实例信息,将其缓存起来,并基于负载均衡策略返回服务实例给服务消费者进行调用。

在这里插入图片描述

基于上图我们给出loadbalancer自动装配的服务查询组件源码,可以看到ServiceInstanceListSupplier通过withBlockingDiscoveryClient方法聚合了nacos服务查询组件:

@Bean
  @ConditionalOnBean(DiscoveryClient.class)
  @ConditionalOnMissingBean
  @Conditional(DefaultConfigurationCondition.class)
  public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
    ConfigurableApplicationContext context) {
    //ServiceInstanceListSupplier通过withBlockingDiscoveryClient方法聚合了nacos服务查询组件
   return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
  }

当我们发起服务调用时,feign代理会走到FeignBlockingLoadBalancerClient上,其内部会执行如下步骤:

通过loadBalancerClient的choose尝试上文的多级缓存查询服务示例的步骤,并完成负载均衡选取服务实例返回。
基于上述实例生成请求地址和参数。
发起请求并响应结果给服务消费者。

@Override
 public Response execute(Request request, Request.Options options) throws IOException {
   //......
  //尝试从缓存中拿服务,然后执行负载均衡调用
  ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
   //......
   //基于上述实例生成请求地址和参数
  String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
  Request newRequest = buildRequest(request, reconstructedUrl);
  LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
  //发起请求
  return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
    supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
 }

步入loadBalancerClient.choose的调用路径,我们就可以看到上文所说的多级缓存查询步骤:

到lb缓存即cacheManager通过cachingServiceInstanceListSupplierCache查询是否存在服务提供者nacos-provider的实例。
如果没有则到nacos中的查询。
基于查询结果写入lb的cache中:

public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
 
  this.serviceInstances = CacheFlux.lookup(key -> {
   // 到lb缓存管理拿缓存
   Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
   //......
   //查看lb缓存有没有
   List<ServiceInstance> list = cache.get(key, List.class);
   //如果没有返回空
   if (list == null || list.isEmpty()) {
    return Mono.empty();
   }
   return Flux.just(list).materialize().collectList();
  }, 
  //若lb缓存没有则触发onCacheMissResume回调,就会通过delegate.get()触发nacos服务组件查询
  delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
    .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
    .doOnNext(instances -> {//doOnNext得到nacos缓存后写入lb缓存中
     Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
     if (cache == null) {
      //.......
     }
     else {
       //写入lb缓存
      cache.put(key, instances);
     }
    }).then());
 }

loadbalancer缓存更新策略

默认情况下,lb缓存每35s完成一次更新,这也就意味着缓存在lb缓存中的服务实例信息只有在存入后的35s内是有效的,为了避免定时轮询更新服务实例的开销,lb的缓存采用了一种惰性更新的思想。

假设我们此时此刻缓存将nacos-provider的实例信息缓存到lb装配的CaffeineCache中,服务消费者在35s之后发起调用,此时CaffeineCache就会基于缓存服务实例的起始时间判断缓存是否过期,如果发现过期则直接返回null,让loadbalancer到nacos缓存中获取nacos-provider实例信息并覆盖掉当前过期的缓存:
在这里插入图片描述

对应cache缓存默认过期时间的默认值,即位于LoadBalancerCacheProperties 中对应ttl 的赋值:

@ConfigurationProperties("spring.cloud.loadbalancer.cache")
public class LoadBalancerCacheProperties {

 //.......

 private Duration ttl = Duration.ofSeconds(35);

 //......
}

为方便说明,我们再次贴出lb缓存查询的源码,如下所示, cache.get(key, List.class)这一段就是从Loadbalancer的缓存中获取服务实例的信息,如果过期也会返回null,然后到nacos缓存中获取信息并更新过期缓存:

public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
 
  this.serviceInstances = CacheFlux.lookup(key -> {
   // 到lb缓存管理拿缓存
   Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
   //......
   //查看lb缓存有没有查询服务实例信息,如果不存在或者过期则返回null
   List<ServiceInstance> list = cache.get(key, List.class);
   //如果没有返回空
   if (list == null || list.isEmpty()) {
    return Mono.empty();
   }
   return Flux.just(list).materialize().collectList();
  }, 
  //若lb缓存没有则触发onCacheMissResume回调,就会通过delegate.get()触发nacos服务组件查询
  delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
    .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
    .doOnNext(instances -> {//doOnNext得到nacos缓存后写入lb缓存中
     Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
     if (cache == null) {
      //.......
     }
     else {
       //写入lb缓存,并更新写入时间
      cache.put(key, instances);
     }
    }).then());
 }

步入Loadbalancer缓存CaffeineCache的get方法,最终就会来到BoundedLocalCache的getIfPresent,这段就是查询缓存并判断过期的核心实现,对应步骤为:

基于要调用的服务实例的字符串(以本文示例来说就是nacos-provider)作为key进行查询,并得到一个node。
通过node的writeTime比对当前时间now判断是否过期。
如果过期返回null,让loadbalancer到nacos缓存中获取最新的值并覆盖掉当前缓存。
如果没过期则直接返回。
对应的这段说明的源码,可结合表述和源码注释理解上述步骤:

public @Nullable V getIfPresent(Object key, boolean recordStats) {
    Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
    //......
 //获取服务实例信息
    V value = node.getValue();
    
    //......
    //判断是否过期,如果过期则返回null
    if (hasExpired(node, now) || (collectValues() && (value == null))) {
      //......
      return null;
    }

  //......
  //没过期直接返回缓存中的服务实例信息
    return value;
  }

关于loadbalancer的一些注意事项

经过上述的说明相信大家对于loadbalancer的底层工作机制有所了解,所以需要做灰度发布或者服务平滑下线的场景,建议将loadbalancer缓存直接禁用,一律采用nacos缓存,因为nacos客户端的中的服务实例缓存是实时刷新的,只要服务端感知到服务下线就会以RPC的方式通知nacos客户端更新缓存。

禁用缓存的配置:

spring.cloud.loadbalancer.cache.enabled=false
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐