Kafka基本操作和场景


发布于 2024-05-11 / 61 阅读 / 0 评论 /
描述Kafka命令行操作,以及当前支持的使用场景,这对评估客户需求以及实现方案的可行性特别重要。

下面是LinkedIn内部kafka线上环境的操作和实践案例。

1.创建topic

新增topic有两种方式:

(1)一种是手动创建

(2)另一种是在发布消息到一个不存在的topic时可自动创建此topic(auto.create.topics.enable参数为true)。

手动创建topic的命令如下所示:

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor参数表示每个消息会写入到多少个broker中。如果broker数量为2,而replication-factor参数为3,则会写入失败。我们建议replication-factor设为2或3,这样的话你就可以大张旗鼓地启停一台服务器,而不会中断数据的消费。

partitions参数表示分区数,控制着这些消息会被分配到多少个日志文件中。分区数有一些重要的注意事项:首先,每个分区必须在一台broker中,如果整个数据集有20个分区,那么这些分区会被分配到不超过20台broker中;然后,分区数限制了消费进程的最大并发数。

每个分区的日志都会在kafka-log目录的文件夹中,文件夹名称的组成为“{topic名称}-{分区号}”。因为系统的文件夹名称不能超过255字符,所以对topic的名称的长度也有一定的限制。假定分区数小于100000,因此topic名称就不能超过249个字符。

命令行中的其他配置可以覆盖broker级别的配置,比如数据的保留时间。

2.修改Topic

修改Topic包含修改分区数或者其他配置参数。

特别要注意的是:kafka当前不支持减少topic的分区。

2.1.添加分区

命令行案例如下:

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name --partitions 40

需要注意的是,以上案例只是修改了topic的分区数,而不会更改原有分区数据的分布。如果生产数据依赖分区,那么这个操作会干扰消费者消费数据。也就是说,如果数据分区策略是“hash(key) % number_of_partitions”,那么增加分区后kafka会对刷新此之后的数据分区,但还是不会自动化重分布数据。

2.2.增加配置

命令行案例如下所示:

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

2.3.删除配置

命令行案例如下所示:

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x

2.4.删除topic

命令行案例如下所示:

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

3.优雅地关闭集群

kafka集群会自动检测broker是否宕机,并为此宕机的broker中的分区选举新的leader。当服务器宕机或broker因维护状态或配置改变导致的内部进程挂掉的时候,也会执行一样的选举动作。

对于主动关闭集群,kafka支持一种更佳优雅的机制来停止broker,而不是kill进程。服务器被优雅关闭有以下两个重要优势:

(1)broker会同步所有日志到磁盘中,以避免重启时进行日志的恢复,例如校验日志尾部所有消息的校验和。因为日志恢复会花费时间,所以这加速了内部服务的重启。

(2)broker也会在关闭之前迁移本服务器上所有的分区到其他broker上,这能更快速地把leader关系传递到其他副本上,降低了分区不可用的时间(毫秒级)。

无论服务器被stop还是kill,日志的同步都会自动发生,但leader关系的迁移需要添加配置:

controlled.shutdown.enable=true

需要注意的是,controlled shutdown只有在所有的分区都有副本时才会发生,也就是说replication-factor参数要大于1且至少有一个副本是可用的。

一般来说只有停用了最后一个副本,那么此topic就不可用了。

4.leader的均衡

无论broker停止或是宕机,broker中分区的leader关系都会被转移到副本中。当broker重启时,此broker的分区就会成为follower,这意味着这些分区不会被kafka-client读或者写。

为了避免这种失衡,Kafka有一个preferred-replicas(assigned-replicas)参数。如果某个分区的preferred-replicas参数为“1,5,9”,那么比起broker5和broker9,broker1会被优先选为leader,因为1在preferred-replicas参数列表的前面。默认情况下,kafka集群会试着按照已保存的副本顺序进行leader关系的修复,这个行为需要设置一个参数,如下所示:

auto.leader.rebalance.enable=true

我们可以把这个参数设为false,但我们就需要手动才能使leader关系符合已保存的副本顺序,命令如下所示:

bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

