绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Flink(三)-Time-&-Watermark
2020-07-05 23:41:37

1. Time

Flink中的时间(Time)主要分为三种:

  • Event Time:每条数据真实的产生时间,这就要求每条进入Flink应用的数据都要自己带有时间戳,标明数据产生时间;
  • Ingestion Time:是介于Event time和 Processing Time之间的时间。在数据通过Source Function 进入Flink应用之后,他就会获取Source Operator的本地时间作为时间戳;
  • Processing Time:即数据被处理的时间。当我们的Flink程序是使用这个时间进行处理的时候,所有基于时间的操作都会使用当前机器的系统时钟来做为时间戳。



在应用中指定时间类型

在Flink中默认情况下使用的是Processing Time,如果我们使用了Event time或者Ingestion time那么就需要在创建StreamExecutionEnvironment之后调用setStreamTimeCharacteristic来设置基准时间。这个设置指定了数据的时间分配,以及窗口操作所使用的时间类型。

下面的这段代码就指定数据的时间类型为Processing Time,窗口大小为1小时。

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

stream
 .keyBy( _.getUser )
 .timeWindow(Time.hours(1))
 .reduce( (a, b) => a.add(b) )
 .addSink(...)

2. Event Time和WaterMark

我们知道Flink支持无界数据流的处理,同时也支持窗口操作。Flink的并行度设置在横向上对数据流进行了分割,而窗口就是在纵向上对数据流进行了分割。Flink中窗口的作用是将无限的数据流划分成一个个有限的数据集。所以基于窗口的操作都是针对这些有限的数据集进行的。

在使用Event time时,我们需要思考一个问题,对于一个无限的数据流,窗口大小的情况下,如何确定窗口内的数据都已经全部都到了?例如,现在的窗口大小是1小时。对于有序的数据流而言,我们只需要判断数据的时间即可。08:01的数据一定是在08:02之前进入应用,当09:00的数据到达时,Flink就知道可以操作08:00~09:00的数据了。

但是在我们实际的应用环境中,大部分的数据流都是无序的,而且影响因素可能有很多。在这种情况下,8:58的数据可能是在9点之后才到的,这种情况下,我们的窗口操作又该在何时执行呢?


上面的问题总结一下就是:1. Flink如何确定窗口内的数据全部都到齐了? 2. Flink如何对待数据流中迟到的数据?

为了解决上面的问题,需要用到Flink中的Watermark(时间水印)机制。Watermark能够衡量数据进度,确保数据在乱序情况下也能被正常处理,得出连续的结果。Watermark作为数据流中一部分随数据流入下游,当一个watermark(t)到达下游时就表示后面的数据时间都是迟于t。

在Flink中用户可以配置大延迟的时间间隔,Flink会用新的数据时间减去这个间隔来更新watermark。当watermark时间大于窗口结束时间,且窗口中有数据时,就会立刻触发窗口计算。例如,我们以30分钟做为大延迟间隔,窗口大小为1个小时,那么窗口时间就应该为(00:00-01:00),(02:00-03:00)...(23:00-00:00)。假设现在有一条03:31的数据进入应用,它减去半个小时就是03:01大于(02:00-03:00)的结束时间,那么就认为没有数据时间迟于03:00了,此时如果窗口内有数据就会立马触发窗口计算。这个计算需要通过延迟间隔和新的数据计算,判断是否已经超过了窗口允许的延迟时间。设置半个小时就意味着每个窗口的数据可以迟到半个小时。如果真的有数据超过了这个延迟时间,那我们就需要指定这类迟到数据的处理策略了。

2.1 顺序数据流中的watermark



在数据有序的情况下,10:00的数据到达时,我们就知道09:00~10:00的窗口可以操作了,因为不会有比10点还早的数据了,所有09:00~10:00窗口内的时间都已经到了。但是因为我们甚至了30分钟的watermark,10点减去半个小时为09:30小于窗口的结束时间,所以它会等,一直等到10:31数据来了之后,10:31减半个小时大于10:00。原本早就可以执行的计算现在多等了半个小时,所以在数据流有序的情况,并不能很好的发挥watermark的作用,反而会增加应用的延迟。

