响应式编程与关系型数据库

工程 | Mark Paluch | 2018年12月7日 | ...

随着软件吞噬世界,命令式代码以传入请求的速度消耗线程。本文讨论了 JVM 上响应式编程的假设,以及这对集成(特别是关系型数据库)意味着什么。

撰写这篇文章的动机是响应式编程采用的持续增长,而一些主要的构建块尚未可用——特别是这个问题:关系型数据库怎么办?

什么是响应式编程

关于什么是响应式编程以及它与响应式系统的比较,有很多答案。我认为响应式编程是一种编程模型,它通过创建对资源的可用性和可处理性做出反应的事件驱动的非阻塞函数管道来促进可扩展性和稳定性。延迟执行、并发和异步只是底层编程模型的结果。

只有当整个堆栈都是响应式的,并且所有参与的组件(应用程序代码、运行时容器、集成)都遵守延迟执行、非阻塞 API 和数据流的流式性质时,响应式编程才能充分发挥其优势——基本上遵循底层假设。

虽然可以将非响应式组件引入以函数式响应式风格编写的应用程序,但最终结果是可扩展性和稳定性效果(实际的预期收益)会降低。在最坏的情况下,运行时行为几乎没有或根本没有区别。但是,响应式编程有助于提高代码的可读性。

如果我们纵观响应式生态系统,我们会发现多个框架、库和集成。它们各自都有其特定的优势。许多功能领域都得到了很好的覆盖,无论是通过通用方法还是在特定响应式框架的上下文中。所以让我们讨论关系型数据库集成。

关系型数据库与响应式

众所周知关系型数据库很流行,并且可以推测,大多数企业项目都严重依赖关系型数据库的使用。无论如何,最常被问到的问题是:我们什么时候才能获得用于响应式关系型数据库集成的 API?

Java 使用 JDBC 作为与关系型数据库集成的主要技术。JDBC 本质上是阻塞式的——没有什么合理的方法可以缓解 JDBC 的阻塞特性。使调用非阻塞的第一个想法是将 JDBC 调用卸载到 Executor(通常是 Thread 池)。虽然这种方法在某种程度上有效,但它也存在一些缺点,这些缺点忽略了响应式编程模型的优势。

线程池需要——毫不奇怪——线程来运行。响应式运行时通常使用数量有限的线程,这些线程与 CPU 核心的数量相匹配。额外的线程会引入开销并降低线程限制的效果。此外,JDBC 调用通常会在队列中堆积,并且一旦线程被请求饱和,池将再次阻塞。因此,JDBC 目前不是一个选项。

响应式数据库的努力

有一些独立的驱动程序,例如Reactiverse 的 reactive-pg-client。这些驱动程序带有供应商特定的 API,并不真正适合更广泛的采用。客户端集成商需要提供额外的层来公开通用 API。新的驱动程序无法轻松地插入客户端库。相比之下,拥有标准 API 可以允许可插拔性,同时将客户端与特定于数据库的解决方案解耦——这对所有人来说都是巨大的价值。

Oracle 宣布了ADBA,这是一项旨在通过使用 future 为 Java 提供标准化异步数据库访问 API 的计划。ADBA 中的所有内容都正在开发中,ADBA 背后的团队很乐意获得反馈。一群 Postgres 人员正在开发一个Postgres ADBA 驱动程序,可用于首次实验。PgNio 是另一个用于 Postgres 的异步驱动程序,它开始尝试使用 ADBA。

ADBA 的可用性尚不清楚。它肯定不会随 Java 12 一起发布。ADBA 计划首次亮相的 Java 版本目前坦率地说尚不清楚。

以下代码片段显示了使用 INSERTSELECT 语句的 ADBA 用法

DataSource ds = dataSource();
CompletableFuture<Long> t;

