Kafka数据文件系统结构


发布于 2024-05-15 / 112 阅读 / 0 评论 /
基于kafka2.8.0版本,描述kafka文件系统结构

kafka是一个对磁盘空间严重依赖的服务,因为大部分数据都存储在磁盘中,对内存的要求不高。下面我们看看kafka在磁盘的文件系统结构是怎么样的,以及磁盘怎么实现磁盘存储层面的负载均衡,以及存储层面的横向扩展。

1.数据文件存储目录配置

kafka中,有两个参数和数据存储目录相关。

(1)log.dir:默认值为/tmp/kafka-logs,如果只配置一个目录,则可以使用此参数。

(2)log.dirs:默认值为null,可通过此参数配置配置多个目录,也可以说是多块盘,为空时,使用log.dir。

从这里可以推断出,通过在log.dirs配置多块盘的挂载目录,可以把kafka消息数据均衡地分配到这些磁盘上。

还有一点,kafka的每个数据目录结构都是一样的,log.dirs配置的每块盘在kakfa启动的时候,都会初始化同样的文件系统结构。

下面,我们就通过实践,看看kafka数据文件存储目录下的文件系统结构。

2.存储目录元数据

元数据主要是指:在kafka-logs存储目录中,分区目录以外的文件。

例如

# ls -al /data1/kafka-logs/

drwxrwxr-x 56 nieo nieo 4096 May 13 20:23 .
drwxrwxr-x  5 nieo nieo   49 Apr 28 11:41 ..
-rw-rw-r--  1 nieo nieo   52 May  8 20:15 cleaner-offset-checkpoint
-rw-rw-r--  1 nieo nieo    0 Apr 28 11:41 .lock
-rw-rw-r--  1 nieo nieo    4 May 13 20:22 log-start-offset-checkpoint
-rw-rw-r--  1 nieo nieo   88 May  8 11:38 meta.properties
-rw-rw-r--  1 nieo nieo 1559 May 13 20:22 recovery-point-offset-checkpoint
-rw-rw-r--  1 nieo nieo 1559 May 13 20:23 replication-offset-checkpoint

主要包含5个文件。

2.1.meta.properties

记录当前节点的元数据信息。内容如下所示:

# cat /data/emr/kafka/data/meta.properties
#
#Wed May 08 11:38:04 CST 2024
broker.id=1
version=0
cluster.id=AAbC23EEE2iAAdd3R89L7q

第一行表示写文件的时间。

第二行表示当前broker的id。是一个随机UUID的base64编码。

第三行表示当前的版本号。

第四行表示集群id。

2.2.cleaner-offset-checkpoint

记录当前cleaner清理完成的各个分区的offset信息。内容如下所示:

# cat /data1/kafka-logs/cleaner-offset-checkpoint
0
2
__consumer_offsets 25 9
__consumer_offsets 37 7

文件的读写kafka.server.checkpoints.CheckpointFile类中。

通过write方法,我们可以知道文件的组织结构是什么样的。

(1)第一行表示版本号

(2)第二行表示记录的分区清理offset的记录数

(3)从第三行开始,每一行的格式是“{topic名称} {分区号} {offset值}” 。总行数与第二行的整数是相匹配的。记录格式可参考kafka.server.checkpoints.OffsetCheckpointFile。比如案例中第二行是2,那么就有2条分区清理记录。

2.3.log-start-offset-checkpoint

此文件对应logStartOffset,用来标识日志的起始偏移量。kafka中有一个定时任务负责将所有分区的logStartOffset刷写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms配置,默认值60000,即60s。

内容如下所示:

# cat /data1/kafka-logs/log-start-offset-checkpoint
0
0

2.4.recovery-point-offset-checkpoint

负责记录topic被写入磁盘的offset信息。内容如下所示:

# cat /data/emr/kafka/data/recovery-point-offset-checkpoint
0
54
test-perf-producer-consumer 0 102266
……

(1)第一行表示OffsetCheckpointFile文件的版本号,当前固定为0。

(2)第二行表示此文件有多少个分区记录。

(3)第三行开始,用“{TOPIC} {PARTITION} {OFFSET}”的格式表示,列举当前broker保存的每个分区的分区号和相应的offset信息。总行数与第二行的整数相匹配。注意,为了简化,案例中省略了其余53条记录。

2.5.replication-offset-checkpoint

用来存储每个replica的HW,表示已经被commited的offset信息。失败的follower开始恢复时,会首先将自己的日志截断到上次的checkpointed时刻的HW,然后向leader拉取消息。kafka有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms配置,默认值5000,即5s。

内容如下所示:

# cat /data1/kafka-logs/replication-offset-checkpoint
0
54
test-perf-producer-consumer 0 102266
……

格式与recovery-point-offset-checkpoint类似,只不过offset表示的是HighWatermark对应的offset。

注意,为了简化,案例中省略了其余53条记录。

3.分区目录

分区目录是存储目录下的一个字目录,具体应该称之为分区副本目录,目录名称格式为“{topic名称}-{分区号}”

分区目录下保存的是分区数据文件,如下所示:

# ls -al /data1/kafka-logs/test-perf-producer-consumer-0
total 52004
drwxrwxr-x  2 nieo nieo      204 May  8 21:48 .
drwxrwxr-x 56 nieo nieo     4096 May 13 20:29 ..
-rw-rw-r--  1 nieo nieo 10485760 May  8 21:48 00000000000000051131.index
-rw-rw-r--  1 nieo nieo 53134775 May  8 01:57 00000000000000051131.log
-rw-rw-r--  1 nieo nieo 10485756 May  8 21:48 00000000000000051131.timeindex
-rw-rw-r--  1 nieo nieo       10 May  8 18:07 00000000000000102266.snapshot
-rw-rw-r--  1 nieo nieo       13 May  8 21:48 leader-epoch-checkpoint
-rw-rw-r--  1 nieo nieo       43 Apr 28 19:54 partition.metadata

