分享好友

×
取消 复制
Elasticsearch源码剖析之master选举策略
2020-06-03 13:41:03

当es Node刚启动或Node ping master节点超时时,会触发当前节点重新加入集群。本文就是对加入集群这一过程的描述。

es中加入集群在org.elasticsearch.discovery.zen.ZenDiscovery#innerJoinCluster中实现。

innerJoinCluster

private void innerJoinCluster() {
 DiscoveryNode masterNode = null;
 final Thread currentThread = Thread.currentThread();
  nodeJoinController.startElectionContext();
  while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
 masterNode = findMaster();
 }
if (!joinThreadControl.joinThreadActive(currentThread)) {
 logger.trace("thread is no longer in currentJoinThread. Stopping.");
 return;
 }
if (transportService.getLocalNode().equals(masterNode)) {
 final int requiredJoins = Math.max(, electMaster.minimumMasterNodes() - 1); // we count as one
 logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
 nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
 new NodeJoinController.ElectionCallback() {
 @Override
 public void onElectedAsMaster(ClusterState state) {
 synchronized (stateMutex) {
 joinThreadControl.markThreadAsDone(currentThread);
 }
 }
@Override
 public void onFailure(Throwable t) {
 logger.trace("failed while waiting for nodes to join, rejoining", t);
 synchronized (stateMutex) {
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 }
 }
 }
);
 } else {
 // process any incoming joins (they will fail because we are not the master)
 nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
 final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
 if (success) {
 DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
 if (currentMasterNode == null) {
 // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
 // a valid master.
 logger.debug("no master node is set, despite of join request completing. retrying pings.");
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 } else if (currentMasterNode.equals(masterNode) == false) {
 // update cluster state
 joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
 }
joinThreadControl.markThreadAsDone(currentThread);
 } else {
 // failed to join. Try again...
 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
 }
 }
 }
 }

具体步骤如下

1、nodeJoinController创建新的ElectionContext实例,electionContext作用后续说明

2、此方法的核心处findMaster,此方法会找到当前集群中的masterNode,如果不存在则选举一个。详见findMaster节。

下面2步实现非常繁琐且跨网络协调,只说下基本过程:

没有被选举为master的节点向master节点发送join请求并阻塞。

如果当前节点是master,则等待非master节点的join至最大数量并阻塞;同时master节点收到join请求时,如果当前master不处于选举情形,则直接向Join节点发送集群状态,然后响应Join请求,如果处于等待join状态时,则等到达法定节点数量时,则向所有Join节点发送集群状态,然后响应所有的Join请求。

findMaster

功能:寻找当前集群中活着的Node,如果这些Node不存在masterNode则选举一个。

private DiscoveryNode findMaster() {
 logger.trace("starting to ping");
   List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
 
final DiscoveryNode localNode = transportService.getLocalNode();
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
// filter responses
 final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
 List<DiscoveryNode> activeMasters = new ArrayList<>();
 for (ZenPing.PingResponse pingResponse : pingResponses) {
 // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
 // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
 if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
 activeMasters.add(pingResponse.master());
 }
 }
// nodes discovered during pinging
  List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
 for (ZenPing.PingResponse pingResponse : pingResponses) {
 if (pingResponse.node().isMasterNode()) {
 masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
 }
 }
if (activeMasters.isEmpty()) {
 if (electMaster.hasEnoughCandidates(masterCandidates)) {
  final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
 logger.trace("candidate {} won election", winner);
 return winner.getNode();
 } else {
 // if we don't have enough master nodes, we bail, because there are not enough master to elect from
 logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
 masterCandidates, electMaster.minimumMasterNodes());
 return null;
 }
 } else {
 assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
 // lets tie break between discovered nodes
  return electMaster.tieBreakActiveMasters(activeMasters);
 }
 }

1、当前node获取集群中其他节点的ZenPing.PingResponse列表。ZenPing.PingResponse主要属性如下:

DiscoveryNode node; :响应node信息

DiscoveryNode master;:响应node master

long clusterStateVersion; :响应node 集群状态版本

2、pingResponses中筛选出其他节点中标记的master node集合:activeMasters

