Spring Cloud Stream Kafka 应用中的事务介绍

工程 | Soby Chacko | 2023年9月27日 | ...

我们正在启动一个新的博客系列,重点介绍在 Spring Cloud Stream Kafka 应用中使用事务。本博客系列涵盖了使用 Spring Cloud Stream 和 Apache Kafka 编写事务应用程序的许多底层细节。在本博客系列结束时,我们希望为您提供足够的信息,以便您可以为各种业务用例编写事务性 Spring Cloud Stream Kafka 应用程序。

基本构建块

Spring Cloud Stream Kafka 应用程序中事务的基础支持主要来自 Apache Kafka 本身和 Spring for Apache Kafka 库。但是,本博客系列是关于在 Spring Cloud Stream 中专门使用此支持。如果您熟悉 Apache Kafka 中事务的工作方式以及 Spring for Apache Kafka 如何以 Spring 友好的方式使用它,那么本系列将让您感觉如鱼得水。

虽然 Apache Kafka 提供了事务的基础支持,但 Spring for Apache Kafka(又名 Spring Kafka)库在 Spring 端扩展了此支持,使其更易于 Spring 开发人员使用,方法是依赖于 Spring Framework 中可用的传统事务支持。Spring Cloud Stream 中的 Kafka 绑定器进一步构建了对 Spring for Apache Kafka 的此支持,从而可以在 Spring Cloud Stream Kafka 应用程序中使用相同的支持。在本博客系列的第一部分中,我们将简要介绍 Kafka 事务、一些使用案例分析(其中依赖事务会很有帮助)以及 Apache Kafka 和 Spring 生态系统中的事务构建块。

在许多用例中,以事务方式发布、使用和处理 Apache Kafka 中的记录变得必要。当在生产者启动的应用程序或以事务方式实现使用-处理-生产模式的进程中以事务方式生成记录时,它们会以原子方式写入 Kafka。如果出现问题,整个过程将回滚,并且事务不会提交。需要注意的是,与支持事务的关系数据库不同(在事务回滚时不会持久化任何记录),Apache Kafka 仍然会将记录发布到主题分区。此行为是由于 Apache Kafka 的基本追加式不变日志架构所致,该架构不允许任何记录修改,例如在将记录添加到记录日志后删除记录。有人可能会想知道使用事务的好处是什么,因为当事务中止时,记录可能会发布到主题分区,这可能会导致使用者看到它们。但是,具有正确隔离级别的使用者永远不会看到回滚的记录,即使回滚事务的记录位于主题分区中。因此,从端到端角度来看,整个过程保证是完全事务性的。

事务用例

事务通常会在 Kafka 应用程序中增加大量的开销。在 Apache Kafka 中使用事务时,每个记录都必须向记录中添加特殊的事务日志,向特殊的交易状态主题发送事务标记,等等。所有这些步骤都需要时间和空间,从而增加了整体延迟。因此,每个应用程序都必须通过分析用例来仔细检查对事务支持的需求。

事务提供了一种主要保护数据以提供ACID能力的方式。它通过提供原子性、一致性、数据隔离性和持久性来确保数据完整性。

当今企业中存在一些关键任务用例,在这些用例中,使用事务并依赖它们带来的 ACID 语义是非常理想的。关于何时使用事务以及证明其带来的开销是合理的,并没有简单的、直接的答案。您必须查看应用程序并评估风险。事务的典型示例是任何需要处理财务数据的操作。Bob 向 Alice 发送钱,这是一个从 Bob 的帐户中借记的动作,Alice 获得贷记。如果此过程中出现任何问题,则整个过程将回滚,就像什么也没发生一样,因为我们不希望流程处于混乱状态。如果该过程从 Bob 的帐户中借记,但 Alice 没有获得贷记(反之亦然),那就是一个问题。从 Apache Kafka 的角度来看,这里发生了一些事情。首先,一条消息到达 Kafka 处理器,以从 Bob 的帐户和接收者信息中借记。处理器处理信息,然后向另一个主题发送消息,指示从 Bob 的帐户中发生了借记。在此之后,另一条消息指示 Alice 现在已获得贷记。此过程中的各种操作需要复杂的协调才能确保一切按预期发生。任何时候我们都有这样的多个相关事件,事务可能有助于确保数据完整性并提供 ACID 语义。在此示例中,单个事件本身没有太多意义,但它们结合在一起形成了整个流程,并需要事务性才能确保数据完整性。

