KafkaConsumer消息消费过程


发布于 2024-07-31 / 51 阅读 / 0 评论 /
本文基于kafka3.6.0源码,从源码级别解析Kafka消息消费过程

KafkaConsumer是一个从Kafka集群消费消息的客户端。这个客户端可以明显处理Kafka Broker的失败,也能明显适应在集群内获取的topic分区。这个客户端也与broker交互,允许一群consumer之间均衡消费。

KafkaConsumer引用链

KafkaConsumer维护了与必要的Broker之间的TCP连接,用于提取数据。如果使用之后没有关闭consumer,就会失去这些连接。

KafkaConsumer线程不安全

KafkaConsumer不是线程安全的,在执行每个公用方法之前,KafkaConsumer会调用acquire()方法,该方法用于检测是否只有一个线程在进行操作。

    /**
     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
     * supported).
     * @throws ConcurrentModificationException if another thread already has the lock
     */
    private void acquire() {
        final Thread thread = Thread.currentThread();
        final long threadId = thread.getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
                    "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
                    " otherThread(id: " + currentThread.get() + ")"
            );
        refcount.incrementAndGet();
    }

这实际上通过CAS的方式来获取当前KafkaConsumer的使用权。如果获取不到,则抛出异常。

在执行线程完成后,会调用release()方法来释放KafkaConsumer的使用权。

    /**
     * Release the light lock protecting the consumer from multi-threaded access.
     */
    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }

poll机制

Kafka采用消息拉取模型,要求消费者通过主动调用KafkaConsumer#poll(java.time.Duration)方法向broker拉取数据。虽然Kafka并未限制获取数据后的消费方式,但为了平衡完备的功能和客户端易用性,将consumer设计为以单线程持续调用poll方法的形式来拉取消息。

poll方法内部并非简单地发送请求给broker并等待响应,然后将消息数据返回给调用方。实际上Fetch内部维护了一个链表ConcurrentLinkedQueue completedFetches,用来缓存已经拉取到的消息数据。

每次当Kafka Consumer调用poll方法的时候会先从completedFetches缓存中(线程安全的链表)查找是否存在未消费的数据,如果存在未消费的数据,Kafka直接解码后返回。

如果缓冲区中没有未消费数据,则根据订阅的情况向所有相关的broker节点发送异步请求,异步响应的结果都会存储在缓存冲区。

消费者线程会等到知道缓冲区有可用数据或者超时,循环解析缓冲链表中的数据,返回不超过(max.poll.records)的消息。

Fetch过程

KafkaConsumer实例中会创建一个Fetcher实例,用于发送FetchRequest,针对订阅到的Partition发送FetchRequest。

与四个参数相关

(1)fetch.max.bytes:一个FetchRequest最大请求的数据量。

(2)fetch.min.bytes:一个FetchRequest最少请求的数据量。

(3)max.partition.fetch.bytes:一个FetchRequest在每个分区上最多拉取多少数据。

(4)fetch.max.wait.ms:服务端返回FetchRequest请求最长等待时间。

Kafka消费过程源码解析

以下内容来自Kafka-3.9.0源码。

Kafka消费的完整过程可分为以下步骤。

KafkaConsumer初始化

KafkaConsumer初始化过程如下所示:

new KafkaConsumer(config)
	ConsumerDelegateCreator#create(config)
		// 根据group.protocol配置进行不同类的初始化
		if 配置值为CONSUMER
			new AsyncKafkaConsumer(config)
		else
			// 配置值为Classic,表示经典的消费者,默认模式
			new ClassicKafkaConsumer(config)
				groupRebalanceConfig = new GroupRebalanceConfig(config)
				this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
				this.clientId = config.getString('client.id');
				boolean enableAutoCommit = config.getBoolean('enable.auto.commit');
				this.requestTimeoutMs = config.getInt('request.timeout.ms');
				this.defaultApiTimeoutMs = config.getInt('default.api.timeout.ms');
				this.retryBackoffMs = config.getLong('retry.backoff.ms');
				this.retryBackoffMaxMs = config.getLong('retry.backoff.max.ms');
				// 初始化消费者元数据管理器
				this.metadata = new ConsumerMetadata(config)
				// 初始化ConsumerNetworkClient
				this.client = createConsumerNetworkClient(config, metadata)
				// 分区分配策略
				this.assignors = ConsumerPartitionAssignor.getAssignorInstances(config)
				// 消费组协调器
				this.coordinator = new ConsumerCoordinator(config, client, metadata)
				this.fetcher = new Fetcher<>(client, metadata)
				this.offsetFetcher = new OffsetFetcher(client, metadata)
				this.topicMetadataFetcher = new TopicMetadataFetcher(client)

ClassicKafkaConsumer是最常用的。

订阅topic或分区

通过subscribe方法进行topic的订阅,也可以通过assign方法指定对应的分区进行消费。

