Kafka Server性能模型


发布于 2024-06-12 / 58 阅读 / 0 评论 /
本文基于kafka3.6

Kafka是一个高性能的消息队列,下面从Kafka Server侧来解析Kafka高性能的原因。

1.Kafka服务端线程模型

这是最基本的高性能设计,通过池设计和高并发来实现。

Kafka线程模型如下图所示:

线程模型中包含四种角色:Acceptor、Processor、RequestChannel、RequestHandlerPool。

模型中各角色分工明确、多线程协作,使Kafka能够同时处理大量有效请求,保持高吞吐量。

1.1.Acceptor

Acceptor是请求接收器,用于接收来自客户端的请求,以及返回响应。

一个Listener对应一个Endpoint,一个Endpoint对应两个Acceptor,分别是controlPlaneAcceptor和dataPlaneAcceptor。

controlPlaneAcceptor和dataPlaneAcceptor是两套相似的处理系统,分别处理不同的请求。

1.2.Processor

Acceptor接收到客户端请求后,把请求交给Processor进行处理。可通过num.network.threads参数配置Processor个数,提升请求并发处理效率,默认为3。

Processor中包含一个responseQueue队列。

1.3.RequestChannel

RequestChannel中封装了一个requestQueue队列,队列大小由queued.max.requests配置决定。如果是control-plane,队列大小固定为20。

RequestChannel中封装了多个Processor,相当于多个responseQueue。

1.4.KafkaRequestHandlerPool

KafkaRequestHandlerPool是请求处理器池,处理器数量由num.io.threads决定,默认为8。如果是control-plane,池中处理器数量为1。

1.5.KafkaScheduler

KafkaScheduler是Kafka Server的后台定时调度器,用于执行LogManager、AlterPartitionManager和ReplicaManager提交的定时任务。

KafkaScheduler中核心线程数由background.threads参数决定,默认为1。

2.文件系统管理

kafka文件结构如下图所示:

一个kafka集群中,可根据需要创建Topic,所以Topic数量时不固定的。

在创建Topic时,需指定该Topic的分区数,以及每个分区的副本数。

每个副本在磁盘中有一个日志目录来存储该副本的数据,副本日志根据消息的序号,可分为不同的Segment。Segment的数量由副本的数据量和Topic的保留时间决定。

每个Segment由三个文件组成。

2.1.Segment log文件

log文件名为“[baseOffset].log”,baseOffset是该Segment中第一条消息的offset,从0开始。这样的设计允许Kafka在恢复时从特定的offset开始重新读取消息。

写入新消息时,数据直接以追加(append)的方式添加到log文件的末尾。由于是追加写入,无论文件多大,写入的时间复杂度都是O(1)。

2.2.Segment index文件

index文件用于提供快速的消息查找。通过index文件,Kafka可以快速定位到消息在log文件中的位置,提高读取性能。

index文件的设计有助于加速消息的检索和定位。

index文件中每条记录都可以转换为一个OffsetIndex对象,OffsetIndex对象中具有两个属性:相对位移和对应消息在log文件中的物理位置。相对位移是个Integer,4字节;消息物理位置也是一个Integer,4字节。因此一个索引记录(OffsetIndex)占8字节。

Kafka​索引文件采用了分段和稀疏索引的方式。这种设计使得通过二分查找快速定位到日志位点成为可能,而且返回的是低位点。与日志文件不同,由于索引文件相对较小,Kafka使用了mmap的方式进行操作,以提高速度。

2.2.1.分段和稀疏索引

index文件采用了分段和稀疏索引的方式。和log文件一样,index文件被划分为多个段,每个log文件都对应一个index文件。每个index文件中中包含对应log文件中部分消息数据的索引项。

注意,索引项不是每个消息都有,而是按照一定规律设置,使得索引文件的大小相对较小。这就是稀疏索引的含义。

如上图所示,log文件中每隔3条消息设置一个索引,而不是一条消息对应一个索引。这样,index文件中索引记录数将会是log文件中消息记录数的1/3。

2.2.2.二分查找

采用二分查找的方式,通过index文件快速定位到特定的日志位点。这样的查找算法具有较高的效率,能够在日志文件很大的情况下快速定位到目标位置。

上图中,如果要读取message5,加入index记录数为10个,则查找顺序为index5--->index3--->index2,这样就能读取到index2对应的message4。

2.2.3.返回低位点

在查找时,索引文件返回的是低位点,即最接近但不超过目标位点的索引项。这样的设计有助于准确定位到目标位置,从而提高读取的准确性。

