Apache Kafka 在 Spring Cloud Stream Kafka 应用中的 exactly-once 语义

工程 | Soby Chacko | 2023 年 10 月 16 日 | ...

本博客系列的其它部分

第 1 部分:Spring Cloud Stream Kafka 应用中的事务介绍

第 2 部分:Spring Cloud Stream Kafka 应用中的生产者发起事务

第 3 部分:在 Spring Cloud Stream Kafka 应用中与外部事务管理器同步

第 4 部分:Spring Cloud Stream 和 Apache Kafka 的事务回滚策略

在本系列之前的讨论中,我们已经对事务在 Spring Cloud Stream Kafka 应用中的工作方式进行了基础分析,现在终于到了房间里的大象(指显而易见但被忽视的问题):exactly-once 语义,这是流处理应用中一个备受讨论且必需的功能。在本博客系列的这部分中,我们将探讨如何在 Spring Cloud Stream 应用中通过 Apache Kafka 事务实现 exactly-once 语义。了解前几节中事务的工作原理,将有助于相对容易地理解 Spring Cloud Stream Kafka 应用如何实现 exactly-once 语义。

这里需要注意一件重要的事情是,为了实现 exactly-once 语义,我们不需要编写超出本博客系列之前文章中已展示代码之外的新代码。本博客将澄清充分支持 Spring Cloud Stream Kafka 应用中 exactly-once 语义所需的某些预期。

exactly-once 语义在分布式计算中很难实现。本文不打算回顾所有技术细节来探讨为何这是一项如此困难的任务。对 exactly-once 语义的底层原理以及其在分布式系统中为何如此难以实现的细节感兴趣的读者,可以参考该主题的更广泛文献。Confluent 的这篇博客是一个很好的起点,可以帮助理解这些技术挑战以及 Apache Kafka 为实现它们所实施的解决方案。

尽管我们不会深入细节,但了解 Apache Kafka 提供的不同交付保证是值得的。主要有三种交付保证:

  • 至少一次语义(At-least-once)
  • 至多一次语义(At-most-once)
  • 恰好一次语义(Exactly-once)

至少一次(at-least-once)的交付语义中,应用程序可能会收到数据一次或多次,但保证至少收到一次。在至多一次(at-most-once)的交付保证中,应用程序可能会收到数据零次或一次,这意味着存在数据丢失的可能性。另一方面,恰好一次(exactly-once)语义则保证,正如其名称所示,只交付一次。根据应用程序的使用场景,您可能可以选择其中任何一种保证。默认情况下,Apache Kafka 提供至少一次的交付保证,这意味着一条记录可能会被多次交付。如果您的应用程序可以处理重复记录或没有记录的后果,那么选择非恰好一次的保证可能是可以接受的。相反,如果您处理关键任务数据,例如金融系统或医疗数据,您必须保证恰好一次的交付和处理,以避免严重后果。由于像 Apache Kafka 这样的系统具有分布式特性,由于涉及许多活动部件,通常很难实现恰好一次语义。

Spring Cloud Stream Kafka 和 Exactly-Once 语义

在本博客系列之前的文章中,我们看到了许多不同的场景。Apache Kafka 中的 exactly-once 语义针对的是读取-处理-写入(或消费-转换-生产)应用程序。有时会困惑于我们究竟是“一次性”做了什么?是初始消费、数据处理还是最终的生产部分?Apache Kafka 为整个读取->处理-写入序列保证 exactly-once 语义。在此序列中,读取和处理部分始终是至少一次(at-least-once)——例如,如果处理或写入的某个部分因任何原因失败。当您依赖 exactly-once 交付时,事务至关重要,这样数据的最终发布才能成功完成或回滚。一个潜在的副作用是初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量就不会更新,下次轮询(如果是在 Spring Cloud Stream 内部重试或应用程序重启时)将重新发送相同的记录并再次处理。因此,在消费和处理(转换)部分,保证是至少一次,这是理解的关键点。任何使用 read_committed 隔离级别运行的下游消费者将只从上游处理器获得一次准确的消息。因此,必须理解在 exactly-once 交付的世界中,处理器和下游消费者都必须协调,才能从 exactly-once 语义中受益。任何使用 read_uncommitted 隔离级别运行的生产主题消费者可能会看到重复数据。

另一个需要记住的点是,由于记录的消费和处理可能会发生多次,应用程序代码需要遵循幂等模式。这主要是当您的代码与外部系统(例如数据库)交互时需要考虑的问题。在这种情况下,由应用程序有责任确保用户代码没有副作用。

让我们回顾一下之前看到的用于简单消费-处理-生产循环的代码。

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

