SpringCloud - 负载衡器Loadbalancer
SpringCloud - 负载衡器Loadbalancer
详解Spring Cloud loadbalancer服务调用机制
loadbalancer使用说明
本文所使用的loadbalancer依赖,可以看到版本为3.1.5,作为服务消费者我们通过loadbalancer作为负载均衡器替换到默认的ribbon,同时我们还引入caffeine触发Loadbalancer完成基于caffeine的服务缓存装配:
<!--使用loadbalancer负载均衡器替换ribbon-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
从项目的架构图,可以看到当前项目给出两个服务提供者nacos-provider和一个服务消费者nacos-consumer,当服务消费者从nacos拉取到可用服务之后,会通过权重算法调用可用的服务示例:
服务消费者所用到的负载均衡算法的配置:
/**
* 将负载均衡算法设置为权重算法
*
* @param environment
* @param loadBalancerClientFactory
* @return
*/
@Bean
ReactorLoadBalancer<ServiceInstance> weightedLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new WeightedLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
WeightedLoadBalancer 这个权重算法的实现:
/**
* 基于权重的负载均衡算法
*/
public class WeightedLoadBalancer implements ReactorServiceInstanceLoadBalancer {
/**
* loadbalancer 提供的访问当前服务的名称
*/
final String serviceId;
/**
* 基于nacos缓存获取服务列表
*/
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public WeightedLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
//如果serviceInstanceListSupplierProvider不存在则采用NoopServiceInstanceListSupplier返回空实例列表
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
//基于服务获取策略supplier略获取可用实例,采用权重算法返回本地调用的服务实例
return supplier.get(request)
.next()
.map(serviceInstances -> new DefaultResponse(NacosBalancer.getHostByRandomWeight3(serviceInstances)));
}
}
loadbalancer自动装配初始化
基于上述的配置,loadbalancer进行自动装配的时候就会识别到Caffeine和CaffeineCacheManager的存在,于是触发caffeineLoadBalancerCacheManager的装配,该管理类内部有一个cacheMap,本次装配将会以cachingServiceInstanceListSupplierCache为key,caffeine cache为value作为键值对存入,后续所有的服务缓存实例信息都会存储在这个键值对中的caffeine cache中:

自动装配的源码,如下所示,可以看到因为Caffeine和CaffeineCacheManager的存在,我们触发了CaffeineBasedLoadBalancerCacheManager的装配:
@Configuration(proxyBeanMethods = false)
//Caffeine和CaffeineCacheManager都存在触发自动装配
@ConditionalOnClass({ Caffeine.class, CaffeineCacheManager.class })
protected static class CaffeineLoadBalancerCacheManagerConfiguration {
@Bean(autowireCandidate = false)
@ConditionalOnMissingBean
LoadBalancerCacheManager caffeineLoadBalancerCacheManager(LoadBalancerCacheProperties cacheProperties) {
//生成CaffeineBasedLoadBalancerCacheManager管理类
return new CaffeineBasedLoadBalancerCacheManager(cacheProperties);
}
}
通过查看CaffeineBasedLoadBalancerCacheManager内部初始化逻辑,最终可以看到setCacheNames这个方法,该方法就会执行就是我们上文中所说的以cachingServiceInstanceListSupplierCache为key,以caffeineCache这个缓存为value的键值对初始化工作:
public CaffeineCacheManager(String... cacheNames) {
//基于
setCacheNames(Arrays.asList(cacheNames));
}
public void setCacheNames(@Nullable Collection<String> cacheNames) {
if (cacheNames != null) {
for (String name : cacheNames) {
this.cacheMap.put(name, createCaffeineCache(name));
}
this.dynamic = false;
}
else {
this.dynamic = true;
}
}
初始化后的调试结果如下,我们可以很直观的看到这个记录服务实例的缓存键值对:
loadbalancer如何基于缓存完成服务调用
默认情况下loadbalancer缓存是没有任何信息的,假设我们nacos-consumer即服务消费者发起对nacos-provider的调用,loadbalancer是如何拿到服务实例的信息呢?
实际上,在loadbalancer初始化的时候,服务实例查询组件ServiceInstanceListSupplier内部聚合了nacos的服务查询组件DiscoveryClientServiceInstanceListSupplier,所以当我们通过feign发起调用时,loadbalancer的执行步骤为:
loadbalancer代理会先通过ServiceInstanceListSupplier到缓存中查看是否存在服务提供者nacos-provider的信息,如果不为空直接返回调用即可,如果不存在则执行步骤2。
尝试到基于nacos服务查询组件DiscoveryClientServiceInstanceListSupplier查看是否存在nacos-provider如果有直接返回。
ServiceInstanceListSupplier基于nacos服务组件的结果拿到实例信息,将其缓存起来,并基于负载均衡策略返回服务实例给服务消费者进行调用。