这个命令的实现逻辑是:对于每一个topic的partition,controller先获得preferred replica(就是assigned replicas列表中的第一个replica),如果这个preferred replica不是leader,controller则向preferred replica所在的那个broker发送一个请求,使之成为partition的leader。

5.通过机架实现replica的均衡

机架感知特性可以把相同分区的不同副本扩散到不同的机架上。这个进一步扩展了kafka的高可用特性,用broker-failure来掩盖rack-failure,降低了机架崩溃时数据丢失的风险。这个特性也会应用到broker分组,例如EC2的可用区。

我们可以在broker配置中加上以下配置来指定此broker属于哪个机架。

broker.rack=my-rack-id

当topic更创建、修改或副本被重分布时,机架的限制就会出现,确保副本尽可能分不到不同的机架上。

用于分配副本所在broker的算法确保了每个broker上leader的数量的限制,尽管broker被分布在不同的机架上,这也确保了吞吐量的均衡。

但是,如果某个机架上broker数量与其他机架不同,副本的分配就会不均衡,拥有更少broker的机架会维护更多的副本,这也就意味着这些机架需要为这些副本提供更多的存储资源。因此,在每个机架上配置相同数量的broker才是明智的。

6.管理消费组

在ConsumerGroupCommand工具的帮助下,我们可以list、describe、delete消费组。消费组可被手动删除,也可被自动删除。当此消费组最后一次提交的offset过期之后此消费组会被自动删除。只有当消费组中没有活跃的消费者时,手动删除消费组才能生效。案例如下:

# list所有的消费组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 —-list

# describe某个消费组详细信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

6.1.describe

describe命令可以查看消费组的offset信息,结果如下图所示:

结果中有三个比较重要的数据:

(1)CURRENT-OFFSET:表示当前消费的offset

(2)LOG-END-OFFSET:表示日志中最后一条消息的offset

(3)LAG:表示还有多少条消息未被消费。此参数可以描述当前分区消息积压情况,判断当前消费组速率是否能匹配上生产者速率。

6.2.describe members

列举消费组中所有活跃的消费者。案例命令如下:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group  --members

结果如下图所示:

可以看到某个消费者在分配分区的个数,但是看不到消费者正在消费哪个topic分区。

6.3.describe members verbose

列出详细消费组信息,包括分配的具体分区信息。案例如下图所示:

可以看到消费者消费分区的个数,也能看到消费的topic分区列表。

6.4.describe offsets

输出结果与describe选项相同,是describe操作的默认选项。

6.5.describe state

提供详细的消费组级别的信息。案例结果如下图所示:

6.6.delete group

如果要手动删除一个或多个消费组,可以使用“--delete”选项,例如下图所示:

6.7.reset-offsets

如果要重置消费组的offset,我们可以使用“--reset-offsets”选项。这个选项只支持一次删除一个消费组。还需要使用定义的域:--all-topics或--topic。使用时必须使用一个域,除非我们使用“--from-file”这个场景。

同时,我们首先要确保此消费实例是否处于inactive状态。“--reset-offsets”还有三个执行选项:

(1)default:展示被重置的offsets

(2)execute:运行

(3)reset-offsets:进程

(4)export:把结果导出到csv文件。

--reset-offsets还有以下应用场景可以选择,且必须选择一种以上:

场景

说明

--to-datetime <String:datetime>

把offset重置到某个时间点,datetime格式为“YYYY-MM-DDTHH:mm:SS.sss”

--to-earliest

重置到最早的offset

--to-latest

重置到最新的offset

--shift-by <Long: number-of-offsets>

把offset推移number-of-offsets条消息。number-of-offsets为正表示向未消费方向推移;为负表示向已消费方向推移。

--from-file

根据csv文件重置offset

--to-current

把offset重置到当前offset

--by-duration <String: duration>

把offset重置到当前时间间隔duration的时间点,duration的格式为“PnDTnHnMnS”

--to-offset <offset id>

把offset重置到一个指定的offset

需要注意的是,如果重置的offset超出了当前offset的最大范围,则会被置为最后的offset值。例如,如果最后的offset为10,把offset向后移动15,那么移动后的offset不是15,而是10。

例如,我们把消费组的offset重置到最新的offset,如下图所示:

