绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
ES5.6 Bulk源码解析
2022-01-30 15:13:55

Bulk注册

在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。

ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
modules.add(actionModule);

在ActionModel中,注册了我们常用的一些操作action,比如说我们这次解析的BulkAction:

  actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
  actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
  actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);

并且初始化RestHandler:

 registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
 registerHandler.accept(new RestBulkAction(settings, restController));
 registerHandler.accept(new RestUpdateAction(settings, restController));

在RestBulkAction中规定了我们的查询方式:

  controller.registerHandler(POST, "/_bulk", this);
  controller.registerHandler(PUT, "/_bulk", this);
  controller.registerHandler(POST, "/{index}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/_bulk", this);
  controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

接收到请求

RestBulkAction在prepareRequest方法中将我们普通的RestRequest转化为BulkReqest,并通过NodeClient调用:

 channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));

而在NodeClient的bulk中则是调用了NodeClient的doExecute()方法。

doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener)

传入的Action是BulkAction.Instance,request就是上一步封装的BulkRequest,listener则是监听器。

在doExecute方法中,首先将普通的action转化为tansportAction,然后用转化后的tansportAction执行该请求:

transportAction(action).execute(request, listener);

bulkAction转化后变为TransportBulkAction,而TransportBulkAction的execute方法则是调用本身的doExecute()方法。在doExecut()方法中首先将存在和不存在的索引分类:

1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.

然后执行executeBulk()方法,接着在executeBulk中创建一个BulkOperation,并开始执行该BulkOperation:

void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
        final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

在BulkOperation中存在两次遍历Bulk中所有的请求,次遍历则将给该请求设置Routing,Mapping等等,如果允许产生ID,则自动生成ID。第二次遍历则是根据shardID将请求分类。ES官网有说到批量处理时让用bulk,原因是bulk处理请求时做了一些底层的优化。这就是一个优化点,将同一个shard的请求集合在一起直接发送到节点对应的shard,避免请求在节点间传递,影响效率。

for (int i = ; i < bulkRequest.requests.size(); i++) {
    ....
    switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
            ....
    }
....
}

....
for (int i = ; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
             }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
         }

然后针对不同的shardRequest,分别用shardBulkAction处理:

shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {}

每个shard的处理流程

接下来就是复杂的类继承关系了:

TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction

上一步的shardBulkAction.execute()方法则是执行的TransportAction的execute方法。我看得源码版本是5.6版本的,与5.0版本相比,ES增加了一个
TransportWriteAction类,而且在TransportReplicationAction不是直接运行run方法,而是通过transportService的RPC接口在实现功能。具体的流程如下:

1)TransportAction.execute()方法会调用TransportReplicationAction的doExecute()方法

2)在TransportReplicationAction的doExecute()方法中执行ReroutePhase的run方法,run方法中根据请求的shardID获取到primary shardID,同时得到primary shard的NodeID,如果当前节点包含primary shard,则执行performLocalAction方法,否则执行performRemoteAction。

3)performLocalAction和performRemoteAction终都将执行performAction方法,在performAction中我们可以看到,transportService发送请求:

transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {}

4)transportService接收到请求后用的PrimaryOperationTransportHandler处理,至于PrimaryOperationTransportHandler是在TransportReplicationAction中注册的:

transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
        new PrimaryOperationTransportHandler());

5)PrimaryOperationTransportHandler则是一个primary操作的处理类,在这个类接收到信息之后调用AsyncPrimaryAction处理:

@Override
    public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }

6)在AsyncPrimaryAction中首先获取shard锁,如果成功的获取到锁则调用自身的onresponse()方法,否则将获取操作加入线程池:

            synchronized (this) {
            releasable = tryAcquire();
            if (releasable == null) {
                // blockOperations is executing, this operation will be retried by blockOperations once it finishes
                if (delayedOperations == null) {
                    delayedOperations = new ArrayList<>();
                }
                final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                if (executorOnDelay != null) {
                    delayedOperations.add(
                        new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                } else {
                    delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                }
                return;
            }
        }

7)在onresponse中,如果该primaryShardReference已经被移动了,则获取到正确的primary shard和nodeID重新发送请求。否则就用primaryShardReference直接处理:

 @Override
    public void onResponse(PrimaryShardReference primaryShardReference) {
        try {
            if (primaryShardReference.isRelocated()) {
                primaryShardReference.close(); // release shard operation lock as soon as possible
                setPhase(replicationTask, "primary_delegation");
                // delegate primary phase to relocation target
                // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                final ShardRouting primary = primaryShardReference.routingEntry();
                assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                transportService.sendRequest(relocatingNode, transportPrimaryAction,
                    new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
                    transportOptions,
                    new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                        TransportReplicationAction.this::newResponseInstance) {

                        @Override
                        public void handleResponse(Response response) {
                            setPhase(replicationTask, "finished");
                            super.handleResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            setPhase(replicationTask, "finished");
                            super.handleException(exp);
                        }
                    });
            } else {
                setPhase(replicationTask, "primary");
                final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
                final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
                final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference, executeOnReplicas)
                        .execute();
            }
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
            onFailure(e);
        }
    }

8)createReplicatedOperation看名字还以为直接就是副本处理了,点进去看了之后才发现是先执行primary,后执行replia。

 primaryResult = primary.perform(request);
    ...
 performOnReplicas(replicaRequest, shards);

主分片处理

主分片的处理调用的是PrimaryShardReference.perform()方法,在该方法中则是调用shardOperationOnPrimary()进行主分片的处理。

shardOperationOnPrimary()方法则是由TransportShardBulkAction来实现的,具体执行的步骤如下:

1)获取节点中所有的索引元数据

2)获取版本号

3)更新mapping

4)调用Engin底层的代码。比如说primary.delete(delete),primary.index(operation)等等。

5)写到tanslog中

副本分片和主分片类似,这里就不做过多解释。



作者:YG_9013
链接:https://www.jianshu.com/p/e5c90a348931
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Elasticsearch
创建时间:2020-05-22 14:49:51
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。我们建立一个网站或应用程序,并要添加搜索功能,但是想要完成搜索工作的创建是非常困难的。我们希望搜索解决方案要运行速度快,我们希望能有一个零配置和一个完全免费的搜索模式,我们希望能够简单地使用JSON通过HTTP来索引数据,我们希望我们的搜索服务器始终可用,我们希望能够从一台开始并扩展到数百台,我们要实时搜索,我们要简单的多租户,我们希望建立一个云的解决方案。因此我们利用Elasticsearch来解决所有这些问题及可能出现的更多其它问题。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~