3、构造候选master列表:masterCandidates。(候选master是通过配置选项 node.master(true)来指定的,默认是true。)

4、如果集群中activeMasters为空(当节点加入集群情形),且masterCandidates的数量不小于法定个数(即候选master节点的一半,阻止split brain)(hasEnoughCandidates)(法定个数是有配置选项discovery.zen.minimum_master_nodes指定,默认1,当候选master大于3时必须修改。)时,此时进行master节点的选举(electMaster.electMaster)。

选举的策略非常简单,

 public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
 List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
 sortedCandidates.sort(MasterCandidate::compare);
 return sortedCandidates.get();
 }

取最大的候选节点排序结果值。排序实现如下:

 public static int compare(MasterCandidate c1, MasterCandidate c2) {
 int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
 if (ret == ) {
 ret = compareNodes(c1.getNode(), c2.getNode());
 }
 return ret;
 }

取最大集群状态版本号(每次集群状态变更,版本号也会递增),相同则取节点ID(节点ID为64位UUID值)。

5、如果master node存活,则同样选取上面排序号的activeMasters的第一个。

pingAndWait

pingAndWait的实现在org.elasticsearch.discovery.zen.UnicastZenPing#pin 中。

protected void ping(final Consumer<PingCollection> resultsConsumer,
 final TimeValue scheduleDuration,
 final TimeValue requestDuration) {
 final List<DiscoveryNode> seedNodes;
 try {
  seedNodes = resolveHostsLists(
 unicastZenPingExecutorService,
 logger,
 configuredHosts,
 limitPortCounts,
 transportService,
 UNICAST_NODE_PREFIX,
 resolveTimeout);
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 seedNodes.addAll(hostsProvider.buildDynamicNodes());
 final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
 // add all possible master nodes that were active in the last known cluster configuration
 for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
 seedNodes.add(masterNode.value);
 }
final ConnectionProfile connectionProfile =
 ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
  final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
 nodes.getLocalNode(), connectionProfile);
 activePingingRounds.put(pingingRound.id(), pingingRound);
 final AbstractRunnable pingSender = new AbstractRunnable() {
 @Override
 public void onFailure(Exception e) {
 if (e instanceof AlreadyClosedException == false) {
 logger.warn("unexpected error while pinging", e);
 }
 }
@Override
 protected void doRun() throws Exception {
 sendPings(requestDuration, pingingRound);
 }
 };
  threadPool.generic().execute(pingSender);
 threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
 threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
 threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
 @Override
 protected void doRun() throws Exception {
 finishPingingRound(pingingRound);
 }
@Override
 public void onFailure(Exception e) {
 logger.warn("unexpected error while finishing pinging round", e);
 }
 });
 }

1、node ip port发现。不细展开,基本过程如下:

根据配置选项 发现域名:discovery.zen.ping.unicast.hosts(默认["127.0.0.1", "[::1]"])和发现端口transport.profiles.default.port(默认9300-9400),来候选节点位置,如果配置的IP则取前5个端口,如果是域名,则DNS获取所有对应IP下的1个端口集合。

2、构造PingingRound实例,PingingRound主要属性:

List<DiscoveryNode> seedNodes:要发现的Node列表

PingCollection pingCollection:ping 响应集合

Consumer<PingCollection> pingListener:当ping结束后回调方法。

3、3次线程调度pingSender,并在scheduleDuration(超时)结束后AbstractRunnable回调。

在pingSender中向每个发现节点Ping,如果有响应并且是同一个是集群则计入PingingRound.pingCollection实例中。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Elasticsearch
创建时间:2020-05-22 14:49:51
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。我们建立一个网站或应用程序,并要添加搜索功能,但是想要完成搜索工作的创建是非常困难的。我们希望搜索解决方案要运行速度快,我们希望能有一个零配置和一个完全免费的搜索模式,我们希望能够简单地使用JSON通过HTTP来索引数据,我们希望我们的搜索服务器始终可用,我们希望能够从一台开始并扩展到数百台,我们要实时搜索,我们要简单的多租户,我们希望建立一个云的解决方案。因此我们利用Elasticsearch来解决所有这些问题及可能出现的更多其它问题。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~