CMAK消费组信息管理


发布于 2025-05-11 / 3 阅读 / 0 评论 /
CMAK消费组信息管理机制研究

cmak获取消费组信息

源码过程如下

controllers.Consumer#consumers
    kafkaManager.getConsumerListExtended(cluster)
        KMClusterQueryRequest(clusterName, BVGetConsumerIdentities))
        KMClusterQueryRequest(clusterName, KSGetConsumers)
    consumerList.scala.html
        consumerListContent.scala.html

最终以html内容返回

BVGetConsumerIdentities请求

BVGetConsumerIdentities请求处理过程如下:

BVGetConsumerIdentities请求处理
kafka.manager.actor.cluster.BrokerViewCacheActor#processActorRequest
    case BVGetTopicIdentities =>
        sender ! topicIdentities

KSGetConsumers请求处理

如下所示:

如下所示KSGetConsumers请求处理
kafka.manager.actor.cluster.BrokerViewCacheActor#processActorRequest
    asyncPipeToSender {
        offsetCache.getConsumerList
            kafka.manager.actor.cluster.OffsetCache#getKafkaManagedConsumerList
                从kafkaManagedOffsetCache中获取
            kafka.manager.actor.cluster.OffsetCache#getZKManagedConsumerList
                不通过zookeeper进行消费组管理,此方法结果集为空。
            将以上两者结果集合并
    }

这里最重要的就是从kafkaManagedOffsetCache获取对应的offset缓存信息。

kafkaManagedOffsetCache维护机制

kafkaManagedOffsetCache是KafkaStateActor.scala文件中用于维护kafka消费组偏移量缓存的核心组件。

相关配置

配置有三个

object KafkaManagedOffsetCacheConfig {
  val defaultGroupMemberMetadataCheckMillis: Int = 30000
  val defaultGroupTopicPartitionOffsetMaxSize: Int = 1000000
  val defaultGroupTopicPartitionOffsetExpireDays: Int = 7
}

对应的配置值为

kafka-managed-offset-metadata-check-millis:表示多长时间拉取一次offset信息。

kafka-managed-offset-group-cache-size:表示缓存的大小,默认1MB

kafka-managed-offset-group-expire-days:表示消费组未更新的过期时间,过期时间为7天。