在 Spring Cloud Stream Kafka 应用中与外部事务管理器同步

工程 | Soby Chacko | 2023 年 10 月 04 日 | ...

本博客系列的其余部分

第一部分:Spring Cloud Stream Kafka 应用中的事务介绍

第二部分:Spring Cloud Stream Kafka 应用中的生产者启动事务

在本博客系列的上一部分中,我们了解了事务管理的基础知识,主要是在使用生产者启动的 Spring Cloud Stream Kafka 应用时。在那次讨论中,我们还简要介绍了 Spring Cloud Stream Kafka 消费者应用如何以适当的隔离级别消费以事务方式产生的记录。当您与外部事务管理器(例如关系数据库的事务管理器)同步时,我们提到必须使用事务来确保数据完整性。在本部分中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。

在开始探讨之前,务必记住,在实践中实现分布式事务是极其困难的。必须依赖两阶段提交 (2PC) 策略和适当的分布式事务管理器(例如兼容 JTA 的事务管理器)才能正确执行此操作。然而,大多数企业用例可能不需要这种复杂程度,并且我们考虑和在实践中看到的大多数用例可能最好坚持使用非分布式事务方法,正如我们在本博客中描述的那样。Spring 工程团队的Dave Syer 博士于 2009 年发表的这篇文章,对于理解分布式事务的挑战以及 Spring 中推荐的替代方法仍然具有现实意义(即使在 14 年后)。

让我们回到我们的讨论:在使用外部事务管理器时,如何在生产者启动和消费-处理-生产(读-处理-写)应用中实现 Spring Cloud Stream Kafka 应用的事务性。

现在,我们可以通过勾勒出一些代码示例来为我们的讨论奠定基础,这些代码示例将贯穿讨论过程。我们将使用一些领域对象来驱动演示,并为其创建了伪代码。

假设消息系统处理“事件”领域类型 - 让我们使用一个 PersonEvent

class PersonEvent {

   String name;
   String type;

   //Rest omitted for brevity
}

我们还需要一个用于 Person 对象的领域实体

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // Rest omitted for brevity
}

最后,我们需要一个用于 Person 领域对象的 CrudRepository

public interface PersonRepository extends CrudRepository<Person, String> {}

在生产者启动的场景中,假设当调用一个方法时(例如通过 REST),会创建一个 Person 领域对象,将其持久化到数据库,并通过 StreamBridge 作为 PersonEvent 发送到一个出站 Kafka 主题。

消费-处理-生产的场景中,假设输入主题接收到一个 PersonEvent,处理器由此生成一个 Person 领域对象并将其持久化到数据库。最后,它会生成另一个 PersonEvent 发送到一个出站 Kafka 主题。

在这里,我们也将使用 JPA 进行讨论。Spring Cloud Stream 应用是 Boot 应用,您可以在应用中包含 spring-boot-starter-jpa 依赖,并包含适当的 spring.jpa.* 属性以驱动必要的自动配置。假设 Spring Boot 会为我们自动配置一个 JPATransactionManager

让我们将用例分解为各种场景。

场景 1:生产者启动事务

在生产者启动的场景中,我们需要进行两个事务性操作:一个数据库操作,紧接着一个 Kafka 发布操作。这是基本思路。请记住,这段代码只展示了关键部分。在实际应用中,代码几乎肯定会比这复杂得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述生产者启动的代码是完全事务性的。在本博客的上一部分中,我们看到如果只有 Kafka 事务,仅添加 Transactional 注解是不够的。正如讨论过的,Transactional 注解没有关联事务管理器,我们需要一个使用相同底层事务资源的自定义事务管理器来实现事务性。然而,在这里情况不同。我们有 Spring Boot 自动配置的 JpaTransactionManager,并且事务拦截器使用它来启动一个事务。由于我们配置了 transaction-id-prefixStreamBridge 的发送操作可以事务性地完成。此外,KafkaTemplate 通过 TransactionSynchronizationManager 将 Kafka 事务与已有的 JPA 事务同步。方法退出时,主事务首先提交,然后是同步的事务,在本例中是 Kafka 事务。

此流程中的顺序如下。
  1. JPA 事务管理器启动一个新的 JPA 事务。
  2. 数据库操作开始,但由于我们仍在方法执行中,此处不会发生提交。
  3. StreamBridge 发送操作触发一个新的 Kafka 事务,通过事务同步管理器与 JPA 事务同步。
  4. 方法退出时,JPA 事务首先提交,然后是 Kafka 事务。

