Flux 的飞行 3 - 线程切换与调度器 (Hopping Threads and Schedulers)

工程 | Simon Baslé | 2019 年 12 月 13 日 | ...

这篇博文是系列文章中的第三篇,旨在深入探讨 Reactor 的更高级概念和内部工作原理。

在这篇文章中,我们将探讨线程模型、大多数操作符如何与并发无关、Scheduler 抽象以及如何使用 publishOn 等操作符在序列中间切换线程。

本系列文章源自 Flight of the Flux 演讲,我发现其内容更适合博文形式。

下表将在其他帖子发布时更新链接,以下是计划中的内容:

  1. 组装 (Assembly) vs 订阅 (Subscription)
  2. 调试注意事项
  3. 线程切换与调度器 (本文)
  4. 内部工作原理:工作窃取 (work stealing)
  5. 内部工作原理:操作符融合 (operator fusion)

如果您还不了解 Reactive Streams 和 Reactor 的基本概念,请前往网站的 学习部分参考指南

事不宜迟,让我们开始吧。

线程模型

Reactor 的操作符通常与并发无关:它们不强制特定的线程模型,只在其 onNext 方法被调用的 Thread 上运行。

正如本系列第一篇帖子中看到的,执行订阅调用的 Thread 也会产生影响:subscribe 调用会一直链式传递,直到到达数据生成者 Publisher (链中最左边的部分),然后此 Publisher 通过 onSubscribe 提供一个 Subscription,该 Subscription 反过来向下传递,被请求等等。默认情况下,同样,此数据生成过程在启动订阅的 Thread 上开始。

对此有一个普遍例外:处理时间概念的操作符。任何此类操作符默认将在 Schedulers.parallel() 调度器上运行计时器/延迟等。

还存在一些其他例外,它们也运行在 parallel() Scheduler 上。可以通过其至少有一个接受 Scheduler 参数的重载方法来识别它们。

但是 Scheduler 是什么?我们为什么需要它?

Scheduler 抽象

在 Reactor 中,Scheduler 是一个抽象,它赋予用户对线程的控制。一个 Scheduler 可以生成 Worker,这些 Worker 在概念上是 Thread,但不一定由一个 Thread 支持(稍后会看到一个例子)。Scheduler 也包含一个时钟的概念,而 Worker 纯粹是关于任务调度。

interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}

Reactor 提供了几种默认的 Scheduler 实现,每种都有其管理 Workers 的特殊性。它们可以通过 Schedulers 工厂方法实例化。以下是它们典型用法的经验法则:

  • Schedulers.immediate() 可以用作一个空对象 (null object),用于当 API 需要一个 Scheduler 但您不想改变线程时。
  • Schedulers.single() 用于可以在一个唯一的 ExecutorService 上运行的一次性任务。
  • Schedulers.parallel() 适合 CPU 密集但生命周期短的任务。它可以并行执行 N 个此类任务(默认情况下,N == CPU 数量)。
  • Schedulers.elastic()Schedulers.boundedElastic() 适用于生命周期较长的任务(例如,阻塞式 I/O 任务)。elastic 按需生成线程,没有限制,而最近引入的 boundedElastic 也这样做,但对创建的线程数量有上限。

每种 Scheduler 类型都有一个由上述方法返回的默认全局实例,但可以使用 Schedulers.new*** 工厂方法创建新实例(例如,Schedulers.newParallel("myParallel", 10)) 用于创建一个自定义并行 Scheduler,其中 N = 10)。

parallel 类型由 N 个基于 ScheduledExecutorService 的工作线程支持。如果您向其提交 N 个长期任务,则无法再执行其他工作,因此它更适合短生命周期任务。

elastic 类型也由基于 ScheduledExecutorService 的工作线程支持,不同之处在于它按需创建这些工作线程并将它们放入池中。不再使用的 Workerdispose() 时返回到池中,并在此处保留配置的 TTL 时长,因此新的入站任务可以重用空闲的工作线程。但是,如果没有任何空闲的 Worker 可用,它会继续创建新的工作线程。

