1. Time
Flink中的时间(Time)主要分为三种:
- Event Time:每条数据真实的产生时间,这就要求每条进入Flink应用的数据都要自己带有时间戳,标明数据产生时间;
- Ingestion Time:是介于Event time和 Processing Time之间的时间。在数据通过Source Function 进入Flink应用之后,他就会获取Source Operator的本地时间作为时间戳;
- Processing Time:即数据被处理的时间。当我们的Flink程序是使用这个时间进行处理的时候,所有基于时间的操作都会使用当前机器的系统时钟来做为时间戳。
在应用中指定时间类型
在Flink中默认情况下使用的是Processing Time,如果我们使用了Event time或者Ingestion time那么就需要在创建StreamExecutionEnvironment之后调用setStreamTimeCharacteristic来设置基准时间。这个设置指定了数据的时间分配,以及窗口操作所使用的时间类型。
下面的这段代码就指定数据的时间类型为Processing Time,窗口大小为1小时。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
2. Event Time和WaterMark
我们知道Flink支持无界数据流的处理,同时也支持窗口操作。Flink的并行度设置在横向上对数据流进行了分割,而窗口就是在纵向上对数据流进行了分割。Flink中窗口的作用是将无限的数据流划分成一个个有限的数据集。所以基于窗口的操作都是针对这些有限的数据集进行的。
在使用Event time时,我们需要思考一个问题,对于一个无限的数据流,窗口大小的情况下,如何确定窗口内的数据都已经全部都到了?例如,现在的窗口大小是1小时。对于有序的数据流而言,我们只需要判断数据的时间即可。08:01的数据一定是在08:02之前进入应用,当09:00的数据到达时,Flink就知道可以操作08:00~09:00的数据了。
但是在我们实际的应用环境中,大部分的数据流都是无序的,而且影响因素可能有很多。在这种情况下,8:58的数据可能是在9点之后才到的,这种情况下,我们的窗口操作又该在何时执行呢?
上面的问题总结一下就是:1. Flink如何确定窗口内的数据全部都到齐了? 2. Flink如何对待数据流中迟到的数据?
为了解决上面的问题,需要用到Flink中的Watermark(时间水印)机制。Watermark能够衡量数据进度,确保数据在乱序情况下也能被正常处理,得出连续的结果。Watermark作为数据流中一部分随数据流入下游,当一个watermark(t)到达下游时就表示后面的数据时间都是迟于t。
在Flink中用户可以配置大延迟的时间间隔,Flink会用新的数据时间减去这个间隔来更新watermark。当watermark时间大于窗口结束时间,且窗口中有数据时,就会立刻触发窗口计算。例如,我们以30分钟做为大延迟间隔,窗口大小为1个小时,那么窗口时间就应该为(00:00-01:00),(02:00-03:00)...(23:00-00:00)。假设现在有一条03:31的数据进入应用,它减去半个小时就是03:01大于(02:00-03:00)的结束时间,那么就认为没有数据时间迟于03:00了,此时如果窗口内有数据就会立马触发窗口计算。这个计算需要通过延迟间隔和新的数据计算,判断是否已经超过了窗口允许的延迟时间。设置半个小时就意味着每个窗口的数据可以迟到半个小时。如果真的有数据超过了这个延迟时间,那我们就需要指定这类迟到数据的处理策略了。
2.1 顺序数据流中的watermark
在数据有序的情况下,10:00的数据到达时,我们就知道09:00~10:00的窗口可以操作了,因为不会有比10点还早的数据了,所有09:00~10:00窗口内的时间都已经到了。但是因为我们甚至了30分钟的watermark,10点减去半个小时为09:30小于窗口的结束时间,所以它会等,一直等到10:31数据来了之后,10:31减半个小时大于10:00。原本早就可以执行的计算现在多等了半个小时,所以在数据流有序的情况,并不能很好的发挥watermark的作用,反而会增加应用的延迟。
2.2 乱序数据流中的watermark
在实际环境中使用event time,我们也会遇到因为网络阻塞或者其他原因导致的无序数据流。在这种情况下watermark便可以保证窗口内的数据按照指定的窗口大小和延迟时间进行计算。值得注意的是,Flink的延迟时间是相对于event time而言的,不是根据系统时间来匹配的。就是说,如果我们设置的窗口大小为1个小时,延迟时间是10分钟。对于(09:00~10:00)的窗口而言,它不一定是会在系统时间超过10:10后计算,因为此刻不一定有时间戳大于10:10的数据到来。只有当watermark大于窗口结束时间时才会进行窗口操作,watermark一般都是根据event time计算的。
如上,假设窗口大小为1小时,延迟时间设为10分钟。明显,数据09:38已经迟到,但它依然会被正确计算,只有当有数据时间大于10:10的数据到达之后(即对应的watermark大于10:10-10min) 09:00~10:00的窗口才会执行计算。
2.3 并行数据流中的watermark
对应并行度大于1的source task,它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子,并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时,就会立马触发窗口操作。
2.4 在代码中生成timeStamp和watermark
2.4.1 自定义Source中的timeStamp和watermark
下面通过一个简单的demo来看一下如何生成数据的event time和watermark:
object WordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[String] = env.addSource(new MySource)
val transformatted = source.map(element => {
val word = element.split("_")()
(word, 1)
}
)
.keyBy()
.timeWindow(Time.seconds(5))
.sum(1)
transformatted.print()
env.execute("Word Count")
}
}
class MySource extends SourceFunction[String] {
val sdf = new SimpleDateFormat("HH:mm:ss")
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (isRunning) {
Thread.sleep(1000)
val letter = (65 + new Random().nextInt(25)).toChar
val timeMills = System.currentTimeMillis() - 3000L
val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
println(element)
//生成带event time的数据
ctx.collectWithTimestamp(element, timeMills)
//生成指定时间间隔为1s的watermark
ctx.emitWatermark(new Watermark(timeMills - 1000))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
这个demo类似与wordcount,统计5秒内字母出现的次数,延迟时间为1s中。我们实现了自定义的SourceFunction,每隔1s生成一个数据,并通过emiterWatermark
和collectWithTimestamp
来生成数据的watermark和时间戳,这里的watermark减去了1秒钟,表示大延迟时间为1秒钟。需要注意的是,emitWatermark
是需要结合Event time来使用的。
- 当我们指定了程序中的基准时间为Event time时,则我们需要生成数据的Event time和watermark来指定数据的延迟时间;
- 当我们指定程序中的基准时间为Ingestion time时,watermark会被自动生成的ingestion time watermarks代替;
- 当我们指定程序中的基准时间为Processing time时,watermark会被忽略掉,因为此刻对于每个task而言,数据都是有序的(按到达的先后指定时间)。
我们先看看程序的部分输出:
M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46
(M,1)
(I,1)
(F,1)
(K,2)
O_09:56:47
U_09:56:48
C_09:56:49
N_09:56:50
V_09:56:51
(I,1)
(O,2)
(C,1)
(U,1)
T_09:56:52
M_09:56:53
G_09:56:54
U_09:56:55
X_09:56:56
(V,1)
(N,1)
(M,1)
(G,1)
(T,1)
X_09:56:57
O_09:56:58
T_09:56:59
U_09:57:00
N_09:57:01
(X,2)
(U,1)
(T,1)
(O,1)
E_09:57:02
我们来分析一波,个窗口算出的结果为:
(M,1)
(I,1)
(F,1)
(K,2)
它只计算了下面的几个元素:
M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
正好是5秒5条数据,但是实际上此刻的数据时间已经到达了09:56:46,即此刻Source产生的全部数据如下:
M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46
比窗口时间多了2秒。这是因为我们设置的延迟时间为1s,ctx.emitWatermark(new Watermark(timeMills - 1000))
。那么对于(09:56:40~09:56:44)的窗口而言,watermark需要大于09:56:44才会触发这个窗口的计算,09:56:44+1s=09:56:45,09:56:44+2s=09:56:46>09:56:44触发窗口计算。而45和46的数据会被放入下个窗口计算,我们可以推算出,下个窗口是在09:56:51之后才触发计算的,实际上也的确如此。
2.4.2 Timestamp Assigner
如果我们使用的是Flink自带的外部数据源,那我们就不可以通过SourceFunction来生成数据的timestamp和watermark。这种情况下我们就要借助Flink自带的Timestamp Assigner来管理数据流中的timestamp和watermark了。Time Assigner一般是在source function之后使用的,也可以在我们的后续的算子之后添加,只要保证它是在个时间相关操作之前被使用就都行。如果我们同时使用了Source Function和Timestamp Assigner,那么后面Timestamp Assigner生成的timestamp和watermark就会覆盖之前生成的。
Flink中Timestamp Assigner生成watermark的两种类型:
- Periodic Watermark: 根据设定的时间间隔周期性地生成watermark,通过AssignerWithPeriodicWatermarks接口定义;
- Punctuated Watermark:根据特定的数据生成watermark,通过AssignerWithPunctuatedWatermarks接口定义。
2.4.2.1 Periodic Watermark
Periodic Watermark Assigner 也有两种不同的实现:升序模式和指定间隔。
升序模式下,会将数据中的指定字段提取出来做为timestamp,而且不需要显式的指定watermark,默认会使用当前的timestamp作为watermark。就跟在Source Function中将watermark也设为timestamp一样。因为没有指定延时间隔,也就是意味着不允许有迟到数据,所以这种方式比较适合有序的数据流。
下面看看如何通过Ascending Timestamp Assigner指定timestamp和watermark:
object PeriodicAssignerWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[(String, Long)] = env.addSource(new MySource)
//指定timestamp,会使用这个时间作为watermark
val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignAscendingTimestamps(_._2)
val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(), 1))
.keyBy()
.timeWindow(Time.seconds(5))
.sum(1)
transformatted.print()
env.execute("Ascending Timestamp Assigner Word Count")
}
}
class MySource extends SourceFunction[(String, Long)] {
val sdf = new SimpleDateFormat("HH:mm:ss")
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
while (isRunning) {
Thread.sleep(1000)
val letter = (65 + new Random().nextInt(25)).toChar
val timeMills = System.currentTimeMillis()
val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
println(element)
ctx.collect((element, timeMills))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
下面是代码的部分输出:
R_14:26:54
L_14:26:55
(R,1)
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00
(L,1)
(U,1)
(O,2)
(V,1)
R_14:27:01
C_14:27:02
D_14:27:03
D_14:27:04
V_14:27:05
(V,1)
(R,1)
(D,2)
(C,1)
我们看第二个计算窗口:
(L,1)
(U,1)
(O,2)
(V,1)
执行这个窗口操作前它接受到的数据为:
L_14:26:55
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00
后一个数据时间,同时也是watermark超过了这个窗口的结束时间14:26:59,所以触发了计算。
以上跟source function的输出相似,只是少了1s延迟,这里不做赘述。
而指定时间间隔则需要实现抽象类BoundedOutOfOrdernessTimestampExtractor
来定义Assigner。这个类的构造器接受一个时间作为指定的时间间隔,而抽象方法extractTimestamp
则是需要用户自己定义timestamp抽取逻辑。以下实现了开始的功能,计算5秒内的wordcount,同时指定延迟时间为1s,watermark根据当前timestamp 减去固定延迟时间生成:
object SpecifiedTimeIntervalAssigner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[(String, Long)] = env.addSource(new MySource)
//定义timestamp抽取逻辑,同时指定延迟时间为1s
val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(1)) {
override def extractTimestamp(element: (String, Long)): Long = element._2
})
val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
transformatted.print()
env.execute("Ascending Timestamp Assigner Word Count")
}
}
class MySource extends SourceFunction[(String, Long)] {
val sdf = new SimpleDateFormat("HH:mm:ss")
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
while (isRunning) {
Thread.sleep(1000)
val letter = (65 + new Random().nextInt(25)).toChar
val timeMills = System.currentTimeMillis()
val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
println(element)
ctx.collect((element, timeMills))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
以下是部分输出,与个demo一样,不做赘述。
N_14:53:03
C_14:53:04
R_14:53:05
L_14:53:06
(N,1)
(C,1)
Q_14:53:07
E_14:53:08
N_14:53:09
M_14:53:10
X_14:53:11
(R,1)
(L,1)
(E,1)
(N,1)
(Q,1)
D_14:53:12
D_14:53:13
K_14:53:14
H_14:53:15
W_14:53:16
(X,1)
(M,1)
(K,1)
(D,2)
2.4.2.2 Punctuated Watermark
我们可以根据数据流中的特殊元素来指定watermark的生成。如果状态为1则生成watermark,反之则不生成。生成Punctuated Watermark逻辑需要通过实现AssignerWithPunctuatedWatermarks
接口,并在其中指定watermark的生成逻辑和timestamp的抽取逻辑。下面实现了只有当“A”出现时才生成watermark的逻辑:
object PunctuatedAssigner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val source: DataStream[(String, Long)] = env.addSource(new MyPunctuatedSource)
//定义timestamp抽取逻辑,同时指定延迟时间为1s
val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks[(String, Long)] {
override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {
if (lastElement._1.split("_")(0).equals("A")) {
new Watermark(lastElement._2)
} else {
null
}
}
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = element._2
}
)
val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
transformatted.print()
env.execute("Ascending Timestamp Assigner Word Count")
}
}
class MyPunctuatedSource extends SourceFunction[(String, Long)] {
val sdf = new SimpleDateFormat("HH:mm:ss")
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
while (isRunning) {
Thread.sleep(1000)
val letter = (65 + new Random().nextInt(25)).toChar
val timeMills = System.currentTimeMillis()
val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
println(element)
ctx.collect((element, timeMills))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
```
输出结果如下:
```text
K_15:15:54
G_15:15:55
V_15:15:56
S_15:15:57
Q_15:15:58
B_15:15:59
G_15:16:00
J_15:16:01
I_15:16:02
K_15:16:03
O_15:16:04
F_15:16:05
V_15:16:06
X_15:16:07
R_15:16:08
N_15:16:09
D_15:16:10
Y_15:16:11
F_15:16:12
D_15:16:13
D_15:16:14
T_15:16:15
G_15:16:16
G_15:16:17
X_15:16:18
G_15:16:19
Y_15:16:20
E_15:16:21
S_15:16:22
A_15:16:23
(K,1)
(G,1)
(S,1)
(Q,1)
(V,1)
(B,1)
(J,1)
(I,1)
(K,1)
(O,1)
(G,1)
(N,1)
(R,1)
(F,1)
(V,1)
(X,1)
(D,3)
(Y,1)
(F,1)
(G,3)
(T,1)
(X,1)
可以看到,当“A”出现时,才触发窗口计算。需要注意的时,触发窗口计算的条件不是“A”出现,是“A”的watermark大于窗口的结束时间。
3. 总结
在Flink中支持多中时间类型,处理起来灵活同时也是复杂的是Event time。当我们使用它作为基准时间的时候,我们就需要指定他的生成逻辑。而当乱序数据流出现的时候,如何区别出时间窗口何时结束,进而触发基于时间窗口的操作,这就要借助watermark的帮助。使用watermark我们可以自己定义允许数据迟到的时间间隔,根据指定数据制定生成逻辑等。值得注意的是,Flink中的时间都是相对而言的,当我们使用event time是,时间是根据数据的event time而言的,如果大于窗口结束时间的watermark不出现,那便一直不会触发窗口操作。当然,使用watermark我没也没法避免数据迟到,这个在于我们自己的取舍。确保数据的准确性,我们可能需要设置较大的延迟时间,这样数据的时效性就可不到保证;如果要确保降低延迟,数据的准确性也没法就没法保证。而对于这些终迟到的数据,Flink中也可以指定不同的处理策略,后面再做整理。
以下是本文的思维导图: