Spring Cloud Stream Kafka 应用中的生产者启动事务

工程 | Soby Chacko | 2023年9月28日 | ...

本博客系列中的其他部分

第一部分:Spring Cloud Stream Kafka 应用中的事务简介

本文是博客系列的第二部分,我们将详细探讨 Spring Cloud Stream 和 Apache Kafka 中的事务。在上一部分中,我们对事务进行了总体介绍,涉及基本概念。在本系列的这一部分中,我们将深入了解一些实现细节及其实际应用。

在本文中,我们将主要关注生产者端,了解 Spring Cloud Stream 和 Apache Kafka 中事务的工作方式。

Spring Cloud Stream 中的生产者

在深入探讨生产者启动的事务之前,让我们先了解一下简单的生产者。在 Spring Cloud Stream 中,有几种方法可以编写生产者(在消息传递领域也称为发布者)。如果您需要定期生成数据,则可以编写一个java.util.function.Supplier方法,如下所示。

@Bean
public Supplier<Pojo> mySupplier() {
  return () -> {
        new Pojo();
  };
}

当提供上述 Supplier 作为 Spring bean 时,如代码所示,Spring Cloud Stream 将其视为发布者,并且由于我们在这里使用的是 Apache Kafka,因此它会将 POJO 记录发送到 Kafka 主题。

默认情况下,Spring Cloud Stream 每秒调用一次 supplier,但是您可以通过配置更改该计划。请参阅参考文档以了解更多详细信息。

如果您不想轮询 supplier,而是想控制其发布频率呢?Spring Cloud Stream 通过StreamOperations API 及其名为StreamBridge的开箱即用实现提供了一种便捷的方式。这是一个示例。

@Autowired
StreamBridge streamBridge;

@PostMapping("/send-data")
public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
}

在这种情况下,应用程序使用 REST 端点通过StreamBridge触发数据发布。由于框架不会按计划调用该函数,因此任何外部方都可以通过调用 REST 端点来启动数据的发布。

在这些基本生产者中使用事务是否合适?

现在我们已经看到了 Spring Cloud Stream 提供的两种发布记录的策略,让我们回到我们主要讨论的话题:**事务性发布**。假设我们需要在使用一个或多个这些生产者时确保数据完整性和获得事务保证。在这种情况下,问题是我们是否需要首先使用事务来实现它们。在上面的两个示例中,您如何确保记录以事务方式发布?简短的答案是,对于此类发布用例,您应该避免使用事务。这些示例中的记录发布是单次发送场景。使用同步生产者,我们可以实现相同的语义事务保证。默认情况下,生产者是异步的,当使其以同步模式运行时,生产者确保在向客户端发送响应之前将记录写入领导者和所有副本。您可以通过将spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync属性设置为true来启用同步发布。

总而言之,在设计仅生产者应用程序时,应谨慎使用事务。如果您使用Supplier或通过StreamBridge一次发送一条记录,我们不建议使用事务,因为将生产者转换为同步模式运行可以达到相同的结果,而不会产生事务开销。然后,这导致了一个有趣的问题。对于仅生产者应用程序,何时需要使用事务并获得好处?正如在本博客系列的上一部分中所讨论的,这完全取决于应用程序的用例。在生产者的上下文中,这意味着我们只需要在进行多个相关的发布时才需要使用事务,或者除了发布之外,还需要与外部事务管理器同步。本文的下一节将介绍前一种情况,本博客系列的下一篇文章将介绍后一种情况。

在 Spring Cloud Stream Kafka Binder 中启用事务

在 Spring Cloud Stream 的 Kafka binder 中启用事务的主要驱动因素是一个属性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。当此属性具有有效的 tiền tố 字符串时,Spring Cloud Stream 中的 Kafka binder 确保底层KafkaTemplate使用事务发布数据。顺便说一句,此属性指示 Spring Cloud Stream 在使用处理器模式(**消费-处理-生产**或**读取-处理-写入**模式)时使使用者具有事务感知能力。

事务实战

虽然违反直觉,但让我们回到我们之前描述的单个SupplierStreamBridge示例,并引入事务来理解事务组件的主要用法。如前所述,我们不需要在这些情况下使用事务,因为这会增加更多开销。但是,这样做有助于我们理解事情。

