kafka监控指标


发布于 2025-05-23 / 17 阅读 / 0 评论 /
Kafka监控指标体系介绍

Kafka JMX监控指标可参考官方文档。

Broker监控指标

有以下指标

(1)kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

Broker每秒接收到的消息数量。包含客户端的消息生产,以及本Broker向其他Broker进行数据同步的消息数量。

(2)kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec

Broker每秒从客户端接收到的消息量(单位是bytes)

(3)kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

Broker每秒从其他Broker同步到的的消息量(单位是bytes)

(4)kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower},version=([0-9]+)

Broker接收到的请求速率,请求类型有Produce、FetchConsumer、FetchFollower三种。

(5)kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)

Broker每秒接收到请求错误的数量,如果Broker响应中有多个错误,则会统计多次。根据请求类型和错误类型进行分组。error为NONE表示请求成功

(6)kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+)

请求报文的大小,根据请求类型进行分组,可得到Broker启动以来不同类型所有请求报文的总大小。

(7)kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|Fetch}

用于消息格式转换和解密的临时内存大小。

(8)kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}

消息格式转换花费的时间总和,毫秒为单位。

(9)kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)

每秒需要进行消息格式转换的消息数量。

(10)kafka.network:type=RequestChannel,name=RequestQueueSize

请求队列的大小。

(11)kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec

Broker每秒向客户端发送的数据量(单位是bytes)

(12)kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec

Broker每秒向其他Broker发送的数据量(单位是bytes)

(13)kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec

对于compacted topic,如果未指定key,消息验证失败的速率(条/秒)

(14)kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec

因无效的magic值导致消息校验失败的速率(条/秒)

(15)kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec

因无效的crc校验值导致消息校验失败的速率(条/秒)

(16)kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec

因不连续的offset或batch中不连续的sequence number导致消息校验失败的速率(条/秒)

(17)kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs

Log Flush的速率和时间。

(18)kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

被踢出ISR的副本数量。正常值为0.

Kafka 分区 Leader 节点收到消息后,会同步给 Follower 节点。集群健康的情况下,UnderReplicatedPartitions 值等于 0,这时同步正常的 Follower 节点数量(也就是 ISR)等于总的 Follower 节点数量。如果这个指标值大于0,比如等于 1,说明有一个 Follower 同步异常,被踢出ISR。

(19)kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount

ISR副本数小于min.insync.replicas的分区数量。正常值为0。

如果某个分区的ISR副本数小于min.insync.replicas,则此分区不可用。

(20)kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount

ISR副本数等于min.insync.replicas的分区数量。

(21)kafka.log:type=LogManager,name=OfflineLogDirectoryCount

Log目录Offline的数量。正常值为0。

(22)kafka.controller:type=KafkaController,name=ActiveControllerCount

当前活跃的Controller数量。只有1个Broker此指标值为1,其他的Broker此指标值为0。

Kafka Broker 集群中有一个节点是 Controller 节点,这个节点非常重要,负责监听 Partition、Topic 和 Broker 的变化,以及元数据管理。

ActiveControllerCount 指标表示当前 Broker 节点是否是 Controller 节点,集群健康的情况下,有且仅有一个 Broker 节点这个指标值是 1。如果有多个 Broker 这个指标值是 1,或者所有 Broker 指标值都是 0,就需要进行故障排查。

(23)kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs

Leader选举的速率和耗时。非0表示有broker有异常。

当分区 Leader 节点挂了之后,就会触发选举新的 Leader。这个指标值表示选举新 Leader 的频率(每秒多少次)和集群中无 Leader 节点的时长。触发 Leader 选举,肯定是旧的 Leader 下线,所以需要定位分析原因。

(24)kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec

每秒进行的Unclean Leader选举的频率。正常值为0。

当 Broker 集群找不到分区 Leader 时,需要从 ISR 集合中选出新的 Leader 节点。而如果 ISR 集合没有节点,那就得从未同步的 Follower 中选出 Leader 节点,让集群处于可用状态,但这个时候因为消息未同步,会有消息丢失。所以这个指标有数据时,代表可能有消息丢失。

(25)kafka.controller:type=KafkaController,name=TopicsToDeleteCount

等待删除的Topic的数量。

