Spring Cloud Stream Kafka 应用中的生产者启动事务

工程 | Soby Chacko | 2023 年 9 月 28 日 | ...

本博客系列的其他部分

第一部分:Spring Cloud Stream Kafka 应用中的事务简介

本文是博客系列的第二部分,我们将详细探讨 Spring Cloud Stream 和 Apache Kafka 中的事务。在前一部分中,我们对事务进行了总体介绍,触及了基本思想。在本部分博客系列中,我们将深入了解一些实现细节及其实际应用。

在本文中,我们主要从生产者的角度来理解事务如何在 Spring Cloud Stream 和 Apache Kafka 中工作。

Spring Cloud Stream 中的生产者

在我们深入探讨生产者启动的事务之前,让我们先通过一个简单的生产者了解一些基础知识。在 Spring Cloud Stream 中,有几种方式可以编写生产者(在消息领域也称为发布者)。如果您需要在预定时间生成数据,可以编写一个 java.util.function.Supplier 方法,如下所示。

@Bean
public Supplier<Pojo> mySupplier() {
  return () -> {
        new Pojo();
  };
}

当将上述 Supplier 作为 Spring bean 提供时(如代码所示),Spring Cloud Stream 将其视为发布者,并且由于我们在这里是在 Apache Kafka 的上下文中,它会将 POJO 记录发送到 Kafka topic。

默认情况下,Spring Cloud Stream 每秒调用一次 supplier,但您可以通过配置更改该计划。有关更多详细信息,请参阅参考文档

如果您不想轮询 supplier,而是想控制它的发布频率怎么办?Spring Cloud Stream 通过 StreamOperations API 及其开箱即用的实现 StreamBridge 提供了一种便捷的方式。示例如下。

@Autowired
StreamBridge streamBridge;

@PostMapping("/send-data")
public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
}

在这种情况下,应用程序使用 REST 端点通过 StreamBridge 触发数据发布。由于框架不是按计划调用该函数,因此任何外部方都可以通过调用 REST 端点来启动数据发布。

在这些基本生产者中使用事务是否合适?

既然我们已经了解了 Spring Cloud Stream 提供的两种记录发布策略,让我们回到主要讨论主题:事务性发布。假设一个场景,在使用一个或多个这些生产者时,我们希望确保数据完整性并获得事务保证。在这种情况下,问题是首先是否需要使用事务来实现这些目标。在上面的这两个示例中,如何确保记录是事务性发布的?简而言之,您应该避免将事务用于这些类型的发布用例。这些示例中的记录发布是单次发送场景。使用同步生产者,我们可以获得相同的语义事务保证。默认情况下,生产者是异步的,当使其在同步模式下运行时,生产者会确保在向客户端发送响应之前将记录写入 leader 和所有副本。您可以通过将 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync 属性设置为 true 来启用同步发布。

总而言之,在设计纯生产者应用程序时,请谨慎使用事务。如果您使用 Supplier 或通过 StreamBridge 一次发送一条记录,我们不建议使用事务,因为将生产者转换为同步模式运行可以在没有事务开销的情况下达到相同的效果。这次讨论引出了一个有趣的问题。对于纯生产者应用程序,何时有必要使用事务并获得收益?正如本博客系列的前一部分所讨论的,这完全取决于应用程序的用例。在生产者的上下文中,这意味着我们只有在进行多个相关的发布,或者除了发布之外还需要与外部事务管理器同步时,才需要使用事务。本文的下一部分将介绍前一种场景,本博客系列的下一篇文章将介绍后一种场景。

在 Spring Cloud Stream Kafka Binder 中启用事务

在 Spring Cloud Stream 的 Kafka binder 中启用事务的主要驱动因素是单个属性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。当此属性具有有效的字符串前缀时,Spring Cloud Stream 中的 Kafka binder 会确保底层 KafkaTemplate 使用事务发布数据。顺便提一下,此属性会指示 Spring Cloud Stream 在使用处理器模式(consume-process-produceread-process-write 模式)时使消费者具有事务感知能力。

事务实操

尽管这有点反直觉,但让我们回到之前的单例 SupplierStreamBridge 示例,并引入事务来理解事务组件的主要用法。如前所述,在那些情况下我们不需要使用事务,因为这会增加额外的开销。然而,这样做有助于我们理解事物。

代码如下所示