其中包含两个分区元数据文件,以及3个segment文件。

3.1.partition.metadata

分区元数据信息,文件内容如下:

# cat /data1/kafka-logs/test-perf-producer-consumer-0/partition.metadata
version: 0
topic_id: ABcdE0T1FFFdLL8BBjiLzc

第一行表示版本号。

第二行表示topic的id。

3.2.leader-epoch-checkpoint

用于保存leader的epoch信息。内容如下所示:

# cat /data1/kafka-logs/test-perf-producer-consumer-0/leader-epoch-checkpoint
0
1
57 51131

该文件在kafka.server.checkpoints.LeaderEpochCheckpointFile类中定义,文件格式也在类中进行了说明。

(1)第一行表示版本号。

(2)第二行表示记录数。

(3)第三行表示真正的记录,格式为“{epoch} {startOffset}”。epoch是leader版本,它是一个单调递增的一个正整数值,每次leader变更,epoch版本都会+1,小版本号的Leader被认为是过期Leader,不能再行使Leader权力。startOffset是每一代leader写入的第一条消息的位移值,即Leader副本在该Epoch值上写入的首条消息的位移。

3.3.segment段

每个分区都是由不同的segment组成,而一个segment由4个文件组成。

因为kafka中每个分区内数据都是顺序的,所以segment段文件也是顺序写入的,可以通过offset来命名段文件。

3.3.1.segment log文件

命名格式为“{起始offset}.log”。其中,{起始offset}是20位的十进制数字,不足20位的话前序填“0”补齐。

文件例如

0000000000000000000.log -> 有50条,offset为 0-49
0000000000000000050.log -> 有10条,offset为 50-59
0000000000000000060.log -> 有xx条,offset从60-xx

文件中按照offset顺序记录了每一条消息。

我们把log文件内存储的消息集合称为record batch,每条消息就是一个record,segment格式以及record的格式如下图所示:

每条消息除了实际内容的value外,伴随着每条消息的产生,还会产生这条消息的额外附带信息,比如偏移量、时间戳等。

图中有大量的varint和varlong的字段,这就是可变长度的类型。

比如一条消息的偏移量是int存储容量是4字节,比如存储10这个偏移量,虽然前面都是0,但实际存储还是需要4个字节。

而varint则可以根据数据的范围选择合适的存储,比如还是10,那么实际记录这个值只需要1个字节就够了。

kafka这么做,就是为了尽最大的可能利用存储空间。

可以通过以下命令查看log文件中存放的是具体的每条消息

kafka-run-class.sh kafka.tools.DumpLogSegments --files XXXXXX.log --print-data-log

输出以下字段信息:

(1)baseOffset:当前batch中第一条消息的位移。

(2)lastOffset:最新消息的位移相对于第一条消息的唯一增量。

(3)count:当前batch有的数据数量,kafka在进行消息遍历的时候,可以通过该字段快速的跳跃到下一个batch进行数据读取。

(4)partitionLeaderEpoch:记录了当前消息所在分区的 leader 的服务器版本,主要用于进行一些数据版本的校验和转换工作。

(5)crc:当前整个batch的数据crc校验码,主要用于对数据进行差错校验的。

(6)compresscode:数据压缩格式,主要有GZIP、LZ4、Snappy三种。

(7)Sequence、producerId、producerEpoch:这三个参数主要是为了实现事务和幂等性而使用的,其中producerId和producerEpoch用于确定当前 producer 是否合法,而起始序列号则主要用于进行消息的幂等校验。

(8)isTransactional:是否开启事务。

(9)magic:Kafka服务程序协议版本号。

(10)CreateTime:数据创建的时间戳。

(11)payload:实际存储的数据。

3.3.2.segment index文件

命名格式为“{起始offset}.index”

文件内容格式为:(offset, 内存偏移地址)

下面,我们通过一个案例来说明index文件和log文件在检索过程中的作用。

需求:consumer发起请求要求从offset=36的消息开始消费
第一步:kafka直接offset的大小,找到对应的log文件,发现36号消息在0000000000000000000.log这个log文件中。
第二步:log文件找到后,需要确认具体的消息在哪?这时需要根据log文件名确定index文件名,找到0000000000000000000.index文件,找到offset为36和37的文件偏移量,发现(36, 12345)和(37, 12400)。
第三步:从0000000000000000000.log文件的12345开始读,读到12400(不包含)为止。也就是读到下一条消息的偏移量停止就可以。这样就读取了一整条消息。

如果是批量消息,也是类似的。

通过以下命令查看index文件的内容

kafka-run-class.sh kafka.tools.DumpLogSegments --files XXXXXX.index --print-data-log

输出内容包含以下字段

(1)offset:这条数据在这个 Segment 文件中的位置,是这个文件的第几条。

(2)position : 这条数据在 Segment 文件中的物理偏移量。

3.3.3.segment timeindex文件

命名格式为“{起始offset}.timeindex”

通过以下命令查看index文件的内容

kafka-run-class.sh kafka.tools.DumpLogSegments --files XXXXXX.timeindex --print-data-log

输出内容包含以下字段

(1)timestamp:该日志段目前为止最大时间戳。

(2)offset:记录的是插入新的索引条目时,当前消息的偏移量。

3.3.4.segment snapshot文件

命名格式为“{}.snapshot”