绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
万字长文浅析微服务Ribbon负载均衡源码(二):负载均衡器
2020-05-25 17:30:08

版本

作者:韩数
Github:github.com/hanshuaikang
完成日期:2019-06-16日
jdk:1.8
springboot版本:2.1.3.RELEASE
SpringCloud版本:Greenwich.SR1

声明:

身为一个刚入门的计算机菜佬,阅读源码自然离不开参考书籍和视频的引导,本篇文章的分析过程中"严重"借鉴了 翟永超 前辈的《SpringCloud微服务实战》这本书籍,在这里也向准备学习微服务的小伙伴们强烈推荐这本书,大家可以把这篇文章理解为《SpringCloud微服务实战》Ribbon部分的精简版和电子版,因为个人水平的原因,很多问题不敢妄下定论,以免误人子弟,所有书上很多内容都是精简过后直接放上去的,由于SpringCloud已经迭代到了Greenwich.SR1版本,Ribbon也和书上有了略微的差别,本篇文章的源码采用的是Ribbon新版本,同时,因为时间原因,有很多额外的子类实现并没有完全顾上,例如PredicateBasedRule类的ZoneAvoidanceRule和AvailabilityFilteringRule 感兴趣的读者可以买《SpringCloud微服务实战》这本书细看,同时强烈推荐小马哥的微服务直播课系列《小马哥微服务实战》。

致谢

翟永超:博客地址:

blog.didispace.com/abou

小马哥: Java 微服务实践 - Spring Boot / Spring Cloud购买链接:

segmentfault.com/ls/165

电子版及相关代码下载(欢迎Star)

Github:github.com/hanshuaikang

微信公众号:码上marson

负载均衡器

AbstractLoadBalancer 类

import java.util.List;

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
    //一个关于服务实例的分组枚举类,定义了三种不同的级别
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }
        
    /**
     * 选择一个服务实例,key为null,忽略key的条件判断
     */
    public Server chooseServer() {
        return chooseServer(null);
    }

    /**
     * 根据不同的分组类型来选择返回不同的服务实例的列表
     */
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    
    /**
     * 获取与负载均衡器相关的统计信息
     */
    public abstract LoadBalancerStats getLoadBalancerStats();    
}

AbstractLoadBalancer是 ILoadBalancer的一个抽象实现,同时也维护了一个关于服务实例的分组枚举类,ServerGroup 同时呢,定义了三种类型,用来针对不同的情况。

  • ALL :所有服务实例
  • STATUS_UP :正常服务的实例
  • STATUS_NOT_UP :停止服务的实例


BaseLoadBalancer类

作用:负载均衡的基础负载均衡器,定义了很多负载均衡器的基本内容

接下来看BaseLoadBalancer针对负载均衡都做了哪些工作呢?

  • 维护了两个服务实例列表,其中一个用于存放所有的实例,一个用于存放正常服务的实例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
        .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
        .synchronizedList(new ArrayList<Server>());
  • 定义了服务检查的IPing对象,默认为null
    protected IPing ping = null;
  • 定义了实施服务检查的执行策略对象,采用默认策略实现。
    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY
    源码部分:


根据注释的意思我们大概知道,如果Server列表过大时,采用默认线性遍历的方式可能会影响系统的性能,
这个时候就需要 实现 IPingStrategy 并重写 pingServers 采用更为灵活的方式。

  • 定义了服务选择器IRule对象,这里默认采用RoundRobinRule实现
    RoundRobinRule代码部分:


这里可以看出Ribbon默认的服务选择策略是线性选择策略。
举个例子:次请求分发到了 9090 端口 第二次则会分发到 9091 然后 9092这样来

  • 启动Ping服务,定时检查当前Server是否健康,默认10秒
    protected int pingIntervalSeconds = 10;
  • 实现了ILoadBalancer的一系列操作




DynamicServerListLoadBalancer类

作用:对基础的负载均衡器BaseLoadBalancer做了扩展,使其拥有服务实例清单在运行期的动态更新的能力。同时也具备了对服务实例清单的过滤功能。

在DynamicServerListLoadBalancer类的成员定义中,我们发现新增了一个成员

ServerList<T> serverListImpl 对象,源码如下:

public interface ServerList<T extends Server> {

    //获取初始化时的服务列表
    public List<T> getInitialListOfServers();
    
