领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在这篇文章中,我们将继续关于响应式编程的系列文章,重点不再是学习基本 API,而是更关注具体的用例和编写真正有用的代码。我们将看到响应式是如何成为并发编程的有用抽象的,但也看到它具有一些非常底层的特性,我们应该以尊重和谨慎的态度对待它们。如果我们开始充分发挥这些特性的潜力,我们就可以控制应用程序中以前不可见的层,这些层隐藏在容器、平台和框架中。
成为响应式迫使你以不同的方式看待世界。与其请求某些东西并获取它(或无法获取它),所有内容都作为序列 (Publisher
) 传递,并且你必须订阅它。与其等待答案,你必须注册回调。当你习惯了它后,它并不难,但是除非整个世界发生翻天覆地的变化并变得响应式,否则你会发现你需要与旧式的阻塞式 API 进行交互。
假设我们有一个返回 HttpStatus
的阻塞式方法
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
并且我们想用不同的参数重复调用它并聚合结果。这是一个经典的“散布-收集”用例,例如,如果你有一个需要汇总多个页面上的“前 N”项的分页后端,你就会遇到这种情况。由于非响应式(阻塞式)操作的细节与散布-收集模式无关,我们可以将其推送到名为 block()
的方法中,并在稍后实现它。这是一个(糟糕的)示例,它调用后端并将结果聚合到 Result
类型的对象中。
Flux.range(1, 10) // (1)
.log()
.map(this::block) // (2)
.collect(Result::new, Result::add) // (3)
.doOnSuccess(Result::stop) // (4)
进行 10 次调用
此处为阻塞代码
收集结果并聚合成单个对象
最后停止计时器(结果是 Mono<Result>
)
不要在家这样做。这是一个“糟糕的”示例,因为虽然 API 在技术上被正确使用,但我们知道它将阻塞调用线程;此代码或多或少等同于一个 for 循环,循环体中包含对 block()
的调用。更好的实现将对 block()
的调用推送到后台线程。我们可以通过将其包装在一个返回 Mono<HttpStatus>
的方法中来做到这一点。
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) // (1)
.subscribeOn(this.scheduler); // (2)
}
此处为阻塞代码,位于 Callable 中以延迟执行
在后台线程上订阅慢速发布者
scheduler
作为共享字段单独声明:Scheduler scheduler = Schedulers.parallel()
。然后我们可以声明我们希望 flatMap()
序列而不是使用 map()
。
Flux.range(1, 10)
.log()
.flatMap( // (1)
this::fetch, 4) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
下降到一个新的发布者以并行处理
flatMap 中的并发提示
如果我们想在非响应式环境中运行上面的散布-收集代码,我们可以使用 Spring MVC,如下所示
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
如果你阅读 @RequestMapping
的 Javadoc,你会发现一个方法可以返回一个 CompletableFuture
,“应用程序使用它在它自己选择的单独线程中生成返回值”。在这种情况下,单独的线程由“scheduler”(一个线程池)提供,因此处理发生在多个线程上,一次 4 个,因为 flatMap()
的调用方式。
带有后台线程的散布-收集是一个有用的模式,但它并不完美——它没有阻塞调用方,但它阻塞了一些东西,所以它只是将问题转移到了别处。有一些实际的影响。我们有一个 HTTP 服务器,它具有(可能是)非阻塞式 IO 处理程序,将工作传递回线程池,每个 HTTP 请求一个线程——所有这些都发生在 servlet 容器(例如 Tomcat)内部。请求被异步处理,因此 Tomcat 中的工作线程没有被阻塞,并且我们在“scheduler”中创建的线程池最多在 4 个并发线程上进行处理。我们正在处理 10 个后端请求(对 block()
的调用),因此使用 scheduler 的最大理论优势是延迟降低了 4 倍。换句话说,如果在单个线程中按顺序处理所有 10 个请求需要 1000 毫秒,我们可能会看到我们 HTTP 服务的单个传入请求的处理时间为 250 毫秒。不过,我们应该强调“可能”:只有在没有争用处理线程的情况下(在两个阶段,Tomcat 工作线程和应用程序调度程序),它才能达到如此快的速度。如果你有一个具有大量内核、非常低并发性(即连接到你的应用程序的客户端数量很少)并且几乎没有两个客户端同时发出请求的可能性,那么你可能会看到接近理论上的改进。一旦有多个客户端尝试连接,它们将争用相同的 4 个线程,延迟将增加,甚至可能比没有后台处理的单个客户端经历的延迟更糟。我们可以通过使用更大的线程池创建调度程序来提高并发客户端的延迟,例如。
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
(16 个线程。)现在我们为线程及其堆栈使用了更多的内存,并且我们可以预期在低并发性下看到更低的延迟,但在硬件内核少于 16 个的情况下,在高并发性下不一定能看到更低的延迟。我们也不期望在负载下获得更高的吞吐量:如果存在线程争用,管理这些资源的成本很高,这必须反映在某个重要的指标中。如果你有兴趣对这种权衡进行更详细的分析,可以在 Rob Harrop 的博客系列中找到关于性能指标如何在负载下扩展的一些详细分析。
提示
Tomcat 默认分配 100 个线程来处理 HTTP 请求。如果我们知道所有处理都将在我们的调度程序线程池上进行,那么这是过度的。存在阻抗不匹配:调度程序线程池可能成为瓶颈,因为它拥有的线程少于上游 Tomcat 线程池。这突出了这样一个事实,即性能调整可能非常困难,并且,虽然你可能控制着所有配置,但这是一个微妙的平衡。
如果我们使用根据需求调整其容量的调度程序,我们可以做得比固定线程池更好。Reactor 为此提供了一个便利,因此,如果你使用 Schedulers.elastic()
尝试相同的代码(你可以在任何地方调用它;只有一个实例),你会看到在负载下会创建更多线程。
从阻塞到响应式之间的桥接是一个很有用的模式,并且使用 Spring MVC 中现有的技术很容易实现(如上所示)。响应式旅程的下一阶段是完全摆脱应用程序线程中的阻塞,而做到这一点需要新的 API 和新的工具。最终,我们必须在整个堆栈中都保持响应式,包括服务器和客户端。这是 Spring Reactive 的目标,它是一个新的框架,与 Spring MVC 正交,但满足相同的需求,并使用类似的编程模型。
注意
Spring Reactive 最初是一个独立的项目,但在 5.0 版本(2016 年 6 月第一个里程碑版本)中被整合到 Spring Framework 中。
在我们散布-收集示例中,完全实现响应式的第一步是将类路径上的 spring-boot-starter-web
替换为 spring-boot-starter-webflux
。在 Maven 中
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
或在 Gradle 中
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
...
}
然后在控制器中,我们可以简单地去除到 CompletableFuture
的桥接,并返回类型为 Mono
的对象
@RequestMapping("/parallel")
public Mono<Result> parallel() {
return Flux.range(1, 10)
.log()
.flatMap(this::fetch, 4)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
将此代码放入 Spring Boot 应用程序中,它将在 Tomcat、Jetty 或 Netty 上运行,具体取决于它在类路径上找到的内容。Tomcat 是该启动器中的默认服务器,因此如果您想切换,则必须将其排除并提供另一个服务器。这三个服务器在启动时间、内存使用和运行时资源使用方面具有非常相似的特性。
我们仍然在 block()
中有阻塞的后端调用,因此我们仍然必须在线程池上订阅以避免阻塞调用方。如果我们有一个非阻塞客户端,我们可以更改它,例如,而不是使用 RestTemplate
,我们使用新的 WebClient
,然后我们可以这样做以使用非阻塞客户端
private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
private Mono<HttpStatus> fetch(int value) {
return this.client.perform(HttpRequestBuilders.get("http://example.com"))
.extract(WebResponseExtractors.response(String.class))
.map(response -> response.getStatusCode());
}
请注意,WebClient.perform()
(或更准确地说是 WebResponseExtractor
)具有响应式返回类型,我们已将其转换为 Mono<HttpStatus>
,但我们尚未订阅它。我们希望框架执行所有订阅操作,因此现在我们一直都是响应式的。
警告
Spring Reactive 中返回 Publisher
的方法**是非阻塞的**,但通常,返回 Publisher
(或 Flux
、Mono
或 Observable
)的方法只是一个提示,表明它可能是非阻塞的。如果您正在编写此类方法,务必分析(最好是测试)它们是否阻塞,并在可能阻塞时明确告知调用方。
注意
我们刚才使用非阻塞客户端来简化 HTTP 堆栈的技巧在普通的 Spring MVC 中也适用。上面 fetch()
方法的结果可以转换为 CompletableFuture
并从普通的 @RequestMapping
方法(例如在 Spring Boot 1.3 中)传递出去。
现在,我们可以删除 HTTP 请求处理程序中 fetch()
调用后的并发提示
@RequestMapping("/netty")
public Mono<Result> netty() {
return Flux.range(1, 10) // (1)
.log() //
.flatMap(this::fetch) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
进行 10 次调用
下降到一个新的发布者以并行处理
考虑到我们根本不需要额外的可调用线程和订阅线程,这段代码比我们必须桥接到阻塞客户端时要简洁得多,这可以归因于代码从头到尾都是响应式的。响应式 WebClient
返回一个 Mono
,这立即促使我们在转换链中选择 flatMap()
,我们需要的代码自然就出现了。编写起来体验更好,可读性也更高,因此更易于维护。此外,由于没有线程池和并发提示,因此无需将 4 的神奇因子插入到我们的性能预期中。某个地方存在限制,但它不再由我们在应用程序层的选择施加,也不受服务器“容器”中的任何内容限制。它不是魔法,物理定律仍然存在,因此后端调用仍然需要大约 100 毫秒,但如果争用较低,我们甚至可能看到所有 10 个请求在大约与一个请求相同的时间内完成。随着服务器负载的增加,延迟和吞吐量自然会下降,但下降的方式受缓冲区争用和内核网络控制,而不是应用程序线程管理。这是一种控制反转,到应用程序代码以下的较低堆栈级别。
请记住,相同的应用程序代码可以在 Tomcat、Jetty 或 Netty 上运行。目前,Tomcat 和 Jetty 的支持是在 Servlet 3.1 异步处理的基础上提供的,因此它不再局限于每个线程一个请求。它构建在自适应 Servlet 3.1 概念以适应响应式范式的响应式桥接之上。在 Reactor Netty 的情况下,背压和响应式支持是内置的。根据您选择的 HTTP 客户端库,服务器和客户端可能共享相同的 HTTP 资源并进一步优化内容。我们将在本系列的另一篇文章中对此进行详细介绍。
提示
在示例代码中,“reactive”示例具有 Maven 配置文件“tomcat”、“tomcatNext”(用于 Tomcat 8.5)、“jetty”和“netty”,因此您可以轻松尝试所有不同的服务器选项,而无需更改一行代码。
注意
许多应用程序中的阻塞代码不是 HTTP 后端调用,而是数据库交互。目前,很少有数据库支持非阻塞客户端(MongoDB 和 Couchbase 是值得注意的例外,但即使这些客户端也没有 HTTP 客户端那么成熟)。在所有数据库供应商在客户端方面赶上之前,线程池和阻塞到响应式模式将长期存在。
我们已经将基本的散布-收集用例简化到代码非常简洁,并且非常适合其运行的硬件。我们编写了一些简单的代码,并且它被很好地堆叠和编排成一个使用 Spring 的工作 HTTP 服务。在阳光明媚的日子里,每个人都对结果非常满意。但是,一旦出现错误,例如网络连接行为不端,或后端服务延迟较差,我们就会受到影响。
首先,最明显的痛苦是,我们编写的代码是声明式的,因此难以调试。发生错误时,诊断信息可能非常不透明。使用原始的底层 API,如没有 Spring 的 Reactor,甚至降到没有 Reactor 的 Netty 层级,可能会使情况变得更糟,因为那样的话,我们必须自己构建许多错误处理,每次与网络交互时都要重复样板代码。至少在 Spring 和 Reactor 的混合中,我们可以预期会看到对未捕获异常的堆栈跟踪日志。但是,它们可能不容易理解,因为它们发生在不受我们控制的线程上,并且有时它们表现为来自堆栈不熟悉部分的非常底层的关注点。
另一个痛苦的来源是,如果我们犯了错误并在我们的某个响应式回调中阻塞,我们将阻塞**所有**相同线程上的请求。对于基于 servlet 的容器,每个请求都隔离到一个线程,并且阻塞不会阻止其他请求,因为它们将在不同的线程上处理。阻塞所有请求仍然是麻烦的根源,但它只会表现为延迟增加,每个请求的延迟大致保持不变。在响应式世界中,阻塞单个请求会导致所有请求的延迟增加,而阻塞所有请求会导致服务器瘫痪,因为额外的缓冲区和线程层不存在来弥补空缺。
能够控制异步处理中的所有活动部件是一件好事:每一层都有线程池大小和队列。我们可以使其中一些层具有弹性,并尝试根据它们执行的工作量进行调整。但在某些时候,它会成为负担,我们开始寻找更简单或更精简的东西。对可扩展性的分析得出的结论是,通常最好放弃额外的线程,并使用物理硬件施加的约束。这是一个“机械同情”的例子,正如 LMAX 在Disruptor 模式中所做的那样,取得了巨大的成功。
我们已经开始看到响应式方法的强大功能,但请记住,权力伴随着责任。它具有根本性和激进性。它属于“推倒重来”的范畴。因此,您也希望了解响应式并不是所有问题的解决方案。实际上,它不是任何问题的解决方案,它仅仅促进了特定类别问题的解决。您从中获得的好处可能超过了学习它、修改 API 以使其从头到尾都成为响应式以及之后维护代码的成本,因此请谨慎行事。