Flux 3 的飞行 - 跳跃线程和调度器

工程 | Simon Baslé | 2019 年 12 月 13 日 | ...

这篇博文是博文系列的第三篇,旨在深入了解Reactor更高级的概念和内部工作原理。

在本篇博文中,我们将探讨线程模型、一些(大多数)操作符如何与并发无关、Scheduler 抽象以及如何使用 publishOn 等操作符在序列中间从一个线程跳到另一个线程。

本系列文章源自 Flight of the Flux 演讲,我发现其内容更适合博客文章格式。

下表将在其他博文发布后更新链接,但以下是计划内容

  1. 组装与订阅
  2. 调试注意事项
  3. 跳跃线程和调度器(本篇博文)
  4. 内部工作原理:工作窃取
  5. 内部工作原理:操作符融合

如果您缺少对 Reactive Streams 和 Reactor 基本概念的介绍,请访问网站的学习部分参考指南

事不宜迟,让我们开始吧

线程模型

Reactor 操作符通常是与并发无关的:它们不强加特定的线程模型,而只是在其 onNext 方法被调用的 Thread 上运行。

正如我们在本系列的第一篇博文中看到的那样,执行订阅调用的 Thread 也会产生影响:subscribe 调用会一直链接,直到到达一个生成数据的 Publisher(操作符链的最左侧部分),然后这个 Publisher 通过 onSubscribe 提供一个 Subscription,依次传递到链中,被请求等等… 默认情况下,此数据生成过程再次从启动订阅的 Thread 开始。

有一个普遍的例外:处理时间概念的操作符。任何此类操作符都将默认在 Schedulers.parallel() 调度器上运行计时器/延迟/等…

还存在一些其他例外,它们也将在 parallel() Scheduler 上运行。可以通过至少有一个带有 Scheduler 参数的重载来识别它们。

但是什么是 Scheduler,为什么我们需要它呢?

Scheduler 抽象

在 Reactor 中,Scheduler 是一个抽象,它允许用户控制线程。Scheduler 可以生成 Worker,从概念上讲,WorkerThread,但不一定由 Thread 支持(我们稍后将看到一个示例)。Scheduler 还包含时钟的概念,而 Worker 纯粹是关于调度任务的。

interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}

Reactor 带有几个默认的 Scheduler 实现,每个实现都有其自身关于如何管理 Worker 的特性。可以通过 Schedulers 工厂方法实例化它们。以下是其典型用法的经验法则

  • Schedulers.immediate() 可用作空对象,用于当 API 需要 Scheduler 但您不想更改线程时
  • Schedulers.single() 用于可以在唯一 ExecutorService 上运行的一次性任务
  • Schedulers.parallel() 适用于 CPU 密集型但短暂的任务。它可以并行执行 N 个此类任务(默认情况下,N == CPU 数量
  • Schedulers.elastic()Schedulers.boundedElastic() 适用于更长时间的任务(例如,阻塞 IO 任务)。elastic 会根据需要无限地生成线程,而最近引入的 boundedElastic 会在创建的线程数量上设置上限。

每种 Scheduler 类型都有一个由上述方法返回的默认全局实例,但可以使用 Schedulers.new*** 工厂方法(例如,Schedulers.newParallel("myParallel", 10)) 创建新实例以创建自定义并行 Scheduler,其中 N = 10)。

parallel 类型由 N 个工作器支持,每个工作器都基于 ScheduledExecutorService。如果您向其提交 N 个长期任务,则无法执行更多工作,因此它更适合短期任务。

elastic 类型也由基于 ScheduledExecutorService 的工作器支持,只是它根据需要创建这些工作器并将其放入池中。不再使用的 Worker 会在 dispose() 时返回到池中,并在此处保留配置的 TTL 时长,因此新传入的任务可能会重用空闲的工作器。但是,如果无法获得空闲的 Worker,它会继续创建新的工作器。

boundedElastic 类型的概念与 elastic 类型非常相似,只是它对创建的 ScheduledExecutorService 支持的 Worker 数量设置了上限。超过此点,其 createWorker() 方法将返回一个外观 Worker,它将排队任务而不是立即提交它们。一旦具体的 Worker 可用,它将与外观进行交换并开始实际提交任务(使其行为就像您刚刚提交了任务一样,包括延迟的任务)。此外,可以限制所有外观工作器可以排队的所有延迟任务的总数。Scheduler 实例。

调度器是否始终由 ExecutorService 支持?

正如我们上面所说,不是。我们实际上已经看到一个示例:immediate() Scheduler。此调度器不会修改代码正在运行的 Thread

但在 reactor-test 库中还有一个更有用的示例:VirtualTimeScheduler。此 Scheduler 在当前 Thread 上执行,但会为提交到它的所有任务加上它们应该运行的时间戳。

然后它管理一个虚拟时钟(由于 Scheduler 也承担着时钟的责任),可以手动推进它。这样做时,排队在新的虚拟时间戳之前或在新的虚拟时间戳执行的任务将被执行。

这在测试场景中非常有用,在这些场景中,您有一个带有长间隔/延迟的 FluxMono,并且您希望测试逻辑而不是时间。例如,类似于 Mono.delay(Duration.ofHours(4)) 的内容可以在不到 100ms 的时间内运行…

也可以想象围绕 Actor 系统、ForkJoinPool、即将推出的 Loom 纤维等实现 Scheduler

关于Thread

通常,人们会询问在 Scheduler 的线程和线程之间来回切换的问题。从主线程到调度器显然是可能的,但从任意线程到线程是不可能的。这纯粹是 Java 的限制,因为无法将任务提交到线程(例如,没有 MainThreadExecutorService)。

