Trino 客户端和 Trino 的 Coordinator 通过 restful 接口进行交互,采用的是分批获取数据的模式。首先客户端向 Trino 发送一个 POST 请求,获取到一个 QueryId,然后接下来需要发送一次或多次的 GET 请求,去获取该 QueryId 对应的结果,每次 GET 请求的地址 nextUri 都可以从 Coordinator 返回的结果中获取到,当nextUri 为空时,说明此时的结果已经取完,请求结束。
2. Coordinator 处理客户端的 POST 请求
客户端发送的 POST 请求是
v1/statement
通过检索 v1/statement ,Trino 的 Server 接收客户端请求的类是 QueuedStatementResource,以下是该类在 Trino 工程中的全路径
core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource
其中接收请求的方法是 postStatement,Trino 中认为任务的过程分为两阶段,个阶段是 QueuedStatementResource,第二个阶段是 ExecutingStatementResource
@ResourceSecurity(AUTHENTICATED_USER)
@POST
@Produces(APPLICATION_JSON)
public Response postStatement(
String statement,
@Context HttpServletRequest servletRequest,
@Context HttpHeaders httpHeaders,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}
// Coordinator 的地址
String remoteAddress = servletRequest.getRemoteAddr();
Optional<Identity> identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));
MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();
SessionContext sessionContext = new HttpRequestSessionContext(headers, alternateHeaderName, remoteAddress, identity, groupProvider);
// 通过 new Query,可以生成 queryId
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory);
// queries 是一个 HashMap 存储 queryId 和 query 的映射
queries.put(query.getQueryId(), query);
// let authentication filter know that identity lifecycle has been handed off
servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);
、 // query.getQueryResults 返回次查询的结果
return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled);
}
代码中的后一行,返回的是客户端请求 Coordinator 次时的结果,主要是通过 getQueryResults 方法将结果封装到 QueryResult 中。
public QueryResults getQueryResults(long token, UriInfo uriInfo)
{
long lastToken = this.lastToken.get();
// token should be the last token or the next token
if (token != lastToken && token != lastToken + 1) {
throw new WebApplicationException(Response.Status.GONE);
}
// advance (or stay at) the token
this.lastToken.compareAndSet(lastToken, token);
synchronized (this) {
// if query submission has not finished, return simple empty result
// 次返回的时候,由于客户端提交的请求还在 Queued,所以 query 还没有提交运行
// 因此次返回的时候就是下面的 createQueryResult
if (querySubmissionFuture == null || !querySubmissionFuture.isDone()) {
return createQueryResults(
token + 1,
uriInfo,
DispatchInfo.queued(NO_DURATION, NO_DURATION));
}
}
// 下面的代码,当客户端的 query 请求提交成功后,会执行下面的代码
Optional<DispatchInfo> dispatchInfo = dispatchManager.getDispatchInfo(queryId);
if (dispatchInfo.isEmpty()) {
// query should always be found, but it may have just been determined to be abandoned
throw new WebApplicationException(Response
.status(NOT_FOUND)
.build());
}
return createQueryResults(token + 1, uriInfo, dispatchInfo.get());
}
提交成功了,querySubmissionFuture 应该不为空了,但是关于Java 中 Future 的用法,我还不太了解,需要去看看怎么使用。
在其实提交成功与否,后执行的是同一个方法,只不过传入的参数不一样,不一样的地方在于是否有 elapsedTime 和 queuedTime,因为还没有提交成功,所以这两个值还不确定。等 query 提交成功就知道 elapsedTime 和 queuedTime 了。
然后接下来看 createQueryResults 方法
private QueryResults createQueryResults(long token, UriInfo uriInfo, DispatchInfo dispatchInfo)
{
// 首先获取 nextUri,如果此时的 query 已经 dispatched,就会重定向到新的 Uri
URI nextUri = getNextUri(token, uriInfo, dispatchInfo);
// 如果中间存在失败的信息,就会返回错误的信息
Optional<QueryError> queryError = dispatchInfo.getFailureInfo()
.map(this::toQueryError);
// 创建 QueryResult,这个类就是 Coordinator 返回给客户端的封装类
return QueuedStatementResource.createQueryResults(
queryId,
nextUri,
queryError,
uriInfo,
queryInfoUrl,
dispatchInfo.getElapsedTime(),
dispatchInfo.getQueuedTime());
}
里面的 getNextUri 方法,用于获取下一次 GET 请求的 Uri。如果此时的 query 已经 dispathcd,那么就重定向到 v1/statement/executing ,若还没有 dispatched,则 nextUri 是 v1/statement/queued
private URI getNextUri(long token, UriInfo uriInfo, DispatchInfo dispatchInfo)
{
// if failed, query is complete
if (dispatchInfo.getFailureInfo().isPresent()) {
return null;
}
// if dispatched, redirect to new uri
// dispatched 以后,getCoordinatorLocation 可以获取到值
// 就重定向到 v1/statement/executing
// 否则就还是 v1/statement/queued
return dispatchInfo.getCoordinatorLocation()
.map(coordinatorLocation -> getRedirectUri(coordinatorLocation, uriInfo))
.orElseGet(() -> getQueuedUri(queryId, slug, token, uriInfo));
}
slug 和 token 都是拼接在 Uri 里面的,slug 不重定向的话,不会改变,而 token 每次都 +1,可以用来按顺序获取数据。queued/{queryId}/{slug}/{token}
执行完上面的 createQueryResults 方法,意味着 Coordinator 处理客户端的次(也是一次) POST 请求已经结束了,会将 QueryResult 这个类返回给客户端,由客户端解析 QueryResult,然后客户端再次发送 GET 请求到 Coordinator,次返回的 Uri 是 queued/{queryId}/{slug}/{token},根据这个 Uri 客户端可以发送 GET 请求。
3. Coordinator 处理客户端的 GET 请求
根据上面我们知道 Coordinator 给客户端返回了一个 POST 请求的结果 QueryResult,此时客户端会解析 QueryResult,提取出 nextUri,并且向 Coordinator 发起 GET 请求。
响应 GET 请求的方法是 QueuedStatementResource#getStatus
@ResourceSecurity(PUBLIC)
@GET
@Path("queued/{queryId}/{slug}/{token}")
@Produces(APPLICATION_JSON)
public void getStatus(
@PathParam("queryId") QueryId queryId,
@PathParam("slug") String slug,
@PathParam("token") long token,
@QueryParam("maxWait") Duration maxWait,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
// 首先根据 queryId, slug, token 从映射中获取到 query 对象
Query query = getQuery(queryId, slug, token);
// wait for query to be dispatched, up to the wait timeout
// 通过调用 query.waitForDispatched() 创建 query
ListenableFuture<?> futureStateChange = addTimeout(
query.waitForDispatched(),
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);
// when state changes, fetch the next result
ListenableFuture<QueryResults> queryResultsFuture = Futures.transform(
futureStateChange,
ignored -> query.getQueryResults(token, uriInfo),
responseExecutor);
// transform to Response
ListenableFuture<Response> response = Futures.transform(
queryResultsFuture,
queryResults -> createQueryResultsResponse(queryResults, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, response, responseExecutor);
}
query.waitForDispathced 方法可以创建 query
private ListenableFuture<?> waitForDispatched()
{
// if query submission has not finished, wait for it to finish
synchronized (this) {
// 如果 query 没提交成功,此时需要创建
// 这里应该就是创建任务了,在此之前没有创建过查询
if (querySubmissionFuture == null) {
querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query);
}
if (!querySubmissionFuture.isDone()) {
return querySubmissionFuture;
}
}
// otherwise, wait for the query to finish
return dispatchManager.waitForDispatched(queryId);
}
关键的方法就是 dispatchManager.createQuery
public ListenableFuture<?> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(sessionContext, "sessionContext is null");
requireNonNull(query, "query is null");
checkArgument(!query.isEmpty(), "query must not be empty string");
checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);
DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
dispatchExecutor.execute(() -> {
try {
// 这个方法是创建 Query 的细节,非常重要
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
finally {
queryCreationFuture.set(null);
}
});
return queryCreationFuture;
}
createQueryInternal 方法
private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
{
Session session = null;
PreparedQuery preparedQuery = null;
try {
if (query.length() > maxQueryLength) {
int queryLength = query.length();
query = query.substring(, maxQueryLength);
throw new TrinoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
}
// decode session
session = sessionSupplier.createSession(queryId, sessionContext);
// check query execute permissions,检查是否有权限执行
accessControl.checkCanExecuteQuery(sessionContext.getIdentity());
// prepare query
// 对 sql 进行词法解析和语法解析,Trino 使用的是 Antlr4 进行 SQL 解析
preparedQuery = queryPreparer.prepareQuery(session, query);
// select resource group 选择资源组
Optional<String> queryType = getQueryType(preparedQuery.getStatement().getClass()).map(Enum::name);
SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
sessionContext.getIdentity().getPrincipal().isPresent(),
sessionContext.getIdentity().getUser(),
sessionContext.getIdentity().getGroups(),
Optional.ofNullable(sessionContext.getSource()),
sessionContext.getClientTags(),
sessionContext.getResourceEstimates(),
queryType));
// apply system default session properties (does not override user set properties)
session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());
// mark existing transaction as active
transactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);
// 创建 dispatchQuery
DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
query,
preparedQuery,
slug,
selectionContext.getResourceGroupId());
boolean queryAdded = queryCreated(dispatchQuery);
if (queryAdded && !dispatchQuery.isDone()) {
try {
// 将 dispatchQuery 提交给 resourceGroupManager
resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);
}
catch (Throwable e) {
// dispatch query has already been registered, so just fail it directly
dispatchQuery.fail(e);
}
}
}
catch (Throwable throwable) {
// creation must never fail, so register a failed query in this case
if (session == null) {
session = Session.builder(new SessionPropertyManager())
.setQueryId(queryId)
.setIdentity(sessionContext.getIdentity())
.setSource(sessionContext.getSource())
.build();
}
Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);
queryCreated(failedDispatchQuery);
}
}
dispatchQueryFactory.createDispatchQuery 这个方法里面涉及到了 SQL 的逻辑计划的生成
来自:https://zhuanlan.zhihu.com/p/377066960