理解响应式类型

工程 | Sébastien Deleuze | 2016 年 4 月 19 日 | ...

继之前的 Reactive SpringReactor Core 3.0 博客文章之后,我想解释一下为什么响应式类型很有用,以及它们与其他异步类型的比较,这些比较基于我们在 Spring Framework 5 即将来临的响应式支持方面获得的经验。

为什么使用响应式类型?

响应式类型并非旨在让您更快地处理请求或数据,事实上,与常规阻塞处理相比,它们会引入少量开销。它们的优势在于能够并发地服务更多请求,以及更有效地处理存在延迟的操作,例如从远程服务器请求数据。它们允许您通过以原生方式处理时间和延迟而无需消耗更多资源来提供更好的服务质量和可预测的容量规划。与在等待结果时阻塞当前线程的传统处理方式不同,响应式 API 等待不会产生任何成本,仅请求其能够处理的数据量,并带来新功能,因为它处理的是数据流,而不仅仅是一个接一个地处理单个元素。

Java 8 之前

在 Java 8 之前,异步非阻塞行为的实现至少有两个原因并不明显。第一个原因是基于回调的 API 需要冗长的匿名类,并且难以进行链式调用。第二个原因是 Future 类型是异步的,**但是**当您尝试使用 get() 方法获取结果时,它会阻塞当前线程直到计算完成。这就是为什么 Spring Framework 4.0 引入了 ListenableFuture,一个添加了非阻塞基于回调功能的 Future 实现。

Lambda 表达式、CompletableFuture 和 Stream

然后 Java 8 引入了 Lambda 表达式和 CompletableFuture。Lambda 表达式允许编写简洁的回调,而 CompletionStage 接口和 CompletableFuture 类最终允许以非阻塞方式和基于推送的方式处理 Future,同时提供链式调用此类延迟结果处理的功能。

Java 8 还引入了 Stream,它旨在有效地处理可以以零或极少延迟访问的数据流(包括基本类型)。它是基于拉取的,只能使用一次,缺少与时间相关的操作,并且可以执行并行计算,但无法指定要使用的线程池。正如 Brian Goetz 所解释的,它并非旨在处理存在延迟的操作,例如 I/O 操作。这就是 Reactor 或 RxJava 等响应式 API 发挥作用的地方。

响应式 API

诸如 Reactor 之类的响应式 API 也提供类似 Java 8 Stream 的操作符,但它们更普遍地适用于任何流序列(而不仅仅是集合),并允许定义一个转换操作流水线,该流水线将应用于通过它的数据,这要归功于方便的流畅 API 和使用 Lambda 表达式。它们旨在处理同步或异步操作,并允许您缓冲、合并、连接或对您的数据应用各种转换。

最初,响应式 API 仅设计用于处理数据流,即 N 个元素,例如,使用 Reactor 的 Flux

reactiveService.getResults()
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .consume(System.out::println);

但在我们对 Spring Framework 5 的工作中,很明显需要区分 1 个或 N 个元素的流,这就是 Reactor 提供 Mono 类型的原因。MonoCompletableFuture 类型的响应式等价物,并允许以响应式方式为处理单个和多个元素提供一致的 API。

Mono.any(reactiveServiceA.findRecent(time), reactiveServiceB.findRecent(time)
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> reactiveServiceC.incrementSuccess())
    .consume(System.out::println);

如果您更深入地了解 FluxMono,您会注意到这些类型实现了来自 Reactive Streams 规范的 Publisher 接口。

Reactive Streams

Reactor 基于 Reactive Streams 规范。Reactive Streams 由 4 个简单的 Java 接口PublisherSubscriberSubscriptionProcessor)、文本规范TCK 组成。它是每个现代响应式库的基石,并且对于互操作性目的来说是必不可少的。

Reactive Streams 的核心关注点是处理背压。简而言之,背压是一种机制,允许接收方询问它希望从发射方接收多少数据。它允许

  • 接收方仅在其准备好处理数据时开始接收数据
  • 控制数据在传输中的数量
  • 有效地处理慢发射方/快接收方或快发射方/慢接收方用例
  • 如果您请求 Long.MAX_VALUE 个元素,则从动态推拉策略切换到仅基于推送的策略

乍一看,Publisher 接口似乎非常简单易于实现;但要完全符合规范地做到这一点却非常困难,并且用户除了订阅它之外,无法对原始 Publisher 做任何事情!因此,通常最好依靠 Reactive Streams 实现(例如 Reactor)来帮助您解决此问题。

请注意,Java 9 将包含 java.util.concurrent.Flow 容器类中的 Reactive Streams 接口,进一步表明 Reactive Streams 在 JDK 中的相关性。

还需要注意的是,向 Reactive Streams 和 Reactor 转换功能 的融合允许在运行时轻松有效地从一种响应式类型转换为另一种响应式类型。

结论

希望这篇博文能帮助您更好地理解响应式类型。

我们正在使用 Reactor 的 MonoFlux 等类型在各种 Spring 项目(如 Spring Framework、Spring Boot、Spring Data、Spring Security 和 Spring Cloud)中进行响应式支持。

但是您即将推出的响应式应用程序也将直接使用这些类型,例如在 @Repository@Service@Controller 方法级别,因为构建响应式应用程序意味着在您必须处理延迟或流时使用响应式语义(我们还将提供一些指导来集成阻塞 API)。

我们将在未来几个月发布更多关于响应式主题的博文。欢迎您熟悉 这个测试驱动的 Lite Rx API 实践指南,它将教会您如何使用 FluxMono,并且像往常一样,欢迎您提供反馈!

如果您碰巧在 5 月中旬在巴塞罗那(无论如何,在巴塞罗那度过时光永远不会是一个坏主意!),不要错过参加 Spring I/O 大会 的机会。此外,SpringOne Platform(8 月初,拉斯维加斯)的注册最近已经开放,如果您想享受早鸟票价,可以考虑一下。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部