2.2 乱序数据流中的watermark

在实际环境中使用event time,我们也会遇到因为网络阻塞或者其他原因导致的无序数据流。在这种情况下watermark便可以保证窗口内的数据按照指定的窗口大小和延迟时间进行计算。值得注意的是,Flink的延迟时间是相对于event time而言的,不是根据系统时间来匹配的。就是说,如果我们设置的窗口大小为1个小时,延迟时间是10分钟。对于(09:00~10:00)的窗口而言,它不一定是会在系统时间超过10:10后计算,因为此刻不一定有时间戳大于10:10的数据到来。只有当watermark大于窗口结束时间时才会进行窗口操作,watermark一般都是根据event time计算的。



如上,假设窗口大小为1小时,延迟时间设为10分钟。明显,数据09:38已经迟到,但它依然会被正确计算,只有当有数据时间大于10:10的数据到达之后(即对应的watermark大于10:10-10min) 09:00~10:00的窗口才会执行计算。

2.3 并行数据流中的watermark

对应并行度大于1的source task,它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子,并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时,就会立马触发窗口操作。



2.4 在代码中生成timeStamp和watermark

2.4.1 自定义Source中的timeStamp和watermark

下面通过一个简单的demo来看一下如何生成数据的event time和watermark:

object WordCount {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 val source: DataStream[String] = env.addSource(new MySource)
 val transformatted = source.map(element => {
 val word = element.split("_")()
 (word, 1)
 }
 )
 .keyBy()
 .timeWindow(Time.seconds(5))
 .sum(1)

 transformatted.print()
 env.execute("Word Count")

 }
}

class MySource extends SourceFunction[String] {

 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true

 override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)

 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis() - 3000L
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString

 println(element)
 //生成带event time的数据
 ctx.collectWithTimestamp(element, timeMills)
 //生成指定时间间隔为1s的watermark
 ctx.emitWatermark(new Watermark(timeMills - 1000))
 }
 }

 override def cancel(): Unit = {
 isRunning = false
 }
}

这个demo类似与wordcount,统计5秒内字母出现的次数,延迟时间为1s中。我们实现了自定义的SourceFunction,每隔1s生成一个数据,并通过emiterWatermarkcollectWithTimestamp来生成数据的watermark和时间戳,这里的watermark减去了1秒钟,表示大延迟时间为1秒钟。需要注意的是,emitWatermark是需要结合Event time来使用的。

  • 当我们指定了程序中的基准时间为Event time时,则我们需要生成数据的Event time和watermark来指定数据的延迟时间;
  • 当我们指定程序中的基准时间为Ingestion time时,watermark会被自动生成的ingestion time watermarks代替;
  • 当我们指定程序中的基准时间为Processing time时,watermark会被忽略掉,因为此刻对于每个task而言,数据都是有序的(按到达的先后指定时间)。

我们先看看程序的部分输出:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46
(M,1)
(I,1)
(F,1)
(K,2)
O_09:56:47
U_09:56:48
C_09:56:49
N_09:56:50
V_09:56:51
(I,1)
(O,2)
(C,1)
(U,1)
T_09:56:52
M_09:56:53
G_09:56:54
U_09:56:55
X_09:56:56
(V,1)
(N,1)
(M,1)
(G,1)
(T,1)
X_09:56:57
O_09:56:58
T_09:56:59
U_09:57:00
N_09:57:01
(X,2)
(U,1)
(T,1)
(O,1)
E_09:57:02

我们来分析一波,个窗口算出的结果为:

(M,1)
(I,1)
(F,1)
(K,2)

它只计算了下面的几个元素:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44

正好是5秒5条数据,但是实际上此刻的数据时间已经到达了09:56:46,即此刻Source产生的全部数据如下:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46

