理解响应式类型

工程 | Sébastien Deleuze | 2016年4月19日 | ...

继之前的响应式 SpringReactor Core 3.0博文之后,我想解释一下响应式类型为何有用,以及它们与其它异步类型相比有何不同,这是基于我们在 Spring Framework 5 即将推出的响应式支持方面所学到的知识。

为什么要使用响应式类型?

响应式类型并非旨在让你更快地处理请求或数据,事实上,与常规的阻塞式处理相比,它们会引入一些轻微的开销。它们的优势在于能够更高效地同时处理更多请求,并处理那些具有延迟的操作,例如从远程服务器请求数据。它们使你能够提供更好的服务质量和可预测的容量规划,因为它们原生处理时间和延迟,而不会消耗更多资源。与等待结果时阻塞当前线程的传统处理方式不同,等待的响应式 API 几乎不消耗成本,仅请求它能够处理的数据量,并且由于它处理的是数据流,而不仅仅是单个元素,因此带来了新的功能。

Java 8 之前

在 Java 8 之前,实现异步非阻塞行为至少有两个原因不够明显。第一个原因是基于回调的 API 需要冗余的匿名类,并且不易于链式调用。第二个原因是 Future 类型是异步的,**但是**当你尝试使用 get() 方法获取结果时,它会阻塞当前线程直到计算完成。这就是为什么 Spring Framework 4.0 引入了 ListenableFuture,一个增加了非阻塞回调功能的 Future 实现。

Lambda 表达式、CompletableFuture 和 Stream

然后 Java 8 引入了 Lambda 表达式和 CompletableFuture。Lambda 表达式允许编写简洁的回调,而 CompletionStage 接口和 CompletableFuture 类最终允许以非阻塞和基于推的方式处理 Future,同时提供对这种延迟结果处理进行链式调用的能力。

Java 8 还引入了 Stream,它被设计用于高效地处理可以以几乎零延迟访问的数据流(包括原始类型)。它是基于拉取的,只能使用一次,缺少与时间相关的操作,并且可以执行并行计算,但无法指定使用的线程池。正如 Brian Goetz 所解释的,它并没有被设计用于处理具有延迟的操作,例如 I/O 操作。而这正是 Reactor 或 RxJava 等响应式 API 发挥作用的地方。

响应式 API

Reactor 这样的响应式 API 也提供了类似 Java 8 Stream 的操作符,但它们更通用地处理任何流序列(不仅仅是集合),并且允许定义一个转换操作管道,通过方便的链式 API 并使用 Lambda 表达式将这些操作应用于经过的数据。它们被设计用于处理同步或异步操作,并允许你对数据进行缓冲、合并、连接或应用广泛的转换。

最初,响应式 API 只设计用于处理数据流,即 N 个元素,例如使用 Reactor 的 Flux

reactiveService.getResults()
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .consume(System.out::println);

但在我们为 Spring Framework 5 所做的工作中,很明显需要区分单个元素流和多个元素流,这就是为什么 Reactor 提供了 Mono 类型。MonoCompletableFuture 类型的响应式等价物,并允许提供一个统一的 API,以响应式的方式处理单个和多个元素。

Mono.any(reactiveServiceA.findRecent(time), reactiveServiceB.findRecent(time)
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> reactiveServiceC.incrementSuccess())
    .consume(System.out::println);

如果你更深入地研究 FluxMono,你会发现这些类型实现了 Reactive Streams 规范中的 Publisher 接口。

Reactive Streams

Reactor 构建于 Reactive Streams 规范之上。Reactive Streams 由 4 个简单的 Java 接口PublisherSubscriberSubscriptionProcessor)、一个 文本规范和一个 TCK 组成。它是所有现代响应式库的基石,并且对于互操作性至关重要。

Reactive Streams 的核心关注点是处理背压。简而言之,背压是一种机制,允许接收者询问它希望从发射器接收多少数据。它允许:

  • 接收者在准备好处理数据后才开始接收数据
  • 控制在途数据的量
  • 高效处理慢发射器/快接收器或快发射器/慢接收器用例
  • 如果你请求 Long.MAX_VALUE 个元素,则可以从动态的推-拉策略切换到仅基于推的策略

乍一看,Publisher 接口似乎出奇地简单;但要完全符合规范来实现它却相当困难,用户除了订阅它之外,无法对原始 Publisher 做任何事情!因此,通常最好依赖 Reactive Streams 的实现,例如 Reactor,来提供帮助。

请注意,Java 9 将把 Reactive Streams 接口包含在 java.util.concurrent.Flow 容器类中,这进一步表明了 Reactive Streams 在 JDK 中的重要性。

另外,值得注意的是,向 Reactive Streams 的融合以及 Reactor 的转换能力允许在运行时轻松有效地将一种响应式类型转换为另一种。

结论

我希望这篇博文能帮助你更好地理解响应式类型。

我们正在 Spring Framework、Spring Boot、Spring Data、Spring Security 和 Spring Cloud 等多个 Spring 项目中,使用 Reactor MonoFlux 等类型来开发响应式支持。

但你即将使用的响应式应用程序也将直接使用这些类型,例如在 @Repository@Service@Controller 方法级别,因为构建响应式应用程序意味着使用响应式语义,你必须处理延迟或流(我们还将提供一些集成阻塞 API 的指导)。

我们将在接下来的几个月内发布更多关于响应式的博文。欢迎你通过 这个测试驱动的 Lite Rx API 实战教程 来熟悉如何使用 FluxMono,一如既往,欢迎提出宝贵意见!

如果你碰巧在五月中旬(无论如何,在巴塞罗那总是个好时候!)在巴塞罗那,请不要错过参加 Spring I/O 大会的机会。此外,SpringOne Platform(八月初,拉斯维加斯)的注册最近已经开放,如果你想享受早鸟票价。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有