代码如下所示

@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {

  @Bean
  public Supplier<Pojo> mySupplier() {
    return () -> {
      new Pojo();
    };
  }

  @Autowired
  StreamBridge streamBridge;

  @PostMapping("/send-data")
  public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
  }
}

现在让我们提供所需的属性。

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-

由于我们在应用程序的配置中提供了该属性,因此每次调用此示例中的 supplier(通过框架)或有人调用StreamBridge#send后面的 REST 端点时,对 Kafka 主题的底层发布都会完全事务化。

当触发 supplier 时,Kafka binder 使用KafkaTemplate发布数据。当 binder 检测到应用程序提供了transaction-id-prefix属性时,每个KafkaTemplate#send调用都是通过KafkaTemplate#executeInTransaction方法完成的。因此,请放心,框架会以事务方式对 Kafka 主题进行底层发布。从应用程序的角度来看,应用程序开发人员只需要为事务目的提供transaction-id-prefix属性。

在开发或调试事务性应用程序时,将日志级别设置为TRACE通常是值得的,以便相关的底层事务性类可以向我们提供有关正在发生什么的详细信息。

例如,如果您将以下包的日志级别设置为 TRACE,您将在日志中看到大量活动。

logging:
 level:
   org.springframework.transaction: TRACE
   org.springframework.kafka.transaction: TRACE
   org.springframework.kafka.producer: TRACE
   org.springframework.kafka.core: TRACE

每次框架调用 supplier 方法时,我们都可以观察到日志中的以下内容

o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()

从跟踪日志中可以看到,每次以事务方式发布记录时,它都会形成一个序列:**beginTransaction**、**Sending**、**Sent**和**commitTransaction**。如果您运行应用程序,您会观察到每秒都会看到这些序列,因为这是 Spring Cloud Stream 调用Supplier方法的默认计划。

相同的事务流程也适用于StreamBridge#send情况。当 Spring Cloud Stream 调用 send 方法时,输出绑定使用的底层KafkaTemplate确保记录在事务中发布,因为我们提供了transaction-id-prefix

多记录发布的事务

在介绍了这些入门知识之后,让我们继续讨论使用事务有意义的情况。正如我们之前讨论的那样,需要将多个记录发布为单个原子单元是一个有效的场景,在这种场景中,使用事务变得必要。

让我们看下面的代码示例