    /**
     *获取更新时的服务列表
     */
    public List<T> getUpdatedListOfServers();   

}

通过查看ServerList的继承关系图,我们发现ServerList接口的实现不止一个,那 具体是使用了哪一个实现呢?

可以从如下思路入手,既然DynamicServerListLoadBalancer类实现了服务实例清单的动态更新,那Ribbon势必要和Eureka整合,所以我们从Eureka对Ribbon的支持下手。

EurekaRibbonClientConfiguration类:

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
      Provider<EurekaClient> eurekaClientProvider) {
   if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
      return this.propertiesFactory.get(ServerList.class, config, serviceId);
   }
   DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
         config, eurekaClientProvider);
   DomainExtractingServerList serverList = new DomainExtractingServerList(
         discoveryServerList, config, this.approximateZoneFromHostname);
   return serverList;
}

可以看到默认采用的DiscoveryEnabledNIWSServerList 实现。

DomainExtractingServerList类:

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    private ServerList<DiscoveryEnabledServer> list;

    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(
                this.list.getInitialListOfServers());
        return servers;
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(
                this.list.getUpdatedListOfServers());
        return servers;
    }

    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}

...

}

可以看到DomainExtractingServerList的具体实现是委托于其内部list来实现的,内部list通过DomainExtractingServerList构造器传入的DiscoveryEnabledNIWSServerList获得。

DiscoveryEnabledNIWSServerList 类:

源码部分:(部分代码略)

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer> {
    
    
     public List<DiscoveryEnabledServer> getInitialListOfServers() {
        return this.obtainServersViaDiscovery();
    }

    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        return this.obtainServersViaDiscovery();
    }

    
    rivate List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList();
        if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
            EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
            if (this.vipAddresses != null) {
                String[] var3 = this.vipAddresses.split(",");
                int var4 = var3.length;

                for(int var5 = ; var5 < var4; ++var5) {
                    String vipAddress = var3[var5];
                    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
                    Iterator var8 = listOfInstanceInfo.iterator();

                    while(var8.hasNext()) {
                        InstanceInfo ii = (InstanceInfo)var8.next();
                        if (ii.getStatus().equals(InstanceStatus.UP)) {
                            if (this.shouldUseOverridePort) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
                                }

                                InstanceInfo copy = new InstanceInfo(ii);
                                if (this.isSecure) {
                                    ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
                                } else {
                                    ii = (new Builder(copy)).setPort(this.overridePort).build();
                                }
                            }

                            DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
                            serverList.add(des);
                        }
                    }

                    if (serverList.size() >  && this.prioritizeVipAddressBasedServers) {
                        break;
                    }
                }
            }

            return serverList;
        } else {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList();
        }
    }
    
    
}


  • 步,通过eureka获取服务实例listOfInstanceInfo列表
    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion)
  • 第二步,遍历listOfInstanceInfo列表,如果该服务实例状态为UP,则转化成DiscoveryEnabledServer对象,然后添加到serverList里面。
  • 返回serverList服务实例列表。


通过查看上面的代码大概知道了Ribbon是如何从Eureka注册中心获取新的服务列表的,那Ribbon又是如何将获取到的服务列表更新到本地的呢,这一切的关键是在DynamicServerListLoadBalancer类上,因为我们知道DynamicServerListLoadBalancer类具体实现了动态更新服务列表的功能。

通过查看源码:

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        //更新的具体实现
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };


public interface ServerListUpdater {

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }


    /**
     * start the serverList updater with the given update action
     * This call should be idempotent.
     * 启动服务更新器
     *
     * @param updateAction
     */
    void start(UpdateAction updateAction);

    /**
     * stop the serverList updater. This call should be idempotent
     *停止服务更新器
     */
    void stop();

    /**
     * @return the last update timestamp as a {@link java.util.Date} string
     *获取近一次更新的时间
     */
    String getLastUpdate();

    /**
     * @return the number of ms that has elapsed since last update
     * 获取上一次更新到现在的时间间隔,单位为Ms毫秒
     */
    long getDurationSinceLastUpdateMs();

    /**
     * @return the number of update cycles missed, if valid
     */
    int getNumberMissedCycles();

    /**
     * @return the number of threads used, if vaid
     * 获取核心线程数
     */
    int getCoreThreads();
}