如果你使用的是旧的高等级消费者,把消费组元数据信息保存在zookeeper中(offsets.storage =zookeeper),那么我们需要传递“--zookeeper”,而不是“--bootstrap-server”。

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

7.集群扩容

往kafka集群添加broker是非常简单的,只需要给这个broker设置设置一个唯一id,并在新服务器中启动这个broker即可。但是新的服务器不会被自动分配分区数据,所以,只有把分区移到这个broker,或新topic创建时,这个broker才会起作用。一般来说,我们在集群中添加新机器的时候,一般会从其他旧的broker中迁移数据到新的broker。

8.集群内分区重分布——数据迁移

迁移数据的过程通常是手动初始化,但自动完成的。

8.1.分区重分布原理

在底层,kafka会把新的broker作为迁移分区的follower,并让这个新broker上的分区完整备份原分区所有数据。当新broker上的分区已经完整备份了数据,并加入到了in-sync副本列表后,kafka会删除原分区的数据。

8.2.分区重分布工具

分区重分布工具可用于broker间分区的迁移。一次理想的分区重分布要确保所有broker间负载均衡及分区数量保持一致。分区重分布工具没办法自主学习数据在kafka集群中的分布来实现平负载均衡。因此,管理员需要在迁移时指定迁移哪些topic的哪些分区。

分区重分布工具可工作于三种独立的模式:

第一种:--generate。这个模式中,给定topic列表和broker列表,工具会生成候补分布任务,用于迁移所有制定的topic的所有分区到制定的新broker中。这个选项仅仅提供了便利的方式来生成一个分区重分布计划,需要指定迁移的topic列表以及目标broker列表。

第二种:- -execute。在这种模式中,工具根据用户提供的重分布计划开始分区的重分布,一般使用“- -reassignment-json-file”选项来指定重分布计划,这可以是管理员自定义的重分布计划,也可以是使用“- -generate”模式生成的重分布计划。

第三种:- -verify。在这种模式中,工具检验上一次“- -execute”重分布的状态,列举出所有的分区。状态可以是:successfully completed、failed、in process三者之一。

8.3.数据自动迁移到新broker

分区重分布工具可以用于把现有broker中的topic数据迁移到新的broker中。这在kafka集群扩容时经常用到,因为比起一次迁移一个分区数据,迁移整个topic数据更简单。当使用topic迁移时,我们需要在命令中指定迁移的topic列表以及目的broker列表。这个工具能够负载均衡地把topic数据迁移到新的broker中,在迁移过程中topic的副本数保持不变。实际上,指定topic列表中所有的分区的副本都会从老的broker迁移到新的broker。

举例来说,下面的案例会把foo1、foo2的所有分区迁移到新的broker5和6。在结束这次迁移后,foo1和foo2的所有分区都只会存在于broker5和6中。

因为工具接受把topic列表以json文件的形式作为输入参数,我们首先需要指定需要迁移的topic,以及创建如下所示的json文件。

> cat topics-to-move.json
{
      "topics": [
            {"topic": "foo1"},
            {"topic": "foo2"}
      ],
      "version":1
}

当json文件准备完毕后,使用分区重分布工具生成候补重分布任务,命令如下所示。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate

  Current partition replica assignment

  {
    "version":1,
    "partitions":[
      {"topic":"foo1","partition":2,"replicas":[1,2]},
      {"topic":"foo1","partition":0,"replicas":[3,4]},
      {"topic":"foo2","partition":2,"replicas":[1,2]},
      {"topic":"foo2","partition":0,"replicas":[3,4]},
      {"topic":"foo1","partition":1,"replicas":[2,3]},
      {"topic":"foo2","partition":1,"replicas":[2,3]}
    ]
  }

  Proposed partition reassignment configuration

  {
    "version":1,
    "partitions":[
      {"topic":"foo1","partition":2,"replicas":[5,6]},
      {"topic":"foo1","partition":0,"replicas":[5,6]},
      {"topic":"foo2","partition":2,"replicas":[5,6]},
      {"topic":"foo2","partition":0,"replicas":[5,6]},
      {"topic":"foo1","partition":1,"replicas":[5,6]},
      {"topic":"foo2","partition":1,"replicas":[5,6]}
    ]
  }

