没错,这张图还是之前文章里的那一张
我在本地写了一小段代码,模拟客户端向 Trino 的 Coordinator 发送请求。代码主要来源于 Trino-client 中的 StatementClientV1#buildQueryResult 以及 StatementClientV1#advance()
本质就是利用 OKHttp3 向 Coordinator 发送 POST 和 GET 请求
代码的几个关键点:
- 如果要对 Coordinator 进行 debug,由于客户端本身设置的有超时时长,后续需要客户端继续发请求的时候,客户端早就超时失败了,因此可以设置 readTimeout 防止超时
- query 的内容是一个字符串,不要带分号
- 由于数据是分批获取,因此可能某几次的 response 中包含部分结果,需要将数据都存放在 List 中,后打印输出
- 如果某次的 response 中有数据返回,那么执行 getData 方法就不为空,此时就可以放置到 res 中了
import io.airlift.json.JsonCodec;
import io.trino.client.JsonResponse;
import io.trino.client.QueryResults;
import okhttp3.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.airlift.json.JsonCodec.jsonCodec;
public class TestTrinoTest {
private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static void main(String[] args) throws URISyntaxException {
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
// 创建客户端,这里设置了 readTimeout 和 callTimeout
// 设置的目的是 debug Coordinator 的时候客户端会超时失败,导致后续无法进行
OkHttpClient client = clientBuilder
.callTimeout(1, TimeUnit.DAYS)
.readTimeout(1, TimeUnit.DAYS)
.build();
// 客户端发送 POST 请求的 Uri
URI postUri = new URI("http://localhost:8080/v1/statement");
HttpUrl url = HttpUrl.get(postUri);
// 要查询的语句,注意这里不能带上,语句不能带分号,带上会报错
// show catalogs 而不是 show catalogs;
String query = "show catalogs";
Request.Builder builder = new Request.Builder()
.url(url)
.post(RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), query))
.addHeader("X-Trino-User", "trino")
.addHeader("User-Agent", "StatementClientV1/356");
// 构建了 post 请求
Request post = builder.build();
// 接受 QueryResult 的返回结果
JsonResponse<QueryResults> response;
// 执行请求将结果给 response
response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, post);
System.out.println(response.getResponseBody());
// 上面是 POST 请求的过程
// 下面就是客户端发送 GET 请求的过程了
// 首先获取 nextUri
URI nextUri = response.getValue().getNextUri();
System.out.println(nextUri);
// 用于存放查询获取的数据
List<Object> res = new ArrayList<>();
// 如果 nextUri 不为空,就一直发送 GET 请求
while (nextUri != null) {
Request get = new Request.Builder()
.get()
.url(HttpUrl.get(nextUri))
.addHeader("X-Trino-User", "trino")
.addHeader("User-Agent", "StatementClientV1/356")
.build();
response = JsonResponse.execute(QUERY_RESULTS_CODEC, client, get);
nextUri = response.getValue().getNextUri();
System.out.println(response.getResponseBody());
// 如果获取到了数据,getData 就不为空,此时将数据全塞进 res 中
if (response.getValue().getData() != null) {
res.addAll((Collection<?>) response.getValue().getData());
}
System.out.println("nextUri 的值是: " + nextUri);
}
System.out.println("==========");
// 后遍历输出结果就可以了
for (int i = ; i < res.size(); i++) {
System.out.println(res.get(i));
}
}
}
代码中由于使用了 Okhttp3 以及 Trino-client 的包,需要我们在 maven 中引入 jar 包
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.14.9</version>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-client</artifactId>
<version>356</version>
</dependency>
在客户端与 Trino 的 Coordinator 交互过程中,我将 Trino 的 Coordinator 返回的 QueryResult 都打印出来了,可以看到他们的过程。
这是 Trino 的 Coordinator 返回的次结果,接收的是客户端的 POST 请求,次请求相当于只是创建了查询任务,所以各项指标都是 0
下面是 Trino 的 Coordinator 接收次 GET 请求的时候,返回的结果,此时的状态还是排队,但是跟前面的排队有些不同了。
由于此时的返回结果中有 nextUri,因此客户端会进行下一次 GET 请求
终于有一些变化了,接受 GET 请求的类由 QueuedStatementResource 变成了 ExecutingStatementResource,nextUri 已经被重定向了
下一次的返回报文比较长,截图不完全,各位看官自行去 json.cn 解析看吧
{"id":"20210604_025513_00000_j6yaa","infoUri":"http://localhost:8080/ui/query.html?20210604_025513_00000_j6yaa","partialCancelUri":"http://localhost:8080/v1/statement/executing/partialCancel/20210604_025513_00000_j6yaa/1/y2806aa3c927a872a4c77b2bd02ba93c716f45e55/1","nextUri":"http://localhost:8080/v1/statement/executing/20210604_025513_00000_j6yaa/y2806aa3c927a872a4c77b2bd02ba93c716f45e55/1","columns":[{"name":"Catalog","type":"varchar(7)","*ignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":7}]}}],"stats":{"state":"RUNNING","queued":false,"scheduled":true,"nodes":1,"totalSplits":19,"queuedSplits":18,"runningSplits":,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"queuedTimeMillis":31,"elapsedTimeMillis":4289,"processedRows":,"processedBytes":,"physicalInputBytes":,"peakMemoryBytes":,"spilledBytes":,"rootStage":{"stageId":"0","state":"RUNNING","done":false,"nodes":1,"totalSplits":1,"queuedSplits":1,"runningSplits":,"completedSplits":,"cpuTimeMillis":,"wallTimeMillis":,"processedRows":,"processedBytes":,"physicalInputBytes":,"subStages":[{"stageId":"1","state":"FLUSHING","done":false,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":,"completedSplits":,"cpuTimeMillis":,"wallTimeMillis":,"processedRows":,"processedBytes":,"physicalInputBytes":,"subStages":[{"stageId":"2","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":,"runningSplits":,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"processedRows":7,"processedBytes":,"physicalInputBytes":,"subStages":[]}]}]},"progressPercentage":5.2631578947368425},"warnings":[]}
里面还有一个指标是 progressPercentage 表示我们已经处理完的百分比,此时是 5.263
说明少还有下一次请求,然后它来了,同样还是很长,这次的响应里面带着我们想要的结果了
{"id":"20210604_025513_00000_j6yaa","infoUri":"http://localhost:8080/ui/query.html?20210604_025513_00000_j6yaa","columns":[{"name":"Catalog","type":"varchar(7)","*ignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":7}]}}],"data":[["example"],["jmx"],["memory"],["raptor"],["system"],["tpcds"],["tpch"]],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"nodes":1,"totalSplits":19,"queuedSplits":0,"runningSplits":0,"completedSplits":19,"cpuTimeMillis":80,"wallTimeMillis":748,"queuedTimeMillis":31,"elapsedTimeMillis":4426,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"peakMemoryBytes":0,"spilledBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":2,"wallTimeMillis":90,"processedRows":7,"processedBytes":96,"physicalInputBytes":0,"subStages":[{"stageId":"1","state":"FINISHED","done":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"cpuTimeMillis":61,"wallTimeMillis":616,"processedRows":7,"processedBytes":96,"physicalInputBytes":0,"subStages":[{"stageId":"2","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":17,"wallTimeMillis":42,"processedRows":7,"processedBytes":0,"physicalInputBytes":0,"subStages":[]}]}]},"progressPercentage":100.0},"warnings":[]}
至此,客户端与 Trino 的 Coordinator 交互就告一段落了。
后再附上我本地的执行结果,可以看到与图中的 data 一致
其实客户端的核心就是向 Coordinator 发送 POST 和 GET 请求,Trino-cli 和 Trino-client 那么多的代码,只是围着 Http 请求打辅助而已。、
原文链接:https://zhuanlan.zhihu.com/p/377833187