关于 Spring 中事务同步的一般说明:这听起来好像在后台进行复杂的事务同步。然而,正如我们在本文开头所暗示的,这里并没有进行分布式事务同步,更不用说在各种事务之间进行智能同步的方式了。事务本身对同步一无所知。Spring 的 TransactionSynchronizatonManager 只是协调多个事务的提交和回滚。在此上下文中同步事务的功能类似于嵌套两个或更多 @Transactional 方法或 TransactionTempate 对象。配置量更少,因为 Spring 为您完成了嵌套。

场景 2:反转提交顺序

假设由于流程中的一些新需求,我们需要反转提交顺序,让 Kafka 事务先于 JPA 事务提交。我们如何做到这一点?一个直观的想法是显式地为 @Transactional 注解提供一个 Kafka 事务管理器,并让 JPA 事务与作为主事务的 Kafka 事务同步。代码如下所示:

@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我们需要提供一个自定义的 Kafka 事务管理器

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

由于 Spring Boot 在检测到已存在事务管理器时不会自动配置事务管理器,我们必须自己配置 JPA 事务管理器

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我们的直觉在这里奏效了吗?我们成功改变了事务应用的顺序了吗?很遗憾,没有。它不起作用,因为 JPA 事务管理器不允许其事务与其他事务同步,例如本例中主事务管理器(自定义 Kafka 事务管理器)的事务。在我们的情况下,尽管我们将自定义 Kafka 事务管理器设置为主事务管理器,但 JPA 事务在执行 repository save 方法时会自行启动和提交,而不会与主事务同步。

此流程中的事件顺序如下:
  1. Kafka 事务管理器启动一个拦截器使用的新事务。
  2. 执行 repository save 方法时,JpaTransactionManager 会创建一个 JPA 事务,而不会与主事务同步。
  3. JPA 事务在方法执行过程中提交。
  4. 拦截器将在方法退出时提交 Kafka 事务。

那么,我们如何反转事务呢?有两种方法可以做到这一点。

首先,我们可以尝试链式事务管理器。ChainedTransactionManagerSpring Data 项目中的一个事务管理器实现。您可以向 ChainedTransactionManager 指定事务管理器列表,它会按照列表中的顺序启动事务。方法退出时,事务将按照事务管理器列表的相反顺序提交。

虽然这听起来是一个合理的策略,但需要记住的一个重要注意事项是,ChainedTransactionManager 目前已被弃用,不建议使用。弃用的原因可以在 Javadoc 中找到。核心思想是,人们常常期望 ChainedTransactionManager 是一款神奇的灵丹妙药,能够解决所有事务问题,包括带有两阶段提交的分布式事务以及其他问题,而这与事实相去甚远。ChainedTransactionManager 仅确保事务按照特定顺序启动和提交。它不保证任何事务同步,更不用说任何分布式事务协调了。如果您对 ChainedTransactionManager 的局限性感到满意,并且像我们的用例一样需要特定的顺序,那么使用这个事务管理器是合理的,只要您记住正在使用框架中已弃用的类。

让我们在我们的场景中尝试使用 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一个名为 ChainedKafkaTransactionManager 的子类,该子类也已被弃用,因为其父类已被弃用。

我们使用之前在链式事务中看到的相同的自定义 KafkaTransactionManager bean。

我们还需要像之前一样创建 JpaTransactionManager bean,因为 Spring Boot 不会自动配置它,因为它已经检测到自定义的 KafkaTransactionManager bean。

添加这两个 bean 后,让我们创建 ChainedKafkaTransactionManager bean

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

完成这些配置后,让我们修改 Transactional 注解

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上述配置实现了我们想要的结果。运行此应用时,事务会如预期般反转 - 即 Kafka 先提交,然后是 JPA。

以下是流程中的步骤:
  1. TransactionInterceptor 使用自定义的 ChainedKafkaTransactionManager 来启动事务。它使用 JpaTransactionManager 启动 JPA 事务,并对 KafkaTransactionManager 执行相同的操作。
  2. 当方法调用数据库操作时,由于它已经在 JPA 事务中运行,因此不会启动另一个事务。此处不会发生提交或回滚,因为这不是一个新事务。
  3. 接下来,方法通过 StreamBridge 执行 Kafka 发布。我们在这里看到的情况与上面 JPA 的情况相同。由于存在一个已有的 Kafka 事务,它不会启动新的 Kafka 事务。StreamBridge 的发送操作会使用与初始 Kafka 事务相同的事务性生产者工厂进行。此处不会发生提交或回滚。
  4. 方法退出时,链式事务管理器会以相反的顺序进行,先是 Kafka 事务提交(或回滚),然后是 JPA 事务。

如果您对链式事务管理器的局限性感到满意,这种方法是可行的。请记住,这里没有事务同步。事务管理器在事务开始时按给定顺序应用,并在提交或回滚时以相反顺序应用。如果您选择此路线,由于您使用的是框架中已弃用的类,最好将它们复制到您的项目中而不是依赖框架。因为它们已被弃用,不保证新的功能和错误修复。未来的版本可能会完全删除它们。也可能永远不会删除它们,并且弃用状态的存在是为了阻止其使用(因为人们认为它具有比实际更多的功能)。

