KafkaProducer消息生产过程


发布于 2024-07-31 / 38 阅读 / 0 评论 /
基于kafka3.6.0源码,解析KafkaProducer生产消息过程

KafkaProducer是kafka最重要的几个api之一。

1.KafkaProducer类结构

我们通过KafkaProducer中的属性链路来解析KafkaProducer的类结构。

属性链路对我们解析KafkaProducer的消息生产过程十分重要。

2.KafkaProducer线程模型

KafkaProducer进程中包含两个重要的线程:main和producer

2.1.KafkaProducer主线程

主线程名为main,执行过程如下图所示:

主线程最终把消息写入到了内存中,而内存中的数据需要通过io线程发送给Kafka Broker。

2.2.KafkaProducer IO线程

线程名前缀为kafka-producer-network-thread,IO线程执行过程如下图所示:

主要通过Sender来完成,kafka自身通过socket编程,以NIO方式实现对数据的发送。

Produce过程源码解析

基于Kafka-3.9.0源码解析Produce过程。

整个Produce过程分为以下几个部分。

Producer初始化

过程如下所示:

KafkaProducer(ProducerConfig config) {
	// 从配置中获取事务id
	String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
	// 从配置中获取客户端id编号
	this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
	// 初始化监控
	List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
	// 初始化分区器
	this.partitioner
	// 初始化拦截器列表
	List<ProducerInterceptor<K, V>> interceptorList
	// 初始化事务管理器
	this.transactionManager = configureTransactionState(config);
	// 初始化消息累积器,accumulator负责将消息缓存并组装成批次
	this.accumulator = new RecordAccumulator()
	// 初始化生产者元数据管理器,metadata用于获取Kafka集群的元数据信息,如Topic分区分布、Broker地址
	this.metadata
	// 初始化消息发送器,sender专门负责将批次消息发送到Broker
	this.sender = newSender()
	this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
	// 启动发送线程
	this.ioThread.start();
}

调用send方法发送消息

通过KafkaProducer类的send方法可向topic生产消息,send执行过程如下所示:

org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord<K, V> record, Callback callback)
	// 按消息发送
	KafkaProducer.doSend(record, callback)
		// 阻塞获取元数据,直到超时,超时时间为max.block.ms,默认60秒
		waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
		// 对key进行序列化
		byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
		// 对value进行序列化
		byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
		// 分区器为消息分配分区编号
		int partition = partition(record, serializedKey, serializedValue, cluster);
		// 消息收集器收集消息
		RecordAccumulator.RecordAppendResult result = accumulator.append(record, partition, serializedKey, serializedValue)
		if (result.abortForNewBatch)
			// 需要开启新的批次再往消息收集器发送
			result = accumulator.append(record, partition, serializedKey, serializedValue)
		if (result.batchIsFull || result.newBatchCreated)
			// 批次已满,或者说新的批次刚创建,则唤醒sender
			Sender.wakeup();
				KafkaClient.wakeup() // 实际是NetworkClient
					org.apache.kafka.common.network.Selector#wakeup
						java.nio.channels.Selector#wakeup
		return result.future;

往消息收集器添加消息

RecordAccumulator是生产者实现高性能写的关键组件,其核心是缓存信息并构建消息批次。RecordAccumulator内部维护了一个Deque<ProducerBatch>队列,用于存储待发送的批次,通过BufferPool管理内存缓冲区,避免频繁的内存分配与释放。

RecordAccumulator的append方法往对应的队列中添加消息,执行过程如下所示:

org.apache.kafka.clients.producer.internals.RecordAccumulator#append(String topic, int partition, long timestamp, byte[] key, byte[] value, Header[] headers, AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, Cluster cluster)
	// 当前正在append的消息数量加一,同一个生产者被多个线程调用生产消息
	appendsInProgress.incrementAndGet();
	ByteBuffer buffer = null;
	while (true) // 无限循环
		// 计算有效分区号,如果方法调用时的partition为-1,则将最近发送到broker批次对应的分区作为有效分区号,否则使用方法调用时的partition
		int effectivePartition
		// 检查有效分区号是否有对应的消息批次,没有则新增
		Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
		synchronized (dq) // 对dq加锁,防止多个线程同时往一个topic-partition中添加消息
			// 尝试添加消息
			RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
				// 获取最后一个消息批次
				ProducerBatch last = deque.peekLast();
				if (last != null)
					// 尝试往消息批次添加消息
					FutureRecordMetadata future = last.tryAppend
					int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
					// 当dq队列中的数量大于1,或者最后一个批次已满,则表示需要通过网络线程将消息发送到broker
	                return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false, false, appendedBytes);
			if (appendResult != null) 
				表示添加成功
		if (abortOnNewBatch)
			return new RecordAppendResult(null, false, false, true, 0);
		if (buffer == null)
			// 计算缓冲区大小,当前消息量与batch.size之间取最大值
			int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
			// 阻塞式申请缓冲区大小,
			buffer = free.allocate(size, maxTimeToBlock);
		synchronized (dq) 
			// 判断是否跟上一次的添加的分区号一致,如果一致,则continue进入下一轮循环
			// 新增ProducerBatch
			RecordAppendResult appendResult = appendNewBatch
			// 更新最近一次新增ProducerBatch对应的分区号
			topicInfo.builtInPartitioner.updatePartitionInfo()
			return appendResult;