try (Session session = ds.getSession()) {

  Submission<Long> submit = session
  .<Long>rowCountOperation(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
    .set("$1", 42055, AdbaType.INTEGER)
    .set("$2", "Description", AdbaType.VARCHAR)
    .set("$3", null, AdbaType.INTEGER)
    .apply(Result.RowCount::getCount)
    .submit();

  t = submit.getCompletionStage().toCompletableFuture();
}

t.join();

CompletableFuture<List<Map<String, Object>>> t;
try (Session session = ds.getSession()) {

  Submission<List<Map<String, Object>>> submit = session
    .<List<Map<String, Object>>> rowOperation(
      "SELECT id, name, manual FROM legoset")
    .collect(collectToMap()) // custom collector
    .submit();
  t = submit.getCompletionStage().toCompletableFuture();
}

t.join();

请注意,collectToMap(…) 是应用程序提供的函数的一个示例,该函数将结果提取到所需的返回类型中。

总之,尚无可用于访问关系型数据库的响应式 API。

R2DBC 救援!

由于缺乏标准 API 以及驱动程序的不可用性,Pivotal 的一个团队开始研究响应式关系 API 的想法,该 API 将非常适合响应式编程目的。他们提出了R2DBC,它代表响应式关系数据库连接。截至目前,R2DBC 是一个孵化器项目,用于评估可行性并开始讨论驱动程序供应商是否有兴趣支持响应式/非阻塞/异步驱动程序。

截至目前,有三个驱动程序实现

R2DBC 带有 API 规范 (r2dbc-spi) 和一个客户端 (r2dbc-client),使 SPI 可用于应用程序。

以下代码片段显示了使用 INSERTSELECT 语句的 R2DBC SPI 用法

ConnectionFactory connectionFactory = null;

Mono<Integer> count = Mono.from(connectionFactory.create())
  .flatMapMany(it ->
    it.createStatement(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
      .bind("$1", 42055)
      .bind("$2", "Description")
      .bindNull("$3", Integer.class)
      .execute())
  .flatMap(io.r2dbc.spi.Result::getRowsUpdated)
  .next();

Flux<Map<String, Object>> rows = Mono.from(connectionFactory.create())
  .flatMapMany(it -> it.createStatement(
    "SELECT id, name, manual FROM legoset").execute())
  .flatMap(it -> it.map((row, rowMetadata) -> collectToMap(row, rowMetadata)));

虽然上面的代码有点笨拙,但 R2DBC 还带有一个客户端库项目,用于更人性化的用户 API。R2DBC SPI 不打算直接使用,而是通过客户端库使用。

使用 R2DBC 客户端重写的相同代码将是

R2dbc r2dbc = new R2dbc(connectionFactory);

Flux<Integer> count = r2dbc.inTransaction(handle ->
  handle.createQuery(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
    .bind("$1", 42055)
    .bind("$2", "Description")
    .bindNull("$3", Integer.class)
    .mapResult(io.r2dbc.spi.Result::getRowsUpdated));

Flux<Map<String, Object>> rows = r2dbc
  .inTransaction(handle -> handle.select(
    "SELECT id, name, manual FROM legoset")
  .mapRow((row, rowMetadata) -> collectToMap(row, rowMetadata));

请注意,collectToMap(…) 是应用程序提供的函数的一个示例,该函数将结果提取到所需的返回类型中。

Spring Data 团队启动了Spring Data R2DBC 作为孵化器,以通过数据库客户端提供响应式 API 并支持响应式存储库。使用 Spring Data R2DBC 重写的示例代码将是

DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);

Mono<Integer> count = databaseClient.execute()
  .sql(
    "INSERT INTO legoset (id, name, manual) " +
    "VALUES($1, $2, $3)")
  .bind("$1", 42055)
  .bind("$2", "Description")
  .bindNull("$3", Integer.class)
  .fetch()
  .rowsUpdated();

Flux<Map<String, Object>> rows = databaseClient.execute()
  .sql("SELECT id, name, manual FROM legoset")
  .fetch()
  .all();

R2DBC 及其生态系统仍处于早期阶段,需要进行实验和反馈以收集用例,并查看响应式关系型数据库集成是否有意义。

基于纤程的 JDBC

让我们谈谈技术的组合。虽然 JDBC 和其他技术公开了阻塞 API(主要是由于等待 I/O),但正在开发Loom 项目。Loom 引入了 纤程 作为一种轻量级抽象,它将阻塞 API 转换为非阻塞 API。这可以通过在调用遇到阻塞 API 时立即进行栈切换来实现。因此,底层的 纤程 尝试在之前使用阻塞 API 的流程上继续执行。

纤程 执行模型大大减少了所需的原生线程数量。结果是更好的可扩展性和非阻塞行为——通过将阻塞调用卸载到 纤程 支持的 Executor。我们这里需要的是一个合适的 API,允许使用基于 纤程 的非阻塞 JDBC 实现。

结论

响应式编程和关系型数据库的未来会怎样?老实说,我不知道。如果我尝试做一个有根据的猜测,我认为 Loom 项目和基于纤程的 Executor 与完善的 JDBC 驱动程序相结合,可能会成为业界的一个潜在变革者。随着 Java 发布节奏的加快,这可能不会太遥远。

ADBA的目标是纳入Java标准运行时,我预计这最早也要到Java 17才会实现,根据目前的计划,这将在2021年某个时候发生。

相比之下,R2DBC现在就可以使用。它自带驱动程序和客户端,并允许进行实验性使用。R2DBC的一个巧妙的副作用是,它公开了一个完全响应式的API,同时独立于底层数据库引擎。由于版本已经发布,因此无需猜测Project Loom,也无需等待可能三年的时间来试用API。使用R2DBC,现在就可以做到。

获取Spring通讯

关注Spring通讯

订阅

抢先一步

VMware提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部