如果我们想概括这种模式,我们可以说,任何时候我们都有一个关键任务的 consume-process-publish 模式,如果一个组件失败,整个处理器需要表现得好像什么都没发生一样,使用事务是一个需要考虑的潜在解决方案。

其他领域更高级别的示例

  • 想象一下一个航空公司预订系统,它需要发布有关具有多个航段的预订的信息。如果由于任何原因,系统无法发布整个预订,则需要中止流程并重新开始。
  • 一家经纪公司发送包含多个买入订单的请求,以发送到清算所。假设该过程无法将各个订单作为单个原子单元从清算所消费的消息系统发布。在这种情况下,经纪公司必须重新发送订单。
  • 一个医疗账单系统,它将患者测试数据发送给保险公司,必须将患者的各种相关测试发布到消息系统。
  • 一个在线游戏系统需要跟踪玩家在游戏中的位置,并将它们以事务方式发送到中央服务器,以确保所有玩家都能看到正确的坐标,而不是部分更新的位置。
  • 零售商的库存补充系统需要将有关各种相关产品状态的信息作为单个原子单元发送。
  • 一个在线电子商务订单系统,它在单个原子聚合操作中发布订单详细信息(例如订单条目、帐户持有人信息、运输信息等)。

与外部数据库同步

事务变得方便的另一类用例是当您必须与其他事务系统同步时。除了发布到 Kafka 之外,假设您必须在单个原子操作中将记录或某些派生信息持久化到关系数据库中。如果一个系统未能发送数据,我们必须回滚。如果您每次只向 Kafka 发布一条记录,没有其他操作,那么您不需要使用事务,正如我们将在本博客系列的下一部分中看到的那样。但是,即使您只向 Kafka 主题发布一次,但使用关系数据库操作作为同一过程的一部分,也需要使用事务来确保数据完整性。

发布到多个 Kafka 主题

仅生产者应用程序中使用事务的另一个用例是发布到多个 Kafka 主题。假设您有一些关键业务数据,以关键通知(例如订单详细信息)的形式出现,您希望将其发布到多个 Kafka 主题,订单详细信息的一部分到订单主题,另一部分到运输主题。在这种情况下,我们可以使用事务来确保端到端数据完整性。

以上事务用例的泛化

以上用例集并非详尽无遗,它列举了需要事务处理的情况。在当今各个领域的企业中,还有许多其他用例,与我们所考察的用例的总体方向并无太大区别,这些用例都需要消息系统中的事务处理。

以下列表总结了 Apache Kafka 中的事务可能有所帮助的泛化用例:

  • 消费-处理-发布系统,其中需要将记录作为单个原子单元发布,并提供精确一次语义的交付保证。
  • 多个相关的发布事件,单独来看没有意义。
  • 将数据作为单个原子单元发布到多个主题。
  • 与外部事务管理器同步。

这是一个所有这些不同情况的图示。它涵盖了我们上面考虑的场景,例如消费-处理-生产、多个生产者、与外部事务同步等等。处理器从入站主题消费数据,执行业务逻辑,将某些信息持久化到数据库系统,并发布到多个 Kafka 主题。

scst-kafka-txn-overview

Apache Kafka 中的事务

有很多文献可以研究 Apache Kafka 中事务工作的底层细节,这里有一篇文章可以介绍这些细节。但是,从非常高的层面简要了解 Kafka 客户端 API 如何实现事务性仍然是有价值的。需要注意的是,对于普通的消费者来说,Kafka 中没有事务性消费者,但有事务感知型消费者。消费者通过设置隔离级别来实现这种事务感知。默认情况下,Kafka 中的消费者可以看到所有记录,即使是上游生产者未提交的记录,因为 Kafka 消费者的默认隔离级别是 **read_uncommitted**。Kafka 消费者必须使用 **read_committed** 隔离级别才能提供端到端的事务语义。我们将在本博客系列的后续部分中看到如何在 Spring Cloud Stream 中实现这一点。

