在 Spring Cloud Stream Kafka 应用中与外部事务管理器同步

工程 | Soby Chacko | 2023年10月4日 | ...

本博客系列中的其他部分

第一部分:Spring Cloud Stream Kafka 应用中的事务入门

第二部分:Spring Cloud Stream Kafka 应用中的生产者发起的事务

在本博客系列的上一部分中,我们了解了事务管理的基础知识,主要是在使用生产者发起式 Spring Cloud Stream Kafka 应用时。在那次讨论中,我们还简要地了解了 Spring Cloud Stream Kafka 消费者应用程序如何使用适当的隔离级别来消费以事务方式生成的记录。当您与外部事务管理器(例如关系数据库的事务管理器)同步时,我们提到必须使用事务来确保数据完整性。在本部分中,我们将了解如何在使用外部事务管理器时在 Spring Cloud Stream 中实现事务保证。

在我们开始探究之前,务必记住,在实践中实现分布式事务极其困难。您必须依赖于两阶段提交 (2PC) 策略和适当的分布式事务管理器(例如兼容 JTA 的事务管理器)才能正确执行此操作。然而,大多数企业用例可能不需要这种复杂性,而且我们考虑到的以及在实践中看到人们使用的多数用例,最好坚持使用非分布式事务方法,正如我们在本博客中所述。这篇文章由 Spring 工程团队的Dave Syer 博士于 2009 年发表,即使在 14 年后,对于理解分布式事务的挑战以及 Spring 中推荐的替代方法仍然具有相关性。

让我们回到我们的讨论:在使用外部事务管理器的生产者发起式和消费-处理-生产 (读-处理-写) 应用中,如何在 Spring Cloud Stream Kafka 应用中实现事务性。

现在,我们可以通过勾勒出一些代码(我们可以在讨论中逐步完成)来为我们的示例域中的讨论设置舞台。我们使用一些域对象来驱动演示,并为它们创建了伪代码。

假设消息系统处理“事件”域类型 - 我们使用一个**PersonEvent**

class PersonEvent {

   String name;
   String type;

   //Rest omitted for brevity
}

我们还需要**Person** 对象的域实体

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // Rest omitted for brevity
}

最后,我们需要**Person** 域对象的**CrudRepository**

public interface PersonRepository extends CrudRepository<Person, String> {}

在生产者发起式场景中,假设当调用一个方法(例如通过 REST)时,会创建一个**Person** 域对象,持久化到数据库,并作为**PersonEvent** 通过`StreamBridge`发送到输出 Kafka 主题。

在**消费-处理-生产**场景中,假设输入主题接收一个**PersonEvent**,处理器从中生成一个**Person** 域对象以持久化到数据库。最后,它将另一个**PersonEvent** 发送到输出 Kafka 主题。

在这里,我们也使用 JPA 进行讨论。Spring Cloud Stream 应用是 Boot 应用,您可以在应用中包含 spring-boot-starter-jpa 依赖项,并包含适当的 spring.jpa.* 属性来驱动必要的自动配置。假设 Spring Boot 将为我们自动配置一个`JPATransactionManager`。

让我们将我们的用例分解成各种场景。

场景 1:生产者发起的事务

在生产者发起式场景中,我们有两个必须以事务方式执行的操作:数据库操作和 Kafka 发布操作。这是基本思路。请记住,此代码仅显示了所涉及内容的关键部分。在实际设置中,代码几乎肯定比这复杂得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述生产者发起式代码完全是事务性的。在本博客的前面部分,我们看到,如果只有 Kafka 事务,那么仅仅添加`Transactional`注解是不够的。如前所述,`Transactional`注解没有事务管理器,我们需要一个使用相同底层事务资源的自定义事务管理器来实现事务性。然而,这里的情况不同。我们有 Spring Boot 自动配置的`JpaTransactionManager`,事务拦截器使用它来启动事务。由于我们配置了**transaction-id-prefix**,因此可以以事务方式执行`StreamBridge`发送操作。但是,`KafkaTemplate`通过`TransactionSynchronizationManager`将 Kafka 事务与已存在的 JPA 事务同步。方法退出时,首先提交主事务,然后提交同步事务,在本例中为 Kafka 事务。

此流程中的顺序如下。
  1. JPA 事务管理器启动一个新的 JPA 事务。
  2. 数据库操作开始,但此处不执行提交,因为我们仍在方法执行中。
  3. `StreamBridge`发送操作触发一个新的 Kafka 事务,通过事务同步管理器与 JPA 事务同步。
  4. 方法退出时,首先提交 JPA 事务,然后提交 Kafka 事务。

