KafkaBroker问题合集


发布于 2024-06-09 / 66 阅读 / 0 评论 /
KafkaBroker问题合集

KafkaBroker节点网络丢包导致节点不可用问题

问题描述

在高可用的测试场景中,有一个案例是通过tc命令模拟计算机节点80%网络丢包率,测试结果发现该节点上的KafkaBroker不可用,对应的9092服务端口不处于监听状态。

问题分析

首先,从kafka.log日志入手,发现以下异常日志

Failed to process feature ZK node change event. The broker will eventually exit.

根据日志中的broker exit,我们初步判定,这个日志后就是broker退出的过程。

日志输出过程

下面,我们通过源码解析这个日志触发的过程,如下所示:

KafkaServer.startup()
    featureChangeListener = new FinalizedFeatureChangeListener()
        thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
    featureChangeListener.initOrThrow()
        thread.start()
            info("Starting")
            ChangeNotificationProcessorThread.doWork() // 循环执行while (isRunning)
                queue.take.updateLatestOrThrow()
                如果上述命令遇到异常,则打印错误日志,抛出FatalExitError异常
                    error("Failed to process feature ZK node change event. The broker will eventually exit.", e)
                    throw new FatalExitError(1)
            在ChangeNotificationProcessorThread.doWork方法中,如果遇到FatalExitError,则执行以下操作
                shutdownInitiated.countDown()
                shutdownComplete.countDown()
                info("Stopped")
                Exit.exit(e.statusCode())
            在ChangeNotificationProcessorThread.doWork方法中,如果遇到其他异常,则忽略,仅打印错误日志
                error("Error due to", e)
            info("Stopped")

这里FinalizedFeatureChangeListener是对/feature这个zknode的监听,包括StateChangeHandler和ZNodeChangeHandler。

输出了对应的日志后,通过Exit.exit方法给KafkaBroker进程发送终止的信号。而这个中止信号会引发kafka shutdown hook钩子函数的执行。

kafka-shutdown-hook执行过程

下面是kafka-shutdown-hook注册和执行过程。

Kafka#main()
    给当前进程添加kafka-shutdown-hook,对应的钩子函数为KafkaServer.shutdown()
    遇到Exit.exit函数调用后,就会执行shutdown钩子函数
    KafkaServer#shutdown()
        info("shutting down")
        CoreUtils.swallow(controlledShutdown(), this)
            info("Starting controlled shutdown")
            如果shutdown成功
                info("Controlled shutdown succeeded")
            如果shutdown不成功
                info("Remaining partitions to move")
                info(s"Error from controller")
                warn("Retrying controlled shutdown after the previous attempt failed...")
            如果shutdown过程中出现异常
                warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: ")
                warn("Retrying controlled shutdown after the previous attempt failed...")
        CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
            info("Stopping socket server request processors")
            info("Stopped socket server request processors")
        CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
            info("shutting down")
            info("shut down completely")
        CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
            info("shutting down")
            info("shut down completely")
        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
            debug("Shutting down task scheduler.")
        CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
            info("Shutdown complete.")
        CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
            info("Shutdown complete.")
        CoreUtils.swallow(authorizer.foreach(_.close()), this)
            logger.info("close() called on authorizer."); // 在RangerKafkaAuthorizer中输出
        CoreUtils.swallow(kafkaController.shutdown(), this)
            debug("Resigning")
            info("Resigned")
        CoreUtils.swallow(socketServer.shutdown(), this)
            info("Shutting down socket server")
            SocketServer.stopProcessingRequests()
                info("Stopping socket server request processors")
                info("Stopped socket server request processors")
            info("Shutdown completed")
        info("shut down completed")

这里主要是通过日志的形式,展现整个过程。跟我们看到的kafka.log日志完全吻合。

Kafka丢数场景

kafka作为一个数据中间件,虽然不能当成数据库来使用,但是中间会保存一段时间的数据,如果操作不善,可能导致数据丢失。

主要有以下几种情况。

磁盘损坏无法恢复

磁盘损坏是最常见的情况,具体有以下两个场景:

如果说我们的topic副本数为1,当消费者还没有消费完这个分区所有的数据,如果此时发生磁盘损坏,且无法恢复的情况,那么剩余未被消费的数据就永久丢失了。

如果我们的topic副本数大于1,当磁盘损坏且无法恢复时,所有分区leader副本在该磁盘上,且没有follower副本追上leader副本的情况下,则leader和follower之间的数据gap就永久丢失了。这种情况有一种解决方案,那就是生产者开启事务,把所有的副本都写完才算消息生产完成。

消费端自动提交丢失数据

这种情况出现在消费者自动提交的场景中,比如消费者消费了一批数据,正在进行业务逻辑处理时,消费者客户端自动提交了offset,此时kafkabroker认为当前消费已完成,但消费者却没有完成,而是正在处理,如果此时消费者宕机,则未被处理的数据就会被丢失。