Kafka JMX监控指标可参考官方文档。
Broker监控指标
有以下指标
(1)kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
Broker每秒接收到的消息数量。包含客户端的消息生产,以及本Broker向其他Broker进行数据同步的消息数量。
(2)kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
Broker每秒从客户端接收到的消息量(单位是bytes)
(3)kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
Broker每秒从其他Broker同步到的的消息量(单位是bytes)
(4)kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower},version=([0-9]+)
Broker接收到的请求速率,请求类型有Produce、FetchConsumer、FetchFollower三种。
(5)kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)
Broker每秒接收到请求错误的数量,如果Broker响应中有多个错误,则会统计多次。根据请求类型和错误类型进行分组。error为NONE表示请求成功
(6)kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+)
请求报文的大小,根据请求类型进行分组,可得到Broker启动以来不同类型所有请求报文的总大小。
(7)kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|Fetch}
用于消息格式转换和解密的临时内存大小。
(8)kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}
消息格式转换花费的时间总和,毫秒为单位。
(9)kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)
每秒需要进行消息格式转换的消息数量。
(10)kafka.network:type=RequestChannel,name=RequestQueueSize
请求队列的大小。
(11)kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
Broker每秒向客户端发送的数据量(单位是bytes)
(12)kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
Broker每秒向其他Broker发送的数据量(单位是bytes)
(13)kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec
对于compacted topic,如果未指定key,消息验证失败的速率(条/秒)
(14)kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec
因无效的magic值导致消息校验失败的速率(条/秒)
(15)kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec
因无效的crc校验值导致消息校验失败的速率(条/秒)
(16)kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec
因不连续的offset或batch中不连续的sequence number导致消息校验失败的速率(条/秒)
(17)kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
Log Flush的速率和时间。
(18)kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
被踢出ISR的副本数量。正常值为0.
Kafka 分区 Leader 节点收到消息后,会同步给 Follower 节点。集群健康的情况下,UnderReplicatedPartitions 值等于 0,这时同步正常的 Follower 节点数量(也就是 ISR)等于总的 Follower 节点数量。如果这个指标值大于0,比如等于 1,说明有一个 Follower 同步异常,被踢出ISR。
(19)kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
ISR副本数小于min.insync.replicas的分区数量。正常值为0。
如果某个分区的ISR副本数小于min.insync.replicas,则此分区不可用。
(20)kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount
ISR副本数等于min.insync.replicas的分区数量。
(21)kafka.log:type=LogManager,name=OfflineLogDirectoryCount
Log目录Offline的数量。正常值为0。
(22)kafka.controller:type=KafkaController,name=ActiveControllerCount
当前活跃的Controller数量。只有1个Broker此指标值为1,其他的Broker此指标值为0。
Kafka Broker 集群中有一个节点是 Controller 节点,这个节点非常重要,负责监听 Partition、Topic 和 Broker 的变化,以及元数据管理。
ActiveControllerCount 指标表示当前 Broker 节点是否是 Controller 节点,集群健康的情况下,有且仅有一个 Broker 节点这个指标值是 1。如果有多个 Broker 这个指标值是 1,或者所有 Broker 指标值都是 0,就需要进行故障排查。
(23)kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
Leader选举的速率和耗时。非0表示有broker有异常。
当分区 Leader 节点挂了之后,就会触发选举新的 Leader。这个指标值表示选举新 Leader 的频率(每秒多少次)和集群中无 Leader 节点的时长。触发 Leader 选举,肯定是旧的 Leader 下线,所以需要定位分析原因。
(24)kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
每秒进行的Unclean Leader选举的频率。正常值为0。
当 Broker 集群找不到分区 Leader 时,需要从 ISR 集合中选出新的 Leader 节点。而如果 ISR 集合没有节点,那就得从未同步的 Follower 中选出 Leader 节点,让集群处于可用状态,但这个时候因为消息未同步,会有消息丢失。所以这个指标有数据时,代表可能有消息丢失。
(25)kafka.controller:type=KafkaController,name=TopicsToDeleteCount
等待删除的Topic的数量。
(26)kafka.controller:type=KafkaController,name=ReplicasToDeleteCount
等待删除的副本数。
(27)kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount
表示当前无法删除的Topic数量。
这个指标反映了在尝试删除主题时,由于某些原因(如主题正在使用中、存在活跃的消费者等)而无法删除的主题数量。在 Kafka中,主题的删除操作并不是立即完成的,而是需要等待所有相关的数据和操作完成。如果存在以下情况之一,主题将无法被删除:
主题正在使用中:如果有消费者正在订阅该主题,或者有生产者正在向该主题发送消息,那么该主题将无法被删除。
存在活跃的消费者:如果主题有活跃的消费者,即有消费者正在读取该主题的消息,那么该主题也无法被删除。
存在未完成的日志段:如果主题中存在未完成的日志段,即有些消息还没有被完全写入日志段,那么该主题也无法被删除。
(28)kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount
表示当前无法删除的副本数量。
(29)kafka.server:type=ReplicaManager,name=PartitionCount
当前Broker节点分区数。
(30)kafka.server:type=ReplicaManager,name=LeaderCount
当前Broker Leader 副本数。
(31)kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
表示 ISR 收缩的频率。
如果这个指标的值很高,那集群中必定有 Follower 节点频繁地退出 ISR。这个时候就需要定位有 Follower 频繁退出 ISR 的原因。
(32)kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
表示 ISR 扩容的频率。
如果这个指标的值很高,那集群中必定有 Follower 节点频繁地进入ISR。这个时候就需要定位有 Follower 频繁进入 ISR 的原因。
(33)kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
fellower与leader之间最大的未同步消息量(条)。
此监控值应当和生产者的最大批量相当。
(34)kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
某个分区副本的消息滞后量(条)
(35)kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
Produce请求的purgatory队列大小,如果acks是-1,则此值非0
purgatory作为一个临时存放的区域,使得生产(produce)和消费(fetch)的请求在那里等待直到被需要的时候。留意 purgatory 的大小有助于确定潜伏期的根本原因。例如,如果 purgatory 队列中获取请求的数量相应增加,则可以很容易地解释消费者获取时间的增加。
(36)kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
Fetch请求的purgatory队列大小,与消费者fetch.wait.max.ms参数相关
(37)kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower}
Broker 处理一笔请求的总时间。比如处理 Producer 发送请求、Consumer 拉取请求、Follower 拉取请求。
这个时间如果出现了比较大的波动,需要查看 Broker 的资源情况并考虑应对方案。
(38)kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
请求队列中,不同类型的请求等待的时间。
(39)kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower}
不同请求在leader中处理的耗时。
(40)kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower}
follower发送的请求等待的时间。
对于produce请求,如果acks是-1,则此值非0
(41)kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
响应队列中,不同请求类型的等待耗时。
(42)kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower}
发送响应的耗时。如果此值变大,可能是网络繁忙或者网络限速。
(43)kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
network processor处于空闲的时间比率,值在0~1之间,理想状态下是大于0.3
(44)kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount
表示因为连接过期而被关闭的客户端连接数。
当客户端与Kafka集群建立连接后,出现以下场景,那么这个连接就被认为是“过期”的。在这种情况下,Kafka服务器可以选择关闭这个连接以释放资源。
客户端不活动:客户端可能因为某些原因(如应用程序崩溃、网络问题、客户端代码逻辑错误等)而没有向服务器发送任何数据或请求。
配置问题:Kafka服务器或客户端的配置可能不正确,例如socket.timeout.ms和connections.max.idle.ms的设置可能不合适。
网络问题:网络延迟或不稳定可能导致客户端无法及时发送心跳或数据,从而被服务器判定为过期。
(45)kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count
表示因为连接过期而被关闭的客户端连接数。根据listener、processor进行分组。
(46)kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
接口处理线程平均空闲率。范围是0~1,理想状态是大于0.3
(47)kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+)
针对某个客户端或某个用户的请求的带宽限额。
输出有两个属性
throttle-time:表示客户端被限制的时间,毫秒为单位。
byte-rate:表示生产/消费的速率,单位是bytes/sec。
对于(user, client-id)限额,需要同时指定user和client-id。如果只需要使用client-id进行限额,则不需要指定user。如果只需要使用user进行限额,则不需要指定client-id。
(48)kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+)
对user或client-id的请求限额。
输出有两个属性
throttle-time:表示客户端被限制的时间,单位是毫秒。理想状态下是0
request-time:表示broker内network和I/O线程处理客户端请求的时间比例。
对于(user, client-id)限额,需要同时指定user和client-id。如果只需要使用client-id进行限额,则不需要指定user。如果只需要使用user进行限额,则不需要指定client-id。
(49)kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs
Broker发送zookeeper客户端请求的延时
(50)kafka.server:type=SessionExpireListener,name=SessionState
zookeeper连接的状态,取值可能是Disconnected、SyncConnected、AuthFailed、ConnectedReadOnly、SaslAuthenticated、Expired之一
(51)kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
加载消费组信息的最大耗时,单位是毫秒。
过去30秒coordinator从__consumer_offsets分区中拉去消费组offset和元数据的时间,包括调度的时间。
(52)kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
加载消费组信息的平均耗时,单位是毫秒。
(53)kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max
加载事务元数据信息的最长耗时,单位是毫秒。
过去30秒coordinator从__transactions_offsets分区中拉取事务元数据的时间,包括调度的时间。
(54)kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg
加载事务元数据信息的平均耗时,单位是毫秒。
(55)kafka.server:type=GroupMetadataManager,name=NumOffsets
已提交的消费组offset信息。
(56)kafka.server:type=GroupMetadataManager,name=NumGroups
已提交的消费组数量
(57)kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead]
不同状态消费组的数量
(58)kafka.server:type=ReplicaManager,name=ReassigningPartitions
正在重分布的分区的数量
(59)kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec
Broker输出的分区重分布流量。
(60)kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec
输入到Broker的分区重分布流量。
(61)kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+)
Broker磁盘中某个分区的消息量,单位是bytes
(62)kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+)
Broker磁盘中某个分区的segement数量。
(63)kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+)
Broker中某个分区的开始offset
(64)kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+)
Broker中某个分区的最新offset
Kafka客户端相关的监控指标
producer、consumer、connector、streams实例中通用的监控指标
(1)kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
这是在客户端实例中的监控指标
包含以下属性
(2)kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)
这是在Broker中的监控指标
包含以下属性
outgoing-byte-rate:一个Broker节点发送的数据流量速率,单位是bytes/sec
outgoing-byte-total:一个Broker节点发送的数据流量,单位是bytes
request-rate:一个Broker节点发送请求的速率,单位是个/秒。
request-total:一个Broker节点发送请求的总量,单位是个。
request-size-avg:一个Broker节点的某个时间窗口中,发送请求的平均量,单位是bytes。
request-size-max:一个Broker节点的某个时间窗口中,发送请求的最大量,单位是bytes。
incoming-byte-rate:一个Broker节点接收的数据流量速率,单位是bytes/sec
incoming-byte-total:一个Broker节点接收的数据流量,单位是bytes
request-latency-avg:一个Broker节点请求的平均延时,单位是毫秒。
request-latency-max:一个Broker节点请求的最大延时,单位是毫秒。
response-rate:一个Broker节点接收到响应的速率,单位是个/秒。
response-total:一个Broker节点接收到的响应总和,单位是个
Producer监控指标
kafka.producer:type=producer-metrics,client-id="{client-id}"
kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}"
Consumer监控指标
kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)
kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}"
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
Connector监控指标
kafka.connect:type=connect-worker-metrics
kafka.connect:type=connect-worker-metrics,connector="{connector}"
kafka.connect:type=connect-worker-rebalance-metrics
kafka.connect:type=connector-metrics,connector="{connector}"
kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
Stream监控指标
kafka.streams:type=stream-metrics,client-id=([-.\w]+)
kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)
kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)
kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)
kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)