**关于在 Spring 中同步事务的一般说明:**它听起来好像是在幕后执行复杂的事务同步。但是,正如我们在本文开头暗示的那样,这里没有进行分布式事务同步,更不用说任何同步各种事务的智能方法了。事务本身不知道任何同步信息。Spring 的`TransactionSynchronizatonManager`只是协调多个事务的提交和回滚。在此上下文中同步事务在功能上类似于嵌套两个或多个`@Transactional`方法或`TransactionTempate`对象。配置较少,因为 Spring 会为您进行嵌套。

场景 2:反转提交顺序

假设由于流程中的一些新需求,我们需要反转提交顺序,Kafka 事务先提交,而不是 JPA 事务。我们该如何做呢?一个我们可能会直觉想到的解决方案是明确地为`@Transactional`注解提供一个 Kafka 事务管理器,并让 JPA 事务与 Kafka 事务(这是主事务)同步。代码如下所示

@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我们需要提供一个自定义的 Kafka 事务管理器

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

由于 Spring Boot 如果检测到已存在的事务管理器则不会配置事务管理器,因此我们必须自己配置 JPA 事务管理器

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我们的直觉在这里起作用了吗?我们是否成功地改变了应用事务的顺序?不幸的是,没有。它不起作用,因为 JPA 事务管理器不允许其事务与其他事务同步,例如在这种情况下来自主事务管理器的那个事务(自定义 Kafka 事务管理器)。在我们的例子中,尽管我们创建了一个自定义的 Kafka 事务管理器作为主事务管理器,但在执行存储库保存方法时,JPA 事务会自行启动并提交,而不会与主事务同步。

此流程中的事件顺序如下:
  1. Kafka事务管理器启动拦截器使用的新的事务。
  2. 当存储库保存方法执行时,JpaTransactionManager会创建一个JPA事务,而不会与主事务同步。
  3. JPA事务在方法执行过程中提交。
  4. 拦截器将在方法退出时提交Kafka事务。

那么,我们如何反转事务呢?有两种方法。

首先,我们可以尝试链接事务管理器。ChainedTransactionManager 是来自Spring Data项目的交易管理器实现。您可以将事务管理器的列表指定给ChainedTransactionManager,它会按照事务管理器列表中的顺序启动事务。在退出时(即方法退出时),事务将按照事务管理器列表的反向顺序提交。

虽然这听起来像是一个合理的策略,但需要注意一个重要的警告,那就是ChainedTransactionManager目前已被弃用,并且不是推荐的选项。弃用的原因在Javadoc中。其要点是,人们通常期望ChainedTransactionManager是一个神奇的灵丹妙药,可以解决所有事务问题,包括具有两阶段提交的分布式事务和其他问题,而事实并非如此。ChainedTransactionManager仅确保事务以特定顺序启动和提交。它不保证任何事务同步,更不用说任何分布式事务协调了。如果您能接受ChainedTransactionManager的局限性并需要特定的顺序(就像我们的用例一样),那么只要您记住正在使用框架中已被弃用的类,使用此事务管理器是合理的。

让我们在我们的场景中尝试ChainedTransactionManager,看看会发生什么。Spring for Apache Kafka提供了一个名为ChainedKafkaTransactionManager的子类,它也已被弃用,因为父类已被弃用。

我们使用之前在链式事务中看到的相同的自定义KafkaTransactionManagerbean。

我们还需要像以前一样创建JpaTransactionManager bean,因为Spring Boot不会自动配置它,因为它已经检测到自定义KafkaTransactionManager bean。

添加这两个bean后,让我们创建ChainedKafkaTransactionManager bean

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

有了这些,让我们修改我们的Transactional注解

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上述配置实现了我们想要的结果。当您运行此应用程序时,我们将按预期反转事务——也就是说,Kafka将首先提交,然后是JPA。

以下是流程中的步骤:
  1. TransactionInterceptor使用自定义的ChainedKafkaTransactionManager启动事务。它使用JpaTransactionManager启动JPA事务,并对KafkaTransactionManager执行相同的操作。
  2. 当方法调用数据库操作时,因为它已经在JPA事务中运行,所以它不会启动另一个事务。这里不会发生提交或回滚,因为这不是一个新的事务。
  3. 接下来,方法通过StreamBridge执行Kafka发布。在这里,我们看到了与上面JPA相同的处理方式。由于已经存在Kafka事务,因此它不会启动新的Kafka事务。StreamBridge发送操作是使用与初始Kafka事务相同的可事务性生产者工厂进行的。这里不会发生提交或回滚。
  4. 当方法退出时,链式事务管理器会反向运行,首先是Kafka事务提交(或回滚),然后是JPA事务。

