Spring 中的响应式事务

工程 | Mark Paluch | 2019年5月16日 | ...

早在2016年,我们就以Spring Framework 5为起点,开始了响应式之旅,并伴随着几个响应式集成。在我们的旅程中,其他项目也加入了响应式运动。随着R2DBC的出现,我们现在也为SQL数据库提供了响应式集成。随着支持事务的集成不断增长,我们不断被问到

Spring Framework 是否支持响应式 @Transaction?

在我们旅程开始的时候,我们没有响应式的事务集成形式,所以这个问题很简单:不需要响应式事务管理。

随着时间的推移,MongoDB 从 4.0 版本开始支持多文档事务。R2DBC(响应式 SQL 数据库驱动程序的规范)开始出现,我们决定通过 Spring Data R2DBC 来支持 R2DBC。这两个项目都希望暴露事务行为,因此它们最终在其 Template API 上提供了 inTransaction(…) 方法,以执行受原生事务保护的工作单元。

虽然对于较小的工作块使用 inTransaction(…) 方法很方便,但它并不反映 Spring 支持事务的方式。在处理命令式编程模型时,Spring Framework 允许两种事务管理安排:@TransactionalTransactionTemplate(声明式和编程式事务管理)。

这两种事务管理方法都构建在 PlatformTransactionManager 之上,该管理器负责事务性资源的事务管理。PlatformTransactionManager 可以是 Spring 提供的事务管理器实现,也可以是基于 JTA 的 Java EE 实现。

这两种方法都有一个共同点,那就是将事务状态绑定到 ThreadLocal 存储,这使得在不传递 TransactionStatus 对象的情况下进行事务状态管理成为可能。事务管理应该在后台以非侵入性的方式进行。ThreadLocal 在命令式编程安排中有效,因为它基于一个基本假设,即我们不会在事务内继续执行工作来占用线程。

命令式事务管理的工作原理

事务管理需要将其事务状态与执行关联起来。在命令式编程中,这通常是 ThreadLocal 存储 - 事务状态绑定到 Thread。基本假设是,事务性代码在容器调用它的同一线程上执行。

响应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细查看响应式执行,我们可以发现代码在不同的线程上执行。当使用进程间通信时,这一点会更加明显。我们不再能安全地假设我们的代码在同一线程上完全执行。

这种假设的变化使依赖于 ThreadLocal 的事务管理实现失效。

由于集成和优化(如操作符融合),线程切换会随时发生。这种变化打破了所有依赖于 ThreadLocal 的代码。结果是,我们需要一种不同的安排来反映事务状态,而无需一直传递 TransactionStatus 对象。

在响应式领域,关联带外数据并不是一个新需求。我们在其他领域也面临这个需求,例如 Spring Security 中的 SecurityContext 用于响应式方法安全(举个例子)。Spring 构建其响应式支持的响应式库 Project Reactor 自 3.1 版本以来就提供了对订阅者上下文的支持。

Reactor Context 对于响应式编程来说,就像 ThreadLocal 对于命令式编程一样:Contexts 允许将上下文数据绑定到特定的执行。对于响应式编程,这是一个 Subscription。Reactor 的 Context 允许 Spring 将事务状态以及所有资源和同步绑定到特定的 Subscription。所有使用 Project Reactor 的响应式代码现在都可以参与响应式事务。想要访问事务性细节的代码,如果返回标量值,则必须重写以使用响应式类型来参与事务。否则,将无法使用 Context

响应式事务管理

从 Spring Framework 5.2 M2 开始,Spring 通过 ReactiveTransactionManager SPI 支持响应式事务管理。

ReactiveTransactionManager 是一个用于响应式和非阻塞集成(使用事务性资源)的事务管理抽象。它是返回 Publisher 类型的响应式 @Transactional 方法以及使用 TransactionalOperator 的编程事务管理的基础。

前两个响应式事务管理器实现是

  • R2DBC 通过 Spring Data R2DBC 1.0 M2
  • MongoDB 通过 Spring Data MongoDB 2.2 M4

让我们看看响应式事务是什么样子的

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql("INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

响应式事务在注解驱动的安排中看起来非常类似于命令式事务。但主要区别在于我们使用的是 DatabaseClient,这是一个响应式资源抽象。所有事务管理都在后台进行,利用 Spring 的事务拦截器和 ReactiveTransactionManager

Spring 根据方法返回类型区分要应用的事务管理类型

  • 方法返回 Publisher 类型:响应式事务管理
  • 所有其他返回类型:命令式事务管理

这种区分很重要,因为您仍然可以使用命令式组件,例如 JPA 或 JDBC 查询。将这些结果包装到 Publisher 类型中会指示 Spring 应用响应式而非命令式事务管理。也就是说,响应式事务安排不会打开 ThreadLocal 绑定的事务,而这对于 JPA 或 JDBC 是必需的。

TransactionalOperator

下一步,让我们通过使用 TransactionalOperator 来了解编程事务管理

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql("INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的代码包含一些值得注意的组件

  • R2dbcTransactionManager:这是 R2DBC ConnectionFactory 的响应式事务管理器。
  • DatabaseClient:该客户端使用 R2DBC 驱动程序访问 SQL 数据库。
  • TransactionalOperator:此操作符将所有上游 R2DBC 发布者与事务上下文关联起来。您可以使用操作符样式 as(…::transactional) 或回调样式 execute(txStatus -> …)

响应式事务在订阅时惰性启动。该操作符启动一个事务,设置适当的隔离级别,并将数据库连接与订阅者上下文关联起来。所有参与的(上游)Publisher 实例都使用一个 Context 绑定的事务性连接。

响应式函数式操作符链可以是线性的(通过使用单个 Publisher)或非线性的(通过合并多个流)。当使用操作符样式时,响应式事务会影响所有上游 Publisher。要将事务范围限制在特定的一组 Publisher 上,请应用回调样式,如下所示

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql("INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事务管理仅限于在 execute(…) 中订阅的 Publisher 实例。换句话说,事务是分范围的。execute(…) 中的 Publisher 实例参与事务,而名为 outsideTransactionPublisher 在事务外部执行其工作。

R2DBC 是 Spring 与响应式事务的集成之一。另一个集成是 MongoDB(通过 Spring Data MongoDB),您可以使用它通过响应式编程参与多文档事务。

Spring Data MongoDB 附带 ReactiveMongoTransactionManager 作为 ReactiveTransactionManager 实现。它创建一个会话并管理事务,以便在受管理事务中执行的代码参与多文档事务。

下面的示例展示了使用 MongoDB 的编程事务管理

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的代码设置了一个 ReactiveTransactionManager,并使用 TransactionalOperator 在单个事务中执行多个写入操作。ReactiveMongoTemplate 被配置为参与响应式事务。

下一步

响应式事务管理随 Spring Framework 5.2 M2、Spring Data MongoDB 2.2 M4 和 Spring Data R2DBC 1.0 M2 里程碑版本一起发布。您可以获取这些版本并在代码中开始集成响应式事务管理。我们期待社区的反馈,以便在六月初发布候选版本之前平滑任何不完善之处。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有