Spring Cloud Stream和Apache Kafka的事务回滚策略

工程 | Soby Chacko | 2023年10月11日 | ...

本博客系列中的其他部分

第一部分:Spring Cloud Stream Kafka 应用中的事务介绍

第二部分:Spring Cloud Stream Kafka 应用中的生产者启动的事务

第三部分:Spring Cloud Stream Kafka 应用中与外部事务管理器同步

在本博客系列的前三部分中,我们分析了Spring Cloud Stream Kafka应用程序中的事务工作方式。我们遇到了事务有帮助的不同上下文,包括生产者和消费者应用程序,以及应用程序如何正确使用它们。现在这些基本要素已经完成,让我们继续讨论事务的另一个方面:**当发生错误时回滚事务**。当发生错误并且事务系统无法提交事务时,事务管理器会回滚事务,并且不会将任何内容持久化供下游消费者查看。应用程序应该能够指示这种回滚机制的工作方式。Spring Cloud Stream 通过 Spring 对 Apache Kafka 的基本支持来促进这种回滚自定义。我们必须注意关于生产者和消费者(**消费-处理-生产**)事务应用程序的几件事。我们将对此进行探讨。

生产者启动的事务

这是一个我们在之前的文章中看到的代码片段。

@Transactional        
public void send(StreamBridge streamBridge)      
{
    for (int i = 0; i < 5; i++) {
      streamBridge.send("mySupplier-out-0", "my data: " + i);           
    }
}

如果事务方法抛出异常,我们该怎么办?答案是,从 Spring Cloud Stream 的角度来看,我们什么都不需要做。事务拦截器会启动回滚,最终 Kafka 中的事务协调器会中止事务。最终,异常会传播到调用者,然后调用者可以决定如果错误是瞬态的则重新触发事务方法。框架不会重试,因为这是一个生产者启动的事务。这种情况很简单,因为在事务回滚期间,我们不需要从应用程序或框架的角度做任何事情。如果发生错误,则保证会回滚。但是,请记住,即使事务已回滚,Kafka 日志中也可能存在未提交的记录。隔离级别为 `read_uncommitted`(默认值)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保它们使用 `read_committed` 的隔离级别,这样它们就不会接收上游事务回滚的任何记录。

与外部事务同步的生产者启动的事务

我们在本博客系列的最后一部分看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用程序也不需要执行任何操作来处理错误。事务会从数据库和 Kafka 发布中回滚。

消费者启动的事务回滚

如果生产者启动的事务回滚如此简单,您可能会好奇有什么大不了的,为什么我们必须为此主题专门撰写一整篇文章。在什么情况下应用程序需要提供特定的回滚策略?当您正在进行消费者启动的事务时,这是有意义的,因为我们需要特别注意如何处理已消费记录的状态及其偏移量。让我们重新检查一下我们在本系列之前的博客中运行的消费者启动的事务方法代码。

public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

您可能还记得,这是一个端到端事务处理的**消费-处理-生产**模式。如果事务方法抛出异常会发生什么?在这里,我们需要了解框架在回滚事务时如何处理已消费的记录。Spring for Apache Kafka 中的基础消息侦听器容器允许设置回滚处理器

消息侦听器容器使用列表开头处的失败记录调用 `AfterRollbackProcessor` API,其中包含上次消费者轮询中剩余的记录。这些实现使用主题/分区信息来确保在下次轮询期间再次获取失败的记录。当应用程序在 Spring Cloud Stream 中启用事务时,我们使用名为 `DefaultAfterRollbackProcessor` 的默认实现来实现 `AfterRollbackProcessor` API。因此,当事务回滚时,默认情况下会启用此实现。让我们检查一下 `AfterRollbackProcessor` 运行时会发生什么。

Spring Cloud Stream 允许您通过消费者绑定设置方法调用重试的最大次数。例如,`spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts`。最大尝试次数包括初始尝试。默认值为**三次**。如果要禁用重试,可以将此值设置为**一次**。在这种情况下,框架只尝试一次记录。此值包含记录的第一次尝试。因此,在默认情况下为三次的情况下,绑定器在第一次尝试后会重试两次。

当用户方法抛出异常时,容器最初启动的事务将回滚。由于我们处于事务上下文中,因此容器随后使用事务模板在新的事务中调用 `AfterRollbackProcessor` 的 process 方法,该方法启动一个新的 Kafka 事务。在运行 `AfterRollbackProcessor` 的 process 方法时,它会根据最大尝试次数配置检查是否还有任何挂起的重试。如果发现更多重试,它会提交当前事务,这是一个无操作,因为在检查期间没有任何事情发生。发生消费者 seek 操作,其中包含失败的记录,以便下次轮询返回此失败的记录。然后消费者轮询更多记录,这些记录会重新传递失败的记录。整个流程重新开始并继续。如果再次失败,则会重复此过程,直到用尽所有可用的重试次数。一旦所有重试都用尽,`AfterRollbackProcessor` 就会调用已注册的恢复程序。Spring Cloud Stream 注册一个恢复程序,该程序会将出错的记录发送到错误通道。之后,输入(已恢复)记录的偏移量将发送到新事务。在此之后,当前事务提交,这会将偏移量原子地发送到事务并提交记录的偏移量。现在流程已完成。已恢复的记录不包含在消费者 seek 中,下次轮询将返回新记录。

如果恢复由于任何原因失败,容器的行为就像重试未用尽一样,并进入无限重试。如上所述,当恢复成功时,失败的记录不包含在 seek 中,因此下次轮询不会返回该记录。

