Spring Cloud Stream Kafka 应用中的事务介绍

工程 | Soby Chacko | 2023 年 9 月 27 日 | ...

我们将开始一个新的博客系列,重点介绍如何在 Spring Cloud Stream Kafka 应用中使用事务。本系列博客将涵盖使用 Spring Cloud Stream 和 Apache Kafka 编写事务性应用的许多底层细节。通过本系列博客,我们希望为您提供足够的信息,帮助您为各种业务场景编写事务性的 Spring Cloud Stream Kafka 应用。

基本组成部分

Spring Cloud Stream Kafka 应用中的事务基础支持主要来自 Apache Kafka 本身以及 Spring for Apache Kafka 库。然而,本系列博客是关于如何特别结合 Spring Cloud Stream 使用这种支持的。如果您熟悉 Apache Kafka 中事务的工作原理,以及 Spring for Apache Kafka 如何以 Spring 友好的方式使其成为可能,那么本系列会让您感到得心应手。

虽然 Apache Kafka 提供了基础的事务支持,但 Spring for Apache Kafka (也称为 Spring Kafka) 库在 Spring 端扩展了这种支持,使 Spring 开发者可以通过依赖 Spring Framework 中传统的事务支持更自然地使用它。Spring Cloud Stream 中的 Kafka binder 在 Spring for Apache Kafka 的支持之上进一步构建,使得在 Spring Cloud Stream Kafka 应用中使用相同的支持成为可能。在本系列博客的第一部分,我们将简要介绍 Kafka 事务,分析一些依赖事务会有帮助的用例,以及 Apache Kafka 和 Spring 生态系统中的事务组成部分。

在许多用例中,在 Apache Kafka 中以事务方式发布、消费和处理记录是必要的。当在生产者发起或实现消费-处理-生产模式的应用中以事务方式生产记录时,它们会被原子地写入 Kafka。如果出现问题,整个过程会被回滚,并且事务不会被提交。需要记住的一点是,与支持事务的关系型数据库不同,在关系型数据库中事务回滚时不会持久化任何记录,而 Apache Kafka 仍然会将记录发布到主题分区。这种行为是由于 Apache Kafka 基本的只追加的不可变日志架构所致,该架构不允许对记录进行任何修改,例如在将记录添加到记录日志后将其删除。有人可能想知道使用事务的好处是什么,因为在事务被中止时记录可能会被发布到主题分区,从而可能导致消费者看到它们。然而,具有正确隔离级别的消费者永远不会看到回滚的记录,即使来自回滚事务的记录位于主题分区中。因此,从端到端的角度来看,整个过程保证是完全事务性的。

事务性用例

事务通常会给 Kafka 应用带来显著的开销。在 Apache Kafka 中使用事务时,每条记录都必须向记录添加特殊的事务日志,向特殊的事务状态主题发送事务标记等等。所有这些步骤都需要时间和空间,从而增加了总体延迟。因此,每个应用都必须通过分析用例来仔细权衡对事务支持的需求。

事务主要提供了一种保护数据的方式,以提供 ACID 能力。它通过提供原子性 (atomicity)、一致性 (consistency)、数据隔离 (data isolation) 和持久性 (durability) 来确保数据完整性。

在当今的企业中,有几个关键任务用例非常需要使用事务并依赖它们带来的 ACID 语义。关于何时使用事务并证明其开销是合理的,并没有一个简单直接的答案。您必须审视应用并评估所涉及的风险。事务的常见典型示例是任何需要处理财务数据的场景。Bob 向 Alice 汇款,这个操作会从 Bob 的账户中扣款,然后 Alice 的账户会增加金额。如果此过程中出现任何问题,整个过程会像什么都没发生一样回滚,因为我们不希望流程处于杂乱的状态。如果从 Bob 的账户中扣款了,但 Alice 的账户没有增加金额(或反之),那就会出现问题。从 Apache Kafka 的角度来看,这里有一些事情发生。首先,一条消息发送到 Kafka 处理器,以从 Bob 的账户中扣款并包含接收方的 TTP 信息。处理器处理该信息,然后向另一个主题发送一条消息,指示已从 Bob 的账户中扣款。之后,另一条消息指示 Alice 的账户现已增加金额。此过程中的各种操作需要复杂的协调以确保一切按预期进行。每当我们有多个像这样相关的事件时,事务可能有助于确保数据完整性并提供 ACID 语义。在此示例中,单个事件本身没有太大意义,但它们组合在一起形成整个流程,并且需要事务性来确保数据完整性。

