Flink SQL 在美团
SQL 作业无法细粒度修改 StateTTL、并发等配置导致资源浪费。
SQL 修改逻辑无法从原先状态恢复。
SQL 作业出现数据正确性问题难以排查。
SQL 作业细粒度配置
个问题是在哪个阶段采集 TTL 信息。因为 Flink 的 TTL 信息与状态绑定,只有在创建具体的状态描述符时才能知晓,而 Transformation 层无法得知作业的状态情况,因此我们终决定在 ExecNode 到 Transformation 的转换过程中采集 TTL 信息。
第二个问题是怎么标识 TTL。我们给 ExecNode 增加了如 Transformation ID 一样的标识,并引入了一个工作栈,存储正在被翻译的 ExecNode。每次 ExecNode 在调用 translateToPlanInternal 方法前,我们获取自增的 ExecNode ID,并将其插入到工作栈中。当 ExecNode 的翻译结束后,从栈顶移除,再建立 Transformation 到 ExecNode,再到 TTL 的映射关系。
支持单独修改算子并发并从状态恢复。
支持单独修改算子的 slotSharingGroup。
支持修改 ChainStrategy 并从状态恢复。
SQL 作业变更支持从状态恢复
Graph Migration:作业的变更仅发生在 Pipleline 的拓扑结构层面,即节点与边的属性发生变化。这类场景我们可以通过分享的个工作可编辑执行计划来解决。 Operator Migration:作业变更仅发生在算子状态层级,DAG 不发生变化。这类场景包括新增了一些聚合指标、关联新增属性等等。 SavepointMigration:作业在 DAG 层面和算子状态同时发生变更。对应的场景是用离线数据为新任务初始化状态。
SQL 层使用 AST 做业务逻辑兼容性校验。
基于可编辑执行计划做拓扑逻辑兼容性校验。
状态 Schema 兼容性校验。
COMPATIBLE_AS_IS,指作业可以直接从老状态恢复,对应的含义是新老作业是没有发生任何变化。
COMPATIBLE_AFTER_RENAME,指作业通过调整 Operator ID 后可从老状态恢复。它对应的业务场景是修改算子并发或者调整作业的 chain 逻辑等。
COMPATIBLE_AFTER_MIGRATION,指作业不可直接从老状态恢复,必须通过状态迁移制作新状态后,可从新状态恢复。对应的场景是新增聚合、去重以及 Join 等算子指标或者字段,也是我们本次分享重点所解决的场景。
INCOMPATIBLE,指作业的新老状态完全不兼容,且无法通过迁移制作任何新状态。对应的场景是其他 SQL 逻辑改动,如交换指标顺序,增减算子以及一些我们可能仍未支持状态迁移的场景。这个也是我们后续对状态迁移需要完善的工作方向之一。
SQL 正确性问题排查能力建设
Flink SQL 作业与已有的自研系统结果对比。
通过实时作业与离线的作业结果对比。
Flink SQL 作业主备链路双跑结果对比。
排查门槛很高,对于业务同学来讲不了解 Flink SQL 底层原理,对于平台同学来讲不了解用户业务。出现正确性的问题后,无从下手。
排查定位周期长,由于没有可借助的工具,所以需要花费几天甚至更长时间定位问题。
严重影响线上业务数据正常产出,用户不得不将 SQL 作业重新迁回到原来的作业上,这大大阻碍了 Flink 作业 SQL 化的进程。
Trace 系统的一次 rpc 调用具备全局关联性,而对于 Flink SQL 来说只能做到同一个 Task 间局部关联性。
Trace 系统中数据上报需要业务在关键方法中手动埋点,但是对于 Flink SQL 来说手动埋点代价极高,我们期望与 Flink 引擎解耦,方便以后 Flink 版本升级维护。
Trace 系统中数据量大,允许部分数据丢失,而 Flink SQL 排查工具是不允许数据丢失的,希望能支持打印部分算子的输入输出。
Trace 系统中的数据有全局关联性可以做到自动归因,在 Flink 中数据没有全局关联性,只能手动分析不能做到自动归因。
平台入口,用户在平台开启 Flink SQL Debug 功能调试作业,选择要输出数据的算子 ID 后,然后提交作业。
TM 启动时,数据监听程序监听 Flink SQL 关键方法,解析算子的输入输出数据,图中的小齿轮代表着解析数据的 javaagent 程序。
将解析后的数据同步发送到 Kafka 中。
通过工具将 Kafka 中的数据同步到 OLAP 引擎中。
后通过查询分析 OLAP 引擎 中的数据 ,排查定位问题。
对于同步算子之前数据传输,当数据经过 Subtask 所有的 Operator 处理后才能处理下一条数据。当数据进入 Subtask 的个 Operator 时 input_order +1,后面的 Operator 使用个 Operator 的 input_order,这些 Operator 的 input_order 要么一样或者要么部分为空。 对于开启了 mini-batch 功能后的算子,算子会攒批后处理,这一批数据也有关联性。 对于 LookupJoin,在同步情况下跟种情况是一样的,异步情况下算子间有相同的字段,可以通过该字段来关联算子关系。
未来展望
未来展望主要分为以下三部分:
Flink SQL 细粒度配置 在细粒度资源管理上,目前细粒度资源管理只支持 API 设置,所以也需要在 SQL 场景通过 Flink SQL 灵活配置的功能支持细粒度的资源管理。 Flink SQL 灵活配置结合 Flink autopilot 机制搭配使用,使得 SQL 作业能自动调整到比较理想状态。 Flink SQL State 希望 Flink SQL State 具备可查询的能力。 探索 SQL 改变后支持以懒迁移方式从状态恢复。 Flink SQL 排查工具 希望根据积累的经验,对 Flink SQL 支持上线前风险提示。 解决发现的已知乱序及性能问题。