假设应用程序将最大尝试次数设置为两次,并且记录两次都失败,**使用事务时,事件顺序如下所示。**

  1. 消费者轮询记录,Spring Kafka 中的侦听器在 `TransactionTemplate` 的 execute 方法中被调用,这会触发 `KafkaTransactionManager` 启动一个新事务。
  2. 最终,侦听器调用用 `@Transactional` 注解的用户方法。
  3. 事务拦截器拦截事务方法,并通过其事务管理器启动一个新的 JPA 事务。
  4. 当它到达数据库操作时,不会发生提交或回滚,因为我们正在方法执行的中间。
  5. `StreamBridge` 调用 send 方法,该方法发布到 Kafka 主题。此处不会启动新的 Kafka 事务,因为已经有 Kafka 事务正在进行中。`KafkaTemplate` 使用相同的事务资源(生产者)进行发布。
  6. 该方法从任何操作中抛出异常,事务拦截器捕获该异常并对 JPA 事务执行回滚。
  7. 异常传播回 Spring Kafka 中的消息侦听器容器,侦听器通过 `TransactionTemplate` 的 execute 方法调用用户方法。然后它回滚 Kafka 事务。
  8. 此时,容器在新的事务中调用 `AfterRollbackProcessor`,因为我们处于事务上下文中。它在其 `TransactionTemplate` 上启动另一个 execute 操作,由 `KafkaTransactionManager` 创建一个新的 Kafka 事务。
  9. `TransactionTemplate` 的 execute 方法调用 `AfterRollbackProcessor` API 中的 process 方法并立即返回,因为还剩一次重试(因为我们最多有两次尝试)。
  10. 然后容器提交新的 Kafka 事务,关闭事务而无需执行任何操作——本质上是一个无操作。
  11. 下一次消费者轮询重新传递失败的记录,容器通过在新的事务中再次调用侦听器来重试(步骤 1)。
  12. 步骤 2-8 重复。
  13. `TransactionTemplate` 的 execute 方法调用 `AfterRollbackProcessor` 的 process 方法,并发现没有更多重试。
  14. **process** 方法调用已注册的恢复程序。由于我们将其作为 Spring Cloud Stream 应用程序运行,因此默认恢复程序会发送到错误通道。
  15. 记录恢复后,使用生产者在事务中发送已恢复记录的偏移量(最初由消费者消费)偏移量。
  16. 一旦AfterRollbackProcessor中的process方法返回,容器就会在事务上调用提交操作,该操作会原子地将偏移量发送到事务并执行消费者偏移量提交。

为什么我们需要在上面步骤 8中使用新的事务,以及在每次失败尝试后调用AfterRollbackProcessor时都需要新的事务?为什么我们不能在提交原始 Kafka 事务之前调用AfterRollbackProcessor?虽然在每次失败尝试执行回滚后任务后创建新的 Kafka 事务听起来像是多余的开销,但这却是必要的。当原始事务发生回滚时,它不会将偏移量发送到事务。如果需要重试,容器会在新的事务中再次调用监听器,这个循环会持续进行,直到重试次数用尽并且记录被恢复。容器可能创建并回滚与最大尝试次数一样多的事务,而不会将偏移量发送到事务。每次原始事务回滚时,容器都会为AfterRollbackProcessor调用启动一个对应的新的事务,其提交操作是空操作(最后一个在恢复之后的操作除外)。恢复记录后,最后一次调用会将偏移量发送到事务中,以便原子地提交偏移量并在 Kafka 端进行必要的交易清理。因此,正如我们所看到的,为了将偏移量发送到事务,我们需要在新的事务中调用AfterRollbackProcessor

自定义 AfterRollbackProcessor

如果应用程序想要自定义回滚后任务,而不是使用 Spring Cloud Stream 使用的默认 - DefaultAfterRollbackProcessor -,则可以使用ListenerContainerCustomizer提供自定义的AfterRollbackProcessor。下面的列表显示了如何操作。

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
   return (container, destination, group) -> container.setAfterRollbackProcessor(
           new DefaultAfterRollbackProcessor<byte[], byte[]>(
                   (record, exception) -> System.out.println("Discarding failed record: " + record),
                   new FixedBackOff(0L, 1)));
}

在提供上述自定义配置时,恢复程序会记录错误并继续执行。DefaultAfterRollbackProcessor的构造函数也包含一个没有重试的回退。因此,在这个例子中,一旦方法中第一次发生异常,记录就会通过日志记录来恢复。

记录恢复期间的事务性 DLQ 发布

Spring Cloud Stream 允许您在用尽所有重试后将失败的记录发送到一个唯一的DLQ(死信队列)主题,作为恢复过程的一部分。我们提到 Spring Cloud Stream Kafka 绑定器使用的DefaultAfterRollbackProcessor会将记录发送到错误通道。当应用程序启用DLQ时,绑定器会将失败的记录发送到一个特殊的DLT主题。这其中的具体细节不在我们事务讨论的范围内。然而,问题是DLT发布是否是事务性的。在设置DLQ基础设施时,如果应用程序使用事务(即,它提供transaction-id-prefix),则绑定器将使用KafkaTransactionManager中使用的相同原始事务性生产者工厂。因此,框架保证以事务方式发布到DLT

通过本文的讨论,我们涵盖了在 Spring Cloud Stream Kafka 应用程序中使用事务时所有主要的构建块。在本博客系列的下一节中,我们将探讨 Kafka 中事务的实际应用、流行的精确一次语义以及如何在 Spring Cloud Stream Kafka 应用程序中启用它们。

获取 Spring 新闻通讯

关注 Spring 新闻通讯

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部