比窗口时间多了2秒。这是因为我们设置的延迟时间为1s,ctx.emitWatermark(new Watermark(timeMills - 1000))。那么对于(09:56:40~09:56:44)的窗口而言,watermark需要大于09:56:44才会触发这个窗口的计算,09:56:44+1s=09:56:45,09:56:44+2s=09:56:46>09:56:44触发窗口计算。而45和46的数据会被放入下个窗口计算,我们可以推算出,下个窗口是在09:56:51之后才触发计算的,实际上也的确如此。

2.4.2 Timestamp Assigner

如果我们使用的是Flink自带的外部数据源,那我们就不可以通过SourceFunction来生成数据的timestamp和watermark。这种情况下我们就要借助Flink自带的Timestamp Assigner来管理数据流中的timestamp和watermark了。Time Assigner一般是在source function之后使用的,也可以在我们的后续的算子之后添加,只要保证它是在个时间相关操作之前被使用就都行。如果我们同时使用了Source Function和Timestamp Assigner,那么后面Timestamp Assigner生成的timestamp和watermark就会覆盖之前生成的。

Flink中Timestamp Assigner生成watermark的两种类型:

  • Periodic Watermark: 根据设定的时间间隔周期性地生成watermark,通过AssignerWithPeriodicWatermarks接口定义;
  • Punctuated Watermark:根据特定的数据生成watermark,通过AssignerWithPunctuatedWatermarks接口定义。

2.4.2.1 Periodic Watermark

Periodic Watermark Assigner 也有两种不同的实现:升序模式指定间隔

升序模式下,会将数据中的指定字段提取出来做为timestamp,而且不需要显式的指定watermark,默认会使用当前的timestamp作为watermark。就跟在Source Function中将watermark也设为timestamp一样。因为没有指定延时间隔,也就是意味着不允许有迟到数据,所以这种方式比较适合有序的数据流。

下面看看如何通过Ascending Timestamp Assigner指定timestamp和watermark:

object PeriodicAssignerWordCount {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 val source: DataStream[(String, Long)] = env.addSource(new MySource)
 //指定timestamp,会使用这个时间作为watermark
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignAscendingTimestamps(_._2)
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(), 1))
 .keyBy()
 .timeWindow(Time.seconds(5))
 .sum(1)

 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
 }
}

class MySource extends SourceFunction[(String, Long)] {

 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true

 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }

 override def cancel(): Unit = {
 isRunning = false
 }
}

下面是代码的部分输出:

R_14:26:54
L_14:26:55
(R,1)
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00
(L,1)
(U,1)
(O,2)
(V,1)
R_14:27:01
C_14:27:02
D_14:27:03
D_14:27:04
V_14:27:05
(V,1)
(R,1)
(D,2)
(C,1)

我们看第二个计算窗口:

(L,1)
(U,1)
(O,2)
(V,1)

执行这个窗口操作前它接受到的数据为:

L_14:26:55
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00

后一个数据时间,同时也是watermark超过了这个窗口的结束时间14:26:59,所以触发了计算。

以上跟source function的输出相似,只是少了1s延迟,这里不做赘述。

而指定时间间隔则需要实现抽象类BoundedOutOfOrdernessTimestampExtractor来定义Assigner。这个类的构造器接受一个时间作为指定的时间间隔,而抽象方法extractTimestamp则是需要用户自己定义timestamp抽取逻辑。以下实现了开始的功能,计算5秒内的wordcount,同时指定延迟时间为1s,watermark根据当前timestamp 减去固定延迟时间生成:

object SpecifiedTimeIntervalAssigner {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
​
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
​
 val source: DataStream[(String, Long)] = env.addSource(new MySource)
 //定义timestamp抽取逻辑,同时指定延迟时间为1s
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(1)) {
 override def extractTimestamp(element: (String, Long)): Long = element._2
 })
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
​
 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
​
 }
}
​
class MySource extends SourceFunction[(String, Long)] {
​
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true
​
 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }
