抢占先机
VMware 提供培训和认证,助您快速进步。
了解更多这篇博文是系列文章中的第三篇,旨在深入探讨 Reactor 的更高级概念和内部工作原理。
在这篇文章中,我们将探讨线程模型、大多数操作符如何与并发无关、Scheduler
抽象以及如何使用 publishOn
等操作符在序列中间切换线程。
本系列文章源自 Flight of the Flux
演讲,我发现其内容更适合博文形式。
下表将在其他帖子发布时更新链接,以下是计划中的内容:
如果您还不了解 Reactive Streams 和 Reactor 的基本概念,请前往网站的 学习部分 和 参考指南。
事不宜迟,让我们开始吧。
Reactor 的操作符通常与并发无关:它们不强制特定的线程模型,只在其 onNext
方法被调用的 Thread
上运行。
正如本系列第一篇帖子中看到的,执行订阅调用的 Thread
也会产生影响:subscribe
调用会一直链式传递,直到到达数据生成者 Publisher
(链中最左边的部分),然后此 Publisher
通过 onSubscribe
提供一个 Subscription
,该 Subscription
反过来向下传递,被请求等等。默认情况下,同样,此数据生成过程在启动订阅的 Thread
上开始。
对此有一个普遍例外:处理时间概念的操作符。任何此类操作符默认将在 Schedulers.parallel()
调度器上运行计时器/延迟等。
还存在一些其他例外,它们也运行在 parallel()
Scheduler
上。可以通过其至少有一个接受 Scheduler
参数的重载方法来识别它们。
但是 Scheduler
是什么?我们为什么需要它?
Scheduler
抽象在 Reactor 中,Scheduler
是一个抽象,它赋予用户对线程的控制。一个 Scheduler
可以生成 Worker
,这些 Worker
在概念上是 Thread
,但不一定由一个 Thread
支持(稍后会看到一个例子)。Scheduler
也包含一个时钟的概念,而 Worker
纯粹是关于任务调度。
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Reactor 提供了几种默认的 Scheduler
实现,每种都有其管理 Workers
的特殊性。它们可以通过 Schedulers
工厂方法实例化。以下是它们典型用法的经验法则:
Schedulers.immediate()
可以用作一个空对象 (null object),用于当 API 需要一个 Scheduler
但您不想改变线程时。Schedulers.single()
用于可以在一个唯一的 ExecutorService
上运行的一次性任务。Schedulers.parallel()
适合 CPU 密集但生命周期短的任务。它可以并行执行 N
个此类任务(默认情况下,N == CPU 数量
)。Schedulers.elastic()
和 Schedulers.boundedElastic()
适用于生命周期较长的任务(例如,阻塞式 I/O 任务)。elastic
按需生成线程,没有限制,而最近引入的 boundedElastic
也这样做,但对创建的线程数量有上限。每种 Scheduler
类型都有一个由上述方法返回的默认全局实例,但可以使用 Schedulers.new***
工厂方法创建新实例(例如,Schedulers.newParallel("myParallel", 10))
用于创建一个自定义并行 Scheduler
,其中 N
= 10
)。
parallel
类型由 N
个基于 ScheduledExecutorService
的工作线程支持。如果您向其提交 N
个长期任务,则无法再执行其他工作,因此它更适合短生命周期任务。
elastic
类型也由基于 ScheduledExecutorService
的工作线程支持,不同之处在于它按需创建这些工作线程并将它们放入池中。不再使用的 Worker
在 dispose()
时返回到池中,并在此处保留配置的 TTL 时长,因此新的入站任务可以重用空闲的工作线程。但是,如果没有任何空闲的 Worker
可用,它会继续创建新的工作线程。
boundedElastic
类型在概念上与 elastic
非常相似,不同之处在于它对创建的基于 ScheduledExecutorService
的 Worker
数量设置了上限。超过此上限后,其 createWorker()
方法将返回一个门面 (facade) Worker
,该门面 Worker
会将任务排队而不是立即提交。一旦有具体的 Worker
可用,它就会与门面互换,并开始实际提交任务(使其表现得就像您刚刚提交了任务一样,包括延迟的任务)。此外,可以设置所有该 Scheduler
实例的门面工作线程可以排队的延迟任务总数的上限。
正如我们上面所说,不是。我们已经看到一个例子:immediate() Scheduler
。这个调度器不会改变代码运行的 Thread
。
但在 reactor-test
库中有一个更有用的例子:VirtualTimeScheduler
。这个 Scheduler
在当前 Thread
上执行,但会将提交给它的所有任务都打上它们应该运行的时间戳。
然后它管理一个虚拟时钟(得益于 Scheduler
也具有时钟的职责),可以手动推进。推进时,所有排队等待在新虚拟时间戳之前或之时执行的任务都将被执行。
这在测试场景中非常有用,当您有一个带有长间隔/延迟的 Flux
或 Mono
,并且您想测试逻辑而不是时序时。例如,像 Mono.delay(Duration.ofHours(4))
这样的代码可以在 100ms
内运行...
我们也可以想象围绕 Actor 系统、ForkJoinPool
、即将推出的 Loom 纤维等实现一个 Scheduler
...
关于主线程
人们经常询问如何在
Scheduler
的线程和主线程之间来回切换。从主线程切换到调度器显然是可能的,**但从任意线程切换到主线程是不可能的**。这完全是 Java 的限制,因为没有办法向主线程提交任务(例如,没有 MainThreadExecutorService)。
既然我们熟悉了 Reactor 中线程的基本构建块,让我们看看这在操作符世界中如何体现。
我们已经确定,大多数操作符继续在其发出信号的 Thread
上工作,时间类操作符(如 Mono.delay
、bufferTimeout()
等)除外。
Reactor 的理念是为您提供正确的工具,通过组合操作符来实现。线程也不例外:请看 subscribeOn
和 publishOn
。
这两个操作符只是接受一个 Scheduler
,并将执行切换到该调度器的一个 Worker
上。当然,两者之间有重大区别:)
publishOn(Scheduler s)
操作符这是您想要切换线程时需要的基本操作符。来自其源的入站信号将在给定的 Scheduler
上发布,从而有效地将线程切换到该调度器的一个工作线程。
这适用于 onNext
、onComplete
和 onError
信号。也就是说,从上游源流向下游订阅者的信号。
因此,从本质上讲,此操作符下方出现的每个处理步骤都将在新的 Scheduler
s
上执行,直到另一个操作符再次切换(例如,另一个 publishOn
)。
让我们看一个故意粗糙的阻塞调用示例。但请记住,响应式链中的阻塞调用总是粗糙的!:)
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
在上面的示例中,假设此代码在主线程上执行,每个 Flux.fromIterable
都会在该同一个 Thread
上发出其 List
的内容。然后我们在一个 map
内部使用一个命令式阻塞式 Web 客户端来获取每个 url
的主体,这“继承”了该线程(从而阻塞了它)。因此,每个 subscribe
中的数据消费 lambda 也在主线程上运行。
结果是,所有这些 URL 都将在主线程上依次处理。
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
如果我们引入 publishOn
,我们可以使此代码更具性能,这样 Flux
之间就不会相互阻塞。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
这可能产生类似以下输出:
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
现在,第一个列表和第二个列表交错出现,太棒了!
subscribeOn(Scheduler s)
操作符在前面的例子中,我们看到了如何使用 publishOn
将阻塞工作转移到单独的线程上,方法是将这些阻塞工作的触发器(要抓取的 URL)的发布切换到提供的 Scheduler
上。
由于 map
操作符在其源线程上运行,因此通过在 map
之前放置 publishOn
来切换该源线程可以按预期工作。
但是,如果这个 URL 抓取方法是别人写的,并且他们遗憾地忘记添加 publishOn
怎么办?有没有办法影响上游的 Thread
?
在某种程度上,有。这就是 subscribeOn
可以派上用场的地方。
此操作符更改了 subscribe
方法的执行位置。并且由于订阅信号向上流动,它直接影响源 Flux
在何处订阅并开始生成数据。
因此,它似乎可以作用于响应式操作符链的上下游部分(只要链中没有抛出 publishOn
)
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url)); //oops!
}
//your code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
就像我们的第二个 publishOn
示例一样,这段代码将正确地输出类似以下内容:
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
那么发生了什么?
subscribe
调用仍然在主线程上运行,但它们迅速将 subscribe
信号传播给了它们的源 subscribeOn
。然后,subscribeOn
将该相同的信号从 fetchUrls
传播给它自己的源,**但这次是在一个 boundedElastic Worker
上**。
在 fetchUrls
返回的 Flux
序列中,map
在 boundedElastic 工作线程上订阅,range
也是如此。range
开始生成数据,仍然在 boundedElastic 工作线程上。
数据路径沿着这条路线继续,每个订阅者都在其源线程(即 boundedElastic
线程)上执行 onNext
。
最后,在 subscribe(...)
调用中配置的 lambdas 也将在 boundedElastic
线程上执行。
重要提示
区分订阅的行为和传递给
subscribe()
方法的 lambda 是很重要的。此方法订阅其源Flux
,但 lambda 在处理结束时执行,当数据流经所有步骤(包括切换到另一个线程的步骤)后。因此,执行 lambda 的
Thread
可能与订阅的Thread
不同,即调用subscribe
方法的线程。
如果我们是 fetchUrls
库的作者,我们可以通过以稍微不同的方式利用 subscribeOn
,让每次抓取都在自己的 Worker
上运行,从而使代码更具性能:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
subscribeOn
将贯穿整个订阅阶段,从下到上,然后在数据路径上作用,直到遇到 publishOn
(或时间类操作符)。
让我们考虑以下示例:
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
这将打印:
just elastic-1
publish boundedElastic-1
hello delayed parallel-1
我们应该一步步拆解发生了什么:
subscribe
在主线程上调用,但由于紧邻其上的 subscribeOn
,订阅迅速切换到 elastic
调度器。elastic
上订阅。just
在 elastic
调度器上发出其值。doOnNext
在同一线程上接收该值并打印出来:just elastic-1
publishOn
:来自 doOnNext
的数据在 boundedElastic
调度器上向下游传播。doOnNext
在 boundedElastic
上接收其数据,并相应地打印 publish bounderElastic-1
。delayElements
是一个时间操作符,因此默认情况下它会在 Schedulers.parallel()
调度器上发布数据。subscribeOn
只负责在同一线程上传播信号,不做其他事情。subscribe(...)
的 lambda(s) 在接收数据信号的线程中执行,因此 lambda 打印 hello delayed parallel-1
。在本文中,我们了解了 Scheduler
抽象以及它如何支持高级用法,如 VirtualTimeScheduler
。
然后我们学习了如何在响应式序列中间切换线程(或者更确切地说是 Scheduler
工作线程),以及 publishOn
和 subscribeOn
之间的区别。
在下一篇文章中,我们将深入探讨库的内部机制,以描述为确保 Reactor 性能而采取的一些优化措施。
在此期间,祝您响应式编程愉快!