根据稀疏索引的特性,需要找到低于目标offset对应的消息的,也就是上例中index2,通过index2索引到的log文件中的消息(message4),再顺序往后读,就能读取到message5这条消息。

2.2.4.mmap方式操作

由于index文件相对较小,Kafka使用mmap(内存映射)的方式进行操作。mmap将文件映射到虚拟内存,使得文件的读取和访问可以直接在内存中进行,避免了磁盘IO的开销,提高了操作速度。

2.3.Segment timeindex文件

timeindex文件用于根据时间戳快速查找特定消息的位移值。

timeindex文件中保存的是“<时间戳,相对位移值>”键值对。时间戳是Long类型,相对偏移值是Integer类型。因此,TimeIndex单个索引项需要占12字节。存储同数量索引项,TimeIndex(12字节)比OffsetIndex(8字节)占更多磁盘空间。

虽然OffsetIndex和TimeIndex是不同类型索引,但Kafka内部把二者结合使用。通常先使用TimeIndex寻找满足时间戳要求的消息位移值,然后再利用OffsetIndex定位该位移值所在的物理文件位置。也就是说,如果要使用timeindex文件中的TimeIndex索引,则必须用到index文件中的OffsetIndex索引。

3.顺序磁盘IO

写入新消息时,数据直接以追加(append)的方式添加到log文件的末尾。由于是追加写入,无论文件多大,写入的时间复杂度都是O(1)。

kafka读取消息是根据offset从小到大读取的,因为写入消息是顺序的,所以读取肯定也是顺序的。

机械硬盘的IO测试中发现:顺序读IO的性能是随机读IO的40~400倍;顺序写IO的性能是随机写IO的10~100倍。原因在于随机IO的寻道时间比顺序IO要长。

4.页缓存机制

Kafka并不太依赖JVM内存,更注重充分发挥Page Cache的作用。

Page Cache是操作系统中的一种内存管理技术,它通过将磁盘上的数据块缓存在内存中,提供了对数据的快速访问。如果使用应用层缓存(JVM堆内存)可能会加重垃圾回收GC的负担,导致额外的停顿和延迟增加。

而Page Cache则将磁盘上的数据块缓存在Page Cache中,使得读取操作能够直接在Page Cache上进行,而无需每次都访问物理磁盘。这样的设计在消费和生产速度相当时尤为有效,甚至在某些情况下可以避免直接在物理磁盘上进行数据交换。当数据块已存在于Page Cache中时,读写操作可以在内存中直接进行,避免了较慢的物理磁盘访问。当Page Cache写满的时候,才会由Kafka进行统一的刷盘操作,来完成数据写入磁盘。

另外,即使Kafka发生重启,Page Cache仍然可用,因为Page Cache是由操作系统管理的,而不是由应用程序控制的。这使得Kafka在重启后能够迅速恢复读取性能,而不必等待缓存重新加载。

5.零拷贝机制

在Kafka中,大量的网络数据经过两个关键过程进行持久化和传输,直接影响了整个系统的吞吐量。生产者(Producer)将数据通过网络传输到Broker,并在Broker端进行持久化到磁盘的操作。经过磁盘文件,这些数据再通过网络发送给消费者(Consumer)。这两个环节的性能对Kafka整体的吞吐量产生直接而深刻的影响。

5.1.传统的数据文件拷贝

传统的数据文件拷贝过程如下图所示

大概可以分成四个过程:

(1)操作系统将数据从磁盘中加载到内核空间的Read Buffer(页缓存区)中。

(2)应用程序将Read Buffer中的数据拷贝到应用空间的应用缓冲区中。

(3)应用程序将应用缓冲区的数据拷贝到内核的Socket Buffer中。

(4)操作系统将数据从Socket Buffer中发送到网卡,通过网卡发送给数据接收方。

传统的数据文件传输需要多次在用户态和核心态进行切换,并且需要把数据在用户态和核心态之间拷贝多次,最终才到达网卡,传输到接收方。

5.2.零拷贝(zero copy)

零拷贝中,数据传输的的过程就简化了,如下图所示

共分为三个步骤:

(1)操作系统将数据从磁盘中加载到内核空间的Read Buffer(页缓存区)中。

(2)操作系统之间将数据从内核空间的Read Buffer(页缓存区)传输到网卡中,并通过网卡将数据发送给接收方。

(3)操作系统将数据的描述符拷贝到Socket Buffer中。Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。