boundedElastic 类型在概念上与 elastic 非常相似,不同之处在于它对创建的基于 ScheduledExecutorServiceWorker 数量设置了上限。超过此上限后,其 createWorker() 方法将返回一个门面 (facade) Worker,该门面 Worker 会将任务排队而不是立即提交。一旦有具体的 Worker 可用,它就会与门面互换,并开始实际提交任务(使其表现得就像您刚刚提交了任务一样,包括延迟的任务)。此外,可以设置所有该 Scheduler 实例的门面工作线程可以排队的延迟任务总数的上限。

调度器总是由 ExecutorService 支持吗?

正如我们上面所说,不是。我们已经看到一个例子:immediate() Scheduler。这个调度器不会改变代码运行的 Thread

但在 reactor-test 库中有一个更有用的例子:VirtualTimeScheduler。这个 Scheduler 在当前 Thread 上执行,但会将提交给它的所有任务都打上它们应该运行的时间戳。

然后它管理一个虚拟时钟(得益于 Scheduler 也具有时钟的职责),可以手动推进。推进时,所有排队等待在新虚拟时间戳之前或之时执行的任务都将被执行。

这在测试场景中非常有用,当您有一个带有长间隔/延迟的 FluxMono,并且您想测试逻辑而不是时序时。例如,像 Mono.delay(Duration.ofHours(4)) 这样的代码可以在 100ms 内运行...

我们也可以想象围绕 Actor 系统、ForkJoinPool、即将推出的 Loom 纤维等实现一个 Scheduler...

关于线程

人们经常询问如何在 Scheduler 的线程和线程之间来回切换。从主线程切换到调度器显然是可能的,**但从任意线程切换到线程是不可能的**。这完全是 Java 的限制,因为没有办法向线程提交任务(例如,没有 MainThreadExecutorService)。

将调度器应用于操作符

既然我们熟悉了 Reactor 中线程的基本构建块,让我们看看这在操作符世界中如何体现。

我们已经确定,大多数操作符继续在其发出信号的 Thread 上工作,时间类操作符(如 Mono.delaybufferTimeout() 等)除外。

Reactor 的理念是为您提供正确的工具,通过组合操作符来实现。线程也不例外:请看 subscribeOnpublishOn

这两个操作符只是接受一个 Scheduler,并将执行切换到该调度器的一个 Worker 上。当然,两者之间有重大区别:)

publishOn(Scheduler s) 操作符

这是您想要切换线程时需要的基本操作符。来自其源的入站信号将在给定的 Scheduler发布,从而有效地将线程切换到该调度器的一个工作线程。

这适用于 onNextonCompleteonError 信号。也就是说,从上游源流向下游订阅者的信号。

因此,从本质上讲,此操作符下方出现的每个处理步骤都将在新的 Scheduler s 上执行,直到另一个操作符再次切换(例如,另一个 publishOn)。

让我们看一个故意粗糙的阻塞调用示例。但请记住,响应式链中的阻塞调用总是粗糙的!:)

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

在上面的示例中,假设此代码在线程上执行,每个 Flux.fromIterable 都会在该同一个 Thread 上发出其 List 的内容。然后我们在一个 map 内部使用一个命令式阻塞式 Web 客户端来获取每个 url 的主体,这“继承”了该线程(从而阻塞了它)。因此,每个 subscribe 中的数据消费 lambda 也在主线程上运行。

结果是,所有这些 URL 都将在主线程上依次处理。

main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E

如果我们引入 publishOn,我们可以使此代码更具性能,这样 Flux 之间就不会相互阻塞。

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

这可能产生类似以下输出:

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

现在,第一个列表和第二个列表交错出现,太棒了!

subscribeOn(Scheduler s) 操作符

在前面的例子中,我们看到了如何使用 publishOn 将阻塞工作转移到单独的线程上,方法是将这些阻塞工作的触发器(要抓取的 URL)的发布切换到提供的 Scheduler 上。

