绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Trino(Presto) Coordinator 处理请求
2022-05-12 15:23:02

1. Trino 的客户端与 Coordinator 交互过程

Trino 客户端和 Trino 的 Coordinator 通过 restful 接口进行交互,采用的是分批获取数据的模式。首先客户端向 Trino 发送一个 POST 请求,获取到一个 QueryId,然后接下来需要发送一次或多次的 GET 请求,去获取该 QueryId 对应的结果,每次 GET 请求的地址 nextUri 都可以从 Coordinator 返回的结果中获取到,当nextUri 为空时,说明此时的结果已经取完,请求结束。

2. Coordinator 处理客户端的 POST 请求

客户端发送的 POST 请求是

v1/statement 

通过检索 v1/statement ,Trino 的 Server 接收客户端请求的类是 QueuedStatementResource,以下是该类在 Trino 工程中的全路径

core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource 

其中接收请求的方法是 postStatement,Trino 中认为任务的过程分为两阶段,个阶段是 QueuedStatementResource,第二个阶段是 ExecutingStatementResource

    @ResourceSecurity(AUTHENTICATED_USER)
    @POST
    @Produces(APPLICATION_JSON)
    public Response postStatement(
            String statement,
            @Context HttpServletRequest servletRequest,
            @Context HttpHeaders httpHeaders,
            @Context UriInfo uriInfo)
    {
        if (isNullOrEmpty(statement)) {
            throw badRequest(BAD_REQUEST, "SQL statement is empty");
        }
        // Coordinator 的地址
        String remoteAddress = servletRequest.getRemoteAddr();
        Optional<Identity> identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));
        MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();

        SessionContext sessionContext = new HttpRequestSessionContext(headers, alternateHeaderName, remoteAddress, identity, groupProvider);
        // 通过 new Query,可以生成 queryId
        Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory);
        // queries 是一个 HashMap 存储 queryId 和 query 的映射
        queries.put(query.getQueryId(), query);

        // let authentication filter know that identity lifecycle has been handed off
        servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);
      // query.getQueryResults 返回次查询的结果
        return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled);
    }

代码中的后一行,返回的是客户端请求 Coordinator 次时的结果,主要是通过 getQueryResults 方法将结果封装到 QueryResult 中。

public QueryResults getQueryResults(long token, UriInfo uriInfo)
        {
            long lastToken = this.lastToken.get();
            // token should be the last token or the next token
            if (token != lastToken && token != lastToken + 1) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            // advance (or stay at) the token
            this.lastToken.compareAndSet(lastToken, token);

            synchronized (this) {
                // if query submission has not finished, return simple empty result
                // 次返回的时候,由于客户端提交的请求还在 Queued,所以 query 还没有提交运行
                // 因此次返回的时候就是下面的 createQueryResult 
                if (querySubmissionFuture == null || !querySubmissionFuture.isDone()) {
                    return createQueryResults(
                            token + 1,
                            uriInfo,
                            DispatchInfo.queued(NO_DURATION, NO_DURATION));
                }
            }
            // 下面的代码,当客户端的 query 请求提交成功后,会执行下面的代码
            Optional<DispatchInfo> dispatchInfo = dispatchManager.getDispatchInfo(queryId);
            if (dispatchInfo.isEmpty()) {
                // query should always be found, but it may have just been determined to be abandoned
                throw new WebApplicationException(Response
                        .status(NOT_FOUND)
                        .build());
            }

            return createQueryResults(token + 1, uriInfo, dispatchInfo.get());
        }

提交成功了,querySubmissionFuture 应该不为空了,但是关于Java 中 Future 的用法,我还不太了解,需要去看看怎么使用。

在其实提交成功与否,后执行的是同一个方法,只不过传入的参数不一样,不一样的地方在于是否有 elapsedTime 和 queuedTime,因为还没有提交成功,所以这两个值还不确定。等 query 提交成功就知道 elapsedTime 和 queuedTime 了。

然后接下来看 createQueryResults 方法

private QueryResults createQueryResults(long token, UriInfo uriInfo, DispatchInfo dispatchInfo)
        {
            // 首先获取 nextUri,如果此时的 query 已经 dispatched,就会重定向到新的 Uri
            URI nextUri = getNextUri(token, uriInfo, dispatchInfo);
            // 如果中间存在失败的信息,就会返回错误的信息
            Optional<QueryError> queryError = dispatchInfo.getFailureInfo()
                    .map(this::toQueryError);
            // 创建 QueryResult,这个类就是 Coordinator 返回给客户端的封装类
            return QueuedStatementResource.createQueryResults(
                    queryId,
                    nextUri,
                    queryError,
                    uriInfo,
                    queryInfoUrl,
                    dispatchInfo.getElapsedTime(),
                    dispatchInfo.getQueuedTime());
        }

里面的 getNextUri 方法,用于获取下一次 GET 请求的 Uri。如果此时的 query 已经 dispathcd,那么就重定向到 v1/statement/executing ,若还没有 dispatched,则 nextUri 是 v1/statement/queued

private URI getNextUri(long token, UriInfo uriInfo, DispatchInfo dispatchInfo)
        {
            // if failed, query is complete
            if (dispatchInfo.getFailureInfo().isPresent()) {
                return null;
            }
            // if dispatched, redirect to new uri
            // dispatched 以后,getCoordinatorLocation 可以获取到值
            // 就重定向到 v1/statement/executing
            // 否则就还是 v1/statement/queued
            return dispatchInfo.getCoordinatorLocation()
                    .map(coordinatorLocation -> getRedirectUri(coordinatorLocation, uriInfo))
                    .orElseGet(() -> getQueuedUri(queryId, slug, token, uriInfo));
        }

