绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Kafka Consumer消费能力较低时的解决方案
2020-05-27 13:55:46

背景

随着业务的发展,项目组有大量的任务需要处理。

这些任务需要主要分为两种类型:

  • 通过接口调用, 后台执行任务
  • 通过调度系统定时执行

接口调用就需要执行任务不能阻塞, 不然系统的处理能力就会下降。任务调度系统需要在在一个小的检测粒度时间内,执行完所有任务。这两种情况都面临这样一个问题, 任务不能阻塞,不然会非常影响性能。所以需要引入消息中间件,将任务派发方和任务执行方分离出来。

在这种情况下, 我们选择了kafka作为了我们的消息中间件, 选择kafka主要基于以下几点:

  • 支持分布式, 避免单点问题
  • 技术方案成熟, 公司内部有上线项目
  • 性能优异, 能够持久化消息

遇到的问题

我们团队在kafka使用上面都没有经验, 其他同事说kafka consumer在消费超时后会掉线,导致重复消费,当时没有这个使用场景,不能理解这个概念。

次发现问题是在联调的时候,任务执行方发现consumer会打印出错误日志,重复消费,并且陷入循环。

当时很快定位到问题, consumer长时间没有发送心跳包, 导致触发rebalance操作, consumer被踢下线了。

对于这个问题,需要详细讲述一下kafka consumer相关的机制。

kafka为了保证partition分配的高效率, 使用了如下机制:

  1. 所有的consumer都要和coordinator连接
  2. coordinator选出一个consumer作为leader来分配partition
  3. leader分配完以后通知coordinator, 由coordinator来通知给其他consumer
  4. 如果一个consumer不能工作了, coordinator会触发rebalance机制,重新分配partition

coordinator判定一个consumer不能工作, 依靠的就是heartbeat机制。consumer的配置里面有一项是session_timeout,如果heartbeat不能在session_timeout时间内发出一次请求,coordinator就会触发一次rebalance操作,重新分配partition。

从上面这样看没什么问题,很多系统都是这么设计的,一个工作线程,一个心跳包线程。但是kafka consumer为了设计上的简单(或者是出于其他目的),他们只有一个线程,也就是说工作逻辑和心跳包逻辑是同步的。对于心跳包这种定时任务,他们使用了一种叫做delayed_task的方案。

delayed_task是Best-Effort的,为什么这么说呢,我们来看看delayed_task是在什么时候工作的:

  1. 取出一批数据
  2. 执行delayed_task
  3. 循环yield 这批数据
  4. 重复执行上述过程

前面我们也说过, consumer只有一个线程, 也就意味着,如果主逻辑消耗了大量时间,delayed_task中的任务就会延期执行。在这种情况下, delayed_task只能保证任务不会提前进行,不能保证任务准时执行。拿一个具体的场景来说, 如果主逻辑花费了60s, 那么delayed_task中的任务早也只能在60s之后执行,像heartbeat任务就直接超时了。


在提出解决方案之前, 我们需要考虑一下几个问题:

生产者速度大于消费者速度怎么处理

如果生产者速度大于消费者速度,消息就会积累。常规的解决方案是增加partition,增加消费者数量,但是在某一些场景下却不能这么实现。思考一下,如果生产者的速度不是恒定的,而是波动的,并且波峰和波谷差距比较大,大部分时间出于波谷,这样在波谷时其实资源是闲置的,并且会降低消费速度。另外对于消费的实时性比较高的场景,如果短时间内消息被积压,纵然后能够消费掉,但是已经过了有效期,这样的消费其实是的。

所以我们必须有能力知道两个数据,即当前队列剩余的消息的数量和当前消息产生的时间。

在消费速度不一致的情况下如何提交offset

kafka-consumer的offset的提交机制是定时向delayed_task里面加入一个AutoCommitTask。但是在消费者消费速度不均衡的情况下不能这么做,如果消费者消费速度比较快,定时提交offset的机制会使得一旦consumer宕机,会丢失一大批消费信息。
同时我们也不能单纯的以消费数量作为是否提交的阈值,在消费者比较消费速率比较慢的情况下,一旦consumer宕机,我们会耗费大量时间在无用的消费上面。
所以我们需要同时衡量数量和时间两个变量,作为我们是否提交的阈值

offset提交失败该怎么处理

consumer的offset提交是按照TopicPartition作为提交单元的。在consumer消费过程中,可能会发生reblance事件,如果当前consumer分配到的partition数量大于1个,可能这个partition会被分配给其他的consumer。在这个过程中,consumer已经消费了该条数据,那么在提交offset的时候,就会遇到CommitOffsetError,因为这个partition已经不属于自己了。
这种情况下该如何处理这些数据

解决方案

带着上面的一些问题,我们开始着手提出解决方案。

从上面的分析可以看出来, consumer掉线的主要问题就是delayed_task和主函数出于同一个工作线程中,那么直观的解决方法就是将这两个分离出来。

由于python GIL的限制,加上kafka consumer 是线程不安全的, 所以我们使用多进程来解决这个问题。

在consumer中,除了迭代器_message_generator之外,还提供了一个poll函数。这个函数和迭代器功能差不多,也能够获取消息,同时也会执行delayed_task。不同之处是, 这个函数会一次性返回一批数据,这样我们就有能力统计剩下的消息的数量。同时我们要求在producer发送消息的时候,一定要带上create_time这个字段,标注消息产生的时间。客户端现在同时能获取数量和时间两个参数,对于实时性要求比较高的场景,他就可以选择性的丢弃一批不满足要求的数据。

当消费者消费速度比较低的时候,我们需要停止获取数据,但是同时不能停下delayed_task。幸运的是,consumer提供了一个pause的函数,可以让我们停止对应的partition。一旦使用pause函数,poll函数将不会返回任何数据,单他依然会执行delayed_task。

由于我们使用poll函数一次性返回多个数据,加上在消费速度不均衡的情况下offset管理的问题。所以我们必须要手动管理offset, 保存我们上次提交offset的时间和未提交offset的数量,一旦其中某一个达到阈值,就真正的提交offset。

当我们提交offset失败的时候,我们需要清除对应的partition的所有数据,防止consumer做无用消费。


综合上面,我们就有能力构造出一个强健的consumer客户端,方便其他同学来使用。


核心代码

while True:
    topic_records = self.consumer.poll().values()
    if not topic_records:
        self.get_offset()
        time.sleep(self.config['idle_timeout'])
    self.consumer.pause(*self.consumer.assigment())
    paused = True
    for records in topic_records:
        remain = len(records)
        for record in records:
        while True:
            data = {"record":record, "remain": remain}
            try:
                self.task_queue.put(data, self.config['block_timeout'])
                remain -= 1
              break
            except Full:
                self.consumer.poll()
                self.get_offset()
    if self.task_queue.qsize < self.config['resumen_count'] and paused:
        partitions = self.consumer.paused()
        if partitions:
            self.consumer.resume(*partitions)

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~