cmak获取消费组信息
源码过程如下
controllers.Consumer#consumers
kafkaManager.getConsumerListExtended(cluster)
KMClusterQueryRequest(clusterName, BVGetConsumerIdentities))
KMClusterQueryRequest(clusterName, KSGetConsumers)
consumerList.scala.html
consumerListContent.scala.html
最终以html内容返回
BVGetConsumerIdentities请求
BVGetConsumerIdentities请求处理过程如下:
BVGetConsumerIdentities请求处理
kafka.manager.actor.cluster.BrokerViewCacheActor#processActorRequest
case BVGetTopicIdentities =>
sender ! topicIdentities
KSGetConsumers请求处理
如下所示:
如下所示KSGetConsumers请求处理
kafka.manager.actor.cluster.BrokerViewCacheActor#processActorRequest
asyncPipeToSender {
offsetCache.getConsumerList
kafka.manager.actor.cluster.OffsetCache#getKafkaManagedConsumerList
从kafkaManagedOffsetCache中获取
kafka.manager.actor.cluster.OffsetCache#getZKManagedConsumerList
不通过zookeeper进行消费组管理,此方法结果集为空。
将以上两者结果集合并
}
这里最重要的就是从kafkaManagedOffsetCache获取对应的offset缓存信息。
kafkaManagedOffsetCache维护机制
kafkaManagedOffsetCache是KafkaStateActor.scala文件中用于维护kafka消费组偏移量缓存的核心组件。
相关配置
配置有三个
object KafkaManagedOffsetCacheConfig {
val defaultGroupMemberMetadataCheckMillis: Int = 30000
val defaultGroupTopicPartitionOffsetMaxSize: Int = 1000000
val defaultGroupTopicPartitionOffsetExpireDays: Int = 7
}
对应的配置值为
kafka-managed-offset-metadata-check-millis:表示多长时间拉取一次offset信息。
kafka-managed-offset-group-cache-size:表示缓存的大小,默认1MB
kafka-managed-offset-group-expire-days:表示消费组未更新的过期时间,过期时间为7天。