append方法的最后需要进行资源释放
	free.deallocate(buffer);
	appendsInProgress.decrementAndGet();

Sender提取ProducerBatch

Sender线程负责从RecordAccumulator中取出满足发送条件的ProducerBatch,并通过NetworkClient将消息发送到Broker。

org.apache.kafka.clients.producer.internals.Sender#run
	if (transactionManager != null)
		// 事务管理器不为空
		transactionManager.setPoisonStateOnInvalidTransition(true);
	while (running) 
		// 线程运行状态,则进入无限循环
		runOnce()
	while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests()))
		// 我们停止线程接收多余请求后,可能还会有一些正在处理的请求或者在收集器未完成的请求,亦或是某些在等待响应的请求,需要将这些处理完线程才能结束
		runOnce()
	while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction())
		// 终止事务管理器后,如果还有些commit或abort的事务没有进入到事务管理器的队列,则需要将这些食物处理完才能结束
		if (!transactionManager.isCompleting())
			// 如果事务未结束,则启动终止事务
			transactionManager.beginAbort()
		runOnce()
	if (forceClose)
		// 停止事务管理器
		transactionManager.close()
		// RecordAccumulator终止未完成的批量
		accumulator.abortIncompleteBatches()
	// 终止网络连接
	this.client.close()

runOnce()方法将启动一次发送消息的发送,执行过程如下

org.apache.kafka.clients.producer.internals.Sender#runOnce
	// 执行一次发送任务
	if (transactionManager != null)
		// 进行事务处理
	long currentTimeMs = time.milliseconds();
	// 准备需要发送的消息
	long pollTimeout = sendProducerData(currentTimeMs);
		// 获取元数据
		metadataSnapshot = metadata.fetchMetadataSnapshot()
		// 获取待发送的分区数据
		result = this.accumulator.ready(metadataSnapshot
		if (!result.unknownLeaderTopics.isEmpty())
			// 如果某些分区的leader为-1,则强制进行对应topic的元数据更新
			this.metadata.requestUpdate(false);
		// 移除所有无法发送消息的节点
		iter = result.readyNodes.iterator()
		while (iter.hasNext())
			node = iter.next()
			if (!this.client.ready(node, now))
				// 节点无法连接,移除改节点
				this.accumulator.updateNodeLatencyStats(node.id(), now, false);
		// 创建Produce请求
		// 获取所有待发送的消息批次,按分区进行分组
		Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain()
		addToInflightBatches(batches);
		// 计算超时时间
		long pollTimeout
		// 发送请求
		sendProduceRequests(batches, now);
			// 针对每个broker,分别发送消息
			for (nodeId : batches.entrySet())
				sendProduceRequest(nodeId, batches.get(nodeId))
					// 事务处理
					// 构建ProduceRequest
					ProduceRequest.Builder requestBuilder
					// 构建回调函数
					RequestCompletionHandler callback
					// 构建客户端请求,acks起作用就在此处,如果acks不为0,表示需要等待响应消息,否则发送完成即可。
					ClientRequest clientRequest = NetworkClient.newClientRequest(nodeId, requestBuilder, callback)
					// 发送请求
					NetworkClient.send(clientRequest, now);
		return pollTimeout;
	// 通过socket发送请求,并等待响应,带有超时时间
	NetworkClient.poll(pollTimeout, currentTimeMs);

NetworkClient发送消息

NewworkClient基于Java NIO实现非阻塞网络通信,通过Selector管理网路连接和I/O操作。

org.apache.kafka.clients.NetworkClient#send
	RequestHeader header = clientRequest.makeHeader(request.version());
	Send send = request.toSend(header);
	selector.send(new NetworkSend(clientRequest.destination(), send));
		String connectionId = send.destinationId();
		KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
		channel.setSend(send);

Selector会不断轮训检查网络连接状态,当连接可写时,将数据写入SocketChannel,实现高效的网络传输。