通过查看ServerListUpdater 接口实现关系图,我们大概发现Ribbon内置了两个实现。

  • PollingServerListUpdater :默认采用的更新策略,采用定时任务的方式动态更新服务列表
    // msecs; 延迟一秒开始执行
    private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000;
    // msecs;以30秒为周期重复执行
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000;
  • EurekaNotificationServerListUpdater :基于Eureka事件机制来驱动服务列表更新的实现。

那么,我们Ribbon默认具体采用了哪一种更新策略呢,通过查看DynamicServerListLoadBalancer类的代码,我们发现Ribbon采用的默认服务更新器是PollingServerListUpdater

 @Deprecated
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, 
            ServerList<T> serverList, ServerListFilter<T> filter) {
        this(
                clientConfig,
                rule,
                ping,
                serverList,
                filter,
                new PollingServerListUpdater()
        );
    }

既然了解了默认更新策略,那么我们再次回到我们的主角DynamicServerListLoadBalancer类上。

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

通过代码我们发现实际履行更新职责的方法是 updateListOfServers() ,不废话,上代码:

  @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

通过查看代码,我们发现流程大致如下:

  • 通过 ServerList的getUpdatedListOfServers() 方法获取到新的服务实例列表
  • 如果之前定义了过滤器,则按照某种规则实施过滤,后返回
  • updateAllServerList(servers); 完成后的更新操作。


public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

}

通过查看继承实现关系图,发现ServerListFilter的直接实现类为:AbstractServerListFilter

其中ZoneAffinityServerListFilter 继承了 AbstractServerListFilter ,然后得ZoneAffinityServerListFilter 真传的子类又有好多,这里着重介绍AbstractServerListFilterZoneAffinityServerListFilter 实现

  • AbstractServerListFilter :抽象过滤器,依赖LoadBalancerStats对象实现过滤。LoadBalancerStats存储了负载均衡器的一些属性和统计信息。
  • ZoneAffinityServerListFilter:此服务器列表筛选器处理基于区域关联性筛选服务器。它会过滤掉一些服务实例和消费者不在一个Zone(区域)的实例。


ZoneAwareLoadBalancer类

功能:ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer类的扩展和补充,该负载混合器实现了Zone(区域)的概念,避免了因为跨区域而导致的区域性故障,从而实现了服务的高可用。

那么ZoneAwareLoadBalancer具体做了哪些工作来实现这些功能的呢?

:重写了DynamicServerListLoadBalancer的setServerListForZones方法:

原版:

 protected void setServerListForZones(
    Map<String, List<Server>> zoneServersMap) {
    LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
    getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
 }

ZoneAwareLoadBalancer类版:

    @Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            //balancers  用来存储每个String对应的Zone
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        //设置对应zone下面的实例清单
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        //检查是否有不再拥有服务器的区域
        //并将列表设置为空,以便与区域相关的度量不为空
        //包含过时的数据
        // 防止因为Zone的信息过时而干扰具体实例的选择算法。
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }  


那ZoneAwareLoadBalancer类是具体如何来选择具体的服务实例呢,

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            //为所有Zone都创建一个快照
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", .2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", .99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }


从源码中可以看出来, getLoadBalancerStats().getAvailableZones().size() <= 1 只有在当前的Zone区域的数量大于1的时候才会采用区域选择策略,否则的话,则'return super.chooseServer(key)' 什么也不做,采用父类的实现。

在选择具体的服务实例中,ZoneAwareLoadBalancer主要做了以下几件事:

  • 为所有Zone区域分别创建一个快照,存储在zoneSnapshot 里面
  • 通过Zone快照中的信息,按照某种策略例如Zone的服务实例数量,故障率等等来筛选掉不符合条件的Zone区域。
  • 如果发现没有符合剔除要求的区域,同时实例大平均负载小于阈值(默认百分之20),就直接返回所有可以的Zone区域,否则,随机剔除一个坏的Zone。
  • 获得的可用的Zone列表不为空,并且数量小于之前快照中的总数量,则根据IRule规则随机选一个Zone区域
  • 确定了终的Zone之后,终调用 BaseLoadBalancer的chooseServer来选择一个合适的服务实例。

写在后:

发现知乎好像并不支持在列表中内嵌代码块,所以只能把图片放上去了。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

唠唠叨叨负载均衡
创建时间:2020-05-25 14:12:28
唠唠叨叨负载均衡
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 小雨滴
    专家
戳我,来吐槽~