如果您不想依赖框架中已弃用的类,也不想在您的项目中复制和维护它们,您还有另一种选择可以尝试。您可以创建两个事务性方法并嵌套调用。这是一个实现该想法的蓝图:

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

确保嵌套调用位于不同的类中,原因我们在本博客系列的第二部分中已经详细阐述过,这是由于 Spring 中 AOP 代理的工作方式所致。

在这种情况下,这两个方法都是事务性的,并且是嵌套的。当事务拦截器拦截到第一个方法调用时,它会启动 JPA 事务。在执行过程中,嵌套调用(其方法也带有 @Transactional 注解)进入。由于此 bean 方法具有 @Transactional 注解,Spring AOP 会将该 bean 包装在一个 AOP advice 中。因为我们从另一个不同类中的 bean 调用这个被代理的 bean,所以代理机制会正确地调用被代理的 bean。另一个事务拦截器会使用不同的事务管理器(即 KafkaTransactionManager)启动一个新的事务。当进行 Kafka 发布时,事务不会立即提交或回滚,因为事务是作为方法的一部分启动的,提交或回滚发生在方法退出时。此时,控制权返回到第一个方法并继续执行。一旦原始方法退出,JPA 事务就会通过拦截器提交。如果发布到 Kafka 的方法抛出异常,它会回滚该事务。在这种情况下,回滚后,异常会传播回第一个事务性方法(JPA 方法),该方法也会因为异常而回滚其事务。

使用此技术时的一个重要注意事项 对嵌套方法的调用应该是第一个方法所做的最后一步,因为如果第一个方法在成功执行 Kafka 调用之后执行某些代码时失败,Kafka 事务已经提交。第一个方法中的失败不会自动回滚 Kafka 事务。

场景 3:消费-处理-生产

凭借我们在本系列中迄今为止对事务获得的核心理解,让我们来看一下事件驱动和流处理应用中的一个重要模式,称为消费-处理-生产模式。在 Spring Cloud Stream 中,这种模式的一个实现如下所示:

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我们有一个 Spring Cloud Stream 函数,它从输入主题消费 PersonEvent,然后在函数 lambda 表达式的主体中调用一个函数进行处理。此函数返回另一个 PersonEvent,我们将其发布到出站 Kafka 主题。如果我们不在事务上下文中,可以将上面的 run 方法内联为函数 lambda 表达式的一部分。但是,为了实现事务语义,@Transactional 注解必须位于另一个类的方法上。

为了使 binder 具有事务性,请确保为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 提供一个有效值。

上述代码是否完全事务性?然而,实际情况是,它只是端到端部分事务性。让我们看看事件序列。

Binder 是事务性的,因为我们提供了 transaction-id-prefix。当消费者在消息监听器容器中轮询记录时,它会在其 TrasactionTemplate#execute 方法内部调用内部监听器方法。因此,执行监听器方法(该方法会调用用户方法)的整个端到端过程都在由 KafkaTransactionManager 启动的事务中运行。事务启动时,TransactionSynchronizationManager 会将资源(生产者)绑定到该事务。当用户方法(带有 @Transactional 注解的方法)被调用时,事务拦截器会拦截该调用,让包装的 AOP advice 处理实际的调用。因为我们有一个 JpaTransactionManager,事务拦截器会使用该管理器并启动一个新的事务。每个事务管理器实现会自行决定是否要与现有事务同步。对于 JpaTransactionManager(以及许多其他类似的数据库事务管理器实现),它不允许与现有事务同步,正如我们上面已经讨论过的。因此,JPA 事务是独立运行的,如上面各节所示。当 run 方法退出时,事务拦截器会使用 JPA 事务管理器执行提交或回滚操作。至此,JPA 事务管理器完成了它的工作。此时,方法调用的响应会返回给调用者,即 Spring Cloud Stream 基础设施。Spring Cloud Stream 中的这个机制会获取此响应并将其发送到 Kafka 的出站主题。它使用了最初事务开始时绑定的同一个事务性生产者。发送记录后,控制权返回给消息监听器容器,然后消息监听器容器会提交或回滚事务。

以下是此序列中的步骤:
  1. Kafka 消费者接收到记录。
  2. Spring Kafka 中的容器使用 TransactionTemplateexecute 方法调用监听器。

