1.文件存储机制
前面提到消息在逻辑上以topic存在,而在物理存储上是partition,在实际存储中partition是作为目录存在于broker上。
假如server.properties中log.dirs和num.partitions配置如下:
log.dirs=/tmp/kafka-logs
num.partitions=4
同时创建一个名为my_test_topic的topic,则会在/tmp/kafka-logs/下面生成四个目录,分别为:
my_test_topic-0
my_test_topic-1
my_test_topic-2
my_test_topic-3
这四个目录对应就是my_test_topic的四个分区,由此也可以看到分区的命名规则为topic名称+以0开头的序号。
每个partition又包含多个segment文件,一个segment文件由.index文件和.log两个文件组成,分别为segment的索引
文件和数据文件,这两个文件的命名基于同一个topic下的所有partition,即每个partition下的所有segment文件命名
都不一样。具体的命名规则为:partition全局的个segment从0开始,后续每个segment文件名为上一个segment文件后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index索引文件用来存储元数据,log文件用来存储消息,索引文件中的元数据指向数据文件中消息的物理偏移地址。
两者关系如下图示:
以index文件中(64 785)这组记录为例,它表示在00000000000000239430.log这个log文件中是第64个消息,
在全局为第239494个消息,即offset,它的物理偏移地址为785。consumer使用offset从partition读取消息依赖于上述关系。
举例说明:
假如要读取offset=239432的消息,使用二分查找定位到到00000000000000239430.index,然后根据偏移量定位到
消息的物理地址偏移量为10,然后读取Message239432。因为消息中记录了offset、magic、crc32、length等信息
所以可以直到在哪里结束读取。
2.消息发送机制
2.1消息发送模式
Kafka的发送模式由producer端的配置参数producer.type来设置,这个参数指定了在后台线程中消息的发送方式是同步的还是异步的,默认是同步的方式,即producer.type=sync。如果设置成异步的模式,即producer.type=async,可以是producer以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须要将producer.type设置为sync。
对于异步模式,有如下四个参数可以设置:
参数 | 描述
--- | --
queue.buffering.max.ms | 默认值:5000。启用异步模式是,producer缓存消息的时间。比如设置成1000,
它会缓存1s的数据然后发送出去,这样可以极大的增加broker的吞吐量,但是会降低数据的失效性。
queue.buffering.max.message | 默认值:10000。producer缓存队列里的大缓存消息数,如果超过这个值,producer就会阻塞或者丢弃消息。
queue.enqueue.timeout.ms | 默认值:-1。当达到上面参数时producer会阻塞等待的的时间。如果设置为0,
队列满时producer不会阻塞,消息会被直接丢弃。若设置为-1,producer会阻塞,不会丢弃消息。
batch.num.message | 默认值:200。启用异步模式时,一个batch缓存消息的数量,当达到这个数值时producer才会发送。
2.2数据可靠性级别
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别。
参数设值 | 描述
--- | --
1(默认) | producer在数据replication的leader收到数据并且得到确认后发送下一条数据。如果leader宕机,消息丢失。
0 | producer无需确认leader的回复而继续发送,这种情况下数据可靠性低,但是传输效率高。
-1 | producer需要等待replication中所有的follower都确认收到数据后才发送下一条数据。可靠性高,但是当只有leader而没有follower的时候和request.required.acks=1一样。
3.副本同步机制
kafka每个topic的partition有N(N>=1)个副本,其中N由replica factor这个配置项决定。kafka通过多个副本实现故障
的自动转移,当kafka集群中一个节点挂掉后可以保证服务仍然正常。多个副本中有一个副本为leader,其余为follower。
leader负责处理针对该partition的所有读写请求,follower能自动的从leader复制数据。下图是一个有4个broker、3个partition(同一个颜色深度为一个partition)、replica factor=3的kafka集群中leader和follower之间关系的示意图。
关于leader和follower之间的数据复制,考虑下面几个问题:
1.当leader挂掉后,会通过选举机制从follower中产生一个新的leader。加入之前的leader已有的数据条数为5,被选举
的follower只复制了其中三条,那么consumer从新的leader获取数据就会出现问题,这种情况下如何保证数据正确性?
2.若某个follower出现故障,导致复制数据出现异常,该如何保证系统可靠性?
针对上述问题,kafka引入了HW(HighWaterMark)和ISR(In Sync Replicas),下面分别介绍。
3.1 HW
和HW相关的还有LEO,LEO是LogEndOffset的缩写,标识每个partition的log后一条message的位置。HW是指
consumer能够读到的此partition新的位置。
3.2 ISR
ISR是指副本同步,副本对于kafka的吞吐率有一定的影响,但是极大的增强了可用性,默认情况下Kafka的replica数量为1,即每个partition都有一个的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中,AR=ISR+OSR。
3.3基于HW和ISR的消息同步
一个partition中取ISR中小的LEO作为HW,consumer多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
(1) follower和leader数据一致,HW和LEO也一致,无需复制。
(2)leader接收到producer写入的数据,此时LEO移动,两个follower开始复制数据。
(3)follower1完全复制了消息,follower2只复制了一部分,HW移动一位。
(4)follower2完全复制消息,HW和LEO一致。
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
3.4文件截断机制
如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。 A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。
如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。
4.consumer消息保证
有三种级别的消息保证:
At most once: 消息可能会丢,但绝不会重复传输
At least once:消息绝不会丢,但可能会重复传输
* Exactly once:每条消息肯定会被传输一次且仅传输一次
(1)先commit offset再消费消息,如果在offset被commit之后但消息没有被消费时consumer宕机,则会丢失消息,实现at most once。
(2)先消费消息,再commit offset,如果在offset被commit之前consumer宕机,当重启时会重复消费。实现at least once。
(3)要实现exactly once则需要对commit offset和消息消费加上事务,或者对at least once加上去重机制。