如果您能接受链式事务管理器的局限性,则此方法有效。请记住,这里没有事务同步。事务管理器在事务开始时按给定的顺序应用,并在提交或回滚时按相反的顺序退出。如果您选择此方法,因为您正在使用框架中已被弃用的类,那么最好将它们复制到您的项目中使用,而不是依赖于框架。由于它们已被弃用,因此无法保证新的功能和错误修复。未来的版本可能会完全删除它们。也有可能永远不会删除它,并且弃用状态是为了劝退其使用(因为人们认为它具有比实际更大的功能)。

如果您不想依赖框架中已被弃用的类,或者不想复制和维护它们,您可以尝试另一种方法。您可以创建两个事务方法并嵌套调用。这是一个关于这个想法的蓝图

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

出于我们在本博客系列的第二部分中讨论的原因,请确保嵌套调用位于不同的类中,因为Spring中的AOP代理的工作方式。

在这种情况下,两种方法都是事务性的,并且它们是嵌套的。当事务拦截器拦截第一个方法调用时,它会启动JPA事务。在执行过程中,嵌套调用(其方法也具有@Transactional注解)出现。由于此bean方法具有@Transactional注解,Spring AOP会将bean包装在AOP建议中。因为我们从不同类的另一个bean调用此建议bean,所以代理机制正确地调用了建议bean。另一个事务拦截器使用不同的事务管理器(即KafkaTransactionManager)启动一个新事务。当Kafka发布发生时,事务不会立即提交或回滚,因为事务是作为方法的一部分启动的,并且提交或回滚发生在方法退出时。此时,控制权返回到第一个方法并继续。一旦退出原始方法,JPA事务将通过拦截器提交。如果发布到Kafka的方法抛出异常,它将回滚该事务。在这种情况下,回滚后,异常会传播回第一个事务方法(JPA方法),由于异常,它也会回滚其事务。

使用此技术时的一个重要说明:对嵌套方法的调用应该是第一个方法做的最后一件事,因为如果第一个方法未能执行Kafka调用后的一些代码(该调用已成功),则Kafka事务已被提交。第一个方法的失败不会自动回滚Kafka事务。

场景3:消费-处理-生产

通过在本系列中到目前为止我们对事务的核心理解,让我们来看一下事件驱动和流应用程序中一个至关重要的模式,称为**消费-处理-生产**模式。在Spring Cloud Stream中,此模式的实现如下所示

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我们有一个Spring Cloud Stream函数,它从输入主题消费PersonEvent,然后调用一个函数在函数lambda表达式的正文中进行处理。此函数返回另一个PersonEvent,我们将其发布到输出Kafka主题。如果我们不在事务上下文中,我们可以将上面的run方法内联为函数lambda表达式的一部分。但是,为了实现事务语义,@Transactional注解必须位于不同类中的方法上。

要使绑定器具有事务性,请确保使用有效值提供spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix

上面的代码完全是事务性的吗?然而,现实情况是它只是部分地端到端事务性的。让我们看看事件的顺序。

绑定器是事务性的,因为我们提供了transaction-id-prefix。当使用者在消息侦听器容器中轮询记录时,它会在其TrasactionTemplate#execute方法中调用内部侦听器方法。因此,执行侦听器方法(调用用户方法)的整个端到端过程都在由KafkaTransactionManager启动的事务中运行。当事务启动时,TransactionSynchronizationManager将资源(生产者)绑定到事务。当调用用户方法(用@Transactional注解的方法)时,事务拦截器会拦截该调用,让包装的AOP建议负责实际的调用。因为我们有JpaTransactionManager,所以事务拦截器使用该管理器并启动一个新事务。由每个事务管理器实现来决定它是否要与现有事务同步。在JpaTransactionManager(以及许多其他类似的数据库事务管理器实现)的情况下,它不允许与现有事务同步,正如我们在上面已经讨论过的那样。因此,JPA事务独立运行,如上述部分所示。当run方法退出时,事务拦截器使用JPA事务管理器执行提交或回滚操作。这样,JPA事务管理器就完成了它的工作。此时,方法调用的响应返回给调用者,即Spring Cloud Stream基础设施。Spring Cloud Stream中的此机制获取此响应并将其发送到Kafka中的输出主题。它使用在初始事务开始时绑定的相同事务性生产者。发送记录后,控制权返回到消息侦听器容器,然后提交或回滚事务。

此序列的步骤如下所示
  1. Kafka消费者接收记录。
  2. Spring Kafka中的容器使用TransactionTemplateexecute方法调用监听器。

