在过去的一年多时间里,我一直在队伍里推广 Apache Flink。很多朋友大概都知道,一年多以前,我负责的项目还在推 Spark,但如今我却极力推荐大家尝试使用 Flink。公司内外许多朋友开始问我:为什么一年里我的态度转变如此之大,特别是当我司加大对 Spark 的支持力度的时候。所以我想,也许我可以把自己的一些经验做一些总结。
我们部门用 Flink 的时间不长,应用领域限制在实时计算,所以我的经验也只能围绕一些我们实际用到的工程特性展开。至于其它一些特性比如机器学习支持之类,我也不甚了了。考虑到网上也有很多各种评测和讨论,诸君不妨在 Google 上搜索相关的文章。
新上手的惊喜:小功能的大作用
刚上手 Flink 时,我惊喜地发现 Flink 添加了两个很小但实用的功能,都恰到好处地解决了我们之前在用 Spark 时的一些工程上的痛点。这在一开始时给了我巨大的好感。
个好用的功能是项目模版。Flink 提供了 flink-quickstart-java 和 flink-quickstart-scala 插件,允许使用 Maven 的开发者创建统一的项目模版。熟悉 Maven 的朋友可能都了解,Maven 的 pom.xml 可以借助各种插件完成极其灵活的功能,但需要程序员对插件有足够多的了解才能驾驭。比如uber-jar 打包时去除不必要的依赖,在运行环境相对复杂的大型项目中十分必要,但很容易就因为配置不当混入额外的依赖包。在使用 Spark 的时候我们经常图省事把需要的 Spark 运行时包组成 uber-jar,产生的 jar 包动辄上百 MB,集成测试时上传更新非常不便。而 Flink 的模版则直接集成了一个良好的 uber-jar 配置,大部分 Flink 运行时包都被有效地排除了出去,终产生的 jar 包经常只有数百 KB 到几个 MB,上传下载都节约了不少时间,大大提高了集成测试的效率。
另一个我迅速喜欢上的功能是 RichFunction 接口。Flink 的开发始于 Java 7 时期,因此其大部分操作符都是从某个接口继承下来并扩展,直到 Java 8 开始才引入 lambda 作为操作符。作为习惯了 Spark 的开发者,我一开始也很自然地嫌弃麻烦的接口而尽量用 lambda,但很快地我就发现 Flink 的好处:它的 RichFunction 接口允许我实现一个 open() 函数,这恰恰是我用 lambda 时一直想要的功能。代码示例如下:
public class MyFilter extennds RichFilterFunctttttion<String> {
public void open(Configuration parameters) {
// Call loadLibrary()
}
public boolean filter(String value) {
// Call native functions via JNI
}
}
因为 open() 函数由 Flink 运行时保证在实际操作前被调用,我可以用它来完成一些重要的一次性工作,比如初始化 JNI 调用前必须的 loadLibrary() 操作。在使用 Spark 时,因为没有 open() 函数,我必须利用 Scala 的 lazy val 之类的语言机制来完成这类动作。然而 lazy val 本身一些限制导致它在一些场景下并不容易使用(比如用 main() 函数传入的参数来初始化 lazy val),加上 Spark 代码在 worker 节点上的执行方式需要经过反序列化,导致全局变量的初始化很容易出错,导致我不得不退化到引入静态类,用更多的代码解决问题。有了 open() 函数后,虽然看上去繁琐,但我可以完全控制整个初始化动作,而且去掉了很多繁琐的全局初始化操作,代码也更加简洁。
后续使用中的思考:流式处理中的 Fail fast
在使用 Flink 一段时间后,我们注意到 Flink 一个很有意思的行为:如果一个 task manager 节点因为机器损坏而掉线,整个程序就会直接终止并从 checkpoint 恢复,而不会保持程序运行并从集群中重新申请一台空白机器进行恢复。这个行为是 Flink 官方文档确认的:
In case of a program failure (due to machine-, network-, or software failure), Flink stops the distributed streaming dataflow. The system then restarts the operators and resets them to the latest successful checkpoint. The input streams are reset to the point of the state snapshot. Any records that are processed as part of the restarted parallel dataflow are guaranteed to not have been part of the previously checkpointed state.
事实上,这个实现方式在公司内部讨论时曾引起过相当大的争议。很多同事以此作为 Flink 设计有重大缺陷的范例大加批评。一开始我对此也颇为迷惑,但经过一段实践之后,我的看法有所改变;事实上,如今我更倾向于认为,也许 Flink 的处置才是合适的解法。
我可以理解这个行为大受批评的原因:它违反许多人对分布式计算的直觉。理想情况下,一个好的分布式程序似乎应该能够在单个节点掉线之后用一个新的节点代替,并继续计算。但在流式处理的上下文下,这个动作其实不容易做到。出于性能优化和实现难度的考虑,许多带状态的操作都需要将状态保存在固定的机器上(比如 Spark 的 mapWithState()),然后将整体的状态以 checkpoint 的形式定期保存。虽然理论上我们可以将状态单独复制若干份分别存储在多台机器上以备恢复,但这个做法和 checkpoint 的功能重复,而且更容易出错——如果整个集群里所有机器都在持续向前处理,而只有一台机器状态是从灾难恢复的,那就意味着它的状态和整体相比会滞后一段时间;这种情况下,要么所有机器停下来等它追上来,要么就保持不一致继续跑下去。前者无法避免延迟,效果和整个程序重启并无大的区别;而后者则在很多场景下则完全不可接受。如果非要如此一条路走到黑,也许更好的方法是将状态整个剥离出来作为一个有可靠性保证的独立服务,但那是将问题转移到了一个新的服务里。几经考虑之后我终相信,这个看似不合理的设计是有其内在的合理性的。
选择接受 Flink 的方案之后,那么对应的修补也就有了方向。既然 Flink 的错误恢复依赖快速的 checkpoint,那么我们就在加快程序崩溃检查和提高 HDFS 性能两方面下功夫。经过一段时间的调整,我和同事们成功优化了我们的 HDFS 系统,保证单个 app 每分钟稳定存入 300GB 的 checkpoint,这让我们 Flink 程序的断流恢复时间缩短到三分钟左右,基本解决了灾难恢复时的延迟问题。
对一些迷思的回应:Flink vs. Spark
网上早已有许多关于 Spark vs. Flink 的文章,我自己也拜读过不少。据实而言,我对一些观点并不十分认同;然而有些讨论流传颇远,也影响了很多朋友对两者的观点。我尝试地总结了一些,一来是梳理自己的看法,二来也是给问我问题的朋友们一个回应。
个常见的争论是:Flink 比 Spark 好,是因为 Flink 是 per-event 的,而 Spark 是基于 mini-batch 的。事实上,在我的观察中,这个特性差异在大部分项目的技术选型中并不起决定性作用。因为除了一部分诸如入侵检测之类的特定业务类型要求严格的单个请求立即处理之外,大部分实时处理业务都允许有一定的延迟。在我的观察中,Spark 的 mini-batch 在分钟级别已经足够稳定,足以支撑产品级别的业务。
有了个争论,也就不难注意到另一个新近的说法:2018 年二月发布的 Spark 2.3 已经支持 continuous processing,所以 Flink 的竞争优势已经消失。其实这个论调不值得一驳,因为 Spark 官方文档已经明确指出,这个特性如今只适用于简单操作。回想一下 Spark 从 1.4 引入 DataFrame 到 2.2 真正宣布稳定可用(GA),我们不难预见,Spark 社区很可能还需要几年的时间来持续完善 continuous processing。
As of Spark 2.3, only the following type of queries are supported in the continuous processing mode.
> Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select
,map
,flatMap
,mapPartitions
, etc.) and selections (where
,filter
, etc.).
第三个争论:Flink 宣称的 event time 和 watermark 机制 Spark 已经在 2.2 版本中也提供了,所以 Spark 和 Flink 一样好用。事实上,且不说 Spark 引入 event time 和 watermark 比 Flink 晚得多(event time 在 2.0 开始加入,watermark 则到 2.2 才得到完整支持),即便是现在的 2.3 版本,Spark 也只支持相对简单的 fixed watermark, Flink 支持的 watermark 动作则可以通过继承接口扩展出更加定制化的逻辑,所以 Flink 仍然有相当的优势。
不算结语的结语:我们仍在前进
我们对 Flink 的使用才刚刚开始。去年我们用在我司后台的一个功能上,成功地实现了处理延迟从小时级到秒级的跨越;而近一段时间的项目中,我的团队正在尝试着结合 Flink 流式处理和统计学习的一些方法,对公司的一些数据源进行挖掘,以期找到一些有趣的特性。限于公司商业机密,我无法在专栏里讨论项目细节。也许在不久的将来我们有机会将其产品化,那么大家也就都能见到了。