抢先一步
VMware 提供培训和认证,助您快速提升技能。
了解更多这篇博文是文章系列的第一篇,旨在深入了解 Reactor 的更高级概念和内部工作原理。
它源于我的 Flight of the Flux
演讲,我发现其内容更适合博客文章格式。
其他文章发布后,我会在下面的表格中更新链接,但以下是计划的内容
如果您缺少对 Reactive Streams 和 Reactor 基本概念的介绍,请访问站点的 学习部分 和 参考指南。
事不宜迟,让我们开始吧
当您第一次学习 JVM 上的 Reactive Streams 和 响应式编程 时,您首先学习的是 Publisher
和 Subscriber
之间的高级关系:一个生成数据,另一个消费数据。很简单,对吧?此外,似乎 Publisher
会将数据推送到 Subscriber
。
但是当使用 Reactor(或 RxJava2)等 Reactive Streams 库时,您会很快遇到以下格言
在您订阅之前,什么也不会发生
有时,您可能会读到这两个库都实现了“推拉混合模型”。等等!拉取?
我们稍后再讨论它,但要理解这句话,您首先需要意识到,默认情况下,Reactor 的响应式类型是惰性的。
在 Flux
或 Mono
上调用方法(操作符)不会立即触发行为。相反,会返回 Flux
(或 Mono
)的新实例,您可以在其上继续组合更多操作符。因此,您创建了一个操作符链(或操作符无环图),它表示您的异步处理管道。
这个声明式阶段称为组装时间。
让我们举一个客户端应用程序向服务器发出 HTTP 请求,并期望获得 HttpResponse 的例子
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
这可以使用流畅的 API 进行简化
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
声明管道完成后,有两种情况:要么您将表示处理管道的 Flux
/Mono
传递给另一段代码,要么您触发管道。
前者意味着您返回 Mono
的代码可能会应用其他操作符,从而产生派生的新管道。由于操作符创建新实例(就像洋葱一样),因此您自己的 Mono
不会被修改,因此可以使用多种不同的结果对其进行多次修饰
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
将其与 CompletableFuture
进行比较,后者本质上不是惰性的:一旦您拥有对 CompletableFuture
的引用,就意味着处理已经开始……
考虑到这一点,让我们了解如何触发响应式管道。
到目前为止,我们已经组装了一个异步管道。也就是说,我们通过使用操作符实例化了 Flux
和 Mono
变量,这会导致其他 Flux
/Mono
的行为像洋葱一样分层。
但是数据还没有开始流过这些已声明的管道中的每一个。
这是因为数据流动的触发器不是管道的声明,而是对它的订阅。记住
在您订阅之前,什么也不会发生
订阅是指“好的,这个管道表示数据的转换,我对数据的最终形式感兴趣”的行为。最常见的做法是调用 Flux.subscribe(valueConsumer, errorConsumer)
。
这种兴趣信号会反向传播到操作符链中,直到源操作符,即实际生成初始数据的 Publisher
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
Flux
,表明我们希望将每个单词打印到控制台(并打印任何错误的堆栈跟踪)flatMapMany
步骤……map
步骤……map
步骤……makeHttpRequest()
(我们将其视为我们的源)此时,源被触发。它以适当的方式生成数据:这里它将向生成 JSON 的端点发出 HTTP 请求,然后发出 HTTP 响应。
从那时起,我们就进入了执行时间。数据已开始流经管道(以更自然的自上而下的顺序,或上游到下游)
HttpResponse
发射到 parseJson
map
getString
map
flatMapMany
flatMapMany
将引用拆分为单词并分别发出每个单词subscribe
中的值处理程序会收到每个单词的通知,并将它们逐行打印到控制台希望这可以帮助您了解组装时间和订阅/执行时间之间的区别!
在解释差异并介绍这个格言之后,现在是时候介绍一个例外了 :laughing
在您订阅之前,什么也不会发生……直到某些事情发生
到目前为止,我们一直在处理一种名为冷 Publisher
的 Flux
和 Mono
源。正如我们所解释的,这些 Publisher
是惰性的,只有在存在 Subscription
时才会生成数据。此外,它们会为每个单独的 Subscription
重新生成数据。
在我们的 HTTP 响应 Mono
示例中,将为每个订阅执行 HTTP 请求
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
顺便说一句,某些操作符的行为意味着多个订阅。例如,retry
在发生错误(onError
信号)时会重新订阅其源,而 repeat
对 onComplete
信号执行相同的操作。
因此,对于像 HTTP 请求这样的冷源,类似 retry
的操作将重新执行请求,从而能够从瞬态服务器端错误中恢复,例如。
另一方面,热 Publisher
没有那么明确:它不一定需要 Subscriber
才能开始泵送数据。它也不一定为每个新的 Subscriber
重新生成专用数据。
为了说明这一点,让我们介绍一个新的冷发布者示例,然后我们将展示如何将该冷发布者转换为热发布者
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
这将打印
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
我们可以通过调用 share()
将 clockTicks
源转换为热源
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
它会产生以下结果
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
您会看到这两个订阅现在共享时钟的相同刻度。share()
通过让源将元素多播到新的 Subscribers
来将冷转换为热,但仅限于这些新订阅后发出的元素。由于 clock2
在 2 秒后订阅,因此它错过了早期的发射 1s
和 2s
。
因此,热发布者可以不那么惰性,即使它们通常至少需要一个初始 Subscription
来触发数据流。
在本文中,我们学习了实例化 Flux
/链接操作符(也称为组装时间)、触发它(也称为订阅时间)和执行它(也称为执行时间)之间的区别。
因此,我们了解到 Flux
和 Mono
大多是惰性的(也称为冷 Publisher
):在您订阅它们之前,什么也不会发生。
最后,我们了解了 Flux
和 Mono
的另一种类型,称为热 Publisher
,它的行为略有不同,并且不那么惰性。
在下一期中,我们将了解这三个阶段为何会对您作为开发人员调试基于 Reactor 的代码的方式产生重大影响。
在此期间,祝您反应式编程愉快!