​
 override def cancel(): Unit = {
 isRunning = false
 }
}

以下是部分输出,与个demo一样,不做赘述。

N_14:53:03
C_14:53:04
R_14:53:05
L_14:53:06
(N,1)
(C,1)
Q_14:53:07
E_14:53:08
N_14:53:09
M_14:53:10
X_14:53:11
(R,1)
(L,1)
(E,1)
(N,1)
(Q,1)
D_14:53:12
D_14:53:13
K_14:53:14
H_14:53:15
W_14:53:16
(X,1)
(M,1)
(K,1)
(D,2)

2.4.2.2 Punctuated Watermark

我们可以根据数据流中的特殊元素来指定watermark的生成。如果状态为1则生成watermark,反之则不生成。生成Punctuated Watermark逻辑需要通过实现AssignerWithPunctuatedWatermarks接口,并在其中指定watermark的生成逻辑和timestamp的抽取逻辑。下面实现了只有当“A”出现时才生成watermark的逻辑:

object PunctuatedAssigner {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
​
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
​
 val source: DataStream[(String, Long)] = env.addSource(new MyPunctuatedSource)
 //定义timestamp抽取逻辑,同时指定延迟时间为1s
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
 new AssignerWithPunctuatedWatermarks[(String, Long)] {
 override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {
 if (lastElement._1.split("_")(0).equals("A")) {
 new Watermark(lastElement._2)
 } else {
 null
 }
 }
​
​
 override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = element._2
 }
 )
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
​
 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
​
 }
}
​
​
class MyPunctuatedSource extends SourceFunction[(String, Long)] {
​
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true
​
 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }
​
 override def cancel(): Unit = {
 isRunning = false
 }
}
​```

输出结果如下:

```text
K_15:15:54
G_15:15:55
V_15:15:56
S_15:15:57
Q_15:15:58
B_15:15:59
G_15:16:00
J_15:16:01
I_15:16:02
K_15:16:03
O_15:16:04
F_15:16:05
V_15:16:06
X_15:16:07
R_15:16:08
N_15:16:09
D_15:16:10
Y_15:16:11
F_15:16:12
D_15:16:13
D_15:16:14
T_15:16:15
G_15:16:16
G_15:16:17
X_15:16:18
G_15:16:19
Y_15:16:20
E_15:16:21
S_15:16:22
A_15:16:23
(K,1)
(G,1)
(S,1)
(Q,1)
(V,1)
(B,1)
(J,1)
(I,1)
(K,1)
(O,1)
(G,1)
(N,1)
(R,1)
(F,1)
(V,1)
(X,1)
(D,3)
(Y,1)
(F,1)
(G,3)
(T,1)
(X,1)

可以看到,当“A”出现时,才触发窗口计算。需要注意的时,触发窗口计算的条件不是“A”出现,是“A”的watermark大于窗口的结束时间。

3. 总结

在Flink中支持多中时间类型,处理起来灵活同时也是复杂的是Event time。当我们使用它作为基准时间的时候,我们就需要指定他的生成逻辑。而当乱序数据流出现的时候,如何区别出时间窗口何时结束,进而触发基于时间窗口的操作,这就要借助watermark的帮助。使用watermark我们可以自己定义允许数据迟到的时间间隔,根据指定数据制定生成逻辑等。值得注意的是,Flink中的时间都是相对而言的,当我们使用event time是,时间是根据数据的event time而言的,如果大于窗口结束时间的watermark不出现,那便一直不会触发窗口操作。当然,使用watermark我没也没法避免数据迟到,这个在于我们自己的取舍。确保数据的准确性,我们可能需要设置较大的延迟时间,这样数据的时效性就可不到保证;如果要确保降低延迟,数据的准确性也没法就没法保证。而对于这些终迟到的数据,Flink中也可以指定不同的处理策略,后面再做整理。

以下是本文的思维导图:

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Flink专区
创建时间:2020-06-19 13:29:19
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • ?
    专家
戳我,来吐槽~