在生产者方面,应用程序依赖于 Kafka 客户端的一些 API 方法。让我们看看重要的那些。

为了使应用程序具有事务性,Kafka 客户端需要一个事务 ID。应用程序通过名为 **transactional.id** 的 Kafka 生产者属性提供它,事务协调器使用它通过注册来启动事务。事务协调器使用此 ID 来跟踪事务的所有方面,例如初始化、进行中、提交等。

以下列表总结了与事务相关的关键生产者 API 方法。

**Producer#initTransactions()** - 每个生产者调用一次以启动事务支持。初始化 Kafka 事务。

**Producer#beginTransaction()** - 在发送记录之前开始事务。

**Producer#sendOffsetsToTransaction()** - 将已消费的记录偏移量发送到事务。

**Producer#commitTransaction()** - 提交事务。

**Producer#abortTransaction()** - 中止事务。

在发送记录之前,我们需要初始化并开始事务。然后,它继续进行数据处理。如果我们消费了一条记录来进行此发布,我们必须使用生产者将已消费记录的偏移量发送到事务。之后,事务提交或中止操作可以继续(commitTransaction 或 abortTransaction)。当我们调用 commitTransaction 方法时,Kafka 客户端会将偏移量精确地原子地发送到 consumer_offsets 主题。

Spring for Apache Kafka 的事务支持

当使用像 Spring for Apache Kafka 或依赖于它的 Spring Cloud Stream Kafka 绑定器这样的框架时,它们的好处是允许应用程序主要关注业务逻辑,因为框架处理了我们上面看到的低级样板事务序列。使用 Spring for Apache Kafka 或其他框架(例如使用它的 Spring Cloud Stream)将是有益的,因为它允许我们不必担心编写低级样板序列(如上所述)以确保所有事务步骤都成功。正如您所想象的,这里有很多活动部件,如果您省略了一个步骤或没有按照预期执行一个步骤,它可能会使应用程序容易出错。对于 Spring 来说,我们提到的框架代表应用程序开发人员处理它们。让我们简要地看看它是如何做到的。

Spring for Apache Kafka 框架通过提供 Spring 开发人员熟悉的、一致的事务编程模型来隐藏所有这些底层细节。结果是,当使用 Spring for Apache Kafka 或其他框架(例如 Spring Cloud Stream)时,应用程序可以简单地专注于应用程序的业务逻辑,而不是处理复杂的事务相关问题。

KafkaTransactionManager

Spring for Apache Kafka 如何提供这种一致的事务编程模型?简短的答案是,Spring 开发人员传统上使用 Transactional 注解或编程方法,例如直接在应用程序中使用 TransactionTemplate 来创建本地事务。这些机制需要一个事务管理器实现来驱动事务方面。Spring for Apache Kafka 提供了一个事务管理器实现。**KafkaTransactionManager** 是 Spring Framework 中 **PlatformTransactionManager** 的一个实现。您可以将此事务管理器与 Transactional 注解一起使用,或者通过使用 TransactionTemplate 在本地事务中使用它。KafkaTransactionManager 使用生产者工厂来创建 Kafka 生产者,并提供开始、提交和回滚事务的 API。

KafkaResourceHolder

Spring for Apache Kafka 还提供了一个 **KafkaResourceHolder**,它持有 Kafka 生产者资源。Spring for Apache Kafka 中的 KafkaTemplate 会触发在当前线程上为给定的生产者工厂绑定 KafkaResourceHolder。在消费者启动的事务的情况下,消息侦听器容器会进行此绑定,并且生产者工厂与 KafkaTransactionManager 使用的生产者工厂相同。这样,事务对所有发布需求使用相同的可事务性生产者。

除了上述组件外,Spring for Apache Kafka 还提供了其他用于处理与事务相关的问题的实用程序。当我们阅读本系列的后续部分时,我们将看到其中的一些。

在本博客系列的第 2 部分中,我们将继续讨论在 Spring Cloud Stream 应用程序中使用事务的更多实际实现细节。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加快您的进步。

了解更多

获得支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部