KafkaConsumer消息消费过程


发布于 2024-07-31 / 41 阅读 / 0 评论 /
本文基于kafka3.6.0源码,从源码级别解析Kafka消息消费过程

KafkaConsumer是一个从Kafka集群消费消息的客户端。这个客户端可以明显处理Kafka Broker的失败,也能明显适应在集群内获取的topic分区。这个客户端也与broker交互,允许一群consumer之间均衡消费。

KafkaConsumer引用链

KafkaConsumer维护了与必要的Broker之间的TCP连接,用于提取数据。如果使用之后没有关闭consumer,就会失去这些连接。

KafkaConsumer线程不安全

KafkaConsumer不是线程安全的,在执行每个公用方法之前,KafkaConsumer会调用acquire()方法,该方法用于检测是否只有一个线程在进行操作。

    /**
     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
     * supported).
     * @throws ConcurrentModificationException if another thread already has the lock
     */
    private void acquire() {
        final Thread thread = Thread.currentThread();
        final long threadId = thread.getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
                    "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
                    " otherThread(id: " + currentThread.get() + ")"
            );
        refcount.incrementAndGet();
    }

这实际上通过CAS的方式来获取当前KafkaConsumer的使用权。如果获取不到,则抛出异常。

在执行线程完成后,会调用release()方法来释放KafkaConsumer的使用权。

    /**
     * Release the light lock protecting the consumer from multi-threaded access.
     */
    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }

poll机制

Kafka采用消息拉取模型,要求消费者通过主动调用KafkaConsumer#poll(java.time.Duration)方法向broker拉取数据。虽然Kafka并未限制获取数据后的消费方式,但为了平衡完备的功能和客户端易用性,将consumer设计为以单线程持续调用poll方法的形式来拉取消息。

poll方法内部并非简单地发送请求给broker并等待响应,然后将消息数据返回给调用方。实际上Fetch内部维护了一个链表ConcurrentLinkedQueue completedFetches,用来缓存已经拉取到的消息数据。

每次当Kafka Consumer调用poll方法的时候会先从completedFetches缓存中(线程安全的链表)查找是否存在未消费的数据,如果存在未消费的数据,Kafka直接解码后返回。

如果缓冲区中没有未消费数据,则根据订阅的情况向所有相关的broker节点发送异步请求,异步响应的结果都会存储在缓存冲区。

消费者线程会等到知道缓冲区有可用数据或者超时,循环解析缓冲链表中的数据,返回不超过(max.poll.records)的消息。

Fetch过程

KafkaConsumer实例中会创建一个Fetcher实例,用于发送FetchRequest,针对订阅到的Partition发送FetchRequest。

与四个参数相关

(1)fetch.max.bytes:一个FetchRequest最大请求的数据量。

(2)fetch.min.bytes:一个FetchRequest最少请求的数据量。

(3)max.partition.fetch.bytes:一个FetchRequest在每个分区上最多拉取多少数据。

(4)fetch.max.wait.ms:服务端返回FetchRequest请求最长等待时间。