public void publish(StreamBridge streamBridge {
  for (int i = 0; i < 5; i++) {
    streamBridge.send("mySupplier-out-0", "data-" + i);
  }
}

如您所见,这是一个人为的示例,用于演示问题的关键所在。我们不是发布一次,而是发布多条记录。向多个主题发布也是这里同样有效的方法。我们可能认为我们可以通过设置transaction-id-prefix属性来快速将多个记录的发布包装在单个事务中。但是,我们需要更多内容来帮助我们。我们仍然需要提供前缀属性。但是,仅凭这一点,每次发送仍然在其专用事务中发生。为了确保所有五条记录的端到端发布以原子方式发生,我们需要在方法上应用来自核心 Spring Framework 的@Transactional注释。此外,我们必须提供一个事务管理器 bean - KafkaTransactionManager - 它使用 Spring Cloud Stream Kafka binder 创建的相同生产者工厂。以下是我们的代码现在的样子以及应用程序的配置

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String[] args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
     	   streamBridge.send("mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
     ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}

以及相应的配置

spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
		transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

请注意,前面代码中使用@Transactional注解的事务方法必须与调用该方法的类不同。如果调用发生在同一个类的不同方法之间,或者发生在不是Spring管理的bean的不同类之间,则不会创建代理,事务拦截器也不会生效。JVM在运行时不知道代理和拦截机制。在方法上添加@Transactional注解时,Spring会在后台为该方法创建一个事务代理。当Spring Cloud Stream调用事务方法时,代理会拦截该调用,然后实际调用通过代理对象发生。

我们提供的自定义KafkaTransactionManager bean有两个作用。首先,它使Spring Boot应用@EnableTransactionManagement。它还提供与绑定器内部使用的生产者工厂相同的工厂,以便事务注解在应用事务时使用正确的资源。

当Spring Boot检测到可用的事务管理器bean时,它会自动为我们应用@EnableTransactionManagement注解,该注解负责检测@Transactional注解,然后通过Spring AOP代理和advice机制添加拦截器。换句话说,Spring AOP为@Transactional方法创建代理,并包含AOP advice。如果没有应用@EnableTransactionManagement注解,Spring不会触发任何这些代理和拦截机制。由于EnableTransactionManagement注解至关重要,我们必须提供一个事务管理器bean。否则,方法上的Transactional注解将无效。

请注意,我们从绑定器获取事务生产者工厂,并在KafkaTransactionManager的构造函数中使用它。当这个bean存在于应用程序中时,所有记录的发布都发生在单个事务的范围内。我们在跟踪日志中只看到单个**beginTransaction…commitTransaction**序列,这意味着只有一个事务执行所有发布操作。

幕后,事件顺序如下:

  1. 一旦调用使用Transactional注解的方法,事务拦截器就会通过AOP代理机制启动,并使用自定义KafkaTransactionManager启动一个新事务。
  2. 当事务管理器开始事务时,事务管理器使用的资源——事务资源持有者(又名从生产者工厂获得的生产者)——将绑定到事务。
  3. 当方法调用StreamBridge#send方法时,底层的KafkaTemplate将使用自定义KafkaTransactionManager创建的相同事务资源。由于事务已经在进行中,它不会启动另一个事务,但发布发生在同一个事务生产者上。
  4. 当它调用更多send方法时,它不会启动新事务。相反,它通过在原始事务中使用的相同生产者资源进行发布。
  5. 当方法退出时,拦截器会要求事务管理器提交事务(如果没有错误)。如果任何发送操作或方法中的任何其他操作抛出异常,拦截器会要求事务管理器回滚事务。这些调用最终会命中KafkaResourceHolder的**commit**或**rollback**方法,这些方法会调用Kafka生产者来**commit**或**rollback**事务。

由于在我们的示例中只有一个自定义KafkaTransactionManager bean,我们可以简单地按原样使用Transactional注解。另一方面,如果我们有多个自定义KafkaTransactionManager bean,则必须使用正确的bean名称限定@Transactional注解。

如果没有自定义KafkaTransactionManager会怎样?

如果我们移除自定义KafkaTransactionManager并运行此应用程序,您会看到它创建了五个单独的事务,而不是单个事务。如果启用TRACE日志记录,您可以在日志中看到五个**beginTransaction…commitTransaction**序列。

您可以通过编写一个事务型消费者Spring Cloud Stream应用程序并将它的隔离级别设置为read_committed来验证此行为。您可以使用spring.cloud.stream.kafka.binder.configuration.isolation.level属性并将它的值设置为read_committed来做到这一点。出于测试目的,添加Thread.sleep或其他等待机制来模拟for循环中每个StreamBridge#send之后的行为。您可以看到,一旦每个send方法调用返回,无论等待与否,消费者都会接收数据,这证明不是单个事务执行了整个操作,而是每个send都在它自己的事务中发生。

我们为每次发送看到单独的事务,因为Transactional注解没有像我们预期的那样工作。只有当存在事务管理器bean并且它的生产者工厂与绑定器使用的工厂相同时,Transactional注解才有效。

如果Spring Boot通过spring.kafka.producer.transaction-id-prefix在配置中检测到transaction-id-prefix属性,它会自动配置一个KafkaTransactionManager。但是,由于我们处于Spring Cloud Stream上下文中,我们必须使用spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因为这是我们向框架发出信号以创建绑定器和相关事务生产者工厂的内部事务管理器的。如果我们提供正确的spring.kafka前缀以便Spring Boot为我们自动配置一个KakaTransactionManager会怎样?虽然这很诱人,但这不起作用,因为自动配置的事务管理器使用与绑定器使用的不同的生产者工厂。因此,我们必须提供一个自定义KafkaTransactionManager,它使用与绑定器使用的相同的生产者工厂。这正是我们上面所做的。

在本博客系列的下一部分中,我们将学习如何与外部事务管理器同步,以进行生产者和消费者启动的事务。

获取Spring新闻通讯

关注Spring新闻通讯

订阅

领先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部