使用 Spring Data 响应式编程

工程 | Mark Paluch | 2016年11月28日 | ...

上周发布的 Spring Data Kay M1 是首个支持响应式数据访问的版本。它最初支持的存储——MongoDB、Apache Cassandra 和 Redis——都已自带响应式驱动程序,这使得它们成为此类原型的理想选择。让我们更详细地了解一下新的编程模型以及构成该支持的 API。

响应式仓库

仓库编程模型是 Spring Data 用户通常处理的最高级别抽象。它们通常包含一组在 Spring Data 提供的接口和特定于域的查询方法中定义的 CRUD 方法。下面是一个响应式 Spring Data 仓库定义的示例:

public interface ReactivePersonRepository
  extends ReactiveCrudRepository<Person, String> {

  Flux<Person> findByLastname(Mono<String> lastname);

  @Query("{ 'firstname': ?0, 'lastname': ?1}")
  Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}

正如您所看到的,它与您习惯的差别不大。但是,与传统的仓库接口相比,响应式仓库使用响应式类型作为返回类型,并且也可以将其用于参数类型。新引入的 `ReactiveCrudRepository` 中的 CRUD 方法当然也使用了这些类型。

默认情况下,响应式仓库使用 Project Reactor 类型,但也可以使用其他响应式库。我们为这些库提供了一个自定义的仓库基接口(例如 `RxJava2CrudRepository`),并且还会根据需要自动调整查询方法的类型,例如 RxJava 的 `Observable` 和 `Single`。其余部分基本保持不变。但是请注意,当前里程碑版本尚不支持分页,并且您当然必须在类路径上拥有必要的响应式库才能激活对特定库的支持。

激活响应式 Spring Data

与我们在阻塞式世界中一样,对响应式 Spring Data 的支持是通过 `@Enable…` 注解以及一些基础设施设置来激活的。

@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {

  @Bean
  public MongoClient mongoClient() {
    return MongoClients.create();
  }

  @Override
  protected String getDatabaseName() {
    return "reactive";
  }
}

请注意我们如何为基础设施配置使用不同的基类,因为我们需要使用 MongoDB 异步驱动程序。

使用响应式仓库

现在可以使用仓库,就像使用阻塞式仓库一样,只是现在可以以响应式方式处理结果。

@RestController
class PersonController {

  private final PersonRepository people;

  public PersonController(PersonRepository people) {
    this.people = people;
  }

  @GetMapping("/people")
  Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {

    Flux<Person> result = repository.findByLastname(lastname);
    return result.map(it -> it.getFullName());
  }
}

请注意我们如何转发 Spring Web Reactive 提供的响应式参数,将它们传递到仓库中,依次获取 `Flux`,然后以响应式方式处理执行结果。一般来说,响应式查询方法遵循与已知仓库相同的查询创建思路。传递给查询方法的参数可以是普通参数(例如 `String`)、包装参数(`Optional`、`Stream`)或响应式包装参数(`Mono`、`Flux`)。如果您使用响应式包装器作为参数类型,则实现会推迟实际的查询创建和执行,直到实际订阅。

响应式模板

就像传统的仓库基于传统的模板实现一样,响应式仓库也是建立在响应式模板之上的。阻塞式模板 API 中的大多数可用操作在响应式模板中都有对应的操作。我们将把阻塞式世界中的更多功能移植到响应式模板 API 中,但某些操作根本无法通过响应式驱动程序(目前)使用,或者在响应式世界中根本没有意义。

这是 Spring Data MongoDB 中 `ReactiveMongoOperations` 的摘录。它由 `ReactiveMongoTemplate` 实现,并使用 Project Reactor 的响应式类型(如 `Mono` 和 `Flux`)来包装响应。某些方法还接受响应式类型以将数据流式传输到您的数据存储中。

public interface ReactiveMongoOperations {

  // …

  /**
   * Map the results of an ad-hoc query on the specified collection to a
   * single instance of an object of the specified type.
   */
  <T> Mono<T> findOne(Query query, Class<T> entityClass);

  /**
   * Map the results of an ad-hoc query on the collection for the entity
   * class to a List of the specified type.
   */
  <T> Flux<T> find(Query query, Class<T> entityClass);

  /**
   * Insert the object into the specified collection.
   */
  <T> Mono<T> insert(T objectToSave, String collectionName);

  /**
   * Insert the object into the collection for the entity type of the object
   * to save.
   */
  <T> Mono<T> insert(Mono<? extends T> objectToSave);

  // …
}

请注意,所有方法都遵循响应式执行模型,在调用时不执行任何包含任何 I/O 的操作,而只在订阅返回的值时执行。

让我们通过模板插入一些数据:

Flux<Person> flux = Flux.just(new Person("Walter", "White"),
  new Person("Skyler", "White"),
  new Person("Saul", "Goodman"),
  new Person("Jesse", "Pinkman"));

template.insertAll(flux).subscribe();

某些方法(例如 `insertAll(…)`)接受响应式类型以将传入数据异步地流式传输到您的 MongoDB 数据库中,例如来自 Spring Web Reactive 控制器中接收到的 `Flux`,该控制器将通过 Jackson 异步地映射 JSON 数组。

@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {

  return template.insertAll(people);
}

正如您所看到的,仓库和模板 API 都允许您以响应式、非阻塞的方式描述请求处理。也就是说,让我们更深入地了解一下 Redis 对响应式数据访问的支持。

Spring Data Redis 的响应式连接

Spring Data Redis 在连接级别提供了初始的响应式支持,目前仅在 Lettuce 上提供,因为它是唯一支持响应式数据访问的 Redis 驱动程序。由于 Redis 通常在更低的抽象级别上使用,因此 Kay M1 版本从该较低级别的响应式抽象开始。`LettuceConnectionFactory` 允许访问 `ReactiveRedisConnection`,后者又提供对 Redis 命令的响应式版本的访问。

使用运算符进行函数式链可以创建链以响应式的方式访问 Redis 数据。同样,所有 I/O 都是异步的。

ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
  .flatMap(keyCommands::type)
  .flatMap(System.out::println)
  .subscribe();

这段代码获取一个随机键并打印其数据类型。不存在的随机键将完成为空 `Mono`。

响应式 Redis 命令有两种形式:接受普通参数和接受命令发布者。命令发布者会发出特定的 Redis 命令,以将数据直接流式传输到 Redis。每个发出的命令都会在命令执行后发出命令响应。

public interface ReactiveStringCommands {

  // …

  Mono<Boolean> set(ByteBuffer key, ByteBuffer value);

  Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);

  // …
}

传统的 Spring Data Redis 在其阻塞式 API 上使用 `byte[]` 来交换数据。如果数据存在于缓冲区(例如 `ByteBuffer` 或 Netty 的 `ByteBuf`)中,则 `byte[]` 会强制进行数据复制。响应式支持很大程度上与高效的资源使用有关,因此我们决定公开接受和返回 `ByteBuffer` 的方法。

总结

我希望这篇博文能让您了解 Kay 在不同抽象级别上提供的响应式功能。您可以在我们的示例存储库中找到所有这些的可执行示例。

我们期待在 2017 年 1 月发布另一个里程碑版本,然后转向发布候选版本。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区所有即将举行的活动。

查看全部