基于上图我们给出loadbalancer自动装配的服务查询组件源码,可以看到ServiceInstanceListSupplier通过withBlockingDiscoveryClient方法聚合了nacos服务查询组件:
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@Conditional(DefaultConfigurationCondition.class)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
//ServiceInstanceListSupplier通过withBlockingDiscoveryClient方法聚合了nacos服务查询组件
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
}
当我们发起服务调用时,feign代理会走到FeignBlockingLoadBalancerClient上,其内部会执行如下步骤:
通过loadBalancerClient的choose尝试上文的多级缓存查询服务示例的步骤,并完成负载均衡选取服务实例返回。
基于上述实例生成请求地址和参数。
发起请求并响应结果给服务消费者。
@Override
public Response execute(Request request, Request.Options options) throws IOException {
//......
//尝试从缓存中拿服务,然后执行负载均衡调用
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
//......
//基于上述实例生成请求地址和参数
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
//发起请求
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());
}
步入loadBalancerClient.choose的调用路径,我们就可以看到上文所说的多级缓存查询步骤:
到lb缓存即cacheManager通过cachingServiceInstanceListSupplierCache查询是否存在服务提供者nacos-provider的实例。
如果没有则到nacos中的查询。
基于查询结果写入lb的cache中:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb缓存管理拿缓存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb缓存有没有
List<ServiceInstance> list = cache.get(key, List.class);
//如果没有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb缓存没有则触发onCacheMissResume回调,就会通过delegate.get()触发nacos服务组件查询
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos缓存后写入lb缓存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//写入lb缓存
cache.put(key, instances);
}
}).then());
}
loadbalancer缓存更新策略
默认情况下,lb缓存每35s完成一次更新,这也就意味着缓存在lb缓存中的服务实例信息只有在存入后的35s内是有效的,为了避免定时轮询更新服务实例的开销,lb的缓存采用了一种惰性更新的思想。
假设我们此时此刻缓存将nacos-provider的实例信息缓存到lb装配的CaffeineCache中,服务消费者在35s之后发起调用,此时CaffeineCache就会基于缓存服务实例的起始时间判断缓存是否过期,如果发现过期则直接返回null,让loadbalancer到nacos缓存中获取nacos-provider实例信息并覆盖掉当前过期的缓存:
对应cache缓存默认过期时间的默认值,即位于LoadBalancerCacheProperties 中对应ttl 的赋值:
@ConfigurationProperties("spring.cloud.loadbalancer.cache")
public class LoadBalancerCacheProperties {
//.......
private Duration ttl = Duration.ofSeconds(35);
//......
}
为方便说明,我们再次贴出lb缓存查询的源码,如下所示, cache.get(key, List.class)这一段就是从Loadbalancer的缓存中获取服务实例的信息,如果过期也会返回null,然后到nacos缓存中获取信息并更新过期缓存:
public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
this.serviceInstances = CacheFlux.lookup(key -> {
// 到lb缓存管理拿缓存
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
//......
//查看lb缓存有没有查询服务实例信息,如果不存在或者过期则返回null
List<ServiceInstance> list = cache.get(key, List.class);
//如果没有返回空
if (list == null || list.isEmpty()) {
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
},
//若lb缓存没有则触发onCacheMissResume回调,就会通过delegate.get()触发nacos服务组件查询
delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {//doOnNext得到nacos缓存后写入lb缓存中
Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (cache == null) {
//.......
}
else {
//写入lb缓存,并更新写入时间
cache.put(key, instances);
}
}).then());
}
步入Loadbalancer缓存CaffeineCache的get方法,最终就会来到BoundedLocalCache的getIfPresent,这段就是查询缓存并判断过期的核心实现,对应步骤为:
基于要调用的服务实例的字符串(以本文示例来说就是nacos-provider)作为key进行查询,并得到一个node。
通过node的writeTime比对当前时间now判断是否过期。
如果过期返回null,让loadbalancer到nacos缓存中获取最新的值并覆盖掉当前缓存。
如果没过期则直接返回。
对应的这段说明的源码,可结合表述和源码注释理解上述步骤:
public @Nullable V getIfPresent(Object key, boolean recordStats) {
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
//......
//获取服务实例信息
V value = node.getValue();
//......
//判断是否过期,如果过期则返回null
if (hasExpired(node, now) || (collectValues() && (value == null))) {
//......
return null;
}
//......
//没过期直接返回缓存中的服务实例信息
return value;
}
关于loadbalancer的一些注意事项
经过上述的说明相信大家对于loadbalancer的底层工作机制有所了解,所以需要做灰度发布或者服务平滑下线的场景,建议将loadbalancer缓存直接禁用,一律采用nacos缓存,因为nacos客户端的中的服务实例缓存是实时刷新的,只要服务端感知到服务下线就会以RPC的方式通知nacos客户端更新缓存。
禁用缓存的配置:
spring.cloud.loadbalancer.cache.enabled=false
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)