KafkaConsumer是一个从Kafka集群消费消息的客户端。这个客户端可以明显处理Kafka Broker的失败,也能明显适应在集群内获取的topic分区。这个客户端也与broker交互,允许一群consumer之间均衡消费。
KafkaConsumer引用链
KafkaConsumer维护了与必要的Broker之间的TCP连接,用于提取数据。如果使用之后没有关闭consumer,就会失去这些连接。
KafkaConsumer线程不安全
KafkaConsumer不是线程安全的,在执行每个公用方法之前,KafkaConsumer会调用acquire()方法,该方法用于检测是否只有一个线程在进行操作。
/**
* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
* supported).
* @throws ConcurrentModificationException if another thread already has the lock
*/
private void acquire() {
final Thread thread = Thread.currentThread();
final long threadId = thread.getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
"currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
" otherThread(id: " + currentThread.get() + ")"
);
refcount.incrementAndGet();
}
这实际上通过CAS的方式来获取当前KafkaConsumer的使用权。如果获取不到,则抛出异常。
在执行线程完成后,会调用release()方法来释放KafkaConsumer的使用权。
/**
* Release the light lock protecting the consumer from multi-threaded access.
*/
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
poll机制
Kafka采用消息拉取模型,要求消费者通过主动调用KafkaConsumer#poll(java.time.Duration)方法向broker拉取数据。虽然Kafka并未限制获取数据后的消费方式,但为了平衡完备的功能和客户端易用性,将consumer设计为以单线程持续调用poll方法的形式来拉取消息。
poll方法内部并非简单地发送请求给broker并等待响应,然后将消息数据返回给调用方。实际上Fetch内部维护了一个链表ConcurrentLinkedQueue completedFetches,用来缓存已经拉取到的消息数据。
每次当Kafka Consumer调用poll方法的时候会先从completedFetches缓存中(线程安全的链表)查找是否存在未消费的数据,如果存在未消费的数据,Kafka直接解码后返回。
如果缓冲区中没有未消费数据,则根据订阅的情况向所有相关的broker节点发送异步请求,异步响应的结果都会存储在缓存冲区。
消费者线程会等到知道缓冲区有可用数据或者超时,循环解析缓冲链表中的数据,返回不超过(max.poll.records)的消息。
Fetch过程
KafkaConsumer实例中会创建一个Fetcher实例,用于发送FetchRequest,针对订阅到的Partition发送FetchRequest。
与四个参数相关
(1)fetch.max.bytes:一个FetchRequest最大请求的数据量。
(2)fetch.min.bytes:一个FetchRequest最少请求的数据量。
(3)max.partition.fetch.bytes:一个FetchRequest在每个分区上最多拉取多少数据。
(4)fetch.max.wait.ms:服务端返回FetchRequest请求最长等待时间。