Flux 1 的飞行 - 组装与订阅

工程 | Simon Baslé | 2019 年 3 月 6 日 | ...

这篇博文是文章系列的第一篇,旨在深入了解 Reactor 的更高级概念和内部工作原理。

它源于我的 Flight of the Flux 演讲,我发现其内容更适合博客文章格式。

其他文章发布后,我会在下面的表格中更新链接,但以下是计划的内容

  1. 组装与订阅(本文)
  2. 调试注意事项
  3. 线程和调度器跳转
  4. 内部工作原理:工作窃取
  5. 内部工作原理:操作符融合

如果您缺少对 Reactive Streams 和 Reactor 基本概念的介绍,请访问站点的 学习部分参考指南

事不宜迟,让我们开始吧

组装时间

当您第一次学习 JVM 上的 Reactive Streams响应式编程 时,您首先学习的是 PublisherSubscriber 之间的高级关系:一个生成数据,另一个消费数据。很简单,对吧?此外,似乎 Publisher 会将数据推送到 Subscriber

但是当使用 Reactor(或 RxJava2)等 Reactive Streams 库时,您会很快遇到以下格言

在您订阅之前,什么也不会发生

有时,您可能会读到这两个库都实现了“推拉混合模型”。等等!拉取

我们稍后再讨论它,但要理解这句话,您首先需要意识到,默认情况下,Reactor 的响应式类型是惰性的

FluxMono 上调用方法(操作符)不会立即触发行为。相反,会返回 Flux(或 Mono)的新实例,您可以在其上继续组合更多操作符。因此,您创建了一个操作符链(或操作符无环图),它表示您的异步处理管道

这个声明式阶段称为组装时间

让我们举一个客户端应用程序向服务器发出 HTTP 请求,并期望获得 HttpResponse 的例子

Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made

这可以使用流畅的 API 进行简化

Mono<String> quote = makeHttpRequest()
    .map(req -> parseJson(req))
    .map(json -> json.getString("quote"));

声明管道完成后,有两种情况:要么您将表示处理管道的 Flux/Mono 传递给另一段代码,要么您触发管道。

前者意味着您返回 Mono 的代码可能会应用其他操作符,从而产生派生的新管道。由于操作符创建新实例(就像洋葱一样),因此您自己的 Mono 不会被修改,因此可以使用多种不同的结果对其进行多次修饰

//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);

//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

//by this point, none of the 3 "pipelines" have triggered an HTTP request

将其与 CompletableFuture 进行比较,后者本质上不是惰性的:一旦您拥有对 CompletableFuture 的引用,就意味着处理已经开始……

考虑到这一点,让我们了解如何触发响应式管道。

订阅时间

到目前为止,我们已经组装了一个异步管道。也就是说,我们通过使用操作符实例化了 FluxMono 变量,这会导致其他 Flux/Mono 的行为像洋葱一样分层。

但是数据还没有开始流过这些已声明的管道中的每一个。

这是因为数据流动的触发器不是管道的声明,而是对它的订阅。记住

在您订阅之前,什么也不会发生

订阅是指“好的,这个管道表示数据的转换,我对数据的最终形式感兴趣”的行为。最常见的做法是调用 Flux.subscribe(valueConsumer, errorConsumer)

这种兴趣信号会反向传播到操作符链中,直到操作符,即实际生成初始数据的 Publisher

makeHttpRequest() //<5>
    .map(req -> parseJson(req)) //<4>
    .map(json -> json.getString("quote")) //<3>
    .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
    .subscribe(System.out::println, Throwable::printStackTrace); //<1>
  1. 我们订阅了单词 Flux,表明我们希望将每个单词打印到控制台(并打印任何错误的堆栈跟踪)
  2. 该兴趣信号传递到 flatMapMany 步骤……
  3. ……它将信号向上传递到 json map 步骤……
  4. ……然后是请求 map 步骤……
  5. ……最终到达 makeHttpRequest()(我们将其视为我们的源)

此时,源被触发。它以适当的方式生成数据:这里它将向生成 JSON 的端点发出 HTTP 请求,然后发出 HTTP 响应。

从那时起,我们就进入了执行时间。数据已开始流经管道(以更自然的自上而下的顺序,或上游下游

  1. HttpResponse 发射到 parseJson map
  2. 它提取 JSON 主体并将其发射到 getString map
  3. 它提取引用并将它传递给 flatMapMany
  4. flatMapMany 将引用拆分为单词并分别发出每个单词
  5. subscribe 中的值处理程序会收到每个单词的通知,并将它们逐行打印到控制台

希望这可以帮助您了解组装时间和订阅/执行时间之间的区别!

冷与热

在解释差异并介绍这个格言之后,现在是时候介绍一个例外了 :laughing

在您订阅之前,什么也不会发生……直到某些事情发生

到目前为止,我们一直在处理一种名为PublisherFluxMono 源。正如我们所解释的,这些 Publisher 是惰性的,只有在存在 Subscription 时才会生成数据。此外,它们会为每个单独的 Subscription 重新生成数据。

在我们的 HTTP 响应 Mono 示例中,将为每个订阅执行 HTTP 请求

Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request

顺便说一句,某些操作符的行为意味着多个订阅。例如,retry 在发生错误(onError 信号)时会重新订阅其源,而 repeatonComplete 信号执行相同的操作。

因此,对于像 HTTP 请求这样的冷源,类似 retry 的操作将重新执行请求,从而能够从瞬态服务器端错误中恢复,例如。

另一方面,Publisher 没有那么明确:它不一定需要 Subscriber 才能开始泵送数据。它也不一定为每个新的 Subscriber 重新生成专用数据。

为了说明这一点,让我们介绍一个新的冷发布者示例,然后我们将展示如何将该发布者转换为发布者

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

这将打印

clock1 1s
clock1 2s
clock1 3s
    clock2 1s
clock1 4s
    clock2 2s
clock1 5s
    clock2 3s
clock1 6s
    clock2 4s

我们可以通过调用 share()clockTicks 源转换为热源

Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

它会产生以下结果

clock1 1s
clock1 2s
clock1 3s
    clock2 3s
clock1 4s
    clock2 4s
clock1 5s
    clock2 5s
clock1 6s
    clock2 6s

您会看到这两个订阅现在共享时钟的相同刻度。share() 通过让源将元素多播到新的 Subscribers 来将冷转换为热,但仅限于这些新订阅后发出的元素。由于 clock2 在 2 秒后订阅,因此它错过了早期的发射 1s2s

因此,热发布者可以不那么惰性,即使它们通常至少需要一个初始 Subscription 来触发数据流。

结论

在本文中,我们学习了实例化 Flux/链接操作符(也称为组装时间)、触发它(也称为订阅时间)和执行它(也称为执行时间)之间的区别。

因此,我们了解到 FluxMono 大多是惰性的(也称为Publisher):在您订阅它们之前,什么也不会发生

最后,我们了解了 FluxMono 的另一种类型,称为Publisher,它的行为略有不同,并且不那么惰性。

在下一期中,我们将了解这三个阶段为何会对您作为开发人员调试基于 Reactor 的代码的方式产生重大影响。

在此期间,祝您反应式编程愉快!

获取 Spring Newsletter

关注 Spring Newsletter

订阅

抢先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部