Kafka 幂等生产者和事务生产者
幂等生产者和事务生产者
消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
至少一次(at least once):消息不会丢失,但有可能被重复发送。
精确一次(exactly once):消息不会丢失,也不会被重复发送。
kafka提供的交付可靠性保障是第二种,即至少一次。
“已提交”的含 义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成 功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定 消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。
kafka也可以提供最多一次交付保障,只要让producer禁止重试就好。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。。我们通常不会希望出现消息丢失的 情况,但一些场景里偶发的消息丢失其实是被允许的,相 反,消息重复是绝对要避免的。此时,使用最多一次交付保 障就是最恰当的。
kafka同样可以做到第三种,即精确一次,通过两种机制,幂等性和事务性。
幂等性Producer
其最大的优势在于我们可以安全地重试 任何幂等性操作,反正它们也不会破坏我们的系统状态。
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创 建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功 能。在此之前,Kafka 向分区发送数据时,可能会出现同一 条消息被发送了多次,导致消息重复的情况。在 0.11 之 后,指定 Producer 幂等性的方法很简单,仅需要设置一个 参数即可,即 props.put(“enable.idempotence”, ture),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CO NFIG, true)。enable.idempotence 被设置成 true 后,Producer 自动升 级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。
幂等性Producer的作用范围
首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消 息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你 可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。
如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或 者依赖事务型 Producer。这也是幂等性 Producer 和事务 型 Producer 的最大区别!
事务性Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区 中。这批消息要么全部写入成功,要么全部失败。另外,事 务型 Producer 也不惧进程的重启。Producer 重启回来 后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即 可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transctional. id。最好为其设置一 个有意义的名字。
- 修改代码内容,调用了一些事务 API,如 initTransaction、 beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开 始、事务提交以及事务终止。
1 | producer.initTransactions(); |
和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
- read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
- read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
Author: corn1ng
Link: https://corn1ng.github.io/2019/10/25/kafka/5幂等生产者和事务生产者/
License: 知识共享署名-非商业性使用 4.0 国际许可协议