Flux 之旅 1 - 组装与订阅

工程 | Simon Baslé | 2019 年 3 月 6 日 | ...

这篇博文是系列文章的第一篇,旨在深入探讨 Reactor 更高级的概念和内部工作原理。

它源自我关于 Flight of the Flux 的演讲,我发现其内容更适合博客文章的形式。

其他文章发布后,我会更新下面的表格,但以下是计划中的内容:

  1. 组装与订阅(本文)
  2. 调试注意事项
  3. 线程跳跃与调度器
  4. 内部工作原理:任务窃取
  5. 内部工作原理:操作符融合

如果你还不了解 Reactive Streams 和 Reactor 的基本概念,请前往网站的学习区参考指南

事不宜迟,我们开始吧

组装时机

初次接触 JVM 上的 Reactive Streams 和响应式编程时,你首先了解的是 PublisherSubscriber 之间的高级关系:一个产生数据,另一个消费数据。很简单对吧?此外,Publisher 似乎将数据 推送Subscriber

但在使用像 Reactor (或 RxJava2) 这样的 Reactive Streams 库时,你很快就会遇到这句格言:

不订阅,一切都不会发生

有时,你可能会读到这两个库实现了“推-拉混合模型”。等一下!

稍后我们会回到这一点,但要理解这句话,首先需要认识到,默认情况下,Reactor 的响应式类型是 懒惰的

调用 FluxMono 上的方法(即 操作符)不会立即触发行为。相反,它会返回一个新的 Flux(或 Mono)实例,你可以在其上继续组合更多操作符。因此,你创建了一个 操作符链(或操作符的非循环图),它代表你的 异步处理管道

这个 声明式 阶段称为 组装时机

我们来看一个例子,客户端应用向服务器发起 HTTP 请求,期望获得 HttpResponse:

Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made

使用流畅 API 可以简化如下:

Mono<String> quote = makeHttpRequest()
    .map(req -> parseJson(req))
    .map(json -> json.getString("quote"));

声明完管道后,有两种情况:要么将表示处理管道的 Flux/Mono 传递给另一段代码,要么触发管道。

前一种情况意味着你返回 Mono 的代码可能会应用其他操作符,从而产生一个新的派生管道。由于操作符创建新实例(就像洋葱一样),你自己的 Mono 不会被修改,因此它可以被多次进一步装饰,产生截然不同的结果:

//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);

//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

//by this point, none of the 3 "pipelines" have triggered an HTTP request

CompletableFuture 相比,CompletableFuture 本质上不是懒惰的:一旦你获得了 CompletableFuture 的引用,就意味着处理已经在进行中了……

考虑到这一点,让我们来看看如何触发响应式管道。

订阅时机

到目前为止,我们已经 组装了一个异步管道。也就是说,我们通过使用操作符实例化了 FluxMono 变量,从而生成了行为像洋葱一样层层叠加的其他 Flux/Mono 实例。

但数据还没有开始流经这些声明好的管道。

这是因为数据流动的触发点不是管道的声明,而是对它的 订阅。请记住:

不订阅,一切都不会发生

订阅的行为就是在说“好的,这个管道代表数据的转换,我对最终形式的数据感兴趣”。最常见的方式是调用 Flux.subscribe(valueConsumer, errorConsumer)

这种兴趣信号会通过操作符链反向传播,直到源操作符,即实际产生初始数据的 Publisher

makeHttpRequest() //<5>
    .map(req -> parseJson(req)) //<4>
    .map(json -> json.getString("quote")) //<3>
    .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
    .subscribe(System.out::println, Throwable::printStackTrace); //<1>
  1. 我们订阅 words Flux,表示希望将每个单词打印到控制台(并打印任何错误的堆栈跟踪)。
  2. 这个兴趣信号被传递给 flatMapMany 步骤……
  3. ……然后沿着链向上传递给 json map 步骤……
  4. ……再然后是 request map 步骤……
  5. ……最终到达 makeHttpRequest()(我们将其视为源)。

此时,源被触发。它以适当的方式生成数据:在这里,它会向一个产生 JSON 的端点发起 HTTP 请求,然后发出 HTTP 响应。

从那时起,我们处于 执行时机。数据已经开始流经管道(按照更自然的从上到下顺序,即从上游到下游)。

  1. HttpResponse 被发出到 parseJson map
  2. 它提取 JSON 主体并将其发出到 getString map
  3. 它提取引用并将其传递给 flatMapMany
  4. flatMapMany 将引用拆分成单词,并单独发出每个单词。
  5. subscribe 中的值处理器会收到每个单词的通知,并将它们打印到控制台,每行一个。

希望这能帮助你理解组装时机、订阅/执行时机之间的区别!

冷流 vs 热流

解释完区别并介绍了这句格言之后,可能是引入一个例外的好时机 :laughing

不订阅,一切都不会发生……直到某些事情发生

冷流

到目前为止,我们一直在处理一种称为 冷流 PublisherFluxMono 源。正如我们所解释的,这些 Publisher 是懒惰的,只有在有 Subscription 时才会生成数据。此外,它们会为每个独立的 Subscription 重新生成数据。

在我们关于 HTTP 响应 Mono 的例子中,每次订阅都会执行 HTTP 请求。

Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request

顺带一提,某些操作符的行为意味着多次订阅。例如,retry 会在发生错误(onError 信号)时重新订阅其源,而 repeat 会在 onComplete 信号时做同样的事情。

因此,对于像 HTTP 请求这样的冷流源,像 retry 这样的操作符会重新执行请求,从而可以从短暂的服务器端错误中恢复过来。

热流

另一方面,热流 Publisher 则不那么明确:它不一定需要 Subscriber 来开始推送数据。它也不一定为每个新的 Subscriber 重新生成专门的数据。

为了说明这一点,我们引入一个新的冷流 publisher 示例,然后展示如何将该冷流 publisher 转换为热流:

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

输出如下:

clock1 1s
clock1 2s
clock1 3s
    clock2 1s
clock1 4s
    clock2 2s
clock1 5s
    clock2 3s
clock1 6s
    clock2 4s

我们可以通过调用 share()clockTicks 源转换为热流。

Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

结果变为如下:

clock1 1s
clock1 2s
clock1 3s
    clock2 3s
clock1 4s
    clock2 4s
clock1 5s
    clock2 5s
clock1 6s
    clock2 6s

你会看到这两个订阅现在共享时钟的相同 tick。share() 将冷流转换为热流,通过让源将元素多播给新的 Subscribers,但仅多播在这些新订阅之后发出的元素。由于 clock2 晚订阅了 2 秒,它错过了早期的 1s2s 的发出。

所以热流 publishers 可以不那么懒惰,尽管它们通常至少需要一个初始 Subscription 来触发数据流动。

结论

在本文中,我们了解了实例化 Flux / 链式调用操作符(即 组装时机)、触发它(即 订阅时机)和执行它(即 执行时机)之间的区别。

因此,我们了解到 FluxMono 大部分是懒惰的(即 冷流 Publisher):不订阅,一切都不会发生。

最后,我们了解了另一种风味的 FluxMono,称为 热流 Publisher,它的行为略有不同,并且不那么懒惰。

在下一篇中,我们将看到这三个阶段为何在你调试基于 reactor 的代码时会产生重大差异。

祝你响应式编程愉快!

获取 Spring 资讯

订阅 Spring 资讯,保持连接

订阅

领先一步

VMware 提供培训和认证,助你加速进步。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一次简单订阅。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部