领先一步
VMware 提供培训和认证,助你加速进步。
了解更多这篇博文是系列文章的第一篇,旨在深入探讨 Reactor 更高级的概念和内部工作原理。
它源自我关于 Flight of the Flux 的演讲,我发现其内容更适合博客文章的形式。
其他文章发布后,我会更新下面的表格,但以下是计划中的内容:
如果你还不了解 Reactive Streams 和 Reactor 的基本概念,请前往网站的学习区和参考指南。
事不宜迟,我们开始吧
初次接触 JVM 上的 Reactive Streams 和响应式编程时,你首先了解的是 Publisher
和 Subscriber
之间的高级关系:一个产生数据,另一个消费数据。很简单对吧?此外,Publisher
似乎将数据 推送 给 Subscriber
。
但在使用像 Reactor (或 RxJava2) 这样的 Reactive Streams 库时,你很快就会遇到这句格言:
不订阅,一切都不会发生
有时,你可能会读到这两个库实现了“推-拉混合模型”。等一下!拉?
稍后我们会回到这一点,但要理解这句话,首先需要认识到,默认情况下,Reactor 的响应式类型是 懒惰的。
调用 Flux
或 Mono
上的方法(即 操作符)不会立即触发行为。相反,它会返回一个新的 Flux
(或 Mono
)实例,你可以在其上继续组合更多操作符。因此,你创建了一个 操作符链(或操作符的非循环图),它代表你的 异步处理管道。
这个 声明式 阶段称为 组装时机。
我们来看一个例子,客户端应用向服务器发起 HTTP 请求,期望获得 HttpResponse:
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
使用流畅 API 可以简化如下:
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
声明完管道后,有两种情况:要么将表示处理管道的 Flux
/Mono
传递给另一段代码,要么触发管道。
前一种情况意味着你返回 Mono
的代码可能会应用其他操作符,从而产生一个新的派生管道。由于操作符创建新实例(就像洋葱一样),你自己的 Mono
不会被修改,因此它可以被多次进一步装饰,产生截然不同的结果:
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
与 CompletableFuture
相比,CompletableFuture
本质上不是懒惰的:一旦你获得了 CompletableFuture
的引用,就意味着处理已经在进行中了……
考虑到这一点,让我们来看看如何触发响应式管道。
到目前为止,我们已经 组装了一个异步管道。也就是说,我们通过使用操作符实例化了 Flux
和 Mono
变量,从而生成了行为像洋葱一样层层叠加的其他 Flux
/Mono
实例。
但数据还没有开始流经这些声明好的管道。
这是因为数据流动的触发点不是管道的声明,而是对它的 订阅。请记住:
不订阅,一切都不会发生
订阅的行为就是在说“好的,这个管道代表数据的转换,我对最终形式的数据感兴趣”。最常见的方式是调用 Flux.subscribe(valueConsumer, errorConsumer)
。
这种兴趣信号会通过操作符链反向传播,直到源操作符,即实际产生初始数据的 Publisher
:
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
Flux
,表示希望将每个单词打印到控制台(并打印任何错误的堆栈跟踪)。flatMapMany
步骤……map
步骤……map
步骤……makeHttpRequest()
(我们将其视为源)。此时,源被触发。它以适当的方式生成数据:在这里,它会向一个产生 JSON 的端点发起 HTTP 请求,然后发出 HTTP 响应。
从那时起,我们处于 执行时机。数据已经开始流经管道(按照更自然的从上到下顺序,即从上游到下游)。
HttpResponse
被发出到 parseJson map
。map
。flatMapMany
。flatMapMany
将引用拆分成单词,并单独发出每个单词。subscribe
中的值处理器会收到每个单词的通知,并将它们打印到控制台,每行一个。希望这能帮助你理解组装时机、订阅/执行时机之间的区别!
解释完区别并介绍了这句格言之后,可能是引入一个例外的好时机 :laughing
不订阅,一切都不会发生……直到某些事情发生
到目前为止,我们一直在处理一种称为 冷流 Publisher 的 Flux
和 Mono
源。正如我们所解释的,这些 Publisher
是懒惰的,只有在有 Subscription
时才会生成数据。此外,它们会为每个独立的 Subscription
重新生成数据。
在我们关于 HTTP 响应 Mono
的例子中,每次订阅都会执行 HTTP 请求。
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
顺带一提,某些操作符的行为意味着多次订阅。例如,retry
会在发生错误(onError
信号)时重新订阅其源,而 repeat
会在 onComplete
信号时做同样的事情。
因此,对于像 HTTP 请求这样的冷流源,像 retry
这样的操作符会重新执行请求,从而可以从短暂的服务器端错误中恢复过来。
另一方面,热流 Publisher 则不那么明确:它不一定需要 Subscriber
来开始推送数据。它也不一定为每个新的 Subscriber
重新生成专门的数据。
为了说明这一点,我们引入一个新的冷流 publisher 示例,然后展示如何将该冷流 publisher 转换为热流:
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
输出如下:
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
我们可以通过调用 share()
将 clockTicks
源转换为热流。
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
结果变为如下:
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
你会看到这两个订阅现在共享时钟的相同 tick。share()
将冷流转换为热流,通过让源将元素多播给新的 Subscribers
,但仅多播在这些新订阅之后发出的元素。由于 clock2
晚订阅了 2 秒,它错过了早期的 1s
和 2s
的发出。
所以热流 publishers 可以不那么懒惰,尽管它们通常至少需要一个初始 Subscription
来触发数据流动。
在本文中,我们了解了实例化 Flux
/ 链式调用操作符(即 组装时机)、触发它(即 订阅时机)和执行它(即 执行时机)之间的区别。
因此,我们了解到 Flux
和 Mono
大部分是懒惰的(即 冷流 Publisher):不订阅,一切都不会发生。
最后,我们了解了另一种风味的 Flux
和 Mono
,称为 热流 Publisher,它的行为略有不同,并且不那么懒惰。
在下一篇中,我们将看到这三个阶段为何在你调试基于 reactor 的代码时会产生重大差异。
祝你响应式编程愉快!