绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
手动模拟 Trino 客户端,详解客户端与服务端的交互报文
2022-05-12 15:27:37

本文章的目的主要是为了搞清楚 Trino 的 Coordinator 接收到客户端发送的请求后,会返回给客户端什么响应,之前在文章里提到了返回的结果被包装在 QueryResult 中,因此本文对 QueryResult 的内容进行打印输出,看看一次查询中客户端和 Trino 的 Coordinator 的交互过程,同时也可以了解到客户端的本质。

没错,这张图还是之前文章里的那一张

我在本地写了一小段代码,模拟客户端向 Trino 的 Coordinator 发送请求。代码主要来源于 Trino-client 中的 StatementClientV1#buildQueryResult 以及 StatementClientV1#advance()

本质就是利用 OKHttp3 向 Coordinator 发送 POST 和 GET 请求

代码的几个关键点:

  1. 如果要对 Coordinator 进行 debug,由于客户端本身设置的有超时时长,后续需要客户端继续发请求的时候,客户端早就超时失败了,因此可以设置 readTimeout 防止超时
  2. query 的内容是一个字符串,不要带分号
  3. 由于数据是分批获取,因此可能某几次的 response 中包含部分结果,需要将数据都存放在 List 中,后打印输出
  4. 如果某次的 response 中有数据返回,那么执行 getData 方法就不为空,此时就可以放置到 res 中了
import io.airlift.json.JsonCodec;
import io.trino.client.JsonResponse;
import io.trino.client.QueryResults;
import okhttp3.*;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static io.airlift.json.JsonCodec.jsonCodec;

public class TestTrinoTest {
    private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);

    public static void main(String[] args) throws URISyntaxException {

        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
        // 创建客户端,这里设置了 readTimeout 和 callTimeout
        // 设置的目的是 debug Coordinator 的时候客户端会超时失败,导致后续无法进行
        OkHttpClient client = clientBuilder
                .callTimeout(1, TimeUnit.DAYS)
                .readTimeout(1, TimeUnit.DAYS)
                .build();
        // 客户端发送 POST 请求的 Uri
        URI postUri = new URI("http://localhost:8080/v1/statement");
        HttpUrl url = HttpUrl.get(postUri);
        // 要查询的语句,注意这里不能带上,语句不能带分号,带上会报错
        // show catalogs  而不是 show catalogs;
        String query = "show catalogs";
        Request.Builder builder = new Request.Builder()
                .url(url)
                .post(RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), query))
                .addHeader("X-Trino-User", "trino")
                .addHeader("User-Agent", "StatementClientV1/356");
        // 构建了 post 请求
        Request post = builder.build();
        // 接受 QueryResult 的返回结果
        JsonResponse<QueryResults> response;
        // 执行请求将结果给 response
        response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, post);
        System.out.println(response.getResponseBody());
        // 上面是 POST 请求的过程
        // 下面就是客户端发送 GET 请求的过程了
        // 首先获取 nextUri
        URI nextUri = response.getValue().getNextUri();
        System.out.println(nextUri);
        // 用于存放查询获取的数据
        List<Object> res = new ArrayList<>();
        // 如果 nextUri 不为空,就一直发送 GET 请求
        while (nextUri != null) {
            Request get = new Request.Builder()
                    .get()
                    .url(HttpUrl.get(nextUri))
                    .addHeader("X-Trino-User", "trino")
                    .addHeader("User-Agent", "StatementClientV1/356")
                    .build();
            response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, get);
            nextUri = response.getValue().getNextUri();
            System.out.println(response.getResponseBody());
            // 如果获取到了数据,getData 就不为空,此时将数据全塞进 res 中
            if (response.getValue().getData() != null) {
                res.addAll((Collection<?>) response.getValue().getData());
            }
            System.out.println("nextUri 的值是: " + nextUri);
        }
        System.out.println("==========");
        // 后遍历输出结果就可以了
        for (int i = ; i < res.size(); i++) {
            System.out.println(res.get(i));
        }
    }
}


代码中由于使用了 Okhttp3 以及 Trino-client 的包,需要我们在 maven 中引入 jar 包

        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.14.9</version>
        </dependency>
        <dependency>
            <groupId>io.trino</groupId>
            <artifactId>trino-client</artifactId>
            <version>356</version>
        </dependency>