subscribe模式

subscribe模式下,coordinator会为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给相同group下的不同consumer。

subscribe方法执行过程如下:

org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer#subscribe(topics)
	ClassicKafkaConsumer#acquireAndEnsureOpen()
		// 获取轻量级锁,确保消费者未被关闭
		ClassicKafkaConsumer#acquire()
			// 检测是否只有一个线程在进行操作。如果有其他线程正在操作,acquire()将抛出ConcurrentModificationException异常。
			long threadId = Thread.currentThread().getId();
			if (threadId != currentThread.get() && !currentThread.compareAndSet(-1, threadId))
				// 当前线程不是消费者标记的currentThread,则说明是多线程不安全导致,抛异常
				throw new ConcurrentModificationException()
			refcount.incrementAndGet()
		if (this.closed) 
			// 如果消费者被关闭了,则释放轻量级锁,确保consumer不会被多线程访问
			ClassicKafkaConsumer#release
				currentThread.set(-1);
	// 获取当前订阅的topic-partitions信息
	Set<TopicPartition> currentTopicPartitions
	// 清理未被订阅的topic-partitions信息
	fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions)
	if(this.subscriptions.subscribe(topics))
		// 更新元数据
		metadata.requestUpdateForNewTopics()

主要就是准备和更新订阅的topic-partition元数据信息

执行线程完成后,会调用release()方法来释放KafkaConsumer的使用权。

assign模式

assign模式下,需要开发者明确为consumer指定需要消费的topic-partitions,不受group.id限制,相当于指定的group.id无效。

assign方法执行过程如下:

org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer#assign(partitions)
	ClassicKafkaConsumer#acquireAndEnsureOpen()
		// 获取轻量级锁,确保消费者未被关闭
		ClassicKafkaConsumer#acquire()
			// 检测是否只有一个线程在进行操作。如果有其他线程正在操作,acquire()将抛出ConcurrentModificationException异常。
			long threadId = Thread.currentThread().getId();
			if (threadId != currentThread.get() && !currentThread.compareAndSet(-1, threadId))
				// 当前线程不是消费者标记的currentThread,则说明是多线程不安全导致,抛异常
				throw new ConcurrentModificationException()
			refcount.incrementAndGet()
		if (this.closed) 
			// 如果消费者被关闭了,则释放轻量级锁,确保consumer不会被多线程访问
			ClassicKafkaConsumer#release
				currentThread.set(-1);
	// 清理未被订阅的topic-partitions信息
	fetcher.clearBufferedDataForUnassignedPartitions(partitions)
	if (coordinator != null)
		// 如果enable.auto.commit为true,尝试指定异步自动offset提交,
		this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
	if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
		// 更新元数据
		metadata.requestUpdateForNewTopics()

执行线程完成后,会调用release()方法来释放KafkaConsumer的使用权。

poll拉取消息

通过poll方法阻塞式拉取消息

poll方法执行过程如下所示:

org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer#poll(timeout)
	// 初始化定时器
	Timer timer = time.timer(timeout)
	// Consumer检查
	ClassicKafkaConsumer#acquireAndEnsureOpen()
	while (timer.notExpired()) // 定时器未超时,则循环执行以下逻辑
		// 唤醒NetworkClient
		ConsumerNetworkClient#maybeTriggerWakeup()
		ClassicKafkaConsumer#updateAssignmentMetadataIfNeeded(timer, false)
		Fetch<K, V> fetch = ClassicKafkaConsumer#pollForFetches(timer);
			Fetch<K, V> fetch = fetcher.collectFetch();
			if (!fetch.isEmpty())
				// 如果数据集已然可得,则直接返回
				return fetch;
			ClassicKafkaConsumer#sendFetches();
				return fetcher.sendFetches()
			ConsumerNetworkClient#poll()
				NetworkClient#poll()
			return fetcher.collectFetch();
		if (!fetch.isEmpty()) // fetch结果不为空
			// 返回fetch结果集之前,我们可以发送下一轮的fetch请求,避免用户处理fetch结果集时管道流被阻塞等待
			if (sendFetches() > 0 || client.hasPendingRequests())
				ConsumerNetworkClient#transmitSends();
			return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
	// 定时器超时后,返回空结果集
	return ConsumerRecords.empty();

Fetcher内部维护了一个链表ConcurrentLinkedQueue completedFetches,用来缓存已经拉取到的消息数据。

每次当Kafka Consumer调用poll方法的时候会先从completedFetches缓存中(线程安全的链表)查找是否存在未消费的数据,如果存在未消费的数据,Kafka直接解码后返回。

如果缓冲区中没有未消费数据,则根据订阅的情况向所有相关的broker节点发送异步请求,异步响应的结果都会存储在缓存冲区。

消费者线程会等到知道缓冲区有可用数据或者超时,循环解析缓冲链表中的数据,返回不超过(max.poll.records)的消息。