Kafka Authorizer和审计日志


发布于 2017-06-25 / 68 阅读 / 0 评论 /
服务设计的5A标准中,可授权(Authorization)和可审计(auditable)是重要的组成部分。Kafka中,authorizer和audit是相互关联的。

1.Authorizer概述

Kafka支持多种不同的Authorizer,通过参数authorizer.class.name配置,配置kafka.security.auth.Authorizer接口(kafka2.4版本以后,使用org.apache.kafka.server.authorizer.Authorizer接口)实现类的全名,默认为null,表示没有授权管理。

当前常见的Authorizer有两种:

(1)AclAuthorizer

(2)RangerKafkaAuthorizer

每种Authorizer都定义了对应审计日志输出模式。

1.1.Authorizer初始化过程

Authorizer的初始化是在KafkaServer的启动过程中完成的,具体过程如下:

kafka.Kafka#main
       kafka.Kafka#buildServer
              config = kafka.server.KafkaConfig#fromProps(props, false)
                     kafka.server.KafkaConfig初始化过程中需要初始化authorizer变量
                     kafka.security.authorizer.AuthorizerUtils#createAuthorizer(className)

至于使用哪个Authorizer,则由参数authorizer.class.name决定。

1.2.Authorizer鉴权过程

kafka服务端收到请求后,最终调用Authorizer实例进行鉴权,过程如下:

kafka.server.KafkaApis#handle // 服务端收到请求,并进行处理
    KafkaApis.handleXXXRequest(request)
              authHelper.authorize
                     org.apache.kafka.server.authorizer.Authorizer.authorize

需要注意的是,并不是所有的请求都会经过authorize过程。

2.AclAuthorizer

ACL的全称Access Control List,访问控制表。AclAuthorizer是kafka原生支持的授权管理器,通过zookeeper来管理授权信息。

AclAuthorizer的全称为kafka.security.authorizer.AclAuthorizer。

AclAuthorizer的前身是kafka.security.auth.SimpleAclAuthorizer,在kafka2.4版本之后,就不推荐使用SimpleAclAuthorizer,而推荐使用AclAuthorizer。虽然类替换了,但是实现的功能基本相同。

2.1.AclAuthorizer类结构

kafka.security.authorizer.AclAuthorizer类图关系如下图所示:

2.2.AclAuthorizer审计日志输出

当前kafka已实现了标准的AclAuthorizer审计日志输出,只需要我们配置logger输出文件即可。

2.2.1.logger配置

logger变量申明代码如下所示:

val authorizerLogger = Logger("kafka.authorizer.logger")

这里定义了logger名称为“kafka.authorizer.logger”,下面只需要把这个logger的日志输出到指定文件即可。

2.2.2.日志文件配置

日志文件的配置在kafka的log4j2.properties文件中,对应logger的配置如下所示:

appender.authorizerAppender.type = RollingFile
appender.authorizerAppender.name = authorizerAppender
appender.authorizerAppender.fileName = /data/kafka/kafka-authorizer.log
appender.authorizerAppender.filePattern = /data/kafka/kafka-authorizer-%d{yyyyMMdd}-%i.log
appender.authorizerAppender.layout.type = PatternLayout
appender.authorizerAppender.layout.pattern = [%d] [%-5p] - %m%n
appender.authorizerAppender.policies.type = Policies
appender.authorizerAppender.policies.time.type = TimeBasedTriggeringPolicy
appender.authorizerAppender.policies.time.interval = 1
appender.authorizerAppender.policies.time.modulate = true
appender.authorizerAppender.policies.size.type = SizeBasedTriggeringPolicy
appender.authorizerAppender.policies.size.size = 50MB
appender.authorizerAppender.strategy.type = DefaultRolloverStrategy
appender.authorizerAppender.strategy.delete.type = Delete
appender.authorizerAppender.strategy.delete.basePath = /data/kafka/
appender.authorizerAppender.strategy.delete.maxDepth = 5
appender.authorizerAppender.strategy.delete.ifLastModified.type = IfLastModified
appender.authorizerAppender.strategy.delete.ifLastModified.age = 30d

logger.authorizer.name = kafka.authorizer.logger
logger.authorizer.level = INFO
logger.authorizer.appenderRef.authorizerAppender.ref = authorizerAppender
logger.authorizer.additivity = false

注意,logger的名称需要与源码中logger声明的相同,上述代码即为“kafka.authorizer.logger”。

输出的审计日志文件名称为kafka-authorizer.log,可在kafka服务端日志存放目录中查看。

2.2.3.日志格式

所有的审计日志都通过logAutidMessage方法输出,格式如下图所示:

  def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
    def logMessage: String = {
      val principal = requestContext.principal
      val operation = SecurityUtils.operationName(action.operation)
      val host = requestContext.clientAddress.getHostAddress
      val resourceType = SecurityUtils.resourceTypeName(action.resourcePattern.resourceType)
      val resource = s"$resourceType$ResourceSeparator${action.resourcePattern.patternType}$ResourceSeparator${action.resourcePattern.name}"
      val authResult = if (authorized) "Allowed" else "Denied"
      val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
      val refCount = action.resourceReferenceCount

      s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
    }

    if (authorized) {
      if (action.logIfAllowed)
        authorizerLogger.debug(logMessage)
      else
        authorizerLogger.trace(logMessage)
    } else {
      if (action.logIfDenied)
        authorizerLogger.info(logMessage)
      else
        authorizerLogger.trace(logMessage)
    }
  }