1. KafkaTransactionManager 启动一个新的事务。2. Kafka 资源被绑定(生产者)。3. 当到达用户代码时,事务拦截器最终会拦截该调用并启动一个新的 JPA 事务。4. AOP 代理然后调用实际方法。方法退出时,JpaTransactionManager 会提交或回滚。5. 方法的输出返回给 Spring Cloud Stream 中的调用者。6. 然后使用步骤 4 中的同一个事务性资源将响应发送到 Kafka 出站。7. 控制权返回给消息监听器容器,并且 KafkaTransactionManager 会提交或回滚。

那么,这里的问题是什么?它看起来是事务性的,但实际上只是部分如此。最初的主要问题是整个端到端过程超出了单个原子事务的范围,这是一个重要问题。这里有两个事务 - Kafka 和 JPA - 并且 JPA 和 Kafka 事务之间没有同步。如果数据库事务提交成功而 Kafka 发送失败,则无法回滚 JPA 事务。

我们可能会认为 ChainedTransactionManager 会在这里有所帮助。虽然这种直觉有一些道理,但它不适用于上面的代码。由于在调用监听器方法时容器中创建了 Kafka 事务,ChainedTransactionManager 不会从提供给它的任何 Kafka 事务管理器中创建任何新的 Kafka 事务。当退出用户方法时,我们仍然只有一个 JPA 事务要提交或回滚。Kafka 事务必须等到调用返回容器才能提交或回滚。

问题在于我们在 Spring Cloud Stream 中使用了一个函数,该函数使框架能够发布到 Kafka。在我们的例子中,任何用户指定的事务(例如 JPA 事务)都发生在 Spring Cloud Stream 进行 Kafka 发布之前。我们需要确保是用户代码来发布到 Kafka,这样我们才能将整个事务代码视为一个单元。为了实现这一点,我们应该切换到 Consumer 而不是 Function,然后使用 StreamBridge API 发布到 Kafka。看看这段修改后的代码:

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然后我们使用上面相同的 TxCode

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

注意,run 方法不返回任何内容,但我们通过 StreamBridge API 显式发送到出站 Kafka 主题。

让我们看看这些更改后的事件序列:
  1. Kafka 消费者接收到记录。
  2. Spring Kafka 中的容器使用 TransactionTemplate 的 execute 方法调用监听器。
  3. KafkaTransactionManager 启动一个新的事务。
  4. Kafka 资源被绑定(生产者)。
  5. 当到达用户代码时,拦截器会拦截该调用并使用 JpaTransactionManager 启动一个新的事务。
  6. 实际的用户方法被调用。
  7. Kafka 发送操作通过 StreamBridge 作为方法执行的一部分进行。底层的 KafkaTemplate 使用步骤 4 中绑定的同一个事务性生产者工厂。
  8. 方法退出时,JpaTransactionManager 会提交或回滚。
  9. 最后,当 Kafka 事务提交(或回滚)时,控制权返回到 TransactionTemplate#execute 方法。

请特别注意上面的步骤 7。当 KafkaTemplate 检测到已有 Kafka 事务正在进行中(在步骤 3 中开始)时,它不会与 JPA 事务同步,尽管 KafkaTemplate 具有此能力。现有的 Kafka 事务优先,并加入该事务。

即使我们仍然有两个独立的事务,从端到端的事务角度来看,事情是原子性的。如果通过 StreamBridge 进行的 Kafka 发布操作失败,JPA 和 Kafka 事务都不会执行提交操作。两者都会回滚。同样,如果数据库操作失败,两个事务仍然会回滚。然而,始终存在一个事务提交而另一个回滚的可能性,因此应用代码必须处理记录的去重以实现容错。

在讨论消费-处理-生产模式时,另一个关键组成部分是生产者需要将消费记录的偏移量(除了提交偏移量的消费者之外)发送给事务。正如我们在本博客系列的第一部分中看到的那样,Kafka Producer API 有一个名为 sendOffsetToTransaction 的方法,生产者通过 OffsetMetadataConsumerGroupMetadata 为每个分区发送一个偏移量(当前消息的偏移量 + 1)。在使用 Spring Cloud StreamSpring for Apache Kafka 时,应用不需要调用这个底层操作。Spring for Apache Kafka 中的 Kafka 消息监听器容器会代表应用自动处理。尽管框架在事务提交之前会在生产者上调用 sendOffsetToTransaction,但在事务协调器提交事务时,将偏移量发送到事务和实际的消费者偏移量提交是原子性发生的。

通过这次讨论,我们探讨了编写事务性 Spring Cloud Stream 应用的各种选项,这些应用必须与外部事务系统(如数据库)交互,同时消费和生产到 Apache Kafka。

在本系列的下一部分中,我们将探讨事务回滚(在编写事务系统时的另一个关键方面)以及在编写 Spring Cloud Stream Kafka 应用时如何访问各种 Spring 组件。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区的所有即将到来的活动。

查看全部