前言
Flink用来消费消息队列中的数据,在消费之后一定会需要用某种方式存起来,这里我简述一下在数据持久化中可能会遇到的坑和解决方案。 这篇文章中的代码,都经过本公司业务系统上调试过,是我们在使用Flink开发入库服务的时候踩过的一个个小坑,将它们总结起来,希望减少各位踩坑的数量。
本文会以消费Kafka为例,展示持久化到MySQL,MongoDB和HBase等数据库的思路。语言如果没有特殊的标记,一般都是Scala。 本文假定你已经搞定了Flink集群的搭建、程序的提交,同时你的数据库应该也能够支持这样规模的数据写入。
由于公司代码涉及业务,暂时不能开源,但是文中的源代码已经足够使用了。 为了避免文章过于臃肿,代码放在了
Flink持久化踩坑笔记代码Github Gist 需要翻墙,而知乎的文章长度限制也不允许我放入太多代码,请借个梯子看吧。
读的时候可能会有点儿繁琐,不过请耐心读完,一定会有收获。(大牛就绕道吧)
业务定义
假设我们现在有一个DataStream[T]
,这个Stream以一个稳定的速度吐着消息,这个时候你需要做的很简单,就是把数据:
- 如果持久化到HBase:转换为Put,然后创建连接、写入
- 如果持久化到SQL:创建SQL语句,创建连接、写入
- 持久化到MongoDB:创建BSON对象,创建连接、写入
坑1:过于频繁的连接创建
任你是神仙一般的系统,也受不了频繁的打开和关闭连接这种操蛋的操作,可偏偏有人这么写:每次收到数据的时候打开连接,写完关闭,每当看到这种博客我都无言以对。 正常情况下,应该是创建一个单例,如果写入要并发的话应该创建一个连接池。
下面以写入HBase为例,展示一下大致的思路是怎么样的。以下是需要注意的几点:
- 这里继承了
RichOutputFormat<T>
,但其实使用RichSinkFunction<T>
也是一样的,但是记得覆盖其Open方法来初始化连接。Flink官方的测试代码用的也是RichOutputFormat<T>
来做。 -
PutCollection
是我公司根据业务封装的Put类,方便聚合、序列化和调试,你可以理解为Map<String,List<Put>>
,其中String是TableName。 - 之所以使用的都是Rich,因为我们把配置放在了Consul上,而Consul的地址在启动的时候指定,并且放在
GlobalJobParameters
里面,Rich中可以通过getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
获取。
代码参见:
HBaseRichOutputFormat.java坑1.1:数据库配置何去何从
正常来说,我们都会把配置放在文件里面,但是这样的话修改配置就要重新打包,麻烦。 我们使用了Consul(其实用ZK、Redis之类的都是一样的)作为配置中心,主要是因为Consul有UI,手动操作比较方便。
/**
* 查询Consul中的配置信息,并且按照properties的文件格式解码
*
* @param key
* @return
*/
private def queryConsulProperties(key: String): Properties = {
val prop = new Properties()
prop.load(new StringReader(queryConsulString(key)))
prop
}
坑2:尽可能使用ProcessingTime减少资源消耗
诚然,Flink相当牛逼的一点就是可以处理乱序数据,有EventTime这个大杀器,但是我建议能不用就不用,使用ProcessingTime将更加的高效。
你可以在处理流数据的时候使用EventTime,但是在后的写入阶段,建议使用ProcessingTime,这样写入也会更加及时。
坑3:使用TimeWindow+CountTrigger避免峰值出现异常
写入的时候往往是批量写入,而不是单条写入,这个时候如果使用CountWindow不能保证时效性,使用TimeWindow容易造成瞬时峰值数据量过大搞崩某些程序,我们想要一个TimeCountWindow,当超时或者队列过长任意条件满足的时候触发写入。然而我没在Flink里面找到这个东西,可以使用TimeWindow+Trigger完成这个功能。
首先,对于某个流:
someStream.timeWindow(
Time.milliseconds(1000)//这里设置你要的时间
).trigger(
new TimeCountTrigger(
1000,//这里设置你要的数量
timeCharacteristic = TimeCharacteristic.ProcessingTime //用什么时间触发
)
)
其中,TimeCountTrigger
类的源码如下所示:
这里有个小坑需要注意,Trigger的clear方法有的时候不会被自动调用,需要手动调用。
坑4:动态插入的生成和写入
你,怎么做插入? 是手撸Insert语句?是强行Parse出一个Put,还是继续用原生API的InsertMany? 如果能用一个Sink解决写入问题,你能在写入之前对语句做一些合并,这个是坠吼的。
MongoDB插入的合并
简单了,只需要用Map<String,List<Document>>
去描述插入的数据集并且做合并即可。
SQL的合并
SQL的合并比较简单,只需将同一张表的插入聚合到一起就可以了,但是如果要将不同的表的插入合并到一起,那就需要一个通用的数据格式。 这个数据结构可以被描述为: Map<String,List<Map<String,Object>>>
个String代表表名,第二个代表字段名,第三个代表数值。
如果你的数据插入是有主键的,那么List<Map<String,Object>>
可以被: Map<KVPair,List<Map<String,Object>>>
来描述。KVPair自己定义。 这样的话可以合并主键相同的数据。
需要注意的是,由于每一条插入记录里面,使用Map描述以后,有些字段为空,取不到的时候需要处理返回NULL。
Object 的处理方式如下所示:
val valueToString : Any => String = {
case numberValue: Number => numberValue.toString
case intValue: Int => intValue.toString
case doubleValue: Double => "%f".format(doubleValue)
case longValue: Long => longValue.toString
case dateValue: Date => "'%s'".format(DATE_FORMAT.format(dateValue))
case strValue: String => "'%s'".format(escapeSql(strValue))
case anyType: Any => "'%s'".format(escapeSql(GSON.toJson(anyType)))
}
// DATE_FORMAT is SimpleDateFormat
HBase Put的合并
对于HBase来说,应该对两类东西做合并,个是针对同一个Table的Put操作可以合并,第二个是针对同一个Row的操作可以合并。 并且,对于同一行来说,同一个列族下,相同的Qualifier的数据应该做覆盖,在流计算期间就可以减少写入的数据量,可以带来更加高效的写入体验。
为此,我们首先写一个SealedPut
类,用来替代原生的Put类,这个类在必要的时候可以直接生成Put。
随后我们再编写一个PutCollection,用于管理不同表的Put。
PutCollection.java在加窗以后,我们可以在Process里面获取一大堆PutCollection,而我们需要做的,就是将它合成一个:
someIntKeyedWindowStreamOfPutCollection.process[PutCollection](
new ProcessWindowFunction[(PutCollection, Int), PutCollection, Int, TimeWindow] {
override def process(key: Int, context: Context, elements: Iterable[(PutCollection, Int)], out: Collector[PutCollection]): Unit = {
if (elements.size > 1) {
out.collect(
PutCollection.merge(
elements.map(_._1).toSeq: _*
)
)
} else {
elements.map(_._1).foreach(out.collect)
}
LOGGER.debug("Merging {} elements (k={}).", elements.size, key)
}
}
)