logMessage是最终生成日志信息的方法。

3.RangerKafkaAuthorizer

RangerKafkaAuthorizer类全称为org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer,是kafka自定义的授权管理器,权限信息由ranger进行管理。

RangerKafkaAuthorizer类在两个jar包中有两处定义,类全限定名都一样,分别是:ranger-kafka-plugin-*.jar和ranger-kafka-plugin-shim-*.jar。其中:

(1)ranger-kafka-plugin-shim包中的类只是暴露方法,被kafka的插件用于权限校验并返回校验结果。

(2)ranger-kafka-plugin包中的类定义了具体实现、具体认证的逻辑。

3.1.RangerKafkaAuthorizer类结构图

如下图所示:

2.2.审计日志输出-本地日志文件

默认情况下,RangerKafkaAuthorizer的日志会输出到server.log中,如果需要的话,需要制定

3.2.1.logger配置

在RangerKafkaAuthorizer中,有两个logger变量,如下所示:

// 第一个logger的名称为org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer,输出的是RangerKafkaAuthorizer的工作日志,日志是碎片化的。会把RangerKafkaAuthorizer的整个生命周期活动,以及authorize的过程都在日志中表现出来
private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);

//第二个logger的名称为org.apache.ranger.perf.kafkaauth.request,主要输出的是authorize过程的耗时,没有记录权限信息,只记录了资源和耗时信息
private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger(“kafkaauth.request");

主要关注第一个logger。

3.2.2.日志文件配置

需要在kafka的log4j2.properties配置文件中添加以下appender的配置:

appender.authorizerAppender.type = RollingFile
appender.authorizerAppender.name = authorizerAppender
appender.authorizerAppender.fileName = /data/kafka/kafka-authorizer.log
appender.authorizerAppender.filePattern = /data/kafka/kafka-authorizer-%d{yyyyMMdd}-%i.log
appender.authorizerAppender.layout.type = PatternLayout
appender.authorizerAppender.layout.pattern = [%d] [%-5p] - %m%n
appender.authorizerAppender.policies.type = Policies
appender.authorizerAppender.policies.time.type = TimeBasedTriggeringPolicy
appender.authorizerAppender.policies.time.interval = 1
appender.authorizerAppender.policies.time.modulate = true
appender.authorizerAppender.policies.size.type = SizeBasedTriggeringPolicy
appender.authorizerAppender.policies.size.size = 50MB
appender.authorizerAppender.strategy.type = DefaultRolloverStrategy
appender.authorizerAppender.strategy.delete.type = Delete
appender.authorizerAppender.strategy.delete.basePath = /data/kafka/
appender.authorizerAppender.strategy.delete.maxDepth = 5
appender.authorizerAppender.strategy.delete.ifLastModified.type = IfLastModified
appender.authorizerAppender.strategy.delete.ifLastModified.age = 30d

logger.authorizer.name = org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer
logger.authorizer.level = INFO
logger.authorizer.appenderRef.authorizerAppender.ref = authorizerAppender
logger.authorizer.additivity = false

日志文件名称为kafka-authorizer.log。

3.2.3.日志格式

org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer输出的日志有以下几种:

(1)第一种,RangerKafkaAuthorizer生命周期管理日志

logger.info("Calling plugin.init()");
logger.info("close() called on authorizer.");
logger.error("Error closing RangerPlugin.", exception);

(2)第二种,授权管理相关日志

logger.error("Unsupported access type. session=" + session + ", operation=" + operation + ", resource=" + resource);
logger.error("Unsupported resourceType=" + resource.resourceType());
logger.debug("rangerRequest=" + rangerRequest + ", return=" + returnValue);

(3)第三种,其他日志,包括接口调用

logger.error("Ranger Plugin returned null. Returning false");
logger.error("Error while calling isAccessAllowed(). request=" + rangerRequest, t);

这里需要关注的是第二种,需要注意的是,第二种日志输出级别需要设置为debug,才能看到那些被allowed的审计日志,否则只能看到denied的审计日志。

3.3.审计日志输出-外部存储系统

Ranger针对审计日志做了特殊的设计,通过插件的形式来实现审计日志的多模式存储。

整个org.apache.ranger.audit.provider.AuditHandler体系如下图所示:

审计日志的输出到外部系统的过程如下所示:

org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer#authorize
       org.apache.ranger.plugin.service.RangerBasePlugin#isAccessAllowed
              org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuditHandler# processResult
                     org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuditHandler# isAuditingNeeded
                     org.apache.ranger.plugin.audit.RangerDefaultAuditHandler#getAuthzEvents
                            这里构建AuthzAuditEvent授权审计事件
       org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuditHandler#flushAudit//授权审计事件不为空时的逻辑
              org.apache.ranger.plugin.audit.RangerDefaultAuditHandler#logAuthzAudit
                     org.apache.ranger.plugin.audit.RangerDefaultAuditHandler#populateDefaults
                     根据RepositoryName获取AuditHandler
                     org.apache.ranger.audit.provider.AuditHandler#log//这里对生成的授权审计事件刷新到配置的存储介质中

鉴权之后即刻输出审计日志。