如果我们想概括这种模式,我们可以说,任何时候,如果存在一个关键任务的消费-处理-发布模式,并且其中一个组件失败时,整个处理器需要表现得好像什么都没发生一样,那么使用事务是一种可以考虑的潜在解决方案。

其他领域的一些高级示例

  • 想象一个航空预订系统需要发布一个包含多段行程的预订信息。如果由于任何原因,系统无法发布整个预订,则需要中止该过程并重新开始。
  • 一个经纪公司发送包含多个买入订单的请求到结算所。假设该流程无法将这些单独的订单作为一个原子单元发布到供结算所消费的消息系统中。在这种情况下,经纪公司必须重新发送订单。
  • 一个医疗账单系统向保险公司发送患者测试数据时,必须将来自患者的各种相关测试作为一个整体发布到消息系统。
  • 一个在线游戏系统需要跟踪玩家在游戏中的位置,并以事务方式将其发送到中央服务器,以确保所有玩家看到的是正确的坐标,而不是部分更新的位置。
  • 零售商的库存补货系统需要将各种相关产品状态信息作为一个原子单元发送。
  • 一个在线电子商务订购系统,在单个原子聚合操作中发布订单详细信息(如订单项、账户持有人信息、配送信息等)。

与外部数据库同步

另一种需要事务的用例是您必须与其他事务系统同步时。除了发布到 Kafka 之外,假设您必须将记录或一些派生信息持久化到关系型数据库中,所有这些操作都必须在一个原子操作内完成。如果其中一个系统发送数据失败,我们必须回滚。正如本系列博客下一部分将介绍的那样,如果您每次只向 Kafka 发布一条记录,并且没有其他相关操作,则无需使用事务。但是,即使您只向 Kafka 主题发布一次,但作为同一过程的一部分使用了关系型数据库操作,为了确保数据完整性,使用事务就变得必要了。

发布到多个 Kafka 主题

生产者独有的应用中事务的另一个用例是发布到多个 Kafka 主题。假设您有一些业务关键型数据,例如一个重要的通知(如订单详情),您希望将其发布到多个 Kafka 主题,订单详情的一部分发布到一个订单主题,另一部分发布到一个发货主题。在这种情况下,我们可以使用事务来确保端到端的数据完整性。

泛化上述事务性用例

上述用例集合并非需要事务的穷举列表。在当今企业的各种领域中,还有许多其他用例与我们所 рассмотре 的总体方向类似,需要消息系统中的事务处理。

以下列表总结了 Apache Kafka 中事务可能有用的通用用例:

  • 消费-处理-发布系统,其中需要将记录作为一个原子单元发布,并提供 exactly-once-semantics(恰好一次)的交付保证。
  • 多个相关的发布事件,单独来看没有意义。
  • 将数据作为一个原子单元发布到多个主题。
  • 与外部事务管理器同步。

这是所有这些不同情况的图示表示。它涵盖了我们上面考虑的场景,例如消费-处理-生产、多个生产者、与外部事务同步等。一个处理器从一个入站主题消费数据,执行业务逻辑,将一些信息持久化到数据库系统,并发布到多个 Kafka 主题。

scst-kafka-txn-overview

Apache Kafka 中的事务

有大量文献可供学习 Apache Kafka 中事务如何工作的底层细节,这里有一篇文章可以作为这些细节的介绍。然而,从一个非常高的层面简要了解 Kafka 客户端 API 如何实现事务性仍然是值得的。需要注意的一点是,对于普通消费者来说,Kafka 中没有所谓的事务性消费者,但有事务感知型消费者。消费者通过设置隔离级别来实现这种事务感知。默认情况下,Kafka 中的消费者会看到所有记录,包括上游生产者未提交的记录,因为 Kafka 消费者默认的隔离级别是 read_uncommitted。Kafka 消费者必须使用 read_committed 的隔离级别才能提供端到端的事务语义。在本系列博客的后续章节中,我们将看到如何在 Spring Cloud Stream 中实现这一点。

