理解响应式类型

工程 | Sébastien Deleuze | 2016年4月19日 | ...

继之前的 Reactive SpringReactor Core 3.0 博客文章之后,我想基于我们在 Spring Framework 5 即将推出的响应式支持方面学到的知识,解释为什么响应式类型很有用,以及它们与其他异步类型相比有何优势。

为什么使用响应式类型?

响应式类型的目的不是让你更快地处理请求或数据,实际上它们相比常规的阻塞处理会引入少量开销。它们的优势在于能够并发地服务更多请求,并更高效地处理涉及延迟的操作,例如从远程服务器请求数据。它们通过原生处理时间和延迟而无需消耗更多资源,从而为你提供更好的服务质量和可预测的容量规划。与等待结果时阻塞当前线程的传统处理方式不同,等待中的响应式 API 不消耗资源,只请求其能够处理的数据量,并带来了处理数据流(而不仅仅是单个元素)的新能力。

Java 8 之前

在 Java 8 之前,异步非阻塞行为的实现并不直观,至少有两个原因。第一个原因是基于回调的 API 需要冗长的匿名类,并且不容易链式调用。第二个原因是 Future 类型是异步的,但是当你尝试使用 get() 方法获取结果时,它会阻塞当前线程直到计算完成。这就是 Spring Framework 4.0 引入 ListenableFuture 的原因,它是一个 Future 的实现,增加了基于非阻塞回调的能力。

Lambda、CompletableFuture 和 Stream

然后 Java 8 引入了 lambda 表达式和 CompletableFuture。Lambda 允许编写简洁的回调函数,而 CompletionStage 接口和 CompletableFuture 类最终实现了以非阻塞、基于推送的方式处理 future,并提供了链式处理延迟结果的能力。

Java 8 还引入了 Stream,它被设计用来高效地处理可以以极低延迟访问的数据流(包括基本类型)。它是拉取式的,只能使用一次,缺乏时间相关的操作,可以执行并行计算但无法指定使用的线程池。正如 Brian Goetz 所解释的,它并非为处理有延迟的操作(如 I/O 操作)而设计。这正是像 Reactor 或 RxJava 这样的响应式 API 发挥作用的地方。

响应式 API

诸如 Reactor 之类的响应式 API 也提供了类似 Java 8 Stream 的操作符,但它们更普遍地适用于任何流序列(不仅仅是集合),并且可以通过便捷的流式 API 和 lambda 表达式定义一个转换操作管道,该管道将应用于流经其中的数据。它们被设计用来处理同步或异步操作,并允许你对数据进行缓冲、合并、连接或应用各种转换。

最初,响应式 API 仅被设计用来处理数据流,即 N 个元素,例如使用 Reactor 的 Flux

reactiveService.getResults()
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .consume(System.out::println);

但在我们开发 Spring Framework 5 的过程中,发现有明确的需求需要区分 1 个或 N 个元素的流,这就是 Reactor 提供 Mono 类型的原因。MonoCompletableFuture 类型的响应式等价物,它提供了一个一致的 API,以响应式方式处理单个和多个元素。

Mono.any(reactiveServiceA.findRecent(time), reactiveServiceB.findRecent(time)
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> reactiveServiceC.incrementSuccess())
    .consume(System.out::println);

如果你深入了解 FluxMono,你会注意到这些类型实现了 Reactive Streams 规范中的 Publisher 接口。

Reactive Streams

Reactor 基于 Reactive Streams 规范构建。Reactive Streams 由 4 个简单的 Java 接口PublisherSubscriberSubscriptionProcessor)、一份文本规范和一个 TCK 组成。它是所有现代响应式库的基石,并且是实现互操作性的必备要素。

Reactive Streams 的核心关注点是处理背压(backpressure)。简而言之,背压是一种机制,允许接收者请求从发射者那里接收多少数据。它使得

  • 接收者只有在准备好处理数据时才开始接收
  • 控制正在传输的数据量
  • 高效处理慢发射者/快接收者或快发射者/慢接收者的情况
  • 如果你请求 Long.MAX_VALUE 个元素,则可以从动态的推拉(push-pull)策略切换到纯粹的推送(push-based)策略

乍一看,Publisher 接口实现起来似乎非常简单;但完全符合规范地实现它实际上非常困难,而且用户除了订阅原始的 Publisher 外,什么也做不了!这就是为什么通常最好依赖于 Reactive Streams 的实现(如 Reactor)来帮助你完成这项工作的原因。

请注意,Java 9 将会在 java.util.concurrent.Flow 容器类中包含 Reactive Streams 接口,这进一步表明了 Reactive Streams 在 JDK 中的重要性。

同样值得注意的是,向 Reactive Streams 的趋同以及 Reactor 的转换能力 使得在运行时轻松高效地实现响应式类型之间的转换成为可能。

总结

希望这篇博客文章能帮助您更好地理解响应式类型。

我们正在 Spring Framework、Spring Boot、Spring Data、Spring Security 和 Spring Cloud 等各种 Spring 项目中努力实现对 Reactor MonoFlux 等响应式类型的支持。

但您即将构建的响应式应用程序也将直接使用这些类型,例如在 @Repository@Service@Controller 方法层面,因为构建响应式应用程序意味着在必须处理延迟或流的地方使用响应式语义(我们还将提供一些指导,以便集成阻塞 API)。

我们将在未来几个月内发布更多关于响应式的博客文章。请随意通过 这个测试驱动的精简版 Rx API 实操 来熟悉如何使用 FluxMono,一如既往,欢迎您的反馈!

如果您恰好在五月中旬到访巴塞罗那(无论如何,那都是一个不错的季节!),不要错过参加 Spring I/O 大会 的机会。此外,SpringOne Platform(八月初,拉斯维加斯)的注册也已开放,如果您想享受早鸟票优惠,请抓紧时间。

订阅 Spring 新闻邮件

通过 Spring 新闻邮件保持联系

订阅

取得领先

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单订阅。

了解更多

即将举行的活动

查看 Spring 社区的所有即将举行的活动。

查看全部