由于 map 操作符在其源线程上运行,因此通过在 map 之前放置 publishOn 来切换该源线程可以按预期工作。

但是,如果这个 URL 抓取方法是别人写的,并且他们遗憾地忘记添加 publishOn 怎么办?有没有办法影响上游Thread

在某种程度上,有。这就是 subscribeOn 可以派上用场的地方。

此操作符更改了 subscribe 方法的执行位置。并且由于订阅信号向上流动,它直接影响源 Flux 在何处订阅并开始生成数据。

因此,它似乎可以作用于响应式操作符链的上下游部分(只要链中没有抛出 publishOn

//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .map(url -> blockingWebClient.get(url)); //oops!
}

//your code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

就像我们的第二个 publishOn 示例一样,这段代码将正确地输出类似以下内容:

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

那么发生了什么?

subscribe 调用仍然在线程上运行,但它们迅速将 subscribe 信号传播给了它们的源 subscribeOn。然后,subscribeOn 将该相同的信号从 fetchUrls 传播给它自己的源,**但这次是在一个 boundedElastic Worker 上**。

fetchUrls 返回的 Flux 序列中,map 在 boundedElastic 工作线程上订阅,range 也是如此。range 开始生成数据,仍然在 boundedElastic 工作线程上。

数据路径沿着这条路线继续,每个订阅者都在其源线程(即 boundedElastic 线程)上执行 onNext

最后,在 subscribe(...) 调用中配置的 lambdas 也将在 boundedElastic 线程上执行。

重要提示

区分订阅的行为和传递给 subscribe() 方法的 lambda 是很重要的。此方法订阅其源 Flux,但 lambda 在处理结束时执行,当数据流经所有步骤(包括切换到另一个线程的步骤)后。

因此,执行 lambda 的 Thread 可能与订阅的 Thread 不同,即调用 subscribe 方法的线程。

如果我们是 fetchUrls 库的作者,我们可以通过以稍微不同的方式利用 subscribeOn,让每次抓取都在自己的 Worker 上运行,从而使代码更具性能:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

如果我混合使用两者呢?

subscribeOn 将贯穿整个订阅阶段,从下到上,然后在数据路径上作用,直到遇到 publishOn(或时间类操作符)。

让我们考虑以下示例:

Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));

这将打印:

just elastic-1
publish boundedElastic-1
hello delayed parallel-1

我们应该一步步拆解发生了什么:

  • 在这里,subscribe线程上调用,但由于紧邻其上的 subscribeOn,订阅迅速切换到 elastic 调度器。
  • 其上面的所有操作符也都从下到上在 elastic 上订阅。
  • justelastic 调度器上发出其值。
  • 第一个 doOnNext 在同一线程上接收该值并打印出来:just elastic-1
  • 然后,在从上到下的数据路径上,我们遇到了 publishOn:来自 doOnNext 的数据在 boundedElastic 调度器上向下游传播。
  • 第二个 doOnNextboundedElastic 上接收其数据,并相应地打印 publish bounderElastic-1
  • delayElements 是一个时间操作符,因此默认情况下它会在 Schedulers.parallel() 调度器上发布数据。
  • 在数据路径上,subscribeOn 只负责在同一线程上传播信号,不做其他事情。
  • 在数据路径上,传递给 subscribe(...) 的 lambda(s) 在接收数据信号的线程中执行,因此 lambda 打印 hello delayed parallel-1

结论

在本文中,我们了解了 Scheduler 抽象以及它如何支持高级用法,如 VirtualTimeScheduler

然后我们学习了如何在响应式序列中间切换线程(或者更确切地说是 Scheduler 工作线程),以及 publishOnsubscribeOn 之间的区别。

在下一篇文章中,我们将深入探讨库的内部机制,以描述为确保 Reactor 性能而采取的一些优化措施。

在此期间,祝您响应式编程愉快!

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看所有