本文参考kafka3.6.0源码
1.Kafka 的 Exactly Once 语义
事务消息最主要的动机是在流处理中实现 Exactly Once 的语义,根据kafka的操作流程,Exactly Once语义可分为:
(1)仅发送一次:这是从producer端来考虑,单分区仅发送一次由生产者幂等保证,多分区仅发送一次由事务机制保证。
(2)仅消费一次:这是从consumer端来考虑,Kafka 通过offset的提交来控制消费进度,而offset的提交被抽象成向系统 topic 发送消息。这就使得发送和消费行为统一起来,只要解决了多分区发送消息的一致性就能实现 Exactly Once 语义。
2.Producer幂等性
Kafka 的发送幂等是通过序列号来实现的,每个消息都会被分配一个序列号,序列号是递增的,这样就可以保证消息的顺序性。当生产者发送消息时,会将消息的序列号和消息内容一起写入到日志文件中,下次收到非预期序列号的消息就会返回 OutOfOrderSequenceException 异常。
kafka开启producer幂等性需设置 enable.idempotence 参数,同时,producer内部会进行生产者参数校验,ProducerConfig.postProcessAndValidateIdempotenceConfigs方法对以下三个参数进行检查。
(1)max.in.flight.requests.per.connection 必须小于 5
(2)retries 必须大于 0
(3)acks 必须设置为 all
Kafka 将消息的序列号信息保存在分区维度的 .snapshot 文件中,文件内容格式如下图所示:
格式在ProducerStateManager.PRODUCER_SNAPSHOT_ENTRY_SCHEMA中定义。
每条记录都有ProducerId、ProducerEpoch 和 LastSequence三个字段。所以幂等的约束为:相同分区、相同 Producer(id 和 epoch) 发送的消息序列号需递增。即 Kafka 的生产者幂等性只在单连接、单分区生效。Producer 重启或消息发送到其他分区就失去了幂等性的约束。
snapshot 文件在 log segment 滚动时更新,发生重启后通过读取 .snapshot 文件和最新的日志文件即可恢复 Producer 的状态。Broker 的重启或分区迁移并不会影响幂等性。
3.事务流程
事务初始化
Kafka Producer 启动后我们使用两个 API 来初始化事务:initTransactions 和 beginTransaction。
在 initTransactions 中,Producer 首先发送 ApiKeys.FIND_COORDINATOR 请求获取 TransactionCoordinator。
之后即可向其发送 ApiKeys.INIT_PRODUCER_ID 请求获取 ProducerId 及 ProducerEpoch(也是上文中用于幂等的字段)。此步骤生成的 id 和 epoch 会写入内部 Topic __transaction_state 中,并且将事务的状态置为 Empty。
__transaction_state 是 compaction Topic,其中消息的 key 为客户端设置的 transactional.id(详见 TransactionStateManager#appendTransactionToLog)。
区别于 ProducerId 是服务端生成的内部属性;TransactionId 由用户设置,用于标识业务视角认为的“同一个应用”,启动具有相同 TransactionId 的新 Producer 会使得未完成的事务被回滚并且来自旧 Producer(具有较小 epoch)的请求被拒绝掉。
后续 beginTransaction 用于开始一个事务,该方法会创建一个 Producer 内部事务状态,标识这一个事务的开始,并不会有 RPC 产生。
消息发送
上一节说到 beginTransaction 只是更改 Producer 内部状态,那么在第一条消息发送时才隐式开启了事务:
首先,Producer 会发送 ApiKeys.ADD_PARTITIONS_TO_TXN 请求到 TransactionCoordinator。TransactionCoordinator 会将这个分区加入到事务中,并更改事务的状态为 Ongoing,这些信息被持久化到 __transaction_state 中。
然后 Producer 使用 ApiKeys.PRODUCE 请求正常发送消息到对应的分区中。这条消息的可见性控制在下文消息消费一节中会详细讨论。
事务提交与回滚
当所有消息发送完成后,Producer 可以选择提交或回滚事务,此时:
TransactionCoordinator:具有当前事务所有相关分区的信息
其他 Broker:已经将消息持久化到日志文件中
接下来 Producer 调用 commitTransaction 会发送 ApiKeys.END_TXN 请求将事务状态更改为 PrepareCommit(回滚事务对应状态 PrepareAbort)并持久化到 __transaction_state 中,此时从 Producer 的视角来看整个事务已经结束了。
TransactionCoordinator 会异步向各个 Broker 发送 ApiKeys.WRITE_TXN_MARKERS 请求,当所有参加事务的 Broker 都返回成功后,TransactionCoordinator 会将事务状态更改为 CompleteCommit(回滚事务对应状态 CompleteAbort)并持久化到 __transaction_state 中。
消息的消费
某个分区的消息可能是事务消息与非事务消息混杂的,如下图所示:
在 Broker 处理 ApiKeys.PRODUCE 请求时,完成消息持久化会更新 LSO 到第一条未提交的事务消息的 offset。这样在消费者消费消息时,可以通过 LSO 来判断消息是否可见:如果设置了 isolation.level 为 read_committed 则只会消费 LSO 之前的消息。
LSO(log stable offset): 它表示的是已经被成功复制到所有副本(replicas)并且可以被消费者安全消费的消息的最大偏移量。
但是我们可以发现 LSO 之前存在已回滚的消息(图中红色矩形)这些消息应该被过滤掉:在 Broker 处理 ApiKeys.WRITE_TXN_MARKERS 请求时,会将已回滚的消息索引写入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。