Kafka3.6.0发布信息可参考https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.6.0
1.发布时间
KIP Freeze时间:2023年7月26日
Feature Freeze时间:2023年8月16日
Code Freeze时间:2023年8月23日
2.版本需求
Kafka3.6.0对以下16个特性进行了开发。
2.1.KIP-405: Kafka分级存储
这是个重要特性,当前版本完成了主要的功能,Kafka3.7.0会对此功能做一些补充。
2.1.1.为什么要设计Kafka分级存储
Kafka是数据基础设施的重要组成部分,并得到了广泛运用。随着kafka集群规模的增长,越来越多的数据在kafka中存储的时间也越来越长,与可扩展性、效率和可操作性成为需要解决的重要问题。
Kafka将消息以append的形式保存到kafka broker的本地磁盘中。消息的保留周期由log.retention配置决定,这可以是broker级和topic级的参数。保留周期为consumer提供了保证,即使应用程序发生故障或因维护而停机,它也可以在保留其内正常返回,从停止的地方读取数据,不会丢失任何数据。
集群的总存储量与topic或分区数量、消息生产速率、以及保留周期(最重要)成正比。kafka broker通畅需要大量的磁盘来提供总量达数10TB的数据存储容量。Kafka broker本地存储的数据量带来了诸多操作上的挑战。
2.1.1.1.Kafka作为长期存储服务
kafka的应用场景越来越多,成为了所有数据的接入点。它不仅允许用户实时消费数据,还允许用户根据保留策略灵活地获取旧数据。考虑到kafka协议的简单性和consumer api的广泛使用,允许用户以更长的保留期存储和获取数据,这就让kafka成为了一个真正的数据源。
目前,kafka配置了较短的保留期,一般为3天,并且使用数据管道将过期的数据复制到可扩展的外部存储中,供长期使用,比如HDFS。这导致数据消费者必须根据数据的年龄构建不同版本的应用程序来消费不同文件系统的数据。
Kafka集群存储通常通过增加broker节点数量来进行横向扩展。但这也给集群增加了不必要的内存和CPU消耗,与将旧数据存储在外部文件系统中相比,整体存储成本增加了。具有更多节点的更大集群也增加了部署的复杂性,同时增加运营成本。
2.1.1.2.kafka本地存储和运营复杂性
当broker发生故障时,新节点取代故障节点时,新节点必须把故障节点中所有的数据从其他副本中同步到当前节点。同样,当添加一个新的broker节点来扩展集群存储时,集群重分布会将分区分配给新节点,这也需要拷贝大量的数据。恢复和再平衡时间也是和本地数据存储量成正比的。在上百个broker节点的集群设置中,节点故障是常见的现象,恢复过程花费了大量时间,使操作变得困难和耗时。
减少存储在每个broker上的数据量,也就减少了恢复和再平衡的时间。这也需要缩短日志保留时间,从而影响应用程序维护和故障恢复的可用时间。
2.1.1.4.云上Kafka
本地kafka部署使用具有多个高容量磁盘的硬件SKU来最大限度地提高IO吞吐量,并在保留期内存储数据。具有类似本地存储的等效SKU要么不可用,要么在云上非常昂贵。对于本地存储容量较小的SKY,有更多的可用选项作为kafka broker节点,他们更适合在云上使用。
2.1.2.解决方案——kafka分级存储
kafka数据主要是以流的方式从尾部读取进行消费。尾部读取利用了操作系统的页缓存来提供数据,而不是磁盘读取。出于回填和故障恢复的目的,从磁盘读取旧数据是不常见的。
在分层存储的方法论中,kafka集群配置了两层存储——本地和远程。本地存储和当前kafka一样,使用broker本地磁盘存储消息数据。新的远程存储使用了文件系统来存储已完成的消息数据,例如HDFS或S3。根据不同的存储层设置不同的保留期限。启用远程存储后,本地存储的保留期限可缩短到几小时,远程存储的保留期限可一更长,可以是几天甚至是几个月。当一个日志段在本地存储滚动时,它会与相应的index一起复制到远程存储。对延迟敏感的应用程序执行尾部读取,并从本地存储提供服务,利用现有的kafka机制高效地使用页缓存来提供数据。
该解决方案允许kafka集群单独扩展存储,而不依赖内存和CPU的同步扩展,使kafka成为长期存储解决方案。这也减少了kafka broker上本地存储使用量,从减少了恢复和再平衡过程中需要拷贝的数据量。远程存储中的日志段数据不需要在broker上进行还原或延迟还原,而是直接从远程存储中提供服务。有了这个机制,增加保留期限不再需要扩展kafka集群存储,也不需要增加新的broker节点。与此同时,总体数据保留时间仍然可以更长,从而消除了数据从kafka复制到外部存储的单独数据管道的需求,就像当前大部分部署中所做的那样。
2.1.2.1.本期目标
通过将旧数据保留在外部存储(如HDFS或S3),将kafka的存储扩展到kafka集群本地存储之外,对kafka内部的影响最小。对于未配置分层存储的现有用户,kafka行为和操作复杂性不得改变。
2.1.2.2.非目标
分层存储不能替代ETL管道和作业,现有的ETL管道一如既往继续消费kafka数据,尽管kafka中的数据保留期要长得多。
它不支持通过分层存储来压缩topic,在创建topic时设置remote.storage.enable为true,并不能把topic保留期外的清理机制改为压缩。
分层存储也不支持JBOD特性。
2.2.KIP-937: 优化kafka消息时间戳验证
在kafka中,消息的时间戳与两个配置参数有关:log.message.timestamp.type和log.message.timestamp.difference.max.ms,这是KIP-32引入的概念。
默认场景下,log.message.timestamp.type为CreateTime,log.message.timestamp.difference.max.ms为Long.MAX_VALUE。这就意味着producer生产的消息的最小时间戳和最大时间戳之间可相差数百年。我们可修改log.message.timestamp.difference.max.ms配置值以适配更敏感的场景,但这不是本期的目标。
为了重新演绎旧消息,具有过去时间戳的消息会有有效的案例,具有未来时间戳的消息本质上是不准确的,可能会导致异常的日志滚动行为。kafka用户遇到了生产者配置错误而导致的问题,例如使用纳秒而不是毫秒作为时间戳。相关的证据来自于Medium的文章,凸显了这个问题带来的影响。
这个KIP的目标是优化消息时间戳校验的逻辑,主要是通过拒绝未来时间戳的消息以及提供描述性异常。这有助于提升数据的完整性,以及防止因时间戳不准确而导致的潜在陷阱。
2.3.KIP-797: 允许同一端口上绑定多个listener,用于适配IPv4和IPv6双栈场景
当前kafka中,我们不能把多个listener绑定到同一个端口中。如果只是单IP栈环境是没有问题的,但现在我们要支持IPV4和IPV6双栈的场景,就没有理由在同一个端口不能同时绑定一个IPV4地址和IPV6地址了。
需要注意的是,当前提议只适用于listeners,而advertised.listeners已经支持这个特性了。
2.4.KIP-902: 升级Zookeeper到3.8.2版本
kafka目前依赖zookeeper3.6.3,这个zk版本的生命在2022年12月就已经结束了。我们想把zookeeper升级到3.8.2版本,这也是目前zookeeper3.8.x系列中最新的版本。
ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers.
The upgrade from 3.6.x and 3.7.x can be executed as usual, no particular additional upgrade procedure is needed.
ZooKeeper 3.8.x clients are compatible with 3.5.x, 3.6.x and 3.7.x servers as long as you are not using new APIs not present these versions.
ZooKeeper clients from 3.4 and 3.5 branch are fully compatible with 3.6 servers.
The upgrade from 3.5.7 to 3.6.0 can be executed as usual, no particular additional upgrade procedure is needed.
ZooKeeper 3.6.0 clients are compatible with 3.5 servers as long as you are not using new APIs not present in 3.5.
2.5.KIP-941: 范围查询中允许null的上下限
在交互式查询API中的RangeQuery类(允许我们从应用程序外部利用应用程序的状态)中,有一些方法用于获取上限和下限。当web客户端请求带有查询参数(成为这些边界)时,这些参数为null是常见的。我们希望开发人员能够根据需要传递上限/下限,而不是实现自己的逻辑来避免获得整个范围。
这个特性实现后,下面这样的逻辑我们是可以避免的。
private RangeQuery<String, ValueAndTimestamp<StockTransactionAggregation>> createRangeQuery(String lower, String upper) {
if (isBlank(lower) && isBlank(upper)) {
return RangeQuery.withNoBounds();
} else if (!isBlank(lower) && isBlank(upper)) {
return RangeQuery.withLowerBound(lower);
} else if (isBlank(lower) && !isBlank(upper)) {
return RangeQuery.withUpperBound(upper);
} else {
return RangeQuery.withRange(lower, upper);
}
}
KIP-793: Allow sink connectors to be used with topic-mutating SMTs
KIP-868 Metadata Transactions
KIP-875: First-class offsets support in Kafka Connect
KIP-890: Transactions Server-Side Defense
KIP-898: Modernize Connect plugin discovery
KIP-917: Additional custom metadata for remote log segment
KIP-923: Add A Grace Period to Stream Table Join
KIP-925: Rack aware task assignment in Kafka Streams
KIP-938: Add more metrics for measuring KRaft performance
KIP-863: Reduce CompletedFetch#parseRecord() memory copy
KIP-930: Rename ambiguous Tiered Storage Metrics