Spring 的响应式事务

工程 | Mark Paluch | 2019年5月16日 | ...

早在2016年,我们的响应式旅程就从Spring Framework 5以及一些响应式集成开始。在我们的旅程中,其他项目也加入了响应式运动。借助R2DBC,我们现在还为SQL数据库提供了响应式集成。随着具备事务能力的集成的增长,我们不断被问到

Spring Framework 是否支持响应式 @Transaction?

在我们旅程开始的时候,我们没有响应式形式的事务集成,所以这个问题很容易回答:不需要响应式事务管理。

随着时间的推移,MongoDB 开始使用 MongoDB Server 4.0 支持多文档事务。R2DBC(响应式 SQL 数据库驱动程序的规范)开始出现,我们决定使用 Spring Data R2DBC 来选择 R2DBC。这两个项目都希望公开事务行为,因此它们最终在其模板 API 上提供了inTransaction(…)方法来执行受本地事务保护的工作单元。

虽然对于较小的工作块使用inTransaction(…)方法很方便,但这并不反映 Spring 支持事务的方式。在使用命令式编程模型时,Spring Framework 允许两种事务管理安排:@TransactionalTransactionTemplate(声明式和编程式事务管理)。

这两种事务管理方法都是建立在PlatformTransactionManager之上的,后者管理事务资源的事务。PlatformTransactionManager可以是 Spring 提供的事务管理器实现,也可以是基于 JTA 的 Java EE 事务管理器。

这两种方法的共同点是它们将事务状态绑定到ThreadLocal存储,这允许在不传递TransactionStatus对象的情况下进行事务状态管理。事务管理应该以非侵入性的方式在后台进行。由于底层假设我们不使用线程在事务中继续工作,因此ThreadLocal在命令式编程安排中有效。

命令式事务管理的工作原理

事务管理需要将其事务状态与执行关联起来。在命令式编程中,这通常是ThreadLocal存储——事务状态绑定到一个Thread。其基本假设是事务代码在容器调用它的同一线程上执行。

响应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细观察响应式执行,我们可以观察到代码在不同的线程上执行。当使用进程间通信时,这一点更加明显。我们不能再安全地假设我们的代码完全在同一线程上执行。

假设的这种变化使依赖于ThreadLocal的事务管理实现无效。

由于集成和优化(例如运算符融合),线程切换会在任意时间发生。这种变化会破坏所有依赖于ThreadLocal的代码。结果是,我们需要不同的安排来反映事务状态,而无需始终传递TransactionStatus对象。

在响应式领域,关联带外数据并不是一个新的需求。我们在其他领域也面临着这个需求,例如 Spring Security 中用于响应式方法安全的SecurityContext(仅举一个例子)。Project Reactor 是 Spring 构建其响应式支持的基础响应式库,自 3.1 版本起就提供了对订阅者上下文的支持。

Reactor Context 对于响应式编程来说就像ThreadLocal对于命令式编程一样:上下文允许将上下文数据绑定到特定的执行。对于响应式编程,这是一个Subscription。Reactor 的Context允许 Spring 将事务状态以及所有资源和同步绑定到特定的Subscription。现在,所有使用 Project Reactor 的响应式代码都可以参与响应式事务。要访问事务详细信息并返回标量值的代码必须重写为使用响应式类型才能参与事务。否则,Context不可用。

响应式事务管理

从 Spring Framework 5.2 M2 开始,Spring 通过ReactiveTransactionManager SPI 支持响应式事务管理。

ReactiveTransactionManager是用于使用事务资源的响应式和非阻塞集成的事务管理抽象。它是返回Publisher类型的响应式@Transactional方法以及使用TransactionalOperator进行编程式事务管理的基础。

前两个响应式事务管理器实现是:

  • 通过 Spring Data R2DBC 1.0 M2 的 R2DBC
  • 通过 Spring Data MongoDB 2.2 M4 的 MongoDB

让我们看看响应式事务是什么样的

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql("INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

在注解驱动的安排中,响应式事务看起来与命令式事务非常相似。但是,主要区别在于我们使用DatabaseClient,这是一个响应式资源抽象。所有事务管理都在后台发生,利用 Spring 的事务拦截器和ReactiveTransactionManager

Spring 区分(基于方法返回类型)要应用哪种类型的事务管理:

  • 方法返回Publisher类型:响应式事务管理
  • 所有其他返回类型:命令式事务管理

这种区别很重要,因为您仍然可以使用命令式组件,例如 JPA 或 JDBC 查询。将这些结果包装到Publisher类型中会向 Spring 发出信号,指示应用响应式事务管理而不是命令式事务管理。也就是说,响应式事务安排不会打开ThreadLocal绑定的事务,而这对于 JPA 或 JDBC 是必需的。

TransactionalOperator

接下来,让我们看看如何使用TransactionalOperator进行编程式事务管理。

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql("INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的代码包含一些值得注意的组件:

  • R2dbcTransactionManager:这是用于 R2DBC ConnectionFactory 的响应式事务管理器。
  • DatabaseClient:客户端使用 R2DBC 驱动程序访问 SQL 数据库。
  • TransactionalOperator:此运算符将所有上游 R2DBC 发布者与事务上下文关联。您可以使用运算符样式as(…::transactional)或使用execute(txStatus -> …)的回调样式。

响应式事务在订阅时延迟启动。运算符启动事务,设置适当的隔离级别并将数据库连接与其订阅者上下文关联。所有参与的(上游)Publisher实例都使用单个Context绑定的事务连接。

响应式函数式运算符链可以是线性的(使用单个Publisher)或非线性的(合并多个流)。当使用运算符样式时,响应式事务会影响所有上游Publisher。要将事务范围限制为特定的一组Publisher,请应用回调样式,如下所示:

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql("INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事务管理仅限于在execute(…)中订阅的Publisher实例。或者,换句话说,事务是有作用域的。execute(…)中的Publisher实例参与事务,而名为outsideTransactionPublisher在其事务之外执行其工作。

R2DBC 是 Spring 与响应式事务集成的其中一种。另一种集成是通过 Spring Data MongoDB 的 MongoDB,您可以使用它通过响应式编程参与多文档事务。

Spring Data MongoDB 附带ReactiveMongoTransactionManager作为ReactiveTransactionManager实现。它创建会话并管理事务,以便在托管事务中执行的代码参与多文档事务。

以下示例显示了使用 MongoDB 的编程式事务管理:

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的代码设置了ReactiveTransactionManager并使用TransactionalOperator在单个事务中执行多个写入操作。ReactiveMongoTemplate被配置为参与响应式事务。

后续步骤

响应式事务管理已随Spring Framework 5.2 M2、Spring Data MongoDB 2.2 M4和Spring Data R2DBC 1.0 M2里程碑版本发布。您可以获取这些版本并开始在您的代码中集成响应式事务管理。我们期待社区的反馈,以便在6月初发布候选版本之前,我们可以解决任何问题。

获取Spring新闻

关注Spring新闻

订阅

抢先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部