@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {

  @Bean
  public Supplier<Pojo> mySupplier() {
    return () -> {
      new Pojo();
    };
  }

  @Autowired
  StreamBridge streamBridge;

  @PostMapping("/send-data")
  public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
  }
}

现在让我们提供所需的属性。

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-

由于我们在应用程序的配置中提供了该属性,因此在此示例中,每次 supplier 被调用(通过框架)或有人调用 StreamBridge#send 方法背后的 REST 端点时,底层向 Kafka topic 的发布都将完全具备事务性。

当 supplier 触发时,Kafka binder 使用 KafkaTemplate 发布数据。当 binder 检测到应用程序提供了 transaction-id-prefix 属性时,每次 KafkaTemplate#send 调用都会通过 KafkaTemplate#executeInTransaction 方法完成。因此,请放心,框架会事务性地完成底层向 Kafka topic 的发布。从应用程序开发者的角度来看,为了事务目的唯一需要提供的是 transaction-id-prefix 属性。

在开发或调试事务性应用程序时,将日志级别设置为 TRACE 通常很有价值,这样相关的底层事务类可以为我们提供正在发生的事情的详细信息。

例如,如果您将以下包的日志级别设置为 TRACE,您将在日志中看到大量活动。

logging:
 level:
   org.springframework.transaction: TRACE
   org.springframework.kafka.transaction: TRACE
   org.springframework.kafka.producer: TRACE
   org.springframework.kafka.core: TRACE

每次框架调用 supplier 方法时,我们可以在日志中观察到以下内容

o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()

从跟踪日志中可以看出,每次事务性地发布记录时,都会形成一个序列:beginTransactionSendingSentcommitTransaction。如果您运行应用程序,您会观察到每秒都会看到这些序列,因为这是 Spring Cloud Stream 调用 Supplier 方法的默认计划。

同样的事务流程也适用于 StreamBridge#send 的情况。当 Spring Cloud Stream 调用 send 方法时,输出绑定使用的底层 KafkaTemplate 会确保记录在事务内发布,因为我们提供了 transaction-id-prefix

多记录发布的事务

了解了基础知识后,我们来看看何时使用事务是合理的。正如我们之前讨论的,将多个记录作为单个原子单元发布的需求是一个有效的场景,在这种情况下使用事务是必要的。

让我们看下面的代码示例