通过零拷贝基数,不需要把内核空间页缓存中的数据拷贝到应用层缓存,再从应用层缓存拷贝到Socket缓存,这两次拷贝都省略了,所以叫做零拷贝。

这个过程大大地提升了数据消费时读取文件数据的性能。Kafka从磁盘读取数据的时候,会先看看内核空间的页缓存中是否存在,如果存在,则直接通过网关发送出去。

5.3.DMA技术

DMA全称Direct Memory Access,直接内存访问,是零拷贝基数的基础。

DMA传输将数据从一个地址空间复制到另一个地址空间。当CPU初始化这个传输动作时,传输动作本身是由DMA控制器来实行和完成的。因此通过DMA,硬件可以绕过CPU,自己去直接访问系统主内存。

很多硬件都支持DMA,包括网卡、声卡、磁盘驱动控制器等。

有了DMA技术的支持后,网卡就可以直接去访问内核空间的饿内存,可以实现内核空间和应用空间之间的零拷贝,极大地提升传输性能。

6.其他设计

以下设计也为Kafka高性能表现提供了支持。

6.1.异步处理设计

Kafka采用全异步的设计,确保在发送和接收消息、以及复制数据等操作中基本上没有阻塞操作。

异步发送: 在Kafka中,调用发送方法(send)会立即返回,而不会等待消息实际被发送到服务器。发送的消息会首先被放入生产者的缓冲区(buffer)中。

缓冲区管理: 缓冲区管理是异步操作的关键。Kafka使用一个内部的缓冲区来暂时保存待发送的消息,当缓冲区满了或者达到一定的条件时,消息会被批量发送。这样可以最大程度地减少网络开销和提高吞吐量。

轮询机制: 发送和接收消息、以及复制数据的过程都是通过NetworkClient封装的poll方式进行的。这种轮询机制是异步操作的关键。在生产者和消费者内部,都有一个后台线程负责轮询缓冲区中的消息并将其发送到目标。这种方式充分利用了异步I/O的特性,不阻塞主线程的执行。

回调机制: 在异步发送的过程中,Kafka提供了回调机制,允许你注册回调函数以处理消息发送的结果。这样,你可以在消息成功发送或发送失败时执行相应的逻辑。

6.2.批量操作设计

批量操作在Kafka中是非常关键的性能优化策略之一。结合磁盘顺序写入和异步发送,批量操作可以显著提高Kafka的性能和吞吐量

RecordAccumulator: 在Kafka中,RecordAccumulator是一个用于聚合记录(records)的缓冲区。当生产者发送消息时,消息会首先进入RecordAccumulator,而不是立即发送到服务器。这个缓冲区的存在允许多个消息被批量处理,以减少网络开销。

批量压缩: Kafka支持在消息发送时进行批量压缩,以减小网络传输的数据量。这通过配置Producer的压缩类型(例如snappy、gzip等)来实现。批量压缩可以减轻网络负担,特别是在处理大量数据时,能够显著提高效率。

批量刷盘: Kafka的生产者通常会有一个配置,控制消息何时被批量刷写到磁盘。这可以通过配置batch.size参数,表示RecordAccumulator中的消息数量达到一定阈值时触发批量刷盘。

总的来说,批量操作有助于减少网络开销、提高磁盘顺序写入效率,并在一定程度上降低了系统的延迟。这种机制充分利用了Kafka的异步和缓冲特性,是构建高性能、高吞吐量消息系统的关键设计之一。

6.3.数据压缩机制

文件压缩是Kafka中的一个重要性能优化策略,通过减小数据的存储空间和降低网络传输的数据量,可以提高整体系统的效率。以下是有关Kafka文件压缩的一些关键概念和实现:

压缩类型: Kafka支持多种压缩算法,例如snappy、gzip、lz4等。你可以根据具体的需求选择合适的压缩类型。这可以在Producer和Consumer的配置中进行设置。

Producer端压缩: 在Producer端,你可以配置消息在发送时进行压缩。这可以通过设置Producer的compression.type属性来实现。压缩后的消息将占用更小的存储空间,并在网络上传输时减小数据量。

Consumer端解压缩: 在Consumer端,Kafka会自动解压缩消息,无需额外的配置。Consumer从Broker接收到的消息已经是解压缩后的原始数据。这意味着Consumer不需要关心消息是否经过了压缩。

Broker配置: 在Broker端,你可以配置允许或禁止消息压缩。在Kafka的Broker配置文件中,你可以设置compression.type属性,以决定Broker是否接受和存储压缩后的消息。