绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Flink 数据持久化踩过的坑
2020-07-05 23:42:21

前言

Flink用来消费消息队列中的数据,在消费之后一定会需要用某种方式存起来,这里我简述一下在数据持久化中可能会遇到的坑和解决方案。 这篇文章中的代码,都经过本公司业务系统上调试过,是我们在使用Flink开发入库服务的时候踩过的一个个小坑,将它们总结起来,希望减少各位踩坑的数量。

本文会以消费Kafka为例,展示持久化到MySQL,MongoDB和HBase等数据库的思路。语言如果没有特殊的标记,一般都是Scala。 本文假定你已经搞定了Flink集群的搭建、程序的提交,同时你的数据库应该也能够支持这样规模的数据写入。

由于公司代码涉及业务,暂时不能开源,但是文中的源代码已经足够使用了。 为了避免文章过于臃肿,代码放在了

Flink持久化踩坑笔记代码gist.github.com

Github Gist 需要翻墙,而知乎的文章长度限制也不允许我放入太多代码,请借个梯子看吧。

读的时候可能会有点儿繁琐,不过请耐心读完,一定会有收获。(大牛就绕道吧)

业务定义

假设我们现在有一个DataStream[T],这个Stream以一个稳定的速度吐着消息,这个时候你需要做的很简单,就是把数据:

  • 如果持久化到HBase:转换为Put,然后创建连接、写入
  • 如果持久化到SQL:创建SQL语句,创建连接、写入
  • 持久化到MongoDB:创建BSON对象,创建连接、写入

坑1:过于频繁的连接创建

任你是神仙一般的系统,也受不了频繁的打开和关闭连接这种操蛋的操作,可偏偏有人这么写:每次收到数据的时候打开连接,写完关闭,每当看到这种博客我都无言以对。 正常情况下,应该是创建一个单例,如果写入要并发的话应该创建一个连接池。

下面以写入HBase为例,展示一下大致的思路是怎么样的。以下是需要注意的几点:

  1. 这里继承了RichOutputFormat<T>,但其实使用RichSinkFunction<T>也是一样的,但是记得覆盖其Open方法来初始化连接。Flink官方的测试代码用的也是RichOutputFormat<T>来做。
  2. PutCollection是我公司根据业务封装的Put类,方便聚合、序列化和调试,你可以理解为Map<String,List<Put>>,其中String是TableName。
  3. 之所以使用的都是Rich,因为我们把配置放在了Consul上,而Consul的地址在启动的时候指定,并且放在GlobalJobParameters里面,Rich中可以通过getRuntimeContext().getExecutionConfig().getGlobalJobParameters()获取。

代码参见:

HBaseRichOutputFormat.javagist.github.com

坑1.1:数据库配置何去何从

正常来说,我们都会把配置放在文件里面,但是这样的话修改配置就要重新打包,麻烦。 我们使用了Consul(其实用ZK、Redis之类的都是一样的)作为配置中心,主要是因为Consul有UI,手动操作比较方便。

/**
  * 查询Consul中的配置信息,并且按照properties的文件格式解码
  *
  * @param key
  * @return
  */
private def queryConsulProperties(key: String): Properties = {
    val prop = new Properties()
    prop.load(new StringReader(queryConsulString(key)))
    prop
}

坑2:尽可能使用ProcessingTime减少资源消耗

诚然,Flink相当牛逼的一点就是可以处理乱序数据,有EventTime这个大杀器,但是我建议能不用就不用,使用ProcessingTime将更加的高效。

你可以在处理流数据的时候使用EventTime,但是在后的写入阶段,建议使用ProcessingTime,这样写入也会更加及时。

坑3:使用TimeWindow+CountTrigger避免峰值出现异常

写入的时候往往是批量写入,而不是单条写入,这个时候如果使用CountWindow不能保证时效性,使用TimeWindow容易造成瞬时峰值数据量过大搞崩某些程序,我们想要一个TimeCountWindow,当超时或者队列过长任意条件满足的时候触发写入。然而我没在Flink里面找到这个东西,可以使用TimeWindow+Trigger完成这个功能。

首先,对于某个流:

someStream.timeWindow(
    Time.milliseconds(1000)//这里设置你要的时间
).trigger(
    new TimeCountTrigger(
        1000,//这里设置你要的数量
        timeCharacteristic = TimeCharacteristic.ProcessingTime //用什么时间触发
    )
)

其中,TimeCountTrigger类的源码如下所示:

TimeCountTrigger.scalagist.github.com

这里有个小坑需要注意,Trigger的clear方法有的时候不会被自动调用,需要手动调用。

坑4:动态插入的生成和写入

你,怎么做插入? 是手撸Insert语句?是强行Parse出一个Put,还是继续用原生API的InsertMany? 如果能用一个Sink解决写入问题,你能在写入之前对语句做一些合并,这个是坠吼的。

MongoDB插入的合并

简单了,只需要用Map<String,List<Document>>去描述插入的数据集并且做合并即可。

SQL的合并

SQL的合并比较简单,只需将同一张表的插入聚合到一起就可以了,但是如果要将不同的表的插入合并到一起,那就需要一个通用的数据格式。 这个数据结构可以被描述为: Map<String,List<Map<String,Object>>> 个String代表表名,第二个代表字段名,第三个代表数值。

如果你的数据插入是有主键的,那么List<Map<String,Object>>可以被: Map<KVPair,List<Map<String,Object>>>来描述。KVPair自己定义。 这样的话可以合并主键相同的数据。

需要注意的是,由于每一条插入记录里面,使用Map描述以后,有些字段为空,取不到的时候需要处理返回NULL。

Object 的处理方式如下所示:

val valueToString : Any => String = {
   case numberValue: Number => numberValue.toString
   case intValue: Int => intValue.toString
   case doubleValue: Double => "%f".format(doubleValue)
   case longValue: Long => longValue.toString
   case dateValue: Date => "'%s'".format(DATE_FORMAT.format(dateValue))
   case strValue: String => "'%s'".format(escapeSql(strValue))
   case anyType: Any => "'%s'".format(escapeSql(GSON.toJson(anyType)))
}

// DATE_FORMAT is SimpleDateFormat

HBase Put的合并

对于HBase来说,应该对两类东西做合并,个是针对同一个Table的Put操作可以合并,第二个是针对同一个Row的操作可以合并。 并且,对于同一行来说,同一个列族下,相同的Qualifier的数据应该做覆盖,在流计算期间就可以减少写入的数据量,可以带来更加高效的写入体验。

为此,我们首先写一个SealedPut类,用来替代原生的Put类,这个类在必要的时候可以直接生成Put。

SealedPut.javagist.github.com


随后我们再编写一个PutCollection,用于管理不同表的Put。

PutCollection.javagist.github.com

在加窗以后,我们可以在Process里面获取一大堆PutCollection,而我们需要做的,就是将它合成一个:

someIntKeyedWindowStreamOfPutCollection.process[PutCollection](
    new ProcessWindowFunction[(PutCollection, Int), PutCollection, Int, TimeWindow] {
        override def process(key: Int, context: Context, elements: Iterable[(PutCollection, Int)], out: Collector[PutCollection]): Unit = {
            if (elements.size > 1) {
                out.collect(
                    PutCollection.merge(
                        elements.map(_._1).toSeq: _*
                    )
                )
            } else {
                elements.map(_._1).foreach(out.collect)
            }
            LOGGER.debug("Merging {} elements (k={}).", elements.size, key)
        }
    }
)

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Flink专区
创建时间:2020-06-19 13:29:19
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • ?
    专家
戳我,来吐槽~