下面是LinkedIn内部kafka线上环境的操作和实践案例。
1.创建topic
新增topic有两种方式:
(1)一种是手动创建
(2)另一种是在发布消息到一个不存在的topic时可自动创建此topic(auto.create.topics.enable参数为true)。
手动创建topic的命令如下所示:
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
replication-factor参数表示每个消息会写入到多少个broker中。如果broker数量为2,而replication-factor参数为3,则会写入失败。我们建议replication-factor设为2或3,这样的话你就可以大张旗鼓地启停一台服务器,而不会中断数据的消费。
partitions参数表示分区数,控制着这些消息会被分配到多少个日志文件中。分区数有一些重要的注意事项:首先,每个分区必须在一台broker中,如果整个数据集有20个分区,那么这些分区会被分配到不超过20台broker中;然后,分区数限制了消费进程的最大并发数。
每个分区的日志都会在kafka-log目录的文件夹中,文件夹名称的组成为“{topic名称}-{分区号}”。因为系统的文件夹名称不能超过255字符,所以对topic的名称的长度也有一定的限制。假定分区数小于100000,因此topic名称就不能超过249个字符。
命令行中的其他配置可以覆盖broker级别的配置,比如数据的保留时间。
2.修改Topic
修改Topic包含修改分区数或者其他配置参数。
特别要注意的是:kafka当前不支持减少topic的分区。
2.1.添加分区
命令行案例如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name --partitions 40
需要注意的是,以上案例只是修改了topic的分区数,而不会更改原有分区数据的分布。如果生产数据依赖分区,那么这个操作会干扰消费者消费数据。也就是说,如果数据分区策略是“hash(key) % number_of_partitions”,那么增加分区后kafka会对刷新此之后的数据分区,但还是不会自动化重分布数据。
2.2.增加配置
命令行案例如下所示:
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
2.3.删除配置
命令行案例如下所示:
bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
2.4.删除topic
命令行案例如下所示:
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
3.优雅地关闭集群
kafka集群会自动检测broker是否宕机,并为此宕机的broker中的分区选举新的leader。当服务器宕机或broker因维护状态或配置改变导致的内部进程挂掉的时候,也会执行一样的选举动作。
对于主动关闭集群,kafka支持一种更佳优雅的机制来停止broker,而不是kill进程。服务器被优雅关闭有以下两个重要优势:
(1)broker会同步所有日志到磁盘中,以避免重启时进行日志的恢复,例如校验日志尾部所有消息的校验和。因为日志恢复会花费时间,所以这加速了内部服务的重启。
(2)broker也会在关闭之前迁移本服务器上所有的分区到其他broker上,这能更快速地把leader关系传递到其他副本上,降低了分区不可用的时间(毫秒级)。
无论服务器被stop还是kill,日志的同步都会自动发生,但leader关系的迁移需要添加配置:
controlled.shutdown.enable=true
需要注意的是,controlled shutdown只有在所有的分区都有副本时才会发生,也就是说replication-factor参数要大于1且至少有一个副本是可用的。
一般来说只有停用了最后一个副本,那么此topic就不可用了。
4.leader的均衡
无论broker停止或是宕机,broker中分区的leader关系都会被转移到副本中。当broker重启时,此broker的分区就会成为follower,这意味着这些分区不会被kafka-client读或者写。
为了避免这种失衡,Kafka有一个preferred-replicas(assigned-replicas)参数。如果某个分区的preferred-replicas参数为“1,5,9”,那么比起broker5和broker9,broker1会被优先选为leader,因为1在preferred-replicas参数列表的前面。默认情况下,kafka集群会试着按照已保存的副本顺序进行leader关系的修复,这个行为需要设置一个参数,如下所示:
auto.leader.rebalance.enable=true
我们可以把这个参数设为false,但我们就需要手动才能使leader关系符合已保存的副本顺序,命令如下所示:
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
这个命令的实现逻辑是:对于每一个topic的partition,controller先获得preferred replica(就是assigned replicas列表中的第一个replica),如果这个preferred replica不是leader,controller则向preferred replica所在的那个broker发送一个请求,使之成为partition的leader。
5.通过机架实现replica的均衡
机架感知特性可以把相同分区的不同副本扩散到不同的机架上。这个进一步扩展了kafka的高可用特性,用broker-failure来掩盖rack-failure,降低了机架崩溃时数据丢失的风险。这个特性也会应用到broker分组,例如EC2的可用区。
我们可以在broker配置中加上以下配置来指定此broker属于哪个机架。
broker.rack=my-rack-id
当topic更创建、修改或副本被重分布时,机架的限制就会出现,确保副本尽可能分不到不同的机架上。
用于分配副本所在broker的算法确保了每个broker上leader的数量的限制,尽管broker被分布在不同的机架上,这也确保了吞吐量的均衡。
但是,如果某个机架上broker数量与其他机架不同,副本的分配就会不均衡,拥有更少broker的机架会维护更多的副本,这也就意味着这些机架需要为这些副本提供更多的存储资源。因此,在每个机架上配置相同数量的broker才是明智的。
6.管理消费组
在ConsumerGroupCommand工具的帮助下,我们可以list、describe、delete消费组。消费组可被手动删除,也可被自动删除。当此消费组最后一次提交的offset过期之后此消费组会被自动删除。只有当消费组中没有活跃的消费者时,手动删除消费组才能生效。案例如下:
# list所有的消费组信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 —-list
# describe某个消费组详细信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
6.1.describe
describe命令可以查看消费组的offset信息,结果如下图所示:
结果中有三个比较重要的数据:
(1)CURRENT-OFFSET:表示当前消费的offset
(2)LOG-END-OFFSET:表示日志中最后一条消息的offset
(3)LAG:表示还有多少条消息未被消费。此参数可以描述当前分区消息积压情况,判断当前消费组速率是否能匹配上生产者速率。
6.2.describe members
列举消费组中所有活跃的消费者。案例命令如下:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
结果如下图所示:
可以看到某个消费者在分配分区的个数,但是看不到消费者正在消费哪个topic分区。
6.3.describe members verbose
列出详细消费组信息,包括分配的具体分区信息。案例如下图所示:
可以看到消费者消费分区的个数,也能看到消费的topic分区列表。
6.4.describe offsets
输出结果与describe选项相同,是describe操作的默认选项。
6.5.describe state
提供详细的消费组级别的信息。案例结果如下图所示:
6.6.delete group
如果要手动删除一个或多个消费组,可以使用“--delete”选项,例如下图所示:
6.7.reset-offsets
如果要重置消费组的offset,我们可以使用“--reset-offsets”选项。这个选项只支持一次删除一个消费组。还需要使用定义的域:--all-topics或--topic。使用时必须使用一个域,除非我们使用“--from-file”这个场景。
同时,我们首先要确保此消费实例是否处于inactive状态。“--reset-offsets”还有三个执行选项:
(1)default:展示被重置的offsets
(2)execute:运行
(3)reset-offsets:进程
(4)export:把结果导出到csv文件。
--reset-offsets还有以下应用场景可以选择,且必须选择一种以上:
需要注意的是,如果重置的offset超出了当前offset的最大范围,则会被置为最后的offset值。例如,如果最后的offset为10,把offset向后移动15,那么移动后的offset不是15,而是10。
例如,我们把消费组的offset重置到最新的offset,如下图所示:
如果你使用的是旧的高等级消费者,把消费组元数据信息保存在zookeeper中(offsets.storage =zookeeper),那么我们需要传递“--zookeeper”,而不是“--bootstrap-server”。
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
7.集群扩容
往kafka集群添加broker是非常简单的,只需要给这个broker设置设置一个唯一id,并在新服务器中启动这个broker即可。但是新的服务器不会被自动分配分区数据,所以,只有把分区移到这个broker,或新topic创建时,这个broker才会起作用。一般来说,我们在集群中添加新机器的时候,一般会从其他旧的broker中迁移数据到新的broker。
8.集群内分区重分布——数据迁移
迁移数据的过程通常是手动初始化,但自动完成的。
8.1.分区重分布原理
在底层,kafka会把新的broker作为迁移分区的follower,并让这个新broker上的分区完整备份原分区所有数据。当新broker上的分区已经完整备份了数据,并加入到了in-sync副本列表后,kafka会删除原分区的数据。
8.2.分区重分布工具
分区重分布工具可用于broker间分区的迁移。一次理想的分区重分布要确保所有broker间负载均衡及分区数量保持一致。分区重分布工具没办法自主学习数据在kafka集群中的分布来实现平负载均衡。因此,管理员需要在迁移时指定迁移哪些topic的哪些分区。
分区重分布工具可工作于三种独立的模式:
第一种:--generate。这个模式中,给定topic列表和broker列表,工具会生成候补分布任务,用于迁移所有制定的topic的所有分区到制定的新broker中。这个选项仅仅提供了便利的方式来生成一个分区重分布计划,需要指定迁移的topic列表以及目标broker列表。
第二种:- -execute。在这种模式中,工具根据用户提供的重分布计划开始分区的重分布,一般使用“- -reassignment-json-file”选项来指定重分布计划,这可以是管理员自定义的重分布计划,也可以是使用“- -generate”模式生成的重分布计划。
第三种:- -verify。在这种模式中,工具检验上一次“- -execute”重分布的状态,列举出所有的分区。状态可以是:successfully completed、failed、in process三者之一。
8.3.数据自动迁移到新broker
分区重分布工具可以用于把现有broker中的topic数据迁移到新的broker中。这在kafka集群扩容时经常用到,因为比起一次迁移一个分区数据,迁移整个topic数据更简单。当使用topic迁移时,我们需要在命令中指定迁移的topic列表以及目的broker列表。这个工具能够负载均衡地把topic数据迁移到新的broker中,在迁移过程中topic的副本数保持不变。实际上,指定topic列表中所有的分区的副本都会从老的broker迁移到新的broker。
举例来说,下面的案例会把foo1、foo2的所有分区迁移到新的broker5和6。在结束这次迁移后,foo1和foo2的所有分区都只会存在于broker5和6中。
因为工具接受把topic列表以json文件的形式作为输入参数,我们首先需要指定需要迁移的topic,以及创建如下所示的json文件。
> cat topics-to-move.json
{
"topics": [
{"topic": "foo1"},
{"topic": "foo2"}
],
"version":1
}
当json文件准备完毕后,使用分区重分布工具生成候补重分布任务,命令如下所示。
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{
"version":1,
"partitions":[
{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}
]
}
Proposed partition reassignment configuration
{
"version":1,
"partitions":[
{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}
]
}
这个候把任务把foo1和foo2这两个topic的所有分区数据迁移到5和6这两个topic中。但有一点需要注意,此时任务还没开始执行,只是告诉你当前的分布状态以及提出了一个新任务。你应当保存当前的分布状态,以便于之后可能发生的回滚。新的任务应当保存在一个json文件中,例如expand-cluster-reassignment.json文件,并在工具执行“- -execute”模式时把这个json文件作为参数传递给工具。执行命令如下所示:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{
"version":1,
"partitions":[
{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}
]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{
"version":1,
"partitions":[
{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}
]
}
最后,可以通过“- -verify”选项查看分区重分布任务的状态。需要注意的是,“- -verify”模式的运行需要用到“- -execute”模式运行时用到的expand-cluster-reassignment.json文件,内容是相同的。
8.4.自定义分区重分布和迁移
分区从分布工具可用于选择性地把某些分区移动到其他broker中。如果我们在这种场景下进行分区重分布,那么用户需要知道重分布计划,但不需要通过工具生成候补重分布任务,有效地跳过了“- -generate”步骤,直接使用“- -execute”进行迁移。
举例来说,我们需要把foo1这个topic的0号分区迁移到5和6号broker,把foo2这个topic的1号分区迁移到2和3号broker。
第一步,手动编写自定义重分布计划,写入到json文件,文件内容如下所示:
> cat custom-reassignment.json
{
"version":1,
"partitions":[
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}
]
}
第二步,使用custom-reassignment.json作为“- -execute”模式的参数,启动重分布任务。
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{
"version":1,
"partitions":[
{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}
]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{
"version":1,
"partitions":[
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}
]
}
可通过“- -verify”模式来检查分区重分布任务当前的状态。需要注意的是,在使用“- -verify”模式时,你需要传递与“- -execute”模式一样的“- -reassignment-json-file”入参。使用案例如下所示:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
9.从集群移除broker
目前,分区重分布工具还无法支持从集群中移除broker而自动生成分区重分布计划。因此,管理员一定要自己提出重分布计划,以便把这个即将移除的broker中所有分区的副本都迁移到其他的broker中。相对来说,这个将会更加繁琐,重分布需要确保所有的副本不是从即将移除的broker中迁移到另外的一个broker,而应当是其他的多个broker。为了使迁移进程更加简单,我们计划在未来的版本中添加工具来支持从集群中移除broker。
10.增加副本
为某个已存在的分区增加副本是非常简单的,只需要在自定义重分布json文件中指定额外的副本,并使用“- -execute”模式来增加指定分区的副本数即可。
举例来说,把foo这个topic的0号分区的副本数从1增加到3。那么在增加副本数之前,分区的唯一副本在5号broker上。增加副本数后,我们会在6号和7号broekr中增加另外两个副本。
第一步,手动编写自定义重分布配置文件,如下所示:
> cat increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"foo","partition":0,"replicas":[5,6,7]}
]
}
第二步,使用“--execute”模式开启重分布任务,命令如下所示:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{
"version":1,
"partitions":[
{"topic":"foo","partition":0,"replicas":[5]}
]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{
"version":1,
"partitions":[
{"topic":"foo","partition":0,"replicas":[5,6,7]}
]
}
第三步,使用“- -verify”模式检查分区重分布任务的状态,命令中使用的increase-replication-factor.json配置文件与“- -execute”模式是一样的。命令如下所示:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
我们也可以通过kafka-topics.sh工具来检验副本数的新增情况。命令如下所示:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
11.数据迁移期间限制带宽
kafka允许我们为副本流量限流,为broker间副本迁移设置带宽的上限。这在集群的再平衡场景中非常有用,比如启动新broker、新增或移除broker。这限制了数据敏感型操作的影响。
11.1.限流的方法
限流有两种方式。第一种,最简单也是最安全的方式,就是运行kafka-reassign-partitions.sh脚本时设置限流。第二种,kafka-configs.sh也可以用于直接查看或修改限流的值。
11.1.1.kafka-reassign-partitions.sh设置限流
我们可以在运行kafka-reassign-partitions.sh脚本时设置限流。
举例来说,如果你想要再平衡,通过以下命令,你可以在不超过50MB/s的网络速率下移动分区数据。
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000
当我们执行上述脚本时,我们可以看到限流信息,如下所示。
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
如果我们想在再平衡期间修改限流值,以便增加吞吐量,让再平衡更快执行。我们可以执行如下命令。
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s
一旦再平衡完成,管理员可以通过“--verify”模式查看再平衡任务的状态。如果再平衡确实已经完成,“--verify”命令可以把限流设置移除。对于管理员来说,在再平衡完成后,执行“--verify”命令来移除限流设置是非常重要的。如果不这样做,之后正常的副本传输流量也会被限流。
当“--verify”命令被执行时,重分布已经完成,下面的脚本可以保证限流设置被移除。
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
11.1.2.kafka-configs.sh设置限流
管理员也需要使用kafka-configs.sh命令检查分区的配置。有两类和限流有关的配置
(1)第一类是限流值,是broker级别的属性,也是一个动态属性,属性名如下所示。
leader.replication.throttled.rate
follower.replication.throttled.rate
(2)第二类属性是限流的副本集合,需要为每个topic进行配置,属性名如下所示。
leader.replication.throttled.replicas
follower.replication.throttled.replicas
上面的四个配置都是由kafka-reassign-partitions.sh命令自动分配的。
查看限流值配置的命令如下所示:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000, follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000, follower.replication.throttled.rate=700000000
以上两个属性展示了leader和follower端的备份协议应用的限流值。默认情况下,两端配置的是相同的限流值。
查看限流的副本集合的命令如下所示:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
从结果我们可以看出:上面限制只针对my-topic,对分区leader的限制只作用于102号broker的分区1以及101号broker的分区0。而对分区follower的限制只作用于101号broker的分区1亿记102号broker的分区0。
默认情况下,再平衡之前,kafka-reassign-partitions.sh会把针对leader的限流配置应用于所有的副本,因为这些副本之一会成为leader。而当再平衡之后,kafka-reassign-partitions.sh会把针对follower的限流配置应用于所有再平衡后的副本(不包括之前副本所在的broker)。所以,如果某个分区在101和102号broker上有副本,重分布到102和103号broker后,此分区的leader限流会应用于101和102号broker,而follower限流只会应用于103号broker。
如果有需要的话,你可以使用kafka-configs.sh工具的“--alter”模式来手动更改限流配置。
11.2.已限流备份的安全使用
使用限流的备份时,有以下注意事项需要关注。
(1)移除限流:在重分布完成后,需要手动移除限流,方法是执行kafka-reassign-partitions.sh命令的“--verify”模式。
(2)确保进程:如果限流设置的太低,比流入的写入效率都低的话,备份的进程将不起作用。也就是“max(BytesInPerSec) > throttle”,BytesInPerSec表示producers写入每个broker的数据流量速率。
管理员可以监控是否可以在限流情况下进行备份,使用以下监控数据:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
lag在备份期间应当能够不断减少,如果监控到的数据不在减少,那么就需要增大限流值。
12.设置配额(quotas)
配额的参数配置是user或client-id级别的,默认情况下,客户端会收到一个无限的配额。我们也可以为每个user、client-id设置配额。
配置自定义配额的方法如下所示:
# for (user=user1, client-id=clientA)
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
# for user=user1
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
# for client-id=clientA
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id ‘clientA'.
我们也可以通过“--entity-default”而不是“--entity-name”来设置默认配额,案例如下所示:
# Configure default client-id quota for user=userA
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
# Configure default quota for user
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.
# Configure default quota for client-id
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.
如何展示配额信息?案例如下:
# for a given (user, client-id)
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
# for a given user
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
# for a given client-id
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
如果没有指定entity-name,那么所有指定类型的entity都会被列举,例如:
# describe all users
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
# describe for (user, client)
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
通过在broker中添加配置,我们也可以为所有的client-id设置默认配额。这些属性只有在配额未被覆盖和默认值未在zookeeper中设置时才有效。默认情况下,每个client-id都会有一个无限的配额。下面是为每个producer和consumer的client-id设置配额,值为10MB/sec。
quota.producer.default=10485760
quota.consumer.default=10485760
需要注意的是,这些属性在未来的版本中可能会被移除,kafka-configs.sh配置的默认值的优先级比这些属性高。
13.kafka metrics
kafka中有一套自有的监控数据,可通过jmx端口暴露给外部系统。连接方式如下所示:
bin/kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://nieo-9-109-182-160:8713/jmxrmi --object-name kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
14.数据中心
在某些生产应用环境中,我们需要管理一个数据管道,用于发布数据到不同的数据中心。我们建议的方法是为每个数据中心部署一个本地的kafka集群,每个数据中心的应用实例只能与本地的集群通信,集群间生成数据镜像。
这种部署模式允许数据中心作为独立的主体而存在,也允许我们管理和调整数据中心内部的备份,也允许每个基础服务独立运行和操作,即使数据中心内部的连接不可用。当生成镜像失败时,直到链接恢复,服务才能正常。
对于需要所有数据的全局视图的应用来说,你可以使用镜像来提供集群,集群聚合了所有数据中心的本地集群的镜像,这些聚合的集群可作为应用所需的完整数据集。
这不是唯一的部署模式,也可能基于WAN来读取和写入到远程Kafka集群,这种方式很明显会有一定的延时。
kafka会很自然地在producer和consumer端进行数据的批量处理,这也是获取吞吐量的一种方式,即使是基于高延迟的连接。为了允许这种批量处理,我们有必要为producer、consumer和broker增加TCP socket buffer size,使用的参数是socket.send.buffer.bytes和socket.receive.buffer.bytes。
我们不建议运行单节点的kafka集群来扩散数据到高延迟的数据中心,对于写kafka和zookeeper来说,这都会导致非常高的备份延迟。如果两地之间的网络不可用,本地的kafka和zookeeper都会是可用的。