绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Kafka丢失数据问题优化总结以及重复消费原因分析(一)
2022-09-16 17:35:55

数据丢失是一件非常严重的事情事,针对数据丢失的问题我们需要有明确的思路来确定问题所在,针对这段时间的总结,我个人面对kafka数据丢失问题的解决思路如下:

1. 是否真正的存在数据丢失问题

比如有很多时候可能是其他同事操作了测试环境,所以首先确保数据没有第三方干扰。


2. 理清你的业务流程,数据流向

数据到底是在什么地方丢失的数据,在Kafka之前的环节或者Kafka之后的流程丢失?比如Kafka的数据是由flume提供的,也许是flume丢失了数据,Kafka自然就没有这一部分数据。


3. 如何发现有数据丢失,又是如何验证的?

从业务角度考虑,例如:教育行业,每年高考后数据量巨大,但是却反常的比高考前还少,或者源端数据量和目的端数据量不符。


4. 定位数据是否在Kafka之前就已经丢失还是消费端丢失数据的

Kafka支持数据的重新回放功能(换个消费group),清空目的端所有数据,重新消费。

如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。

如果是在写入端丢失数据,那么每次结果应该完全一样(在写入端没有问题的前提下)。


5. Kafka环节丢失数据

常见的Kafka环节丢失数据的原因有:

如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。

网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。没有做限速处理,超出了网络带宽限速。kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。如果磁盘坏了,会丢失已经落盘的数据。


单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常解决:

Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)

6. partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了。

Kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在PageCache里面,出现多个副本同时挂掉的概率比1个副本挂掉的概率就很小了。(官方推荐是通过副本来保证数据的完整性的)

7. Kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。

可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,Kafka当前支持GZip和Snappy压缩,来缓解这个问题是否使用replica取决于在可靠性和资源代价之间的balance。

同时Kafka也提供了相关的配置参数,来让你在性能与可靠性之间权衡(一般默认):

当达到下面的消息数量时,会将数据flush到日志文件中。默认10000

log.flush.interval.messages=10000

当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms

log.flush.interval.ms=1000

检查是否需要将日志flush的时间间隔

log.flush.scheduler.interval.ms = 3000

推荐文章:

Kafka集群消息积压问题及处理策略

如何为Kafka集群确定合适的分区数以及分区数过多带来的弊端

Kafka分区分配策略(Partition Assignment Strategy)

Kafka中sequence IO、PageCache、SendFile的应用详解

关注微信公众号大数据学习与分,获取更多技术干货

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~