如前所述,为了使此应用程序具有事务性,我们必须提供具有适当值的 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 配置属性。提供此属性是 Spring Cloud Stream 中启用上述代码段完全具备 exactly-once 交付能力所需的全部操作。整个端到端过程在事务边界内运行(尽管在上述示例中我们有两个事务)。我们有一个外部的 Kafka 事务,它在容器调用监听器时启动,还有一个由事务拦截器启动的 JPA 事务。当 StreamBridge 发送发生时,使用来自初始 Kafka 事务的相同事务资源,但在控制权返回容器之前不会提交。当方法退出时,JPA 事务会被提交。假设这里出了问题,数据库操作抛出异常。在这种情况下,JPA 不会提交,它将回滚,异常会传播回监听器容器,此时 Kafka 事务也会回滚。另一方面,如果 JPA 操作成功,但 Kafka 发布失败并抛出异常,JPA 不会提交但会回滚,并且异常会传播到监听器。

在上面的代码中,如果我们不与外部事务管理器同步,而只是发布到 Kafka,那么我们不需要使用 @Transactional 注解,甚至可以将 txCode 方法中的代码内联到消费者 lambda 中。

@Bean
public Consumer<PersonEvent> process() {
   return pe -> {
	  Person person = new Person();
       person.setName(pe.getName());
       PersonEvent event = new PersonEvent();
       event.setName(person.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);

   }
}

在这种情况下,我们只有在容器调用监听器时由容器发起的 Kafka 事务。当代码通过 StreamBridge 的 send 方法发布记录时,KafkaTemplate 使用来自初始事务的相同事务性生产者工厂。

在这两种场景下,情况是:我们是完全事务性的,并且最终的发布对于该事务而言只执行一次。使用 read_committed 隔离级别的下游消费者应该恰好消费一次。

Kafka Streams 和 Exactly-Once 语义

到目前为止,在本系列中我们还没有讨论 Kafka Streams。有点讽刺的是,最初 Kafka Streams 应用是 Apache Kafka 添加事务支持和 exactly-once 语义的原因,但我们至今尚未提及。原因是,在 Kafka Streams 应用中实现 exactly-once 语义非常简单,几乎是微不足道的。正如他们所说,它只是一个配置开关。要了解更多关于 Kafka Streams 中 exactly-once 语义的信息,请参阅Confluent 的这篇博客

与基于常规 Kafka 客户端的应用一样,在 Kafka Streams 的情况下,当您在消费-处理-生产模式中产生最终输出时,exactly-once 保证就会生效,这意味着只要使用 read_committed 隔离级别,下游消费者就会恰好消费一次产生的数据。

Kafka Streams 配置属性 processing.guarantee 属性可以在 Kafka Streams 应用中启用 exactly-once 语义。您可以在 Spring Cloud Stream 中通过设置 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee 属性来配置它。您需要将值设置为 exactly_once。默认情况下,Kafka Streams 使用的值是 at_least_once

有状态的 Kafka Streams 应用中通常会发生三个主要活动:

  1. 记录的初始消费
  2. 通过变更日志主题更新状态存储
  3. 生产数据

模式是:记录被接收并处理。在此过程中,任何状态信息都会具体化到状态存储中,本质上是更新特定的变更日志主题。最后,出站记录被发布到另一个 Kafka 主题。如果您注意到这个模式,它看起来类似于我们已经见过的许多场景,除了状态存储部分。当将 processing.guarantee 设置为 exactly_once 时,Kafka Streams 保证如果在这些活动期间发生异常或应用程序崩溃,整个单元将原子地回滚,就像什么都没发生一样。应用程序重启后,处理器会再次消费该记录,处理它,并最终发布数据。由于这种发布在幕后是事务性的,所以使用 read_committed 隔离级别的下游消费者在记录完全发布之前不会消费它,并且会处理所有实现事务性所需的事务(例如提交已消费记录的偏移量等等),从而保证 exactly-once 交付。

Kafka Streams 的 exactly-once 交付保证是从 Kafka 相关活动的角度来看,针对记录的端到端消费、处理和发布。当存在外部系统时,它不提供此保证。例如,假设您的代码与外部系统(例如数据库插入或更新操作)有交互。在这种情况下,由应用程序决定如何参与事务。Spring 的事务支持在这种情况下再次派上用场。我们不想在这里重复代码。但是,正如我们在本系列中多次看到的那样,您可以将与数据库交互的代码封装在一个单独的方法中,使用 @Transactional 注解进行标注,并提供适当的事务管理器,例如我们已经见过的 JPA 事务管理器。当这样的方法抛出异常时,JPA 事务会回滚,并且异常会传播到 Kafka Streams 处理器代码,最终再传播回 Kafka Streams 框架本身,然后框架回滚原始的 Kafka 事务。这里值得再次强调的是,重要的是要理解这些从流拓扑中的处理器调用的操作必须编写成处理幂等性,因为“exactly once”仅适用于整个过程,而不是序列中的单个读取和处理。

结论

正如我们在本文开头已经提到的,exactly-once 交付语义是分布式计算中的一个复杂主题。然而,借助 Kafka 原生提供的实现 exactly-once 语义的解决方案以及 Spring 在 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的支持,在 Spring Cloud Stream Kafka 应用中实现 exactly-once 交付语义变得相对容易。

订阅 Spring 新闻简报

通过 Spring 新闻简报保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区所有即将举办的活动。

查看全部