数据丢失是一件非常严重的事情事,针对数据丢失的问题我们需要有明确的思路来确定问题所在,针对这段时间的总结,我个人面对kafka数据丢失问题的解决思路如下:
1. 是否真正的存在数据丢失问题
比如有很多时候可能是其他同事操作了测试环境,所以首先确保数据没有第三方干扰。
2. 理清你的业务流程,数据流向
数据到底是在什么地方丢失的数据,在Kafka之前的环节或者Kafka之后的流程丢失?比如Kafka的数据是由flume提供的,也许是flume丢失了数据,Kafka自然就没有这一部分数据。
3. 如何发现有数据丢失,又是如何验证的?
从业务角度考虑,例如:教育行业,每年高考后数据量巨大,但是却反常的比高考前还少,或者源端数据量和目的端数据量不符。
4. 定位数据是否在Kafka之前就已经丢失还是消费端丢失数据的
Kafka支持数据的重新回放功能(换个消费group),清空目的端所有数据,重新消费。
如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。
如果是在写入端丢失数据,那么每次结果应该完全一样(在写入端没有问题的前提下)。
5. Kafka环节丢失数据
常见的Kafka环节丢失数据的原因有:
如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。
网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。没有做限速处理,超出了网络带宽限速。kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。如果磁盘坏了,会丢失已经落盘的数据。
单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常解决:
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
6. partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了。
Kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在PageCache里面,出现多个副本同时挂掉的概率比1个副本挂掉的概率就很小了。(官方推荐是通过副本来保证数据的完整性的)
7. Kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。
可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,Kafka当前支持GZip和Snappy压缩,来缓解这个问题是否使用replica取决于在可靠性和资源代价之间的balance。
同时Kafka也提供了相关的配置参数,来让你在性能与可靠性之间权衡(一般默认):
当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
log.flush.interval.messages=10000
当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
log.flush.interval.ms=1000
检查是否需要将日志flush的时间间隔
log.flush.scheduler.interval.ms = 3000
推荐文章:
如何为Kafka集群确定合适的分区数以及分区数过多带来的弊端
Kafka分区分配策略(Partition Assignment Strategy)
Kafka中sequence IO、PageCache、SendFile的应用详解
关注微信公众号大数据学习与分享,获取更多技术干货