绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
我的 Kafka 旅程 - Consumer
2022-09-29 18:16:18

kafka采用Consumer消费者Pull主动拉取数据的方式,当Broker无数据时,消费者空转。Kafka并不删除已消费的消息,各自独立的消费者可消费同一个Broker分区数据。

消费流程

1、消费者发起网络消费请求

# 每批次小抓取设置(推荐1字节)
fetch.min.bytes
# 每批次大抓取大小设置(推荐500ms)
fetch.max.bytes
# 未达到大小的超时设置(推荐50M)
fetch.max.wait.ms

2、拉取数据到内存消费队列中

# 单次拉取大消息条数设置(推荐500条)
max.poll.records

2.1、反序列化处理(对应了Producer端的序列化动作)

2.2、拦截器处理(如:汇总统计记录)

3、数据的后续处理

保存等的消费端动作。

 

offset

当一个消费者挂掉或重启后,是否还记得消费到的位置了?offset解决了此问题。
对于每一个topic,都会维持一个分区日志,分区中的每一个记录都会分配一个Id来表示顺序,称之为offset,offset用来的标识分区中每条记录,并将每次的消费位置提交到topic中。消费者恢复启动后接着按序消费数据。

自动提交

# 开启自动提交
enable.auto.commit = true
# 每次提交间隔(推荐5秒)
auto.commit.interval.ms = 5000

手动提交

先关闭自动提交后,在Consumer客户端的代码中,通过调用方法函数提交,通常的方法名:

# 同步提交,等提交完成才可下一次再消费
.CommitSync
# 异步提交,可直接进行下一个消费,也有可能提交失败
.CommitAync

指定消费

在Consumer客户端的代码中,手动指定offset的位置进行消费,关联到的方法函数名:

# 按指定时间得出offset值
.offsetsForTimes
# 按指定offset值继续消费
.seek

初始策略

# earliest:	早消费;无offset时,从头开始消费。
# latest:	新消费;无offset时,从新的数据开始消费。
# none:	无offset时,引发异常。
auto.offset.reset = earliest | latest | none

消费现象

重复消费:offset未提交成功,下次消费还是旧的offset。

漏消费:offset提交成功,消费者端后续的数据处理未完成(建议下游步骤事务处理)。

 

消费者组

为了实现横向扩展,应用程序需要创建一个消费者群组,然后往群组里添加消费者来提高处理效率,群组里的每个消费者只处理一部分消息。

消费者组是逻辑上的一个消费者,是由一个或多个消费者实例组成,具有可扩展性和可容错性,消费者组内的消费者共享一个GroupId组成;组内每个消费者负责消费不同分区数据,并行消费数据;当组内一个消费者挂了之后,其它消费者要自动承担它的消费任务 - 组内再平衡

 

触发再平衡

消费成员与Broker分区保持心跳连接,或者消费成员处理消息时间过长,会被认为此消费者需要被移除,触发组内消费成员任务再分配。以下配置任其一条件触发再平衡:

# 心跳连接超时的 移除条件(建议45秒)
session.timeout.ms
# 消息处理超时的 移除条件(建议5分钟)
max.poll.interval.ms

再平衡策略

# 再平衡策略配置项(可多策略组合)
partition.assignment.strategy = Range | RoundRobin | Sticky | CooperativeSticky
  • Range:单个Topic内的重新平均分配
  • RoundRobin:所有Topic的全部消费者,一起重新分配
  • Sticky:一次小范围重新分配;仅调整需要的,避免大规模重新分配
  • CooperativeSticky:可多次小范围重新调整,直至终效果

 

提升吞吐量

  • 增加分区,增加消费者,两者一一对应起来,并行消费
  • 调整一次多拉取的消息条数(500条)
  • 调整单次抓取的数据大容量(50M)
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~