领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多本博客系列中的其他部分
第一部分:Spring Cloud Stream Kafka 应用中的事务介绍
第二部分:Spring Cloud Stream Kafka 应用中的生产者启动的事务
第三部分:Spring Cloud Stream Kafka 应用中与外部事务管理器同步
在本博客系列的前三部分中,我们分析了Spring Cloud Stream Kafka应用程序中的事务工作方式。我们遇到了事务有帮助的不同上下文,包括生产者和消费者应用程序,以及应用程序如何正确使用它们。现在这些基本要素已经完成,让我们继续讨论事务的另一个方面:**当发生错误时回滚事务**。当发生错误并且事务系统无法提交事务时,事务管理器会回滚事务,并且不会将任何内容持久化供下游消费者查看。应用程序应该能够指示这种回滚机制的工作方式。Spring Cloud Stream 通过 Spring 对 Apache Kafka 的基本支持来促进这种回滚自定义。我们必须注意关于生产者和消费者(**消费-处理-生产**)事务应用程序的几件事。我们将对此进行探讨。
这是一个我们在之前的文章中看到的代码片段。
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "my data: " + i);
}
}
如果事务方法抛出异常,我们该怎么办?答案是,从 Spring Cloud Stream 的角度来看,我们什么都不需要做。事务拦截器会启动回滚,最终 Kafka 中的事务协调器会中止事务。最终,异常会传播到调用者,然后调用者可以决定如果错误是瞬态的则重新触发事务方法。框架不会重试,因为这是一个生产者启动的事务。这种情况很简单,因为在事务回滚期间,我们不需要从应用程序或框架的角度做任何事情。如果发生错误,则保证会回滚。但是,请记住,即使事务已回滚,Kafka 日志中也可能存在未提交的记录。隔离级别为 `read_uncommitted`(默认值)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保它们使用 `read_committed` 的隔离级别,这样它们就不会接收上游事务回滚的任何记录。
我们在本博客系列的最后一部分看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用程序也不需要执行任何操作来处理错误。事务会从数据库和 Kafka 发布中回滚。
如果生产者启动的事务回滚如此简单,您可能会好奇有什么大不了的,为什么我们必须为此主题专门撰写一整篇文章。在什么情况下应用程序需要提供特定的回滚策略?当您正在进行消费者启动的事务时,这是有意义的,因为我们需要特别注意如何处理已消费记录的状态及其偏移量。让我们重新检查一下我们在本系列之前的博客中运行的消费者启动的事务方法代码。
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
您可能还记得,这是一个端到端事务处理的**消费-处理-生产**模式。如果事务方法抛出异常会发生什么?在这里,我们需要了解框架在回滚事务时如何处理已消费的记录。Spring for Apache Kafka 中的基础消息侦听器容器允许设置回滚处理器
消息侦听器容器使用列表开头处的失败记录调用 `AfterRollbackProcessor` API,其中包含上次消费者轮询中剩余的记录。这些实现使用主题/分区信息来确保在下次轮询期间再次获取失败的记录。当应用程序在 Spring Cloud Stream 中启用事务时,我们使用名为 `DefaultAfterRollbackProcessor` 的默认实现来实现 `AfterRollbackProcessor` API。因此,当事务回滚时,默认情况下会启用此实现。让我们检查一下 `AfterRollbackProcessor` 运行时会发生什么。
Spring Cloud Stream 允许您通过消费者绑定设置方法调用重试的最大次数。例如,`spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts`。最大尝试次数包括初始尝试。默认值为**三次**。如果要禁用重试,可以将此值设置为**一次**。在这种情况下,框架只尝试一次记录。此值包含记录的第一次尝试。因此,在默认情况下为三次的情况下,绑定器在第一次尝试后会重试两次。
当用户方法抛出异常时,容器最初启动的事务将回滚。由于我们处于事务上下文中,因此容器随后使用事务模板在新的事务中调用 `AfterRollbackProcessor` 的 process 方法,该方法启动一个新的 Kafka 事务。在运行 `AfterRollbackProcessor` 的 process 方法时,它会根据最大尝试次数配置检查是否还有任何挂起的重试。如果发现更多重试,它会提交当前事务,这是一个无操作,因为在检查期间没有任何事情发生。发生消费者 seek 操作,其中包含失败的记录,以便下次轮询返回此失败的记录。然后消费者轮询更多记录,这些记录会重新传递失败的记录。整个流程重新开始并继续。如果再次失败,则会重复此过程,直到用尽所有可用的重试次数。一旦所有重试都用尽,`AfterRollbackProcessor` 就会调用已注册的恢复程序。Spring Cloud Stream 注册一个恢复程序,该程序会将出错的记录发送到错误通道。之后,输入(已恢复)记录的偏移量将发送到新事务。在此之后,当前事务提交,这会将偏移量原子地发送到事务并提交记录的偏移量。现在流程已完成。已恢复的记录不包含在消费者 seek 中,下次轮询将返回新记录。
如果恢复由于任何原因失败,容器的行为就像重试未用尽一样,并进入无限重试。如上所述,当恢复成功时,失败的记录不包含在 seek 中,因此下次轮询不会返回该记录。
假设应用程序将最大尝试次数设置为两次,并且记录两次都失败,**使用事务时,事件顺序如下所示。**
AfterRollbackProcessor
中的process方法返回,容器就会在事务上调用提交操作,该操作会原子地将偏移量发送到事务并执行消费者偏移量提交。为什么我们需要在上面步骤 8中使用新的事务,以及在每次失败尝试后调用AfterRollbackProcessor
时都需要新的事务?为什么我们不能在提交原始 Kafka 事务之前调用AfterRollbackProcessor
?虽然在每次失败尝试执行回滚后任务后创建新的 Kafka 事务听起来像是多余的开销,但这却是必要的。当原始事务发生回滚时,它不会将偏移量发送到事务。如果需要重试,容器会在新的事务中再次调用监听器,这个循环会持续进行,直到重试次数用尽并且记录被恢复。容器可能创建并回滚与最大尝试次数一样多的事务,而不会将偏移量发送到事务。每次原始事务回滚时,容器都会为AfterRollbackProcessor
调用启动一个对应的新的事务,其提交操作是空操作(最后一个在恢复之后的操作除外)。恢复记录后,最后一次调用会将偏移量发送到事务中,以便原子地提交偏移量并在 Kafka 端进行必要的交易清理。因此,正如我们所看到的,为了将偏移量发送到事务,我们需要在新的事务中调用AfterRollbackProcessor
。
如果应用程序想要自定义回滚后任务,而不是使用 Spring Cloud Stream 使用的默认 - DefaultAfterRollbackProcessor
-,则可以使用ListenerContainerCustomizer
提供自定义的AfterRollbackProcessor
。下面的列表显示了如何操作。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 1)));
}
在提供上述自定义配置时,恢复程序会记录错误并继续执行。DefaultAfterRollbackProcessor
的构造函数也包含一个没有重试的回退。因此,在这个例子中,一旦方法中第一次发生异常,记录就会通过日志记录来恢复。
Spring Cloud Stream 允许您在用尽所有重试后将失败的记录发送到一个唯一的DLQ(死信队列)主题,作为恢复过程的一部分。我们提到 Spring Cloud Stream Kafka 绑定器使用的DefaultAfterRollbackProcessor
会将记录发送到错误通道。当应用程序启用DLQ时,绑定器会将失败的记录发送到一个特殊的DLT主题。这其中的具体细节不在我们事务讨论的范围内。然而,问题是DLT发布是否是事务性的。在设置DLQ基础设施时,如果应用程序使用事务(即,它提供transaction-id-prefix
),则绑定器将使用KafkaTransactionManager
中使用的相同原始事务性生产者工厂。因此,框架保证以事务方式发布到DLT。
通过本文的讨论,我们涵盖了在 Spring Cloud Stream Kafka 应用程序中使用事务时所有主要的构建块。在本博客系列的下一节中,我们将探讨 Kafka 中事务的实际应用、流行的精确一次语义以及如何在 Spring Cloud Stream Kafka 应用程序中启用它们。