在生产者端,应用依赖于 Kafka 客户端提供的一些 API 方法。我们来看一些重要的方法。

为了使应用具有事务性,Kafka 客户端需要一个事务 ID。应用通过 Kafka 生产者属性 transactional.id 来提供此 ID,事务协调器使用此 ID 来注册并启动事务。事务协调器使用此 ID 来跟踪事务的所有方面,例如初始化、进行中的进度、提交等。

以下列表总结了与事务相关的关键生产者 API 方法。

Producer#initTransactions() - 每个生产者调用一次以初始化事务支持。初始化 Kafka 事务。

Producer#beginTransaction() - 在发送记录之前开始事务。

Producer#sendOffsetsToTransaction() - 将消费记录的偏移量发送到事务。

Producer#commitTransaction() - 提交事务。

Producer#abortTransaction() - 中止事务。

在发送记录之前,我们需要初始化并开始事务。然后,继续进行数据处理。如果为了执行此发布而消费了记录,我们必须使用生产者将消费记录的偏移量发送到事务。在此之后,可以继续执行事务提交或中止操作 (commitTransaction 或 abortTransaction)。当我们调用 commitTransaction 方法时,正是 Kafka 客户端将偏移量原子地发送到 consumer_offsets 主题的时候。

Spring for Apache Kafka 中的事务支持

使用像 Spring for Apache Kafka 或依赖于它的 Spring Cloud Stream Kafka binder 这样的框架时,它们带来的好处是允许应用主要关注业务逻辑,因为框架处理了我们上面看到的底层事务性模板代码序列。使用 Spring for Apache Kafka 或其他框架(例如使用它的 Spring Cloud Stream)是有益的,因为它使我们不必担心编写(上面描述的)底层模板代码序列来确保所有事务步骤都成功。正如您可以想象的那样,这里有很多活动部件,如果您遗漏一个步骤或未按预期执行一个步骤,可能会导致应用容易出错。在使用 Spring 的情况下,我们提到的这些框架代表应用开发者处理了这些问题。让我们简要看看它是如何做到的。

Spring for Apache Kafka 框架通过提供 Spring 开发者熟悉的、一致的事务性编程模型来隐藏所有这些底层细节。结果是,使用 Spring for Apache Kafka 或 Spring Cloud Stream 等其他框架的应用,可以简单地专注于应用的业务逻辑,而不必处理复杂的底层事务相关事务。

KafkaTransactionManager

Spring for Apache Kafka 如何提供这种一致的事务性编程模型?简而言之,Spring 开发者传统上使用 @Transactional 注解或编程式方法(例如直接在应用中使用 TransactionTemplate)来创建本地事务。这些机制需要一个事务管理器实现来驱动事务方面。Spring for Apache Kafka 提供了一个事务管理器实现。KafkaTransactionManager 是 Spring Framework 中 PlatformTransactionManager 的一个实现。您可以将此事务管理器与 @Transactional 注解一起使用,或通过使用 TransactionTemplate 在本地事务中使用它。KafkaTransactionManager 使用生产者工厂创建 Kafka 生产者,并提供 API 来开始、提交和回滚事务。

KafkaResourceHolder

Spring for Apache Kafka 还提供了一个 KafkaResourceHolder,用于持有 Kafka 生产者资源。Spring for Apache Kafka 中的 KafkaTemplate 为给定的生产者工厂在当前线程上触发 KafkaResourceHolder 的绑定。在消费者发起的事务中,消息监听容器执行此绑定,并且生产者工厂与 KafkaTransactionManager 使用的生产者工厂相同。这样,事务为所有发布需求都使用同一个事务性生产者。

除了上述组件外,Spring for Apache Kafka 还提供了其他用于处理事务相关问题的实用工具。在后续系列章节中,我们会在必要时看到其中的一些工具。

在本系列博客的第二部分,我们将深入探讨在 Spring Cloud Stream 应用中使用事务的更实际的实现细节。

订阅 Spring 时事通讯

保持与 Spring 时事通讯的联系

订阅

抢先一步

VMware 提供培训和认证,助您加速前进。

了解更多

获得支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部