slug 和 token 都是拼接在 Uri 里面的,slug 不重定向的话,不会改变,而 token 每次都 +1,可以用来按顺序获取数据。queued/{queryId}/{slug}/{token}

执行完上面的 createQueryResults 方法,意味着 Coordinator 处理客户端的次(也是一次) POST 请求已经结束了,会将 QueryResult 这个类返回给客户端,由客户端解析 QueryResult,然后客户端再次发送 GET 请求到 Coordinator,次返回的 Uri 是 queued/{queryId}/{slug}/{token},根据这个 Uri 客户端可以发送 GET 请求。

3. Coordinator 处理客户端的 GET 请求

根据上面我们知道 Coordinator 给客户端返回了一个 POST 请求的结果 QueryResult,此时客户端会解析 QueryResult,提取出 nextUri,并且向 Coordinator 发起 GET 请求。

响应 GET 请求的方法是 QueuedStatementResource#getStatus

    @ResourceSecurity(PUBLIC)
    @GET
    @Path("queued/{queryId}/{slug}/{token}")
    @Produces(APPLICATION_JSON)
    public void getStatus(
            @PathParam("queryId") QueryId queryId,
            @PathParam("slug") String slug,
            @PathParam("token") long token,
            @QueryParam("maxWait") Duration maxWait,
            @Context UriInfo uriInfo,
            @Suspended AsyncResponse asyncResponse)
    {
        // 首先根据 queryId, slug, token 从映射中获取到 query 对象
        Query query = getQuery(queryId, slug, token);

        // wait for query to be dispatched, up to the wait timeout
        // 通过调用 query.waitForDispatched() 创建 query
        ListenableFuture<?> futureStateChange = addTimeout(
                query.waitForDispatched(),
                () -> null,
                WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
                timeoutExecutor);

        // when state changes, fetch the next result
        ListenableFuture<QueryResults> queryResultsFuture = Futures.transform(
                futureStateChange,
                ignored -> query.getQueryResults(token, uriInfo),
                responseExecutor);

        // transform to Response
        ListenableFuture<Response> response = Futures.transform(
                queryResultsFuture,
                queryResults -> createQueryResultsResponse(queryResults, compressionEnabled),
                directExecutor());
        bindAsyncResponse(asyncResponse, response, responseExecutor);
    } 

query.waitForDispathced 方法可以创建 query

private ListenableFuture<?> waitForDispatched()
        {
            // if query submission has not finished, wait for it to finish
            synchronized (this) {
            // 如果 query 没提交成功,此时需要创建
            // 这里应该就是创建任务了,在此之前没有创建过查询
                if (querySubmissionFuture == null) {
                    querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query);
                }
                if (!querySubmissionFuture.isDone()) {
                    return querySubmissionFuture;
                }
            }

            // otherwise, wait for the query to finish
            return dispatchManager.waitForDispatched(queryId);
        }

关键的方法就是 dispatchManager.createQuery

public ListenableFuture<?> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query)
    {
        requireNonNull(queryId, "queryId is null");
        requireNonNull(sessionContext, "sessionContext is null");
        requireNonNull(query, "query is null");
        checkArgument(!query.isEmpty(), "query must not be empty string");
        checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);

        DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
        dispatchExecutor.execute(() -> {
            try {
            // 这个方法是创建 Query 的细节,非常重要
                createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
            }
            finally {
                queryCreationFuture.set(null);
            }
        });
        return queryCreationFuture;
    }

createQueryInternal 方法

private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
    {
        Session session = null;
        PreparedQuery preparedQuery = null;
        try {
            if (query.length() > maxQueryLength) {
                int queryLength = query.length();
                query = query.substring(, maxQueryLength);
                throw new TrinoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
            }

            // decode session
            session = sessionSupplier.createSession(queryId, sessionContext);

            // check query execute permissions,检查是否有权限执行
            accessControl.checkCanExecuteQuery(sessionContext.getIdentity());

            // prepare query
            // 对 sql 进行词法解析和语法解析,Trino 使用的是 Antlr4 进行 SQL 解析
            preparedQuery = queryPreparer.prepareQuery(session, query);

            // select resource group 选择资源组
            Optional<String> queryType = getQueryType(preparedQuery.getStatement().getClass()).map(Enum::name);
            SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    sessionContext.getIdentity().getGroups(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

            // apply system default session properties (does not override user set properties)
            session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());

            // mark existing transaction as active
            transactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);
            // 创建 dispatchQuery 
            DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
                    session,
                    query,
                    preparedQuery,
                    slug,
                    selectionContext.getResourceGroupId());

            boolean queryAdded = queryCreated(dispatchQuery);
            if (queryAdded && !dispatchQuery.isDone()) {
                try {
                    // 将 dispatchQuery 提交给 resourceGroupManager
                    resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);
                }
                catch (Throwable e) {
                    // dispatch query has already been registered, so just fail it directly
                    dispatchQuery.fail(e);
                }
            }
        }
        catch (Throwable throwable) {
            // creation must never fail, so register a failed query in this case
            if (session == null) {
                session = Session.builder(new SessionPropertyManager())
                        .setQueryId(queryId)
                        .setIdentity(sessionContext.getIdentity())
                        .setSource(sessionContext.getSource())
                        .build();
            }
            Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
            DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);
            queryCreated(failedDispatchQuery);
        }
    }


dispatchQueryFactory.createDispatchQuery 这个方法里面涉及到了 SQL 的逻辑计划的生成

来自:https://zhuanlan.zhihu.com/p/377066960

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Trino
创建时间:2022-04-12 14:37:38
Trino
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 飘絮絮絮丶
    专家
戳我,来吐槽~