(26)kafka.controller:type=KafkaController,name=ReplicasToDeleteCount

等待删除的副本数。

(27)kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount

表示当前无法删除的Topic数量。

这个指标反映了在尝试删除主题时,由于某些原因(如主题正在使用中、存在活跃的消费者等)而无法删除的主题数量。在 Kafka中,主题的删除操作并不是立即完成的,而是需要等待所有相关的数据和操作完成。如果存在以下情况之一,主题将无法被删除:

‌主题正在使用中‌:如果有消费者正在订阅该主题,或者有生产者正在向该主题发送消息,那么该主题将无法被删除。

‌存在活跃的消费者‌:如果主题有活跃的消费者,即有消费者正在读取该主题的消息,那么该主题也无法被删除。

‌存在未完成的日志段‌:如果主题中存在未完成的日志段,即有些消息还没有被完全写入日志段,那么该主题也无法被删除。

(28)kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount

表示当前无法删除的副本数量。

(29)kafka.server:type=ReplicaManager,name=PartitionCount

当前Broker节点分区数。

(30)kafka.server:type=ReplicaManager,name=LeaderCount

当前Broker Leader 副本数。

(31)kafka.server:type=ReplicaManager,name=IsrShrinksPerSec

表示 ISR 收缩的频率。

如果这个指标的值很高,那集群中必定有 Follower 节点频繁地退出 ISR。这个时候就需要定位有 Follower 频繁退出 ISR 的原因。

(32)kafka.server:type=ReplicaManager,name=IsrExpandsPerSec

表示 ISR 扩容的频率。

如果这个指标的值很高,那集群中必定有 Follower 节点频繁地进入ISR。这个时候就需要定位有 Follower 频繁进入 ISR 的原因。

(33)kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica

fellower与leader之间最大的未同步消息量(条)。

此监控值应当和生产者的最大批量相当。

(34)kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

某个分区副本的消息滞后量(条)

(35)kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce

Produce请求的purgatory队列大小,如果acks是-1,则此值非0

purgatory作为一个临时存放的区域,使得生产(produce)和消费(fetch)的请求在那里等待直到被需要的时候。留意 purgatory 的大小有助于确定潜伏期的根本原因。例如,如果 purgatory 队列中获取请求的数量相应增加,则可以很容易地解释消费者获取时间的增加。

(36)kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch

Fetch请求的purgatory队列大小,与消费者fetch.wait.max.ms参数相关

(37)kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower}

Broker 处理一笔请求的总时间。比如处理 Producer 发送请求、Consumer 拉取请求、Follower 拉取请求。

这个时间如果出现了比较大的波动,需要查看 Broker 的资源情况并考虑应对方案。

(38)kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}

请求队列中,不同类型的请求等待的时间。

(39)kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower}

不同请求在leader中处理的耗时。

(40)kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower}

follower发送的请求等待的时间。

对于produce请求,如果acks是-1,则此值非0

(41)kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}

响应队列中,不同请求类型的等待耗时。

(42)kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower}

发送响应的耗时。如果此值变大,可能是网络繁忙或者网络限速。

(43)kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent

network processor处于空闲的时间比率,值在0~1之间,理想状态下是大于0.3

(44)kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount

表示因为连接过期而被关闭的客户端连接数。

当客户端与Kafka集群建立连接后,出现以下场景,那么这个连接就被认为是“过期”的。在这种情况下,Kafka服务器可以选择关闭这个连接以释放资源。

  • 客户端不活动:客户端可能因为某些原因(如应用程序崩溃、网络问题、客户端代码逻辑错误等)而没有向服务器发送任何数据或请求。

  • 配置问题:Kafka服务器或客户端的配置可能不正确,例如socket.timeout.ms和connections.max.idle.ms的设置可能不合适。

  • 网络问题:网络延迟或不稳定可能导致客户端无法及时发送心跳或数据,从而被服务器判定为过期。

(45)kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count

表示因为连接过期而被关闭的客户端连接数。根据listener、processor进行分组。

(46)kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent

接口处理线程平均空闲率。范围是0~1,理想状态是大于0.3

(47)kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+)

针对某个客户端或某个用户的请求的带宽限额。

