领先一步
VMware 提供培训和认证,助您加速进步。
了解更多本博客系列的其他部分
第 1 部分:Spring Cloud Stream Kafka 应用中的事务简介
第 2 部分:Spring Cloud Stream Kafka 应用中的生产者发起事务
第 3 部分:在 Spring Cloud Stream Kafka 应用中与外部事务管理器同步
在本博客系列的最后三部分中,我们分析了事务在 Spring Cloud Stream Kafka 应用中的工作方式。我们遇到了事务在不同场景下(包括生产者和消费者应用)的帮助作用以及应用如何正确使用它们。既然这些基本要素已经掌握,让我们继续探讨事务的另一个方面:错误发生时回滚事务。当发生错误且事务系统无法提交事务时,事务管理器会回滚事务,并且不会持久化任何数据供下游消费者查看。如果应用能够指定此回滚机制的工作方式,那将很有帮助。Spring Cloud Stream 通过 Spring 对 Apache Kafka 的基础支持,使得这种回滚定制成为可能。我们必须了解一些关于生产者和消费者(消费-处理-生产)事务性应用的事项。我们将对此进行讲解。
下面是我们在上一篇文章中看到的代码片段。
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "my data: " + i);
}
}
如果事务方法抛出异常,我们该怎么办?答案是,从 Spring Cloud Stream 的角度来看,我们什么都不需要做。事务拦截器会发起回滚,最终 Kafka 的事务协调器会中止事务。最终,异常会传播给调用方,然后调用方可以决定是否在错误是瞬时的情况下重新触发事务方法。由于这是生产者发起事务,框架不会进行重试。这种情况很简单,因为在事务回滚期间,我们不需要从应用或框架的角度做任何事情。如果发生错误,事务保证会回滚。然而,请记住,即使事务已回滚,Kafka 日志中可能仍存在未提交的记录。隔离级别为 read_uncommitted
(默认)的消费者仍然会收到这些记录。因此,消费者应用必须确保使用 read_committed
隔离级别,以便它们不会收到上游事务回滚的任何记录。
我们在本博客系列的最后一部分看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用也不需要做任何事情来处理错误。数据库和 Kafka 发布操作的事务都会回滚。
如果生产者发起事务回滚如此简单,您可能会想这有什么大不了的,以及为什么我们必须专门用一整篇文章来讨论这个话题。何时需要应用提供特定的回滚策略?当您有正在进行的消费者发起事务时,这才有意义,因为我们需要特别注意如何处理已消费记录的状态及其偏移量。让我们重新审视本系列上一篇博客中运行的消费者发起事务方法代码。
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
正如您所记得的,这是一个端到端事务性的消费-处理-生产模式。如果事务方法抛出异常怎么办?在这里,我们需要了解框架在回滚事务时如何处理已消费的记录。Spring for Apache Kafka 中的底层消息监听器容器允许设置一个回滚处理器(rollback processor)
消息监听器容器会调用 AfterRollbackProcessor
API,传入上次消费者拉取剩余的记录,失败的记录位于列表开头。实现会使用主题/分区信息,确保在下次拉取时再次获取失败的记录。当应用在 Spring Cloud Stream 中启用事务时,我们默认使用名为 DefaultAfterRollbackProcessor
的实现,它实现了 AfterRollbackProcessor
API。因此,当事务回滚时,此实现会默认生效。让我们看看这个 AfterRollbackProcessor
工作时会发生什么。
Spring Cloud Stream 允许您通过消费者绑定设置方法调用的最大重试次数。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
。最大尝试次数值包含初始尝试。此值的默认值为三。如果您想禁用重试,可以将此值设置为一。在这种情况下,框架只会尝试一次记录。此值包含记录的首次尝试。因此,在默认值三的情况下,绑定器在初始尝试后会重试两次。
当用户方法抛出异常时,容器最初启动的事务会回滚。由于我们处于事务上下文中,容器随后会使用事务模板在一个新事务中调用 AfterRollbackProcessor
的 process 方法,这会启动一个新的 Kafka 事务。在运行 AfterRollbackProcessor
的 process 方法时,它会根据最大尝试次数配置检查是否还有待处理的重试。如果发现还有更多重试,它会提交当前事务,这实际上是一个空操作,因为在检查期间没有发生任何事情。消费者会对失败的记录进行 seek 操作,以便下次拉取时返回此失败的记录。然后消费者会拉取更多记录,这会重新投递失败的记录。整个流程再次开始并持续。如果再次失败,它会重复,直到所有可用的重试次数耗尽。一旦所有重试次数耗尽,AfterRollbackProcessor
会调用注册的恢复器。Spring Cloud Stream 注册了一个恢复器,将出错的记录发送到错误通道。之后,输入(已恢复)记录的偏移量会发送到新事务。在此之后,当前事务会提交,这会原子性地将偏移量发送到事务并提交记录的偏移量。现在处理完成。已恢复的记录不会包含在消费者 seek 中,下次拉取会返回新的记录。
如果恢复由于任何原因失败,容器的行为就像重试没有耗尽一样,并进入无限重试。如上所述,当恢复成功时,失败的记录不会包含在 seek 中,因此下次拉取不会返回该记录。
假设应用设置的最大尝试次数为两次,并且记录两次都失败,以下是使用事务时事件发生的顺序。
TransactionTemplate
的 execute 方法内被调用,这会触发 KafkaTransactionManager
启动一个新的事务。@Transactional
注解进行标记。StreamBridge
调用 send 方法,该方法会发布到 Kafka 主题。这里不会启动新的 Kafka 事务,因为已经有一个 Kafka 事务正在进行中。KafkaTemplate
使用相同的事务资源(生产者)来发布消息。TransactionTemplate
的 execute 方法调用了用户方法。然后,它会回滚 Kafka 事务。AfterRollbackProcessor
。它会在其 TransactionTemplate
上开始另一个 execute 操作,由 KafkaTransactionManager
创建一个新的 Kafka 事务。TransactionTemplate
的 execute 方法调用 AfterRollbackProcessor
API 中的 process 方法并立即返回,因为还剩一次重试机会(因为我们最多尝试两次)。TransactionTemplate
的 execute 方法调用 AfterRollbackProcessor
的 process 方法,并发现没有更多的重试机会了。AfterRollbackProcessor
中的 process 方法返回,容器就会调用事务上的 commit 操作,该操作会原子性地将偏移量发送到事务并执行消费者偏移量提交。为什么在上面的步骤 8 中需要一个新的事务,以及在失败尝试后每次调用 AfterRollbackProcessor
时都需要一个新的事务?为什么不能在提交原始 Kafka 事务之前调用 AfterRollbackProcessor
?虽然每次失败尝试后创建一个新的 Kafka 事务来执行回滚后任务可能听起来像是不必要的开销,但这却是必需的。当原始事务发生回滚时,它不会将偏移量发送到事务。如果需要重试,容器会在一个新的事务中再次调用监听器,这个循环会一直持续,直到重试次数耗尽并且记录被恢复。容器创建并回滚的事务数量可能与最大尝试次数一样多,而没有将偏移量发送到事务。每次原始事务回滚时,容器都会为 AfterRollbackProcessor
调用启动一个相应的新事务,其提交都是空操作(no-op)(恢复后的最后一次提交除外)。在恢复记录后,这最后一次调用会将偏移量发送到事务,以便原子地提交偏移量并在 Kafka 端进行必要的事务清理。因此,正如我们所见,为了将偏移量发送到事务,我们需要在一个新的事务中调用 AfterRollbackProcessor
。
如果应用想要定制回滚后任务,而不是使用 Spring Cloud Stream 使用的默认 DefaultAfterRollbackProcessor
,那么可以使用 ListenerContainerCustomizer
提供一个自定义的 AfterRollbackProcessor
。以下列表显示了如何做到这一点
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 1)));
}
提供上述定制后,恢复器会记录错误并继续。DefaultAfterRollbackProcessor
的构造函数也接受一个没有重试的 backoff。因此,在本例中,一旦方法中首次发生异常,记录就会通过日志记录的方式得到恢复。
Spring Cloud Stream 允许您在重试次数耗尽后,将失败的记录作为恢复过程的一部分发送到一个唯一的死信队列 (DLQ) 主题。我们提到过,Spring Cloud Stream Kafka 绑定器使用的 DefaultAfterRollbackProcessor
会将记录发送到一个错误通道。当应用启用 DLQ 时,绑定器会将失败的记录发送到一个特殊的 DLT 主题。这方面的具体细节超出了我们事务讨论的范围。然而,问题在于 DLT 发布是否具有事务性。在设置 DLQ 基础设施时,如果应用使用了事务(即提供了 transaction-id-prefix
),绑定器会使用与 KafkaTransactionManager
中使用的相同的原始事务性生产者工厂。因此,框架保证以事务方式发布到 DLT。
通过本文的讨论,我们涵盖了在 Spring Cloud Stream Kafka 应用中使用事务时的所有主要构建块。在本博客系列的下一部分中,我们将探讨 Kafka 中事务的一个实际应用,即流行的精确一次语义(exactly-once-semantics),以及如何在 Spring Cloud Stream Kafka 应用中启用它们。