这一部分是CustomScan的三种实现。
在pg shard中,我们可以看到从分片查询的数据,会汇总到临时表,然后提供到上层查询,在citus里面,这里进行了优化,针对adaptive(OLTP)和TaskTracker(OLAP),insert select分别做了不同的处理,并非仅仅依赖临时表。
首先说AdaptiveExecutorCustomScanMethods
这个对象终指向AdaptiveExecutorCustomExecMethods,其中CitusExecScan就代表了Adaptive的扫描办法
AdaptiveExecutor 实际的执行器
调用ExecuteSubPlans执行下属执行计划
CreateDistributedExecution创建执行器
StartDistributedExecution 启动执行器
创建2PC相关的锁,以及外键表的检查加锁
对于本地查询 执行RunLocalExecution,终调用ExecuteLocalTaskPlan执行
CreateQueryDesc 创建查询
ExecutorStart 开始查询
ExecutorRun 执行查询
ExecutorFinish结束查询
ExecutorEnd 关闭查询
FreeQueryDesc 释放资源
对于需要顺序查询的任务,执行SequentialRunDistributedExecution
对task逐个循环调用RunDistributedExecution
对于可以并行额任务,执行RunDistributedExecution
这里通过异步IO监听处理WaitEventSetWait
每个task的event会绑定到WaitEventSetWait execution->waitEventSet
每次循环中,会再次循环检查所有wait状态来处理遇到的错误:postmaster故障,中断等
ConnectionStateMachine连接状态机负责连接的处理过程
TransactionStateMachine事务状态机负责事务的处理
REMOTE_TRANS_STARTED 执行StartPlacementExecutionOnSession发送SQL
REMOTE_TRANS_SENT_COMMAND 执行ReceiveResults接收结果集
对于非只读执行,这里也负责记录影响行总数executorState->es_processed += execution->rowsProcessed
FinishDistributedExecution结束并清理数据结构
如果有sort需求,并且是returing关键字指定的返回,这里调用SortTupleStore排序
CitusExecScan会直接调用ReturnTupleFromTuplestore获取到存储的结果集(可以类比临时表)
与之对应的,是TaskTrackerExecScan,以及CoordinatorInsertSelectExecScan。
TaskTrackerExecScan则不一样,对于未完成(!finishedRemoteScan)的扫描,会从分布式执行计划获取到job与对应的查询语句,并且,这里会检查确认没有复合子查询和CTE查询(tasktracker模式下不支持这两种查询)
随后对相关表加锁LockPartitionsInRelationList,创建对应文件夹PrepareMasterJobDirectory,终执行MultiTaskTrackerExecute并缓存(LoadTuplesIntoTupleStore)入执行结果集合scanState。
MultiTaskTrackerExecute
首先通过hash表分配连接TrackerHash
taskTrackerHash 执行查询
transmitTrackerHash 获取结果文件
随后while true循环,知道全部成功,或者遇到失败
ManageTaskExecution 调度任务执行到TaskExecStatus
ManageTransmitExecution获取执行结果到TransmitExecStatus
失败的情况直接跳出
TrackerCleanupResources 清理掉无用资源
处理期间产生的报错信息
CoordinatorInsertSelectExecScan
LockPartitionRelations 锁表
这里insert into select,进行hash关系重排,保证每个shard查询出来的语句直接对应到目标shard,避免额外的消耗
函数ExecuteSelectIntoColocatedIntermediateResults生成对应结果
BuildColumnNameListFromTargetList 获取列对应关系
PartitionColumnIndexFromColumnList 获取分区列对应关系
CreateCitusCopyDestReceiver 设置目标表的接收者
ExecuteQueryIntoDestReceiver 实际执行