输出有两个属性

  • throttle-time:表示客户端被限制的时间,毫秒为单位。

  • byte-rate:表示生产/消费的速率,单位是bytes/sec。

对于(user, client-id)限额,需要同时指定user和client-id。如果只需要使用client-id进行限额,则不需要指定user。如果只需要使用user进行限额,则不需要指定client-id。

(48)kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+)

对user或client-id的请求限额。

输出有两个属性

  • throttle-time:表示客户端被限制的时间,单位是毫秒。理想状态下是0

  • request-time:表示broker内network和I/O线程处理客户端请求的时间比例。

对于(user, client-id)限额,需要同时指定user和client-id。如果只需要使用client-id进行限额,则不需要指定user。如果只需要使用user进行限额,则不需要指定client-id。

(49)kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs

Broker发送zookeeper客户端请求的延时

(50)kafka.server:type=SessionExpireListener,name=SessionState

zookeeper连接的状态,取值可能是Disconnected、SyncConnected、AuthFailed、ConnectedReadOnly、SaslAuthenticated、Expired之一

(51)kafka.server:type=group-coordinator-metrics,name=partition-load-time-max

加载消费组信息的最大耗时,单位是毫秒。

过去30秒coordinator从__consumer_offsets分区中拉去消费组offset和元数据的时间,包括调度的时间。

(52)kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg

加载消费组信息的平均耗时,单位是毫秒。

(53)kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max

加载事务元数据信息的最长耗时,单位是毫秒。

过去30秒coordinator从__transactions_offsets分区中拉取事务元数据的时间,包括调度的时间。

(54)kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg

加载事务元数据信息的平均耗时,单位是毫秒。

(55)kafka.server:type=GroupMetadataManager,name=NumOffsets

已提交的消费组offset信息。

(56)kafka.server:type=GroupMetadataManager,name=NumGroups

已提交的消费组数量

(57)kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead]

不同状态消费组的数量

(58)kafka.server:type=ReplicaManager,name=ReassigningPartitions

正在重分布的分区的数量

(59)kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec

Broker输出的分区重分布流量。

(60)kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec

输入到Broker的分区重分布流量。

(61)kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+)

Broker磁盘中某个分区的消息量,单位是bytes

(62)kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+)

Broker磁盘中某个分区的segement数量。

(63)kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+)

Broker中某个分区的开始offset

(64)kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+)

Broker中某个分区的最新offset

Kafka客户端相关的监控指标

producer、consumer、connector、streams实例中通用的监控指标

(1)kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)

这是在客户端实例中的监控指标

包含以下属性

(2)kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)

这是在Broker中的监控指标

包含以下属性

  • outgoing-byte-rate:一个Broker节点发送的数据流量速率,单位是bytes/sec

  • outgoing-byte-total:一个Broker节点发送的数据流量,单位是bytes

  • request-rate:一个Broker节点发送请求的速率,单位是个/秒。

  • request-total:一个Broker节点发送请求的总量,单位是个。

  • request-size-avg:一个Broker节点的某个时间窗口中,发送请求的平均量,单位是bytes。

  • request-size-max:一个Broker节点的某个时间窗口中,发送请求的最大量,单位是bytes。

  • incoming-byte-rate:一个Broker节点接收的数据流量速率,单位是bytes/sec

  • incoming-byte-total:一个Broker节点接收的数据流量,单位是bytes

  • request-latency-avg:一个Broker节点请求的平均延时,单位是毫秒。

  • request-latency-max:一个Broker节点请求的最大延时,单位是毫秒。

  • response-rate:一个Broker节点接收到响应的速率,单位是个/秒。

  • response-total:一个Broker节点接收到的响应总和,单位是个

Producer监控指标

kafka.producer:type=producer-metrics,client-id="{client-id}"

kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"

Consumer监控指标

kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)

kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"

Connector监控指标

kafka.connect:type=connect-worker-metrics

kafka.connect:type=connect-worker-metrics,connector="{connector}"

kafka.connect:type=connect-worker-rebalance-metrics

kafka.connect:type=connector-metrics,connector="{connector}"

kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"

kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"

Stream监控指标

kafka.streams:type=stream-metrics,client-id=([-.\w]+)

kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)

kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)

kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)

kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)