Kafka事务设计


发布于 2024-08-20 / 68 阅读 / 0 评论 /
随着业务需求的增长,Kafka 的事务消息功能应运而生,它允许应用程序以一种原子的方式处理消息

本文参考kafka3.6.0源码

1.Kafka 的 Exactly Once 语义

事务消息最主要的动机是在流处理中实现 Exactly Once 的语义,根据kafka的操作流程,Exactly Once语义可分为:

(1)仅发送一次:这是从producer端来考虑,单分区仅发送一次由生产者幂等保证,多分区仅发送一次由事务机制保证。

(2)仅消费一次:这是从consumer端来考虑,Kafka 通过offset的提交来控制消费进度,而offset的提交被抽象成向系统 topic 发送消息。这就使得发送和消费行为统一起来,只要解决了多分区发送消息的一致性就能实现 Exactly Once 语义。

2.Producer幂等性

Kafka 的发送幂等是通过序列号来实现的,每个消息都会被分配一个序列号,序列号是递增的,这样就可以保证消息的顺序性。当生产者发送消息时,会将消息的序列号和消息内容一起写入到日志文件中,下次收到非预期序列号的消息就会返回 OutOfOrderSequenceException 异常。

kafka开启producer幂等性需设置 enable.idempotence 参数,同时,producer内部会进行生产者参数校验,ProducerConfig.postProcessAndValidateIdempotenceConfigs方法对以下三个参数进行检查。

(1)max.in.flight.requests.per.connection 必须小于 5

(2)retries 必须大于 0

(3)acks 必须设置为 all

Kafka 将消息的序列号信息保存在分区维度的 .snapshot 文件中,文件内容格式如下图所示:

格式在ProducerStateManager.PRODUCER_SNAPSHOT_ENTRY_SCHEMA中定义。

每条记录都有ProducerId、ProducerEpoch 和 LastSequence三个字段。所以幂等的约束为:相同分区、相同 Producer(id 和 epoch) 发送的消息序列号需递增。即 Kafka 的生产者幂等性只在单连接、单分区生效。Producer 重启或消息发送到其他分区就失去了幂等性的约束。

snapshot 文件在 log segment 滚动时更新,发生重启后通过读取 .snapshot 文件和最新的日志文件即可恢复 Producer 的状态。Broker 的重启或分区迁移并不会影响幂等性。

3.事务流程

事务初始化

Kafka Producer 启动后我们使用两个 API 来初始化事务:initTransactions 和 beginTransaction。

在 initTransactions 中,Producer 首先发送 ApiKeys.FIND_COORDINATOR 请求获取 TransactionCoordinator。

之后即可向其发送 ApiKeys.INIT_PRODUCER_ID 请求获取 ProducerId 及 ProducerEpoch(也是上文中用于幂等的字段)。此步骤生成的 id 和 epoch 会写入内部 Topic __transaction_state 中,并且将事务的状态置为 Empty。

__transaction_state 是 compaction Topic,其中消息的 key 为客户端设置的 transactional.id(详见 TransactionStateManager#appendTransactionToLog)。

区别于 ProducerId 是服务端生成的内部属性;TransactionId 由用户设置,用于标识业务视角认为的“同一个应用”,启动具有相同 TransactionId 的新 Producer 会使得未完成的事务被回滚并且来自旧 Producer(具有较小 epoch)的请求被拒绝掉。

后续 beginTransaction 用于开始一个事务,该方法会创建一个 Producer 内部事务状态,标识这一个事务的开始,并不会有 RPC 产生。

消息发送

上一节说到 beginTransaction 只是更改 Producer 内部状态,那么在第一条消息发送时才隐式开启了事务:

首先,Producer 会发送 ApiKeys.ADD_PARTITIONS_TO_TXN 请求到 TransactionCoordinator。TransactionCoordinator 会将这个分区加入到事务中,并更改事务的状态为 Ongoing,这些信息被持久化到 __transaction_state 中。

然后 Producer 使用 ApiKeys.PRODUCE 请求正常发送消息到对应的分区中。这条消息的可见性控制在下文消息消费一节中会详细讨论。

事务提交与回滚

当所有消息发送完成后,Producer 可以选择提交或回滚事务,此时:

TransactionCoordinator:具有当前事务所有相关分区的信息

其他 Broker:已经将消息持久化到日志文件中

接下来 Producer 调用 commitTransaction 会发送 ApiKeys.END_TXN 请求将事务状态更改为 PrepareCommit(回滚事务对应状态 PrepareAbort)并持久化到 __transaction_state 中,此时从 Producer 的视角来看整个事务已经结束了。

TransactionCoordinator 会异步向各个 Broker 发送 ApiKeys.WRITE_TXN_MARKERS 请求,当所有参加事务的 Broker 都返回成功后,TransactionCoordinator 会将事务状态更改为 CompleteCommit(回滚事务对应状态 CompleteAbort)并持久化到 __transaction_state 中。

消息的消费

某个分区的消息可能是事务消息与非事务消息混杂的,如下图所示:

在 Broker 处理 ApiKeys.PRODUCE 请求时,完成消息持久化会更新 LSO 到第一条未提交的事务消息的 offset。这样在消费者消费消息时,可以通过 LSO 来判断消息是否可见:如果设置了 isolation.level 为 read_committed 则只会消费 LSO 之前的消息。

LSO(log stable offset): 它表示的是已经被成功复制到所有副本(replicas)并且可以被消费者安全消费的消息的最大偏移量。

但是我们可以发现 LSO 之前存在已回滚的消息(图中红色矩形)这些消息应该被过滤掉:在 Broker 处理 ApiKeys.WRITE_TXN_MARKERS 请求时,会将已回滚的消息索引写入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。