Kafka2.4.0发布信息


发布于 2019-12-21 / 72 阅读 / 0 评论 /
Kafka2.4.0发布内容和发布时间

Kafka2.4.0发布信息可参考https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125307901

1.发布时间

KIP Freeze时间:2019年9月25日

Feature Freeze时间:2019年10月4日

Code Freeze时间:2019年10月16日

Release时间:2019年12月16日

2.版本需求

Kafka2.4.0版本对以下37个特性进行了开发。

KIP-213: Support non-key joining in KTable

KIP-307: Allow to define custom processor names with KStreams DSL

KIP-309: Add toUpperCase support to sasl.kerberos.principal.to.local rule

KIP-369: Alternative Partitioner to Support "Always Round-Robin" Selection

KIP-379: Multiple Consumer Group Management

KIP-382: MirrorMaker 2.0

KIP-392: Allow consumers to fetch from closest replica

从follower副本读取数据

早先kafka的设计中,为了使consumer读取数据能够保持一致,是只允许consumer读取leader副本的数据的。即follower replica只是单纯地备份数据的作用。那推出follower replica fetch功能的背景是什么呢?

为什么要从follower读取数据

在kafka多数据中心场景下,不同数据中心存在于不同的机房,当其中一个数据中心需要向另一个数据中心同步数据的时候,由于只能从leader replica消费数据,那么它不得不进行跨机房获取数据,而这些流量带宽通常是比较昂贵的(尤其是云服务器)。即无法利用本地性来减少昂贵的跨机房流量。

所以kafka推出这一个功能,就是帮助类似这种场景,节约流量资源。

实现方法

broker端配置replica.selector.class,ReplicaSelector实现类的全名。目前这个接口有两个实现类,一个是LeaderSelector,即从leader副本读数据。另一个则是RackAwareReplicaSelector,会去到指定的rack id读数据。

consumer端配置client.rack,这个参数需要和broker端指定的broker.rack相同,表示去哪个rack中获取数据。

KIP-412: Extend Admin API to support dynamic application log levels

KIP-429: Kafka Consumer Incremental Rebalance Protocol (Partially implemented)

KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics

KIP-440: Extend Connect Converter to support headers

KIP-444: Augment metrics for Kafka Stream (Partially Implemented)

KIP-455: Admin API for Replica Reassignment (Partially implemented)

KIP-460: Admin Leader Election RPC

KIP-464: Defaults for AdminClient#createTopic

KIP-467: Augment ProduceResponse error messaging for specific culprit records (Partially implemented)

KIP-470: TopologyTestDriver test input and output usability improvements

KIP-471: Expose RocksDB Metrics in Kafka Streams (Partially Implemented)

KIP-474: To deprecate WindowStore#put(key, value)

KIP-475: New Metrics to Measure Number of Tasks on a Connector

KIP-479: Add StreamJoined config object to Join

KIP-480: Sticky Partitioner

kafka producer生产消息时尽可能能填满一个batch再发送到一个分区。实际决定batch如何形成的一个因素是分区策略(partitioner strategy)。在Kafka2.4版本之前,在producer发送数据默认的分区策略是轮询策略,可能会造成一个大的batch被轮询成多个小的batch的情况。

鉴于小batch可能导致延时增加,之前对于无Key消息的分区策略效率很低。kafka2.4的时候推出一种新的分区策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy会随机地选择另一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。

sticky Partitioner实现

sticky Partitioner实现的代码是在UniformStickyPartitioner里面,使用Sticky Partitioner有助于改进消息批处理,减少延迟,并减少broker的负载。

public class UniformStickyPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return stickyPartitionCache.partition(topic, cluster);
    }

    public void close() {}
    
    /**
     * If a batch completed for the current sticky partition, change the sticky partition. 
     * Alternately, if no sticky partition has been determined, set one.
     */
    @SuppressWarnings("deprecation")
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

UniformStickyPartitioner#partition()直接通过一个cache类获取相同的分区,这表示新的record会一直发送到同一个分区中,除非生成新的batch,触发了UniformStickyPartitioner#onNewBatch()方法才会换分区。

sticky Partitioner优势

sticky partitioner最大的好处就是性能较好,按照官方给出的测试结果,使用sticky partitioner测试可以减少50%的延时,吞吐也有相对应的提高。

KIP-481: SerDe Improvements for Connect Decimal type in JSON

KIP-482: The Kafka Protocol should Support Optional Fields

KIP-484: Expose metrics for group and transaction metadata loading duration

KIP-488: Clean up Sum,Count,Total Metrics

KIP-492: Add java security providers in Kafka Security config

KIP-495: Dynamically Adjust Log Levels in Connect

KIP-496: Administrative API to delete consumer offsets

KIP-503: Add metric for number of topics marked for deletion

KIP-504: Add new Java Authorizer Interface

KIP-525: Return topic metadata and configs in CreateTopics response

KIP-507: Securing Internal Connect REST Endpoints

KIP-511: Collect and Expose Client's Name and Version in the Brokers (Partially implemented)

KIP-517: Add consumer metrics to observe user poll behavior

KIP-521: Enable redirection of Connect's log4j messages to a file by default

KIP-528: Deprecate PartitionGrouper configuration and interface