到目前为止,我们一直在从pipeline开发者的角度研究流处理,第二章介绍了watermarks,回答了计算什么时间范围内的数据以及什么时候将处理结果物化等基本问题。在本章中,我们将从流处理系统的底层机制的角度来看同样的问题。研究这些机制将帮助我们理解和应用有关watermarks的概念。我们将讨论如何在数据进入点创建水印,如何通过数据处理管道传播,以及如何影响输出结果时间戳。我们还演示了如何在处理无界数据时保留必要的保证,以回答event time数据在何处处理以及何时物化的问题。
定义
对于任何连续接收数据并输出结果的pipeline。我们希望解决的一般问题是,什么时候关闭event-time窗口是安全的,这意味着该窗口不再需要任何数据。
解决事件时间窗口问题的一种简单方法是基于当前处理时间作为事件时间窗口。正如我们在第1章中看到的,我们很快就遇到了问题—数据处理和传输不是即时的,因此处理和事件时间几乎从不相等。在pipeline中出现的任何小问题或尖刺都可能导致将消息分配到错误的窗口。终,这个策略失败了,因为我们没有可靠的方法来保证这样的窗口。
另一种直观但终不正确的方法是考虑pipeline处理消息的速率。尽管这是一个有趣的度量标准,但是速率可能随输入的变化、预期结果的可变性、可用于处理的资源等等而任意变化。更重要的是,速率并不能帮助解决完整性的基本问题。具体来说,速率不会告诉我们何时处理了特定时间间隔内的所有消息。在实际的系统中,会出现消息无法在系统中处理的情况。这可能是暂时错误(如崩溃、网络故障、机器停机)的结果,也可能是持久化错误(如应用程序级故障,需要更改应用程序逻辑或其他手动干预来解决)的结果。当然,如果发生了大量的故障,那么处理速率可能是检测这些故障的一个很好的方式。然而,速率不能告诉我们一条消息没有通过我们的管道被处理。然而,即使是一条这样的消息,也会影响输出结果的正确性。
我们需要一种更有力的进步衡量标准。为了实现这个目标,我们对我们的流数据做了一个基本的假设:每个消息都有一个关联的逻辑事件时间戳。这种假设在连续到达无界数据的情况下是合理的,因为这意味着连续生成输入数据。在大多数情况下,我们可以将原始事件发生的时间作为其逻辑事件时间戳。然后我们可以检查这些时间戳在任意pipeline中的分布。这样的pipeline可以分布在多个agent上并行处理,消费读取的消息,但不能保证在各个分片之间的顺序。因此,该管道中活动的待处理消息(in-flight)的事件时间戳集将形成一个分布,如图3-1所示。
消息由pipeline接收、处理并终标记为完成(completed)。每个消息都是“in-flight”(即已经收到但尚未完成),或者是“completed”(即不再需要对该消息进行处理)。如果我们按照事件时间检查消息的分布,它将类似于图3-1。随着时间的推移,更多的信息将被添加到右边的“in-flight”分布中,更多来自分布“in-flight”部分的消息将被完成并转移到“completed”分布中。
在这个分布上有一个关键点,它位于“in-flight”分布的左边,对应于pipeline中未处理消息的oldest event timestamp。我们使用这个值来定义水印:
水印(Watermark)是一个表示早的尚未完成的工作单调递增的时间戳。
这个定义提出了两个基本属性:
完整性Completeness
如果水印已经提前超过了某个时间戳T,它的单调性保证我们在T点或T点之前不会对准时(非延迟的数据)事件进行更多的处理.因此,我们可以正确地在T点或T点之前发出任何聚合。换句话说,水印让我们知道什么时候关闭窗口是正确的。
可见性Visibility
如果一条消息由于任何原因被卡在我们的pipeline中,水印就不能前进。我们需要找到问题的根源,通过检查阻止水印前进的消息。
数据源 Watermark 创建
这些水印如何产生?要为数据源建立水印,必须为从该数据源进入pipeline的每个消息分配一个逻辑事件时间戳。正如第2章告诉我们的,所有水印的创建分为两大类:perfect or heuristic(完美或启发式)。为了了解关于完美和启发式水印之间的区别,让我们看看图3-2。在第二章中给出了窗口求和的例子。
如图所示,完美水印的特点是保证了水印对所有数据的考虑,而启发式水印会出现一些落后数据元素。
在水印被创建为完美或启发式之后,watermark在pipeline的下游一直存在。至于应该创建完美式或启发式,这取决于很大程度的本质是消费的数据源。要了解原因,让我们看看每种类型的水印创建的几个例子。
创建完美式Watermark
完美水印创建为传入消息分配时间戳,在这种情况下,生成的watermark是一个严格的保证,从这个Source中不会看到任何数据的事件时间小于watermark。使用完美水印创建的pipeline从不需要处理延迟数据。也就是说,在水印之后到达的数据已经超过了新到达消息的事件时间。然而,完美的watermark创建需要对输入有全面的了解,因此对于许多现实的分布式输入源是不切实际的。这里有几个可以创建完美水印的用例:
入口时间戳(Ingress timestamping)
将进入系统的时间指定为数据的event time的数据源可以创建一个完美的水印。在这种情况下,源水印只是简单地跟踪pipeline所观察到的当前处理时间。在2016年之前,几乎所有支持窗口的流式系统都使用了这种方法。
由于事件时间是由单一的、单调递增的数据源(实际处理时间)分配的,因此系统对于数据流中的下一个时间戳具有很好的了解。因此,事件时间进度和窗口语义变得非常容易推理。当然,缺点是水印与数据本身的事件时间没有相关性。这些事件时间被丢弃,而水印只是跟踪相对于其到达系统的数据的进度。
按时间排序的静态日志集(Static sets of time-ordered logs)
一个静态的按时间排序的输入源,在上面创建一个完美的水印是相对简单的(例如,带有一组静态partition的Apache Kafka topic,其中源的每个partition都包含单调递增的事件时间)。为此,源只需跨已知的和静态的源分区集跟踪未处理数据的小事件时间(每个partition中近一次读取记录的小事件时间)
与前面提到的入口时间戳类似,由于静态分区集合中的事件时间是单调增加的,所以系统完全知道下一个时间戳是哪个。这实际上是一种有界无序处理.已知的一组分区的无序程度受到这些分区之间观察到的小事件时间的限制。
从逻辑上讲,保证分区内时间戳单调递增的方法是将分区内的时间戳指定为写入数据的时间戳;例如,通过web前端将事件直接记录到Kafka中。虽然这仍然是一个有局限的用例,但它肯定比到达数据处理系统时的入口时间戳更有用。
创建启发式Watermark
启发式水印的创建,创建的水印仅仅是一个估计值,即事件时间小于watermark的数据将不会再被看到。使用启发式水印创建的pipeline可能需要处理一些落后的数据。落后数据是指当水印已经过了该数据的事件时间之后到达的任何数据。落后数据只能用启发式水印创建。如果启发式是合理的,落后的数据量可能是非常小的,水印仍然是高效的估计。如果要支持需要正确性的用例(例如,计费之类的事情),系统仍然需要为用户提供一种方法来处理延迟的数据。
对于许多实际的分布式输入源,构造一个完美的水印在计算或操作上都是不现实的,但利用输入数据源的结构特征,仍有可能构建出高精度的启发式水印。以下是两个例子,其中启发式水印(不同的特性)是可行的:
按时间排序的动态日志集(Dynamic sets of time-ordered logs)
考虑一组动态的结构化日志文件(每个文件包含的记录相对于同一文件中的其他记录具有单调递增的事件时间,但文件之间没有固定的事件时间关系),在运行时不知道完整的日志文件集(用Kafka的说法就是partition)。这种输入通常出现在由许多独立团队构建和管理的全球范围的服务中。在这样的用例中,在输入上创建一个完美的水印是很难处理的,但是创建一个的启发式水印是很有可能的。
通过跟踪现有日志文件集中未处理数据的小事件时间,数据增长率,并利用网络拓扑和可用带宽等外部信息,您可以创建一个非常准确的水印,即使缺乏所有输入的完美认知。这种类型的输入源是在谷歌中发现的常见的无界数据集类型之一。因此,我们在为这种场景创建和分析水印质量方面拥有丰富的经验,并看到它们在许多用例中得到了良好的效果。
Google Cloud Pub/Sub
云发布/订阅是一个有趣的用例,发布/订阅目前不保证按顺序交付;即使单个发布者按顺序发布两条消息,有可能(通常是很小的可能性)他们可能是无序的。这是由于底层架构的动态性(它允许透明地扩展到非常高的吞吐量级别,而无需用户干预).因此,Cloud Pub/Sub没有办法保证完美的水印。作为本章后面的一个案例研究,详细讨论了这种启发式方法的实现
考虑一个用户玩手机游戏的例子,他们的分数被发送到我们的pipeline进行处理:你通常可以认为,对于任何使用移动设备作为输入的源,通常不可能提供完美的水印。由于设备长时间离线等原因,对于这样的数据源,没有办法提供任何合理的完整性评估。然而,您可以想象构建一个可以准确跟踪当前在线设备的输入完整性的水印,类似于刚刚描述的Google Pub/Sub水印。无论如何,从提供低延迟结果的角度来看,活跃在线的用户可能是相关的用户子集。这通常并不像你初想象的那么严重。
一般来说,使用启发式水印创建时,对数据源的了解越多,启发式越好,看到的落后数据项越少。没有一个统一的解决方案,因为源的类型、事件的分布和使用模式会有很大的不同。但是在任何一种情况下(完美或启发式),在输入源处生成水印后,系统都可以通过管道完美地传播水印。这意味着水印将在下游一直存在,启发式水印将保持严格的启发式,因为他们是建立时决定的。这就是水印方法的好处:您可以将跟踪管道完整性的复杂性完全降低到在Source中创建水印的问题。
Watermark传播
到目前为止,我们只考虑了单个operator或stage上下文中输入的水印。然而,大多数实际的pipeline都包含多个stage。理解watermark如何跨越独立的stage传播对于理解它们如何影响整个pipeline及其结果延迟的观察非常重要。
PIPELINE STAGES
每次pipeline将数据按某个新维度分组时,通常都产生不同的stage。例如,如果您有一个使用原始数据的管道,计算每个用户的聚合,然后使用这些每个用户的聚合来计算每个团队的聚合,那么您可能会得到一个分为三个阶段的pipeline:
- 一个使用未分组的原始数据
- 一个是按用户分组数据,并按用户计算聚合
- 一个是按团队对数据进行分组并计算每个团队的聚合
我们将在第6章了解更多关于分组对pipeline的影响。
如前一节所述,水印是在输入源处创建的。然后,随着数据的流入,它们流经系统。您可以跟踪不同粒度级别的水印。对于包含多个不同stage的管道,每个阶段都可能跟踪自己的水印,其值是之前所有输入和stage的function。因此,管道后面的stage将有更早以前的水印(因为他们看到的总体输入较少)
我们可以在管道中任何单个operator或stage的边界处定义水印。这不仅有助于理解pipeline中的每个stage的进度,而且有助于独立地、尽可能快地调度每个stage的结果。我们对各stage边界的水印给出如下定义:
- 输入watermark:它捕获该stage上游数据处理的进度(该stage的输入有多完整)。对于源,输入水印是为输入数据创建水印的特定于源的函数。对于非源stage,输入水印被定义为其所有上游源和stage的所有分片/分区/实例的输出水印的小值。
- 输出watermark:它捕获该stage本身的进度,本质上定义为该stage输入水印的小值和stage内所有非延迟数据的event time。“active”所包含的内容在某种程度上取决于给定stage实际执行的操作和流处理系统的实现。它通常包括为聚合而缓冲但尚未物化到下游的数据、传递到下游阶段的pending输出数据等等。
定义特定stage的输入和输出水印的一个优势是,我们可以使用它们来计算stage带来的事件时间延迟。从一个stage的输出水印的值减去它的输入水印的值,就得到了这个阶段引入的事件时间延迟。这种延迟是指每个stage的输出与实际时间之间的延迟程度。例如,执行10秒窗口聚合的stage将有10秒或更长时间的延迟,这意味着阶段的输出至少要比输入延迟那么久。输入和输出水印的定义提供了整个pipeline中水印的递归关系。pipeline中的每个后续阶段都根据需要延迟水印,这是基于该stage的事件时间延迟
每个stage的处理也不是单一的。我们可以将一个stage内的处理过程分割成具有几个概念组件的流,每一个都有输出水印。如前所述,这些组件的特性取决于stage执行的操作和系统的实现。从概念上讲,每个这样的组件充当一个缓冲区,active消息可以驻留在其中,直到某个操作完成。例如,当数据到达时,它被缓冲以进行处理。处理可能会将数据写入state,以便以后进行延迟聚合。当触发延迟聚合时,可能会将结果写入输出缓冲区,等待下游阶段的消费,如图3-3所示。
我们可以跟踪每个这样的缓冲区与自己的watermark,跨越各级缓冲区的小水印构成该级的输出水印。因此,输出水印可以是下列小值:
- 每个源的水印(Per-source watermark)-每个发送stage。
- 每个外部输入的水印(Per-external input watermark)-管道外部的源
- 每个状态组件水印(Per-state component watermark)-可以写入的每种状态类型
- 每个输出缓冲水印(Per-output buffer watermark)-每个接收stage
使水印在这个粒度级别上可用,能够更好的描述系统内部状态。水印跟踪消息在系统中不同缓冲区之间的流转状态,以便更容易地诊断阻塞。
理解 Watermark 传播
为了更好地理解输入和输出水印之间的关系以及它们如何影响水印的传播,我们来看一个示例。让我们考虑游戏分数,而不是计算团队分数的总和。尝试分析用户的参与度。我们首先计算每个用户的会话长度,假设用户参与游戏的时间代表用户参与度。在计算会话长度之后,我们还将计算固定时间段内的平均会话长度。
为了让我们的示例更加有趣,我们假设我们使用两个数据集,一个是移动分数,另一个是pc端分数。我们执行相同的分数计算逻辑,通过并行在这两个独立的数据集上计算。一种是计算用户在移动设备上玩游戏的分数,另一种是计算用户pc端上玩游戏的分数,这可能是因为不同平台采用的数据收集策略不同。重要的是,这两个阶段执行的是相同的操作,但是数据不同,因此输出水印也有很大的不同。
首先,让我们看一下示例3-1,看看这个pipeline部分的代码是什么样的。
//Example 3-1. Calculating session lengths
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
这里,我们独立地读取每个输入,而以前我们是通过team来分组集合的,在本例中,我们是通过user来分组的。之后,对于每个pipeline的个stage,采用Seesion Window,然后调用一个名为CalculateWindowLength的自定义PTransform。这个PTransform只是按key(用户)分组,然后通过将当前窗口的大小作为该窗口的值来计算每个用户的会话长度。在这种情况下,我们可以使用默认的触发器(AtWatermark)和累积模式(discardingFiredPanes)设置。两个特定用户的每个管道的输出可能如图3-4所示。
因为我们需要跨多个stage跟踪数据,我们用红色跟踪与移动设备分数相关的所有内容,用蓝色跟踪与PC端分数相关的所有内容,而图3-5中平均会话长度的水印和输出是黄色的。
我们已经回答了四个问题:什么、何处、何时以及如何计算各个会话的长度。接下来,我们将第二次回答这些问题,将这些会话长度转换为固定时间窗口内的全局会话长度平均值。这要求我们首先将两个数据源合并为一个,然后将窗口重新设置为固定时间窗口;我们已经在计算的会话长度值中获得了会话的重要本质。我们现在要计算的是在一天中相同时间段内的全局平均值。示例3-2给出了它的代码。
//Example 3-1. Calculating session lengths
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Float> averageSessionLengths = PCollectionList .of(mobileSessions).and(consoleSessions)
.apply(Flatten.pCollections()) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark())
.apply(Mean.globally());
如果我们看到这条管道在工作,它将类似于图3-5。与前面一样,这两个输入管道正在计算移动和控制台玩家的单个会话长度。然后,这些会话长度进入管道的第二阶段,在固定窗口中计算全局会话长度平均值。
这里有两点很重要:
- 每个移动会话和pc端会话阶段的输出水印至少与相应输入水印一样早,实际上还要更早一些。这是因为在实际的系统中,计算答案需要时间,而且我们不允许输出水印提前,直到给定输入的处理完成。
- 求平均会话长度阶段(Average Session Lengths)的输入水印是直接上游两个阶段的输出水印的小值。
结果表明,下游输入水印是上游输出水印的小值。注意,这与本章前面对这两种类型的水印的定义相匹配。还要注意下游的水印是怎样形成的,即上游stage将比后面的stage在时间上走得更远。
这里值得注意的一点是,我们在例3-1中再次提出问题,从而从本质上改变pipeline的结果。以前我们只是简单地计算每个用户会话的长度,而现在我们计算的是两分钟的全局会话长度平均值。是对玩游戏的用户的整体行为的更深入的观察,并让您对简单数据转换和实际的数据计算之间的区别有了一个细微的了解。
现在我们已经了解了这个pipeline如何运行的基础知识,我们可以更仔细地研究这四个问题相关的更微妙的问题之一:输出时间戳。
Watermark传播和输出时间戳
在图3-5中,忽略了输出时间戳的一些细节。但是如果您仔细查看图中的第二个stage,您会发现阶段的每个输出都被分配了一个时间戳,该时间戳与窗口的末端相匹配。虽然这是输出时间戳的一个非常自然的选择,但它不是有效的选择。正如您在本章前面所知道的,决不允许水印回退。有了这个限制,您就可以推断出一个给定窗口的有效时间戳的范围是从窗口中早的非延迟记录的时间戳开始的(因为只有nonlate记录才能保证带有水印),并且一直延伸到正无穷。有很多选择。然而,在实践中,在大多数情况下只有少数几个选择是有意义的:
窗口结束时间(End of the window)
如果希望输出时间戳代表窗口边界,那么使用窗口的结束时间是安全的选择。稍后我们会看到它也是所有水印中平稳的。
窗口中个非迟到数据的时间戳(Timestamp of first nonlate element)
如果希望保持水印尽可能保守,那么使用个nonlate元素的时间戳是一个不错的选择。然而,权衡的结果是,水印的进展可能会受到更多的阻碍,我们很快就会看到这一点。
窗口中特定元素的时间戳(Timestamp of a specific element)
对于某些用例,其他任意元素的时间戳(从系统的角度来看)是正确的选择。想象一个用例,在这个用例中,您将查询流join到该查询结果的单击流。执行join之后,一些系统会发现查询的时间戳更有用;其他用户可能更喜欢单击的时间戳。从水印正确性的角度来看,任意这样的时间戳都是有效的,只要它对应的元素没有延迟。
了解一些输出时间戳的备选选项之后,让我们看看输出时间戳的选择对整个pipeline有什么影响,在示例3-3和图3-6中,我们将切换到为窗口使用尽可能早的时间戳:个nonlate元素的时间戳作为窗口的时间戳。
//Example 3-3. Average session lengths pipeline, that output timestamps for session windows set at earliest element
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.withTimestampCombiner(EARLIEST)
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(AtWatermark())
.withTimestampCombiner(EARLIEST)
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Float> averageSessionLengths = PCollectionList .of(mobileSessions).and(consoleSessions) .apply(Flatten.pCollections()) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark())
.apply(Mean.globally());
为了体现输出时间戳选择的效果,请查看个stage中的虚线,这些虚线显示了每个阶段的输出水印所在位置。与图3-7和图3-8相比,我们选择的时间戳会延迟输出水印,在图3-7和图3-8中,输出时间戳被选择为窗口的末端。从图中可以看出,第二个stage的输入水印随后也被延迟了。
这个版本与图3-7的不同之处有两点值得注意:
Watermark delay
与图3-5相比,图3-6中的水印处理速度要慢得多。这是因为个stage的输出水印是每个窗口中个元素的时间戳,直到该窗口的输入完成为止。只有在一个给定的窗口被物化之后,输出水印(以及下游的输入水印)才被允许前进。
语义差别
由于会话时间戳现在被分配为与会话中早的非延迟元素相匹配,因此当我们在下一阶段计算会话长度平均值时,各个会话通常会在不同的固定窗口中结束。到目前为止,我们看到的这两种选择本质上没有什么对错之分;他们只是不同。但是重要的是要理解它们是不同的,并且要对它们不同的方式有一个直观的认识,这样面临选择时,您就可以为特定的用例做出正确的选择。
窗口重叠的复杂案例
关于输出时间戳的另一个微妙但重要的问题是如何处理滑动窗口。将输出时间戳设置为早的元素这种简单的方法很容易导致下游的延迟,因为水印被阻塞了。要了解原因,请考虑一个包含两个stage的pipeline示例,每个stage使用相同类型的滑动窗口。假设每个元素在三个连续的窗口中结束。随着输入水印的推进,这种情况下滑动窗口所需的语义如下:
- 个窗口在个stage完成,并向下游发出。
- 然后,个窗口在第二个stage完成,也可以在下游发出。
- 一段时间后,第二个窗口在个stage结束……等等。
但是,如果输出时间戳被选择为窗格中个nonlate元素的时间戳,实际发生的情况如下:
- 个窗口在个stage完成,并向下游发出。
- 第二个stage的个窗口无法完成,因为它的输入水印是被第二个和第三个窗口上游的输出水印阻塞的.这些水印被保留下来是正确的,因为早的元素时间戳被用作这些窗口的输出时间戳。
- 第二个窗口在个stage完成并向下游发出。
- 第二个阶段的个和第二个窗口仍然无法完成,被上游的第三窗口阻塞。
- 第三个窗口在个stage完成并向下游发出。
- 第二个stage的个、第二个和第三个窗口现在都可以完成了,后一次性完成了所有三个窗口。
虽然这种窗口化的结果是正确的,但是这会导致结果不必要的延迟。因此,Beam为重叠窗口提供了特殊的逻辑,以确保窗口N+1的输出时间戳总是大于窗口N的结束时间。
百分比Watermarks
到目前为止,我们所关注的水印是由一个stage中活动消息的小事件时间来度量的。跟踪小值系统知道所有较早的时间戳在什么时候已经被占用。另一方面,我们可以考虑活动消息的事件时间戳的整个分布,并利用它来创建更细粒度的触发条件。
我们可以不考虑分布的小值,而是取分布的任意百分比,并保证使用较早的时间戳处理所有事件的这个百分比
这个方案的优点是什么?如果业务逻辑“大部分”是正确的,那么百分位水印提供了一种机制,通过这种机制,水印可以比通过丢弃分布在水印长尾的异常值来跟踪小事件时间的方法更快、更平稳地前进。图3-9显示了事件时间的紧凑分布,其中第90百分位水印接近第100百分位。图3-10演示了一个离群值更落后的情况,因此第90百分位水印明显领先于第100百分位。通过丢弃水印中的离群数据,百分位数水印仍然可以跟踪大部分的分布而不受离群数据的影响。
图3-11显示了用于为两分钟固定窗口绘制窗口边界的百分位数水印的示例,我们可以根据百分比水印跟踪的到达数据的时间戳的百分比来绘制早期边界。
图3-11显示了第33个百分位、第66个百分位和第100个百分位(完整)水印,跟踪了数据分布中的各个时间戳百分位.正如预期的那样,这些允许在跟踪完整的第100个百分位水印之前绘制边界。请注意,第33和第66百分位的水印都允许更早地触发windows,但同时也会将更多的数据标记为延迟。例如,对于个窗口[12:00,12:02],基于第33百分位水印关闭的窗口将只包含四个事件,并在12:06处理时间落地结果。如果我们使用第66百分位水印,相同的事件时间窗口将包括7个事件,并在12:07处理时间落地。使用第100百分位水印包括所有10个事件和延迟实现的结果,直到12:08处理时间。因此,百分比水印提供了一种方法来调整落地结果的延迟和结果的精度之间的平衡。
处理时间 Watermarks
到目前为止,我们一直在研究与流经我们系统的数据相关的水印。我们已经看到,查看watermark可以帮助我们识别早的数据和实时之间的整体延迟。
但是,这不足以区分数据问题还是系统延迟。换句话说,只检查我们到目前为止所定义的事件时间水印,我们无法区分一个系统是否在一小时前快速而无延迟地处理数据,一个试图处理实时数据的系统在处理数据时被延迟了一个小时。
为了进行区分,我们需要更多的概念:处理时间水印。我们已经看到流系统中有两个时间域:处理时间和事件时间。到目前为止,我们已经在事件-时间域中定义了watermark,作为流经系统的数据的时间戳的函数。这是一个事件时间水印。现在,我们将对处理时间域应用相同的模型来定义处理时间水印。所有这些操作都是针对管道当前或上游阶段的先前操作而执行的。因此,正如数据元素在系统中“流动”一样,处理这些元素所涉及的一系列操作也在系统中“流动”
我们定义处理时间水印的方法与定义事件时间水印的方法完全相同,除了使用早尚未完成的算子的事件时间-时间戳之外,我们使用尚未完成的早操作的处理时间时间戳。处理时间水印的延迟的一个例子可能是从一个stage到另一个stage的被阻塞的消息传递,对读取状态或外部数据的I/O调用被阻塞,或者在处理过程中出现异常,导致处理无法完成。
因此,处理时间水印提供了处理延迟与数据延迟分离的概念。要理解这种区别的价值,请参考图3-12中的图,其中我们体现了事件-时间水印延迟。我们看到数据延迟是单调递增的,但是没有足够的信息来区分系统阻塞和数据阻塞的情况。只有查看处理时间水印,如图3-13所示,我们才能区分这些情况。
在种情况下(图3-12),当我们检查处理时间水印延迟时,我们看到它也在增加。这告诉我们,我们的系统中的一个操作被阻塞了,这种情况也导致了数据延迟。可能发生这种情况的一些实际例子是,网络问题阻止了pipeline各stage之间的消息传递,或者发生了故障并正在重试。一般来说,一个不断增长的处理时间watermark表明了一个问题,这个问题阻碍了系统功能所需的操作的完成,而且常常需要用户或管理员的介入来解决。
在第二种情况下,如图3-14所示,处理时间水印延迟很小。这告诉我们没有卡住的操作。事件-时间水印的延迟仍然在增加,这表明我们有一些等待耗尽的缓冲状态。这是可能的,例如,如果我们在等待窗口边界发出聚合的同时缓冲某个状态,并且与pipeline的正常操作相对应,如图3-15所示。
因此,处理时间水印是区分系统延迟和数据延迟的有效工具。除了可见性之外,我们还可以在系统实现级别上使用处理时间水印,用于临时状态的垃圾收集等任务。(Reuven在第5章中详细讨论了这个例子)。
案例
既然我们已经为watermark应该如何表现奠定了基础,那么现在就应该看看一些实际的系统,以了解水印的不同机制是如何实现的。我们希望这些能够说明在真实系统中水印的延迟和正确性以及可扩展性和可用性之间可能存在的权衡。
Case Study: Watermarks in Google Cloud Dataflow
在流处理系统中实现水印有许多可能的方法。在这里,我们将简要介绍Google Cloud Dataflow中的实现,这是一个用于执行Apache Beam管道的完全托管服务。数据流包括定义数据处理工作流的sdk,以及在谷歌云平台资源上运行这些工作流的云平台托管服务。
Dataflow每个stage,都将输入的数据按照key的范围分片,每个物理worker只负责一个key分区。当pipeline中GroupByKey操作时,数据必须按照key被分发到相应的worker上进行计算,其逻辑示意图如下:
对worker的key分区的物理分配可能如图3-17所示。
在水印的传播部分,我们讨论了每个步骤的多个子组件的水印维护。Dataflow跟踪每个组件的每个分区的水印。然后,watermark聚合涉及计算所有分区内每个水印的小值,以确保满足以下保证:
- 所有分区都有watermark。如果某个分区不存在水印,则我们无法推进该水印,因为未报告的分区必须视为未知。
- 确保水印是单调递增的。由于数据可能延迟。如果更新水印会导致水印回退,我们就不能更新水印。
Google Cloud Dataflow通过一个集中的聚合器agent执行聚合。为了提高效率,我们可以把代理分片。从正确性的角度来看,水印聚合器是水印的“真实来源”。
确保分布式水印聚合的正确性带来了一定的挑战。重要的是不要过早推进水印,因为过早地推进水印会把准时的数据变成延迟的数据。每个worker都会被分配一段key空间,这个worker需要负责这段key的数据的state的更新/清理工作。确保只有一个worker可以修改key的持久状态。为了保证水印的正确性,我们必须确保来自worker进程的每个水印更新只有在worker进程仍然保持其持久状态租约(worker还负责这段key的state管理)的情况下才被允许进入聚合。因此,水印更新必须考虑到租约的有效性。
Case Study: Watermarks in Apache Flink
Apache Flink是一个开源的流处理框架,用于分布式的、高性能的、高可用的、的数据流应用程序。使用Flink runner可以运行Beam程序。在此过程中,Beam依赖于流处理框架(如Flink中的水印)的实现。与Google Cloud Dataflow通过一个集中式的水印聚合器agent实现水印聚合不同,Flink在内部执行水印跟踪和聚合。
为了理解这是如何工作的,让我们看一下Flink pipeline,如图3-18所示。
在这个pipeline中,数据在两个数据源上生成。这些源都生成水印“检查点”,这些“检查点”与数据流同步地发送。这意味着,当来自源A的时间戳“53”的水印检查点发出时,它保证不会从源A发出非延迟的数据消息,而时间戳在“53”之后。下游的“keyBy”操作符消费输入数据和水印检查点。当消费新的水印检查点时。下游操作的水印视图将得到推进,并且可以为下游操作发出新的水印检查点。
这种将水印检查点与数据流一起发送到下游的选择与Cloud Dataflow方法不同,Cloud Dataflow 方法依赖于中心聚合,会导致一些权衡。
以下是in-band方式的一些优点:
减少水印传播延迟,以及非常低的延迟水印
因为不需要让水印数据遍历等待中心聚合,使用in-band方法可以更容易地实现非常低的延迟。
水印聚合没有单点故障
中心水印聚合agent中的不可用将导致整个pipeline中的水印延迟。使用in-band方法,部分管道的不可用性不会导致整个管道的水印延迟。
可扩展性更强
尽管Cloud Dataflow在实践中可以很好地扩展,但是与in-band水印的隐式可扩展性相比,使用集中式水印聚合服务实现可扩展性更复杂。
以下是out-of-band 水印聚合的一些优点:
真实来源
对于调试、监控和其他应用场景,有一个服务可以提供所有水印的值,而不是在流中隐含水印,系统的每个组件都有自己的局部视图,这是有利的。
创建源头水印
一些源水印需要全局信息。例如,源可能暂时空闲,数据速率低,或者需要有关源或其他系统组件的out-of- band 信息来生成水印。这在中央服务中更容易实现。有关示例,请参见下面有关Google Cloud Pub/Sub源水印的案例研究。
Case Study: Source Watermarks for Google Cloud Pub/Sub
谷歌云发布/订阅是一个完全托管的实时消息服务,允许您在独立应用程序之间发送和接收消息。在这里,我们讨论如何为通过云发布/订阅发送到管道的数据创建合理的启发式水印。
首先,我们需要描述一下Pub/Sub是如何工作的。消息发布在发布/订阅topics上。一个topic可以有任意数量的发布/订阅。对于给定topic的所有订阅,都传递相同的消息。传递的方法是让客户端从订阅中拉去消息,并通过提供的id确认特定消息的接收。客户端不能选择哪些消息被拉取,尽管Pub/Sub首先尝试提供早的消息,但并没有严格的保证。
为了建立启发式,我们对发送数据到Pub/Sub的源做了一些假设。具体来说,我们假设原始数据的时间戳“表现良好”;换句话说,在将源数据发送到Pub/Sub之前,我们希望源数据上有一定数量的无序时间戳。使用时间戳发送的任何超出允许的无序范围的数据都将被视为延迟数据。在我们当前的实现中,这个边界至少是10秒,这意味着在发送到Pub/Sub之前重新排序时间戳多10秒不会创建延迟的数据,我们把这个值称为估计值。另一种方法是,当管道与输入完全同步时,水印将比实际时间晚10秒,以允许对源进行重新排序。如果管道被积压,那么所有的积压(不只是10秒的范围)都将用于估计水印。
Pub/Sub中我们面对的挑战是什么?因为发布/订阅不保证排序,我们必须有一些额外的元数据来充分了解backlog。幸运的是,Pub/Sub根据“旧的未确认发布时间戳”提供了对backlog的度量。这与消息的事件时间戳不同,因为发布/订阅与通过它发送的应用程序级元数据无关;相反,这是消息被Pub/Sub接收的时间戳。
此度量与事件时间水印不同。它实际上是用于发布/订阅消息传递的处理时间水印.发布/订阅发布时间戳不等于事件时间戳,在发送历史(过去)数据的情况下,它可能是任意远的。这些时间戳上的排序也可能不同,因为如前所述,我们允许有限数量的重新排序。
但是,我们可以使用它作为backlog的度量,以了解backlog中出现的事件时间戳的足够信息,这样我们就可以创建一个合理的水印,如下所示。
我们创建了包含输入消息的topic的两个订阅:管道将实际用于读取待处理的数据的基本订阅,以及仅用于元数据的跟踪订阅,用于执行水印估计。
在图3-19中查看我们的基本订阅,我们可以看到消息到达的顺序可能不一致。我们使用发布/订阅发布时间戳“pt”和事件时间戳“et”来标记每条消息。注意,这两个时间域可以是不相关的。
基本订阅上的一些消息未得到确认,形成积压。这可能是由于它们尚未交付,或者它们可能已经交付但尚未处理。因此,仅通过查看基本订阅就不能说我们的水印应该是什么。
如图3-20所示,跟踪订阅用于有效地检查基本订阅的backlog,并获取backlog中事件时间戳的小值。通过在跟踪订阅上保证很少或没有积压,我们可以在基本订阅的旧的未确认消息之前检查消息。
我们通过确保从订阅中拉取的计算成本很低,从而保持对跟踪订阅的跟踪。相反地,如果我们在跟踪订阅上落后太多,我们将停止推进水印。为此,我们确保至少符合下列其中一项条件:
- 跟踪订阅比基本订阅早得多。充分超前意味着跟踪订阅至少领先于估计值。这确保了在估计范围内的任何有界的重新排序都被考虑在内。
- 跟踪订阅非常接近实时。换句话说,在跟踪订阅上没有backlog。
在保存了关于消息的发布和事件时间戳的元数据之后,我们将尽快确认跟踪订阅上的消息。我们以稀疏直方图格式存储此元数据,以小化所使用的空间量和持久写入的大小。
在输入处10秒的重新排序限制内的所有时间戳都将由水印负责,而不会显示为延迟数据。然而,它可能产生一个过于保守的水印。因为我们在跟踪订阅中考虑了所有在基本订阅的旧的未确认消息之前的消息,所以我们可以在已经确认的消息的水印估计中包含事件时间戳。
此外,还有一些启发式方法可以确保进度。这种方法适用于数据密集、频繁到达的情况。在数据稀少或不频繁的情况下,可能没有足够的新消息来构建合理的估计。如果我们在超过两分钟的时间内没有看到关于订阅的数据(并且没有积压),我们将水印提升到接近实时的水平。这确保了水印和pipeline继续取得进展,即使没有更多的消息。
上述所有这些都确保了只要源数据-事件时间戳重新排序在估计范围内,就不会有额外的延迟数据。
总结
至此,我们已经探索了如何使用消息的事件时间来给出流处理系统中进度的定义。我们看到了进度的概念如何随后帮助我们回答事件时间处理在何处发生以及处理时间结果何时物化的问题。具体地说,我们研究了如何在源上创建水印,数据进入pipeline的点,然后在整个管道中传播。我们还研究了在水印上更改输出窗口时间戳的影响。后,我们探讨了在大规模构建水印时的一些实际系统注意事项。
既然我们已经对水印如何在幕后工作有了一个坚实的基础,我们可以在第4章中使用窗口和触发来回答更复杂的问题时,深入了解它们能为我们做些什么。