在客户端与 Trino 的 Coordinator 交互过程中,我将 Trino 的 Coordinator 返回的 QueryResult 都打印出来了,可以看到他们的过程。

这是 Trino 的 Coordinator 返回的次结果,接收的是客户端的 POST 请求,次请求相当于只是创建了查询任务,所以各项指标都是 0


下面是 Trino 的 Coordinator 接收次 GET 请求的时候,返回的结果,此时的状态还是排队,但是跟前面的排队有些不同了。

由于此时的返回结果中有 nextUri,因此客户端会进行下一次 GET 请求

终于有一些变化了,接受 GET 请求的类由 QueuedStatementResource 变成了 ExecutingStatementResource,nextUri 已经被重定向了

下一次的返回报文比较长,截图不完全,各位看官自行去 json.cn 解析看吧


{"id":"20210604_025513_00000_j6yaa","infoUri":"http://localhost:8080/ui/query.html?20210604_025513_00000_j6yaa","partialCancelUri":"http://localhost:8080/v1/statement/executing/partialCancel/20210604_025513_00000_j6yaa/1/y2806aa3c927a872a4c77b2bd02ba93c716f45e55/1","nextUri":"http://localhost:8080/v1/statement/executing/20210604_025513_00000_j6yaa/y2806aa3c927a872a4c77b2bd02ba93c716f45e55/1","columns":[{"name":"Catalog","type":"varchar(7)","*ignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":7}]}}],"stats":{"state":"RUNNING","queued":false,"scheduled":true,"nodes":1,"totalSplits":19,"queuedSplits":18,"runningSplits":,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"queuedTimeMillis":31,"elapsedTimeMillis":4289,"processedRows":,"processedBytes":,"physicalInputBytes":,"peakMemoryBytes":,"spilledBytes":,"rootStage":{"stageId":"0","state":"RUNNING","done":false,"nodes":1,"totalSplits":1,"queuedSplits":1,"runningSplits":,"completedSplits":,"cpuTimeMillis":,"wallTimeMillis":,"processedRows":,"processedBytes":,"physicalInputBytes":,"subStages":[{"stageId":"1","state":"FLUSHING","done":false,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":,"completedSplits":,"cpuTimeMillis":,"wallTimeMillis":,"processedRows":,"processedBytes":,"physicalInputBytes":,"subStages":[{"stageId":"2","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":,"runningSplits":,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"processedRows":7,"processedBytes":,"physicalInputBytes":,"subStages":[]}]}]},"progressPercentage":5.2631578947368425},"warnings":[]}


里面还有一个指标是 progressPercentage 表示我们已经处理完的百分比,此时是 5.263

说明少还有下一次请求,然后它来了,同样还是很长,这次的响应里面带着我们想要的结果了

{"id":"20210604_025513_00000_j6yaa","infoUri":"http://localhost:8080/ui/query.html?20210604_025513_00000_j6yaa","columns":[{"name":"Catalog","type":"varchar(7)","*ignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":7}]}}],"data":[["example"],["jmx"],["memory"],["raptor"],["system"],["tpcds"],["tpch"]],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"nodes":1,"totalSplits":19,"queuedSplits":0,"runningSplits":0,"completedSplits":19,"cpuTimeMillis":80,"wallTimeMillis":748,"queuedTimeMillis":31,"elapsedTimeMillis":4426,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"peakMemoryBytes":0,"spilledBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":2,"wallTimeMillis":90,"processedRows":7,"processedBytes":96,"physicalInputBytes":0,"subStages":[{"stageId":"1","state":"FINISHED","done":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"cpuTimeMillis":61,"wallTimeMillis":616,"processedRows":7,"processedBytes":96,"physicalInputBytes":0,"subStages":[{"stageId":"2","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"processedRows":7,"processedBytes":0,"physicalInputBytes":0,"subStages":[]}]}]},"progressPercentage":100.0},"warnings":[]}

至此,客户端与 Trino 的 Coordinator 交互就告一段落了。

后再附上我本地的执行结果,可以看到与图中的 data 一致

其实客户端的核心就是向 Coordinator 发送 POST 和 GET 请求,Trino-cli 和 Trino-client 那么多的代码,只是围着 Http 请求打辅助而已。、

原文链接:https://zhuanlan.zhihu.com/p/377833187

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Trino
创建时间:2022-04-12 14:37:38
Trino
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 飘絮絮絮丶
    专家
戳我,来吐槽~