这个候把任务把foo1和foo2这两个topic的所有分区数据迁移到5和6这两个topic中。但有一点需要注意,此时任务还没开始执行,只是告诉你当前的分布状态以及提出了一个新任务。你应当保存当前的分布状态,以便于之后可能发生的回滚。新的任务应当保存在一个json文件中,例如expand-cluster-reassignment.json文件,并在工具执行“- -execute”模式时把这个json文件作为参数传递给工具。执行命令如下所示:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute

  Current partition replica assignment

  {
    "version":1,
    "partitions":[
      {"topic":"foo1","partition":2,"replicas":[1,2]},
      {"topic":"foo1","partition":0,"replicas":[3,4]},
      {"topic":"foo2","partition":2,"replicas":[1,2]},
      {"topic":"foo2","partition":0,"replicas":[3,4]},
      {"topic":"foo1","partition":1,"replicas":[2,3]},
      {"topic":"foo2","partition":1,"replicas":[2,3]}
    ]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started reassignment of partitions
  {
    "version":1,
    "partitions":[
      {"topic":"foo1","partition":2,"replicas":[5,6]},
      {"topic":"foo1","partition":0,"replicas":[5,6]},
      {"topic":"foo2","partition":2,"replicas":[5,6]},
      {"topic":"foo2","partition":0,"replicas":[5,6]},
      {"topic":"foo1","partition":1,"replicas":[5,6]},
      {"topic":"foo2","partition":1,"replicas":[5,6]}
    ]
  }

最后,可以通过“- -verify”选项查看分区重分布任务的状态。需要注意的是,“- -verify”模式的运行需要用到“- -execute”模式运行时用到的expand-cluster-reassignment.json文件,内容是相同的。

8.4.自定义分区重分布和迁移

分区从分布工具可用于选择性地把某些分区移动到其他broker中。如果我们在这种场景下进行分区重分布,那么用户需要知道重分布计划,但不需要通过工具生成候补重分布任务,有效地跳过了“- -generate”步骤,直接使用“- -execute”进行迁移。

举例来说,我们需要把foo1这个topic的0号分区迁移到5和6号broker,把foo2这个topic的1号分区迁移到2和3号broker。

第一步,手动编写自定义重分布计划,写入到json文件,文件内容如下所示:

> cat custom-reassignment.json
  {
    "version":1,
    "partitions":[
      {"topic":"foo1","partition":0,"replicas":[5,6]},
      {"topic":"foo2","partition":1,"replicas":[2,3]}
    ]
  }

第二步,使用custom-reassignment.json作为“- -execute”模式的参数,启动重分布任务。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
  
  Current partition replica assignment

{
  "version":1,
  "partitions":[
    {"topic":"foo1","partition":0,"replicas":[1,2]},
    {"topic":"foo2","partition":1,"replicas":[3,4]}
  ]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{
  "version":1,
  "partitions":[
    {"topic":"foo1","partition":0,"replicas":[5,6]},
    {"topic":"foo2","partition":1,"replicas":[2,3]}
  ]
}

可通过“- -verify”模式来检查分区重分布任务当前的状态。需要注意的是,在使用“- -verify”模式时,你需要传递与“- -execute”模式一样的“- -reassignment-json-file”入参。使用案例如下所示:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
  
  Status of partition reassignment:
  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully

9.从集群移除broker

目前,分区重分布工具还无法支持从集群中移除broker而自动生成分区重分布计划。因此,管理员一定要自己提出重分布计划,以便把这个即将移除的broker中所有分区的副本都迁移到其他的broker中。相对来说,这个将会更加繁琐,重分布需要确保所有的副本不是从即将移除的broker中迁移到另外的一个broker,而应当是其他的多个broker。为了使迁移进程更加简单,我们计划在未来的版本中添加工具来支持从集群中移除broker。

10.增加副本

为某个已存在的分区增加副本是非常简单的,只需要在自定义重分布json文件中指定额外的副本,并使用“- -execute”模式来增加指定分区的副本数即可。

举例来说,把foo这个topic的0号分区的副本数从1增加到3。那么在增加副本数之前,分区的唯一副本在5号broker上。增加副本数后,我们会在6号和7号broekr中增加另外两个副本。

第一步,手动编写自定义重分布配置文件,如下所示:

> cat increase-replication-factor.json
  {
    "version":1,
    "partitions":[
      {"topic":"foo","partition":0,"replicas":[5,6,7]}
    ]
  }

第二步,使用“--execute”模式开启重分布任务,命令如下所示:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
  
  Current partition replica assignment

  {
    "version":1,
    "partitions":[
      {"topic":"foo","partition":0,"replicas":[5]}
    ]
  }

  Save this to use as the --reassignment-json-file option during rollback
  Successfully started reassignment of partitions
  {
    "version":1,
    "partitions":[
      {"topic":"foo","partition":0,"replicas":[5,6,7]}
    ]
  }

第三步,使用“- -verify”模式检查分区重分布任务的状态,命令中使用的increase-replication-factor.json配置文件与“- -execute”模式是一样的。命令如下所示:

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify

  Status of partition reassignment:
  Reassignment of partition [foo,0] completed successfully

我们也可以通过kafka-topics.sh工具来检验副本数的新增情况。命令如下所示:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
  
  Topic:foo PartitionCount:1  ReplicationFactor:3 Configs:
    Topic: foo  Partition: 0  Leader: 5 Replicas: 5,6,7 Isr: 5,6,7

11.数据迁移期间限制带宽

kafka允许我们为副本流量限流,为broker间副本迁移设置带宽的上限。这在集群的再平衡场景中非常有用,比如启动新broker、新增或移除broker。这限制了数据敏感型操作的影响。

11.1.限流的方法

限流有两种方式。第一种,最简单也是最安全的方式,就是运行kafka-reassign-partitions.sh脚本时设置限流。第二种,kafka-configs.sh也可以用于直接查看或修改限流的值。

11.1.1.kafka-reassign-partitions.sh设置限流

我们可以在运行kafka-reassign-partitions.sh脚本时设置限流。

举例来说,如果你想要再平衡,通过以下命令,你可以在不超过50MB/s的网络速率下移动分区数据。

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000

当我们执行上述脚本时,我们可以看到限流信息,如下所示。

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果我们想在再平衡期间修改限流值,以便增加吞吐量,让再平衡更快执行。我们可以执行如下命令。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

一旦再平衡完成,管理员可以通过“--verify”模式查看再平衡任务的状态。如果再平衡确实已经完成,“--verify”命令可以把限流设置移除。对于管理员来说,在再平衡完成后,执行“--verify”命令来移除限流设置是非常重要的。如果不这样做,之后正常的副本传输流量也会被限流。

当“--verify”命令被执行时,重分布已经完成,下面的脚本可以保证限流设置被移除。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092  --verify --reassignment-json-file bigger-cluster.json

  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

11.1.2.kafka-configs.sh设置限流

管理员也需要使用kafka-configs.sh命令检查分区的配置。有两类和限流有关的配置

(1)第一类是限流值,是broker级别的属性,也是一个动态属性,属性名如下所示。

leader.replication.throttled.rate
follower.replication.throttled.rate

(2)第二类属性是限流的副本集合,需要为每个topic进行配置,属性名如下所示。

leader.replication.throttled.replicas
follower.replication.throttled.replicas

上面的四个配置都是由kafka-reassign-partitions.sh命令自动分配的。

查看限流值配置的命令如下所示:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers

  Configs for brokers '2' are leader.replication.throttled.rate=700000000, follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000, follower.replication.throttled.rate=700000000

以上两个属性展示了leader和follower端的备份协议应用的限流值。默认情况下,两端配置的是相同的限流值。

查看限流的副本集合的命令如下所示:

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

从结果我们可以看出:上面限制只针对my-topic,对分区leader的限制只作用于102号broker的分区1以及101号broker的分区0。而对分区follower的限制只作用于101号broker的分区1亿记102号broker的分区0。

默认情况下,再平衡之前,kafka-reassign-partitions.sh会把针对leader的限流配置应用于所有的副本,因为这些副本之一会成为leader。而当再平衡之后,kafka-reassign-partitions.sh会把针对follower的限流配置应用于所有再平衡后的副本(不包括之前副本所在的broker)。所以,如果某个分区在101和102号broker上有副本,重分布到102和103号broker后,此分区的leader限流会应用于101和102号broker,而follower限流只会应用于103号broker。

如果有需要的话,你可以使用kafka-configs.sh工具的“--alter”模式来手动更改限流配置。

11.2.已限流备份的安全使用

使用限流的备份时,有以下注意事项需要关注。

(1)移除限流:在重分布完成后,需要手动移除限流,方法是执行kafka-reassign-partitions.sh命令的“--verify”模式。

(2)确保进程:如果限流设置的太低,比流入的写入效率都低的话,备份的进程将不起作用。也就是“max(BytesInPerSec) > throttle”,BytesInPerSec表示producers写入每个broker的数据流量速率。

管理员可以监控是否可以在限流情况下进行备份,使用以下监控数据:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

lag在备份期间应当能够不断减少,如果监控到的数据不在减少,那么就需要增大限流值。

12.设置配额(quotas)

配额的参数配置是user或client-id级别的,默认情况下,客户端会收到一个无限的配额。我们也可以为每个user、client-id设置配额。

配置自定义配额的方法如下所示:

# for (user=user1, client-id=clientA)
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Updated config for entity: user-principal 'user1', client-id 'clientA'.

# for user=user1
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  Updated config for entity: user-principal 'user1'.

# for client-id=clientA
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
  Updated config for entity: client-id ‘clientA'.

我们也可以通过“--entity-default”而不是“--entity-name”来设置默认配额,案例如下所示:

# Configure default client-id quota for user=userA
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  Updated config for entity: user-principal 'user1', default client-id.

# Configure default quota for user
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  Updated config for entity: default user-principal.

# Configure default quota for client-id
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  Updated config for entity: default client-id.

如何展示配额信息?案例如下:

# for a given (user, client-id)
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

# for a given user
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

# for a given client-id
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
  Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

如果没有指定entity-name,那么所有指定类型的entity都会被列举,例如:

# describe all users
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users
  Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

# describe for (user, client)
> bin/kafka-configs.sh  --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
  Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

通过在broker中添加配置,我们也可以为所有的client-id设置默认配额。这些属性只有在配额未被覆盖和默认值未在zookeeper中设置时才有效。默认情况下,每个client-id都会有一个无限的配额。下面是为每个producer和consumer的client-id设置配额,值为10MB/sec。

quota.producer.default=10485760
quota.consumer.default=10485760

需要注意的是,这些属性在未来的版本中可能会被移除,kafka-configs.sh配置的默认值的优先级比这些属性高。

13.kafka metrics

kafka中有一套自有的监控数据,可通过jmx端口暴露给外部系统。连接方式如下所示:

bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://nieo-9-109-182-160:8713/jmxrmi --object-name kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica

14.数据中心

在某些生产应用环境中,我们需要管理一个数据管道,用于发布数据到不同的数据中心。我们建议的方法是为每个数据中心部署一个本地的kafka集群,每个数据中心的应用实例只能与本地的集群通信,集群间生成数据镜像。

这种部署模式允许数据中心作为独立的主体而存在,也允许我们管理和调整数据中心内部的备份,也允许每个基础服务独立运行和操作,即使数据中心内部的连接不可用。当生成镜像失败时,直到链接恢复,服务才能正常。

对于需要所有数据的全局视图的应用来说,你可以使用镜像来提供集群,集群聚合了所有数据中心的本地集群的镜像,这些聚合的集群可作为应用所需的完整数据集。

这不是唯一的部署模式,也可能基于WAN来读取和写入到远程Kafka集群,这种方式很明显会有一定的延时。

kafka会很自然地在producer和consumer端进行数据的批量处理,这也是获取吞吐量的一种方式,即使是基于高延迟的连接。为了允许这种批量处理,我们有必要为producer、consumer和broker增加TCP socket buffer size,使用的参数是socket.send.buffer.bytes和socket.receive.buffer.bytes。

我们不建议运行单节点的kafka集群来扩散数据到高延迟的数据中心,对于写kafka和zookeeper来说,这都会导致非常高的备份延迟。如果两地之间的网络不可用,本地的kafka和zookeeper都会是可用的。