拥抱 Spring Data 的响应式编程

工程 | Mark Paluch | 2016 年 11 月 28 日 | ...

上周发布的 Spring Data Kay M1 是首个支持响应式数据访问的版本。其初始支持的存储 — MongoDB、Apache Cassandra 和 Redis — 都已具备响应式驱动,这使得它们成为这类原型的自然选择。让我们更详细地了解一下支持响应式编程的新编程模型和 API。

响应式 Repositories

Repositories 编程模型是 Spring Data 用户通常打交道的最高级抽象。它们通常由一组在 Spring Data 提供的接口中定义的 CRUD 方法以及领域特定的查询方法组成。下面是一个响应式 Spring Data repository 定义的样子:

public interface ReactivePersonRepository
  extends ReactiveCrudRepository<Person, String> {

  Flux<Person> findByLastname(Mono<String> lastname);

  @Query("{ 'firstname': ?0, 'lastname': ?1}")
  Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}

正如你所见,与你习惯的相比,并没有太大的区别。然而,与传统的 repository 接口相比,响应式 repository 使用响应式类型作为返回值,并且也可以用于参数类型。新引入的 ReactiveCrudRepository 中的 CRUD 方法当然也使用了这些类型。

默认情况下,响应式 repository 使用 Project Reactor 类型,但也可以使用其他响应式库。我们为这些库提供了自定义的 repository 基础接口(例如 RxJava2CrudRepository),并且还会根据需要自动适配查询方法所需的类型,例如 RxJava 的 ObservableSingle。其余部分基本保持不变。但请注意,当前的里程碑版本还不支持分页,并且你当然需要将必要的响应式库添加到类路径中才能激活对特定库的支持。

激活响应式 Spring Data

与阻塞式编程中的情况类似,响应式 Spring Data 的支持是通过 @Enable… 注解以及一些基础设施设置来激活的。

@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {

  @Bean
  public MongoClient mongoClient() {
    return MongoClients.create();
  }

  @Override
  protected String getDatabaseName() {
    return "reactive";
  }
}

可以看到,我们为基础设施配置使用了不同的基类,因为我们需要利用 MongoDB 的异步驱动。

使用响应式 Repositories

现在可以使用 repository 了,就像使用阻塞式 repository 一样,只是结果的处理现在可以以响应式的方式进行。

@RestController
class PersonController {

  private final PersonRepository people;

  public PersonController(PersonRepository people) {
    this.people = people;
  }

  @GetMapping("/people")
  Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {

    Flux<Person> result = repository.findByLastname(lastname);
    return result.map(it -> it.getFullName());
  }
}

请看我们如何转发 Spring Web Reactive 提供的响应式参数,将它们传入 repository,然后获得一个 Flux,并以响应式的方式处理执行结果。总的来说,响应式查询方法遵循与已知 repository 相同的查询创建思路。传递给查询方法的参数可以是普通的(例如 String)、包装的(Optional<String>Stream<String>)或响应式包装的参数(Mono<String>Flux<String>)。如果你使用响应式包装器作为参数类型,实现会将实际的查询创建和执行推迟到实际订阅时。

响应式 Templates

正如传统的 repository 基于传统的 template 实现一样,响应式的 repository 是构建在响应式 template 之上的。阻塞式 template API 中可用的大多数操作在响应式 template 中都有对应的功能。我们将把阻塞式编程中的更多功能移植到响应式 template API 中,但有些操作(目前)只是无法通过响应式驱动程序获得,或者在响应式编程中没有意义。

这是 Spring Data MongoDB 中 ReactiveMongoOperations 的一部分。它由 ReactiveMongoTemplate 实现,并使用 Project Reactor 的响应式类型,如 MonoFlux 来包装响应。一些方法还接受响应式类型,以便将数据流式传输到你的数据存储中。

public interface ReactiveMongoOperations {

  // …

  /**
   * Map the results of an ad-hoc query on the specified collection to a
   * single instance of an object of the specified type.
   */
  <T> Mono<T> findOne(Query query, Class<T> entityClass);

  /**
   * Map the results of an ad-hoc query on the collection for the entity
   * class to a List of the specified type.
   */
  <T> Flux<T> find(Query query, Class<T> entityClass);

  /**
   * Insert the object into the specified collection.
   */
  <T> Mono<T> insert(T objectToSave, String collectionName);

  /**
   * Insert the object into the collection for the entity type of the object
   * to save.
   */
  <T> Mono<T> insert(Mono<? extends T> objectToSave);

  // …
}

请注意,所有方法都遵循响应式执行模型,在调用时不会执行任何涉及 I/O 的操作,而只在订阅返回的值时执行。

让我们通过 template 插入一些数据:

Flux<Person> flux = Flux.just(new Person("Walter", "White"),
  new Person("Skyler", "White"),
  new Person("Saul", "Goodman"),
  new Person("Jesse", "Pinkman"));

template.insertAll(flux).subscribe();

一些方法 — 例如 insertAll(…) — 接受响应式类型,以便将传入的数据异步流式传输到你的 MongoDB 数据库,例如,这些数据可能来自你在 Spring Web Reactive 控制器中接收到的 Flux,该控制器将通过 Jackson 异步映射一个 JSON 数组。

@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {

  return template.insertAll(people);
}

正如你所见,repository 和 template API 都允许你以响应式、非阻塞的方式描述请求处理。说了这些,让我们更深入地研究一下 Redis 对响应式数据访问的支持。

Spring Data Redis 的响应式连接

Spring Data Redis 在连接层面提供了初步的响应式支持,目前仅限于 Lettuce,因为它是唯一支持响应式数据访问的 Redis 驱动。由于 Redis 通常在更低的抽象级别上使用,Kay M1 版本从更低级别的响应式抽象开始。LettuceConnectionFactory 允许访问 ReactiveRedisConnection,进而提供对 Redis 命令响应式版本的访问。

通过操作符进行函数式链式调用,创建链来以响应式的方式访问 Redis 数据。同样,所有 I/O 都是异步的。

ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
  .flatMap(keyCommands::type)
  .flatMap(System.out::println)
  .subscribe();

这段代码获取一个随机键并打印其数据类型。一个不存在的随机键会被完成为一个空的 Mono

响应式 Redis 命令有两种形式:接受普通参数和接受命令发布者。命令发布者会发出特定的 Redis 命令,将数据流式传输到 Redis。每个发出的命令在执行后都会发出一个命令响应。

public interface ReactiveStringCommands {

  // …

  Mono<Boolean> set(ByteBuffer key, ByteBuffer value);

  Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);

  // …
}

传统的 Spring Data Redis 在其阻塞式 API 中使用 byte[] 来交换数据。byte[] 会强制进行数据复制,如果数据已经在缓冲区中,例如 ByteBuffer 或 Netty 的 ByteBuf。响应式支持很大程度上是为了高效利用资源,因此我们决定暴露接受和返回 ByteBuffer 的方法。

总结

我希望这篇博文能让你了解 Kay 在各种抽象级别上提供的响应式功能。你可以在我们的示例仓库中找到所有这些功能的可用示例。

我们期待在 2017 年 1 月份发布另一个里程碑版本,然后走向发布候选版本。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有