Presto适合PB级海量数据复杂分析,交互式SQL查询,⽀持跨数据源查询。本文就将详细解析Client提交查询请求到PrestoServer端响应的细节,以及还原Presto资源组校验的实现过程。
▌Presto调度模块源码解析-Client提交请求
当客户端以控制台Console、脚本--execute等方式提交SQL作业时,Presto的Client会将作业相关的参数和脚本封装成一个Restful请求,提交给PrestoServer端。然后再进行后续的诸如解析执行计划、拆分Stage、调度task到Worker端执行等操作。下面就介绍一下Client提交作业到服务端部分的源码。
Main函数
客户端提交作业的代码是从Presto的main函数开始的:
在Console的run方法中,如果入参中含有--execute,会直接将值取出作为待执行的SQL语句。否则认为是通过--file指定了SQL文件,此时会通过文件IO读取该文件中的SQL脚本。这种情况对应着通过脚本提交作业的情况,而如果--execute和--file都没有指定,则认为是通过控制台Console的方式提交SQL。
通过脚本提交作业
通过脚本的方式会直接执行Console类的executeCommand方法。按照“;”切分出SQL语句,并依次调用Console类的process方法来提交作业。
通过控制台提交作业
这种情况相对麻烦一些,他会执行Console.runConsole方法处理客户提交的请求。Presto为这种方式设置了一个AtomicBoolean existing变量来判断Client是否存在,如果不存在则不再提交后续的SQL(对应在控制台中输入了多条sql语句,并用;间隔,当前边的语句正在执行时退出Console,此时后续的sql就不会被提交了)。
在runConsole方法中可以看到,他会有一个while循环不断的循环处理LineReader对象读取到的命令,LineReader继承自jline.console.ConsoleReader,是一个专门处理控制台输入的Java类库(官方网址是https://jline.github.io/)。这个类每读取一行输入就会将值传递给一个名为buffer的StringBuilder对象,然后根据“;”和“\\G”来识别一个完整的SQL,并将SQL交给process方法进行调度。后会将剩下不完整的语句赋值给重新初始化的buffer对象作为下一条SQL的开头。
构建请求并发送
下面我们看一下process方法,这个方法中重要的部分是他会在try with resources中调用QueryRunner的startQuery方法,如下:
这个语法表示小括号中创建的对象如果实现了closable接口,则无论是否出现异常,都会在try catch结束后调用其close方法。
如下是startQuery方法:
然后我们一路点进去,经过QueryRunner的startInternalQuery、StatementClientFactory的newStatementClient方法之后,我们来到了StatementClientV1的构造函数:
在buildQueryRequest方法中,会构建一个目标Rest地址为/v1/statement的请求。
随后在JsonResponse.execute中会发起这个请求。然后我们搜索一下Rest地址/v1/statement,发现他的目标服务类为StatementResouce。
以上就是Client提交查询请求,到PrestoServer端响应的过程。
▌Presto调度模块源码解析-服务端响应-资源组选择(2)
当用户提交一个SQL作业时,Presto客户端会封装一个Request通过Restful接口将请求发送到服务端,下面就详细讲解一下服务端的处理过程。
Client端发送请求的地址是/v1/statement,对应到StatementResource的createQuery方法。在该方法中会调用Query的static方法create,在create方法中new了一个Query对象,然后会调用SqlQueryManager的createQuery方法。
在createQuery方法中首先会创建QueryId,生成规则是:
然后presto会判断集群是否有可用节点,其中isIncludeCoordinator变量对应config.properties配置文件中的node-scheduler.include-coordinator配置项,表示是否允许调度task到coordinator节点进行计算。
如果集群可用节点小于小值1(参数query-manager.initialization-required-workers),则给出熟悉的报错信息“Cluster is still initializing……”。
除此之外presto还对sql长度做了限制,要求不能超过query.max-length(默认1_000_000_000,表示10亿)。
然后presto会根据提交作业的客户端信息选择资源组。
configurationManager的类型是AtomicReference<ResourceGroupConfigurationManager>。selectGroup方法实现如下:
然后我们点进match方法,来到了ResourceGroupConfigurationManager接口中,我们看到这个方法的实现类有如下三个:
那么问题来了,当我们调用match方法时,执行的是这三个实现类中的哪一个呢?
我们首先看一下configurationManager初始化时的值,如下图所示初始化时其类型为LegacyResourceGroupConfigurationManager:
然后我们搜一下configurationManager的引用,发现在InternalResourceGroupManager类的setConfigurationManager方法中修改了他的值。如下图:
该方法在同一个类的loadConfigurationManager方法中被调用。loadConfigurationManager方法会判断常量RESOURCE_GROUPS_CONFIGURATION对应的etc/resource-groups.properties文件是否存在,如果存在会读取文件中配置的resource-groups.configuration-manager参数为Key值,到configurationManagerFactories中取出对应的ResourceGroupConfigurationManagerFactory对象,然后调用其create方法构造一个ResourceGroupConfigurationManager对象,终赋值给configurationManager。方法的实现如下图:
而loadConfigurationManager方法又在PrestoServer类的初始化方法中被调用。
PS:ResourceGroupManager的实现类型是在CoordinatorModule这个类中被注入的:
也就是说,当PrestoServer通过其main方法调用run方法进行初始化时, 会读取etc/resource-groups.properties文件中的配置项resource-groups.configuration-manager,再以它为Key值读取configurationManagerFactories中对应的ResourceGroupConfigurationManagerFactory,然后调用读取出来的工厂类的create方法构建ResourceGroupConfigurationManager对象,后赋值给InternalResourceGroupManager类的configurationManager。
另一个问题出现了,configurationManagerFactories这个Map<String,ResourceGroupConfigurationManagerFactory>类型的全局变量是在什么时候赋值的,里边都有哪些值呢?
我们还是搜索一下它的引用,发现在InternalResourceGroupManager的addConfigurationManagerFactory方法中对其进行了putIfAbsent操作(不存在则put)。
搜索引用发现,在PluginManager的installPlugin方法中调用了这个方法:
然后我们看一下plugin.getResourceGroupConfigurationManagerFactories方法的定义,发现他有两个实现类:
ResourceGroupManagerPlugin的实现如下:
H2ResourceGroupManagerPlugin的实现如下:
我们在addConfigurationManagerFactory方法中可以看到,添加到configurationManagerFactories这个Map中时,是以factory的name作为Key值,factory为Value的:
所以我们看一下这三个实现类对应的name值,也就是resource-groups.configuration-manager参数的可选值:
db:DbResourceGroupConfigurationManagerFactory
h2:H2ResourceGroupConfigurationManagerFactory
file:FileResourceGroupConfigurationManagerFactory
然后,我们回过头来看一下PluginManager的installPlugin方法,该方法在同类的loadPlugin方法中被调用:
loadPlugin方法又在该类中再次被调用:
再往上是loadPlugins方法:
再次向上查找,原来loadPlugins方法是在PrestoServer的run方法中,先与loadConfigurationManager方法被调用的:
也就是说,Presto默认是按照LegacyResourceGroupConfigurationManager进行资源组管理的。
在PrestoServer调用run方法进行初始化时,首先会执行PluginManager的loadPlugins方法,向InternalResourceGroupManager中一个存放ResourceGroupManagerFactory类型元素的Map添加可用的资源组管理工厂类。
然后会调用InternalResourceGroupManager的loadConfigurationManager方法,判断是否配置了参数resource-groups.configuration-manager,如果配置了则会按照配置的manager类型从这个Map中根据ResourceGroupFactory的name取出相应的factory。
后会根据取出的factory对象create一个ResourceGroupConfigurationManager,并将其赋值给configurationManager。
在Presto的官方文档中我们看到,presto只描述了一种name为file的ResourceGroupManagerFactory,对应FileResourceGroupConfigurationManagerFactory。看来这是官方比较推荐的类型。
接下来我们看一下FileResourceGroupConfigurationManager类的match方法,如下图:
入参SelectionCriteria是从session中取得的用户信息,如下图:
从match方法可以看到他会从selectors中找到跟session中用户信息相匹配的ResourceGroupSelector,如果得到的Optional对象不存在value,则给出熟悉的异常信息Query did not match any selection rule,如果存在value则作业继续向下执行。
selectors对象是从resource-groups.config-file配置项指定的文件中解析得到的ResourceGroup配置信息。其初始化的代码是在FileResourceGroupConfigurationManager的构造函数中。
其中,config.getConfigFile方法对应配置项resource-groups.config-file:
在buildSelectors方法中可以看到selectors中添加的对象类型是StaticSelector,这样在match方法的lambda表达式s -> s.match中,s对象就是StaticSelector类型的了。
在StaticSelector的match方法中我们看到,它会根据json文件中读取到的信息与客户端信息依次做校验,如校验不通过则返回一个没有值的Optional对象,以便selectGroup方法抛出异常。如果全部校验通过,终会封装一个SelectionContext类型的Optional对象返回。
以上就是Presto资源组校验的代码,后续将继续整理服务端响应作业提交请求的代码。
来源 https://zhuanlan.zhihu.com/p/46503705