KafkaTransactionManager启动一个新事务。3. Kafka资源绑定(生产者)。4. 当执行到用户代码时,事务拦截器最终会拦截该调用并启动一个新的JPA事务。5. AOP代理然后调用实际方法。方法退出时,JpaTransactionManager提交或回滚。6. 方法的输出返回给Spring Cloud Stream中的调用者。7. 然后使用步骤4中相同的事务资源将响应发送到Kafka输出。8. 控制权返回到消息监听器容器,KafkaTransactionManager提交或回滚。

那么,这里的问题是什么?它看起来像事务性的,但实际上,它只是部分事务性的。最初的主要问题是整个端到端流程都在单个原子事务的范围之外,这是一个重要的问题。这里有两个事务——Kafka和JPA——并且JPA和Kafka事务之间没有同步。如果数据库事务已提交而Kafka发送失败,则无法回滚JPA事务。

我们可能会认为ChainedTransactionManager可以在这里提供帮助。虽然这种直觉有一定的道理,但它不适用于上面的代码。由于在调用监听器方法时在容器中创建了Kafka事务,因此ChainedTransactionManager不会从提供给它的任何Kafka事务管理器创建任何新的Kafka事务。在退出用户方法时,我们仍然只有一个JPA事务需要提交或回滚。Kafka事务必须等到调用返回到容器才能提交或回滚。

问题是我们使用了Spring Cloud Stream中的一个函数,该函数使框架能够发布到Kafka。在我们的例子中,任何用户指定的事务(例如JPA事务)都发生在Spring Cloud Stream执行Kafka发布之前。我们需要确保用户代码是发布到Kafka的代码,以便我们可以将整个事务代码视为一个单元。为了实现这一点,我们应该切换到Consumer而不是Function,然后使用StreamBridge API发布到Kafka。请查看此修改后的代码

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然后我们使用与上面相同的TxCode

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

请注意,run方法不返回任何内容,但我们通过StreamBridge API显式地发送到Kafka输出主题。

让我们看看这些更改后的事件顺序
  1. Kafka消费者接收记录。
  2. Spring Kafka中的容器使用TransactionTemplate的execute方法调用监听器。
  3. KafkaTransactionManager启动一个新事务。
  4. Kafka资源绑定(生产者)。
  5. 当执行到用户代码时,拦截器使用JpaTransactionManager拦截该调用并启动一个新事务。
  6. 调用实际用户方法。
  7. Kafka发送操作通过StreamBridge作为方法执行的一部分进行。底层KafkaTemplate使用步骤4中绑定的相同事务生产者工厂。
  8. 方法退出时,JpaTransactionManager提交或回滚。
  9. 最后,当Kafka事务提交(或回滚)时,控制权返回到TransactionTemplate#execute方法。

请特别注意上面的步骤7。当KafkaTemplate检测到已经有Kafka事务正在进行(在步骤3中开始)时,它不会与JPA事务同步,尽管KafkaTemplate能够这样做。现有的Kafka事务具有优先级,它将加入该事务。

即使我们仍然有两个单独的事务,从端到端事务的角度来看,事情也是原子的。如果通过StreamBridge的Kafka发布操作失败,JPA和Kafka事务都不会执行提交操作。两者都会回滚。同样,如果数据库操作失败,这两个事务也会回滚。但是,始终有可能一个事务提交而另一个事务回滚,因此应用程序代码必须处理记录的重复数据删除以实现容错性。

在这个关于**消费-处理-生产**模式的讨论中,另一个关键组成部分是生产者需要将已消费记录的偏移量(除了提交偏移量的消费者之外)发送到事务。正如我们在本博客系列的第一部分中看到的,Kafka Producer API有一个名为sendOffsetToTransaction的方法,其中生产者通过OffsetMetadataConsumerGroupMetadata为每个分区发送一个偏移量(当前消息的偏移量+1)。当使用**Spring Cloud Stream**或**Spring for Apache Kafka**时,应用程序不需要调用此低级操作。Spring for Apache Kafka中的Kafka消息监听器容器会自动代表应用程序处理它。尽管框架在事务提交之前在生产者上调用sendOffsetToTransaction,但当事务协调器提交事务时,将偏移量发送到事务和实际的消费者偏移量提交会原子地发生。

通过本次讨论,我们探讨了编写必须与外部事务系统(例如数据库)交互的事务性Spring Cloud Stream应用程序的各种选项,同时从Apache Kafka消费和生产数据。

在本系列的下一部分中,我们将研究事务回滚(编写事务系统时的另一个关键方面)以及在编写Spring Cloud Stream Kafka应用程序时如何访问各种Spring组件。

获取Spring时事通讯

关注Spring时事通讯

订阅

领先一步

VMware提供培训和认证,以加快您的进度。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部