public void publish(StreamBridge streamBridge {
  for (int i = 0; i < 5; i++) {
    streamBridge.send("mySupplier-out-0", "data-" + i);
  }
}

如您所见,这是一个特意设计的示例,以演示问题的关键。我们不是发布一次,而是发布多条记录。发布到多个 topic 在这里也是同样有效的方法。我们可能会认为,通过设置 transaction-id-prefix 属性,我们可以快速将多个记录的发布包装在单个事务中。然而,仅凭这一点是不够的。我们仍然需要提供前缀属性。但是,仅仅提供前缀属性,每次发送仍然发生在各自的事务中。为了确保所有五条记录的整个端到端发布原子性地发生,我们需要在方法上应用核心 Spring Framework 中的 @Transactional 注解。此外,我们必须提供一个使用 Spring Cloud Stream Kafka binder 创建的相同生产者工厂的事务管理器 bean——KafkaTransactionManager。下面是我们的代码和应用程序配置现在看起来的样子:

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String[] args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
     	   streamBridge.send("mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
     ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}

以及相应的配置

spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
		transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

请注意,前面代码中的事务性方法(带有 @Transactional 注解的方法)必须与调用该方法的类不同。如果调用发生在同一类的方法之间或不是 Spring 管理的 bean 的不同类之间,则不会创建代理,并且事务拦截器不会生效。JVM 在运行时不知道代理和拦截器机制。当在方法上添加 @Transactional 注解时,Spring 会在幕后为该方法创建一个事务代理。当 Spring Cloud Stream 调用事务性方法时,代理会拦截该调用,然后通过代理对象发生实际调用。

我们提供的自定义 KafkaTransactionManager bean 有两个作用。首先,它使 Spring Boot 应用 @EnableTransactionManagerment。它还提供 binder 内部使用的相同生产者工厂,以便 Transactional 注解在应用事务时使用正确的资源。

当 Spring Boot 检测到可用的事务管理器 bean 时,它会自动为我们应用 @EnableTransactionManagement 注解,该注解负责检测 @Transactional 注解,然后通过 Spring AOP 代理和通知机制添加拦截器。换句话说,Spring AOP 会为 @Transactional 方法创建一个代理并包含 AOP 通知。如果没有应用 @EnableTransactionManagement 注解,Spring 不会触发任何这些代理和拦截机制。由于 EnableTransactionManagement 注解对于这些各种原因至关重要,我们必须提供一个事务管理器 bean。否则,方法上的 Transactional 注解将不起作用。

请注意,我们从 binder 获取事务性生产者工厂,并在 KafkaTransactionManager 的构造函数中使用它。当应用程序中存在此 bean 时,所有记录的整个发布现在都在单个事务的范围内进行。我们在跟踪日志中只看到一个 beginTransaction…commitTransaction 序列,这意味着只有一个真正的事务执行了所有发布操作。

在幕后,事件序列如下

  1. 带有 Transactional 注解的方法被调用后,事务拦截器通过 AOP 代理机制启动,并使用自定义的 KafkaTransactionManager 开启一个新事务。
  2. 当事务管理器开始事务时,事务管理器使用的资源——事务性资源持有者(即从生产者工厂获得的生产者)——将绑定到该事务。
  3. 当方法调用 StreamBridge#send 方法时,底层的 KafkaTemplate 将使用自定义 KafkaTransactionManager 创建的相同事务性资源。由于事务已经进行中,它不会开启另一个事务,而是在同一个事务性生产者上进行发布。
  4. 当它调用更多 send 方法时,它不会开启新的事务。相反,它会通过原始事务中使用的相同生产者资源进行发布。
  5. 当方法退出时,如果没有错误,拦截器会要求事务管理器提交事务。如果在方法中的任何发送操作或其他任何地方抛出异常,拦截器会要求事务管理器回滚事务。这些调用最终会到达 KafkaResourceHoldercommitrollback 方法,从而调用 Kafka 生产者来 commitrollback 事务。

由于我们的示例中只有一个自定义 KafkaTransactionManager bean,我们可以直接使用 Transactional 注解。另一方面,如果我们有多个自定义 KafkaTransactionManager bean,我们就必须使用正确的 bean 名称来限定 @Transactional 注解。

如果我们运行应用程序时不使用自定义 KafkaTransactionManager 会怎样?

如果我们移除自定义的 KafkaTransactionManager 并运行此应用程序,您会看到它创建了五个独立的事务,而不是单个事务。如果您启用 TRACE 日志记录,您可以在日志中看到五个 beginTransaction…commitTransaction 序列。

您可以通过编写一个事务性消费者 Spring Cloud Stream 应用程序,并将其隔离级别设置为 read_committed 来验证此行为。您可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 属性并将其值设置为 read_committed 来实现。为了测试目的,在 for 循环中的每个 StreamBridge#send 之后添加一个 Thread.sleep 或其他等待机制来模拟行为。您会看到,无论等待多久,只要每个 send 方法调用返回,消费者就会收到数据,这证明了并非单个事务执行了整个操作,而是每个 send 都发生在自己的事务中。

我们看到每次发送都有独立的事务,因为 Transactional 注解没有达到我们预期的效果。Transactional 注解只有在存在可用的事务管理器 bean 并且其生产者工厂与 binder 使用的生产者工厂相同时才会起作用。

如果 Spring Boot 通过 spring.kafka.producer.transaction-id-prefix 在配置中检测到 transaction-id-prefix 属性,它会自动配置一个 KafkaTransactionManager。然而,由于我们处于 Spring Cloud Stream 上下文中,我们必须使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因为这是我们指示框架为 binder 创建内部事务管理器和相关事务性生产者工厂的方式。如果我们提供正确的 spring.kafka 前缀,让 Spring Boot 自动配置一个 KakaTransactionManager 会怎样?虽然这很诱人,但它不起作用,因为自动配置的事务管理器使用的生产者工厂与 binder 使用的不同。因此,我们必须提供一个使用 binder 使用的相同生产者工厂的自定义 KafkaTransactionManager。这正是我们在上面所做的。

在本博客系列的下一部分中,我们将学习如何与外部事务管理器同步,以实现生产者和消费者启动的事务。

订阅 Spring 邮件列表

通过 Spring 邮件列表保持联系

订阅

领先一步

VMware 提供培训和认证,助力您快速成长。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部