版本
作者:韩数
Github:https://github.com/hanshuaikang
完成日期:2019-06-16日
jdk:1.8
springboot版本:2.1.3.RELEASE
SpringCloud版本:Greenwich.SR1
声明:
身为一个刚入门的计算机菜佬,阅读源码自然离不开参考书籍和视频的引导,本篇文章的分析过程中"严重"借鉴了 翟永超 前辈的《SpringCloud微服务实战》这本书籍,在这里也向准备学习微服务的小伙伴们强烈推荐这本书,大家可以把这篇文章理解为《SpringCloud微服务实战》Ribbon部分的精简版和电子版,因为个人水平的原因,很多问题不敢妄下定论,以免误人子弟,所有书上很多内容都是精简过后直接放上去的,由于SpringCloud已经迭代到了Greenwich.SR1版本,Ribbon也和书上有了略微的差别,本篇文章的源码采用的是Ribbon新版本,同时,因为时间原因,有很多额外的子类实现并没有完全顾上,例如PredicateBasedRule类的ZoneAvoidanceRule和AvailabilityFilteringRule 感兴趣的读者可以买《SpringCloud微服务实战》这本书细看,同时强烈推荐小马哥的微服务直播课系列《小马哥微服务实战》。
致谢
翟永超:博客地址:
http://blog.didispace.com/aboutme/
小马哥: Java 微服务实践 - Spring Boot / Spring Cloud购买链接:
https://segmentfault.com/ls/1650000011387052
电子版及相关代码下载(欢迎Star)
Github:https://github.com/hanshuaikang/Spring-Note
微信公众号:码上marson
负载均衡器
AbstractLoadBalancer 类
import java.util.List;
public abstract class AbstractLoadBalancer implements ILoadBalancer {
//一个关于服务实例的分组枚举类,定义了三种不同的级别
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
/**
* 选择一个服务实例,key为null,忽略key的条件判断
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* 根据不同的分组类型来选择返回不同的服务实例的列表
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* 获取与负载均衡器相关的统计信息
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer是 ILoadBalancer的一个抽象实现,同时也维护了一个关于服务实例的分组枚举类,ServerGroup 同时呢,定义了三种类型,用来针对不同的情况。
- ALL :所有服务实例
- STATUS_UP :正常服务的实例
- STATUS_NOT_UP :停止服务的实例
BaseLoadBalancer类
作用:负载均衡的基础负载均衡器,定义了很多负载均衡器的基本内容
接下来看BaseLoadBalancer针对负载均衡都做了哪些工作呢?
- 维护了两个服务实例列表,其中一个用于存放所有的实例,一个用于存放正常服务的实例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
- 定义了服务检查的IPing对象,默认为null
protected IPing ping = null; - 定义了实施服务检查的执行策略对象,采用默认策略实现。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY
源码部分:
根据注释的意思我们大概知道,如果Server列表过大时,采用默认线性遍历的方式可能会影响系统的性能,
这个时候就需要 实现 IPingStrategy 并重写 pingServers 采用更为灵活的方式。
- 定义了服务选择器IRule对象,这里默认采用RoundRobinRule实现
RoundRobinRule代码部分:
这里可以看出Ribbon默认的服务选择策略是线性选择策略。
举个例子:次请求分发到了 9090 端口 第二次则会分发到 9091 然后 9092这样来
- 启动Ping服务,定时检查当前Server是否健康,默认10秒
protected int pingIntervalSeconds = 10; - 实现了ILoadBalancer的一系列操作
DynamicServerListLoadBalancer类
作用:对基础的负载均衡器BaseLoadBalancer做了扩展,使其拥有服务实例清单在运行期的动态更新的能力。同时也具备了对服务实例清单的过滤功能。
在DynamicServerListLoadBalancer类的成员定义中,我们发现新增了一个成员
ServerList<T> serverListImpl 对象,源码如下:
public interface ServerList<T extends Server> {
//获取初始化时的服务列表
public List<T> getInitialListOfServers();
/**
*获取更新时的服务列表
*/
public List<T> getUpdatedListOfServers();
}
通过查看ServerList的继承关系图,我们发现ServerList接口的实现不止一个,那 具体是使用了哪一个实现呢?
可以从如下思路入手,既然DynamicServerListLoadBalancer类实现了服务实例清单的动态更新,那Ribbon势必要和Eureka整合,所以我们从Eureka对Ribbon的支持下手。
EurekaRibbonClientConfiguration类:
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
可以看到默认采用的DiscoveryEnabledNIWSServerList 实现。
DomainExtractingServerList类:
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
}
...略
}
可以看到DomainExtractingServerList的具体实现是委托于其内部list来实现的,内部list通过DomainExtractingServerList构造器传入的DiscoveryEnabledNIWSServerList获得。
DiscoveryEnabledNIWSServerList 类:
源码部分:(部分代码略)
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> {
public List<DiscoveryEnabledServer> getInitialListOfServers() {
return this.obtainServersViaDiscovery();
}
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
return this.obtainServersViaDiscovery();
}
rivate List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList();
if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
if (this.vipAddresses != null) {
String[] var3 = this.vipAddresses.split(",");
int var4 = var3.length;
for(int var5 = ; var5 < var4; ++var5) {
String vipAddress = var3[var5];
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
Iterator var8 = listOfInstanceInfo.iterator();
while(var8.hasNext()) {
InstanceInfo ii = (InstanceInfo)var8.next();
if (ii.getStatus().equals(InstanceStatus.UP)) {
if (this.shouldUseOverridePort) {
if (logger.isDebugEnabled()) {
logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if (this.isSecure) {
ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
} else {
ii = (new Builder(copy)).setPort(this.overridePort).build();
}
}
DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size() > && this.prioritizeVipAddressBasedServers) {
break;
}
}
}
return serverList;
} else {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList();
}
}
}
- 步,通过eureka获取服务实例listOfInstanceInfo列表
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion) - 第二步,遍历listOfInstanceInfo列表,如果该服务实例状态为UP,则转化成DiscoveryEnabledServer对象,然后添加到serverList里面。
- 返回serverList服务实例列表。
通过查看上面的代码大概知道了Ribbon是如何从Eureka注册中心获取新的服务列表的,那Ribbon又是如何将获取到的服务列表更新到本地的呢,这一切的关键是在DynamicServerListLoadBalancer类上,因为我们知道DynamicServerListLoadBalancer类具体实现了动态更新服务列表的功能。
通过查看源码:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
//更新的具体实现
@Override
public void doUpdate() {
updateListOfServers();
}
};
public interface ServerListUpdater {
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
/**
* start the serverList updater with the given update action
* This call should be idempotent.
* 启动服务更新器
*
* @param updateAction
*/
void start(UpdateAction updateAction);
/**
* stop the serverList updater. This call should be idempotent
*停止服务更新器
*/
void stop();
/**
* @return the last update timestamp as a {@link java.util.Date} string
*获取近一次更新的时间
*/
String getLastUpdate();
/**
* @return the number of ms that has elapsed since last update
* 获取上一次更新到现在的时间间隔,单位为Ms毫秒
*/
long getDurationSinceLastUpdateMs();
/**
* @return the number of update cycles missed, if valid
*/
int getNumberMissedCycles();
/**
* @return the number of threads used, if vaid
* 获取核心线程数
*/
int getCoreThreads();
}
通过查看ServerListUpdater 接口实现关系图,我们大概发现Ribbon内置了两个实现。
- PollingServerListUpdater :默认采用的更新策略,采用定时任务的方式动态更新服务列表
// msecs; 延迟一秒开始执行
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000;
// msecs;以30秒为周期重复执行
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; - EurekaNotificationServerListUpdater :基于Eureka事件机制来驱动服务列表更新的实现。
那么,我们Ribbon默认具体采用了哪一种更新策略呢,通过查看DynamicServerListLoadBalancer类的代码,我们发现Ribbon采用的默认服务更新器是PollingServerListUpdater
@Deprecated
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter) {
this(
clientConfig,
rule,
ping,
serverList,
filter,
new PollingServerListUpdater()
);
}
既然了解了默认更新策略,那么我们再次回到我们的主角DynamicServerListLoadBalancer类上。
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
通过代码我们发现实际履行更新职责的方法是 updateListOfServers() ,不废话,上代码:
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
通过查看代码,我们发现流程大致如下:
- 通过 ServerList的getUpdatedListOfServers() 方法获取到新的服务实例列表
- 如果之前定义了过滤器,则按照某种规则实施过滤,后返回
- updateAllServerList(servers); 完成后的更新操作。
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
通过查看继承实现关系图,发现ServerListFilter的直接实现类为:AbstractServerListFilter
其中ZoneAffinityServerListFilter 继承了 AbstractServerListFilter ,然后得ZoneAffinityServerListFilter 真传的子类又有好多,这里着重介绍AbstractServerListFilter 和 ZoneAffinityServerListFilter 实现
- AbstractServerListFilter :抽象过滤器,依赖LoadBalancerStats对象实现过滤。LoadBalancerStats存储了负载均衡器的一些属性和统计信息。
- ZoneAffinityServerListFilter:此服务器列表筛选器处理基于区域关联性筛选服务器。它会过滤掉一些服务实例和消费者不在一个Zone(区域)的实例。
ZoneAwareLoadBalancer类
功能:ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer类的扩展和补充,该负载混合器实现了Zone(区域)的概念,避免了因为跨区域而导致的区域性故障,从而实现了服务的高可用。
那么ZoneAwareLoadBalancer具体做了哪些工作来实现这些功能的呢?
:重写了DynamicServerListLoadBalancer的setServerListForZones方法:
原版:
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
ZoneAwareLoadBalancer类版:
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
//balancers 用来存储每个String对应的Zone
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
//设置对应zone下面的实例清单
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
//检查是否有不再拥有服务器的区域
//并将列表设置为空,以便与区域相关的度量不为空
//包含过时的数据
// 防止因为Zone的信息过时而干扰具体实例的选择算法。
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
那ZoneAwareLoadBalancer类是具体如何来选择具体的服务实例呢,
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//为所有Zone都创建一个快照
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", .2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", .99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
从源码中可以看出来, getLoadBalancerStats().getAvailableZones().size() <= 1 只有在当前的Zone区域的数量大于1的时候才会采用区域选择策略,否则的话,则'return super.chooseServer(key)' 什么也不做,采用父类的实现。
在选择具体的服务实例中,ZoneAwareLoadBalancer主要做了以下几件事:
- 为所有Zone区域分别创建一个快照,存储在zoneSnapshot 里面
- 通过Zone快照中的信息,按照某种策略例如Zone的服务实例数量,故障率等等来筛选掉不符合条件的Zone区域。
- 如果发现没有符合剔除要求的区域,同时实例大平均负载小于阈值(默认百分之20),就直接返回所有可以的Zone区域,否则,随机剔除一个坏的Zone。
- 获得的可用的Zone列表不为空,并且数量小于之前快照中的总数量,则根据IRule规则随机选一个Zone区域
- 确定了终的Zone之后,终调用 BaseLoadBalancer的chooseServer来选择一个合适的服务实例。
写在后:
发现知乎好像并不支持在列表中内嵌代码块,所以只能把图片放上去了。