将调度器应用于操作符

现在我们熟悉了 Reactor 中线程的基本构建块,让我们看看这如何在操作符的世界中体现。

我们已经确定,大多数操作符会继续在其被发出信号的 Thread 上工作,除了基于时间的操作符(如 Mono.delaybufferTimeout() 等…)。

Reactor 的理念是通过组合操作符为您提供执行正确操作的工具。线程也不例外:遇到 subscribeOnpublishOn

这两个操作符简单地获取一个 Scheduler,并将执行切换到该调度器的一个工作器上。当然,两者之间存在主要差异 :)

publishOn(Scheduler s) 操作符

当您想跳跃线程时,这是您需要的基本操作符。来自其源的传入信号将在给定的 Scheduler发布,有效地将线程切换到该调度器的一个工作器上。

这对于onNextonCompleteonError信号有效。也就是说,从上游源到下游订阅者的信号流。

因此,从本质上讲,出现在此操作符下方的每个处理步骤都将在新的Scheduler s上执行,直到另一个操作符再次切换(例如,另一个publishOn)。

让我们举一个故意含糊的例子,其中包含阻塞调用,但请记住,在反应式链中进行阻塞调用始终是危险的! :)

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

在上面的示例中,假设此代码在线程上执行,每个Flux.fromIterable都在同一个Thread上发出其List的内容。然后,我们在map内部使用一个命令式阻塞式 Web 客户端来获取每个url的主体,它“继承”了该线程(从而阻塞了它)。因此,每个subscribe中的数据消费 lambda 也在主线程上运行。

结果,所有这些 url 都在主线程上按顺序处理。

main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E

如果我们引入publishOn,我们可以使此代码性能更高,以便Flux不会相互阻塞。

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

这可能会给我们类似以下的输出。

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

第一个列表和第二个列表现在交错在一起了,太棒了!

subscribeOn(Scheduler s) 操作符

在前面的示例中,我们看到了如何使用publishOn通过在提供的Scheduler上切换用于该阻塞工作的触发器(要获取的 url)的发布来将阻塞工作偏移到单独的线程上。

由于map操作符在其源线程上运行,因此通过在map之前放置一个publishOn来切换该源线程可以按预期工作。

但是,如果该 url 获取方法是由其他人编写的,并且他们遗憾地忘记添加publishOn呢?有没有办法影响上游Thread

在某种程度上,确实有。这就是subscribeOn派上用场的地方。

此操作符更改了subscribe方法的执行位置。由于 subscribe 信号向上流动,因此它直接影响源Flux订阅和开始生成数据的位置。

结果,它似乎可以作用于操作符反应式链的上部和下部(只要混合中没有publishOn)。

//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .map(url -> blockingWebClient.get(url)); //oops!
}

//your code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

就像我们在第二个publishOn示例中一样,该代码将正确输出类似以下内容。

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

发生了什么?

subscribe调用仍在线程上运行,但它们将subscribe信号传播到其源subscribeOn。反过来,subscribeOn将其传播到来自fetchUrls的自己的源,**但在boundedElastic Worker上**。

fetchUrls返回的Flux序列中,map 在 boundedElastic 工作线程上订阅,range也是如此。range开始生成数据,仍然在 boundedElastic 工作线程上。

这会一直持续到数据路径的末端,每个订阅者都在其源线程(即boundedElastic线程)上执行onNext

最后,在subscribe(...)调用中配置的 lambda 也在boundedElastic线程上执行。

重要

区分订阅的行为和传递给subscribe()方法的 lambda 非常重要。此方法订阅其源Flux,但 lambda 在处理结束时执行,此时数据已流经所有步骤(包括跳到另一个线程的步骤)。

因此,lambda 执行所在的Thread可能与订阅Thread不同,即调用subscribe方法所在的线程。

如果我们是fetchUrls库的作者,我们可以通过以稍微不同的方式利用subscribeOn让每个获取操作在其自己的Worker上运行,从而使代码的性能更高。

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

如果我混合使用两者会怎样?

subscribeOn将在整个订阅阶段起作用,从下到上,然后在数据路径上起作用,直到遇到publishOn(或基于时间的操作符)。

让我们考虑以下示例。

Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));

这将打印。

just elastic-1
publish boundedElastic-1
hello delayed parallel-1

我们应该逐步分解发生的事情。

  • 这里subscribe线程上被调用,但由于正上方的subscribeOn,订阅迅速切换到elastic调度程序。
  • 它上方的所有操作符也在elastic上订阅,从下到上。
  • justelastic调度程序上发出其值。
  • 第一个doOnNext在同一线程上接收该值并将其打印出来:just elastic-1
  • 然后在从上到下的数据路径上,我们遇到了publishOn:来自doOnNext的数据在下游的boundedElastic调度程序上传播。
  • 第二个doOnNext在其boundedElastic上接收数据并相应地打印publish bounderElastic-1
  • delayElements是一个时间操作符,因此默认情况下它在Schedulers.parallel()调度程序上发布数据。
  • 在数据路径上,subscribeOn除了在同一线程上传播信号之外,什么也不做。
  • 在数据路径上,传递给subscribe(...)的 lambda 在接收数据信号的线程上执行,因此 lambda 打印hello delayed parallel-1

结论

在本文中,我们学习了Scheduler抽象以及它如何支持高级用法,例如VirtualTimeScheduler

然后,我们学习了如何在反应式序列的中间切换线程(或更确切地说Scheduler工作线程),以及publishOnsubscribeOn之间的区别。

在下一期中,我们将更深入地研究库的内部结构,以描述一些用于确保 Reactor 性能的优化措施。

在此期间,祝您反应式编程愉快!

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部