领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多这篇博文是博文系列的第三篇,旨在深入了解Reactor更高级的概念和内部工作原理。
在本篇博文中,我们将探讨线程模型、一些(大多数)操作符如何与并发无关、Scheduler
抽象以及如何使用 publishOn
等操作符在序列中间从一个线程跳到另一个线程。
本系列文章源自 Flight of the Flux
演讲,我发现其内容更适合博客文章格式。
下表将在其他博文发布后更新链接,但以下是计划内容
如果您缺少对 Reactive Streams 和 Reactor 基本概念的介绍,请访问网站的学习部分和参考指南。
事不宜迟,让我们开始吧
Reactor 操作符通常是与并发无关的:它们不强加特定的线程模型,而只是在其 onNext
方法被调用的 Thread
上运行。
正如我们在本系列的第一篇博文中看到的那样,执行订阅调用的 Thread
也会产生影响:subscribe
调用会一直链接,直到到达一个生成数据的 Publisher
(操作符链的最左侧部分),然后这个 Publisher
通过 onSubscribe
提供一个 Subscription
,依次传递到链中,被请求等等… 默认情况下,此数据生成过程再次从启动订阅的 Thread
开始。
有一个普遍的例外:处理时间概念的操作符。任何此类操作符都将默认在 Schedulers.parallel()
调度器上运行计时器/延迟/等…
还存在一些其他例外,它们也将在 parallel()
Scheduler
上运行。可以通过至少有一个带有 Scheduler
参数的重载来识别它们。
但是什么是 Scheduler
,为什么我们需要它呢?
Scheduler
抽象在 Reactor 中,Scheduler
是一个抽象,它允许用户控制线程。Scheduler
可以生成 Worker
,从概念上讲,Worker
是 Thread
,但不一定由 Thread
支持(我们稍后将看到一个示例)。Scheduler
还包含时钟的概念,而 Worker
纯粹是关于调度任务的。
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Reactor 带有几个默认的 Scheduler
实现,每个实现都有其自身关于如何管理 Worker
的特性。可以通过 Schedulers
工厂方法实例化它们。以下是其典型用法的经验法则
Schedulers.immediate()
可用作空对象,用于当 API 需要 Scheduler
但您不想更改线程时Schedulers.single()
用于可以在唯一 ExecutorService
上运行的一次性任务Schedulers.parallel()
适用于 CPU 密集型但短暂的任务。它可以并行执行 N
个此类任务(默认情况下,N == CPU 数量
)Schedulers.elastic()
和 Schedulers.boundedElastic()
适用于更长时间的任务(例如,阻塞 IO 任务)。elastic
会根据需要无限地生成线程,而最近引入的 boundedElastic
会在创建的线程数量上设置上限。每种 Scheduler
类型都有一个由上述方法返回的默认全局实例,但可以使用 Schedulers.new***
工厂方法(例如,Schedulers.newParallel("myParallel", 10))
创建新实例以创建自定义并行 Scheduler
,其中 N
= 10
)。
parallel
类型由 N
个工作器支持,每个工作器都基于 ScheduledExecutorService
。如果您向其提交 N
个长期任务,则无法执行更多工作,因此它更适合短期任务。
elastic
类型也由基于 ScheduledExecutorService
的工作器支持,只是它根据需要创建这些工作器并将其放入池中。不再使用的 Worker
会在 dispose()
时返回到池中,并在此处保留配置的 TTL 时长,因此新传入的任务可能会重用空闲的工作器。但是,如果无法获得空闲的 Worker
,它会继续创建新的工作器。
boundedElastic
类型的概念与 elastic
类型非常相似,只是它对创建的 ScheduledExecutorService
支持的 Worker
数量设置了上限。超过此点,其 createWorker()
方法将返回一个外观 Worker
,它将排队任务而不是立即提交它们。一旦具体的 Worker
可用,它将与外观进行交换并开始实际提交任务(使其行为就像您刚刚提交了任务一样,包括延迟的任务)。此外,可以限制所有外观工作器可以排队的所有延迟任务的总数。Scheduler
实例。
正如我们上面所说,不是。我们实际上已经看到一个示例:immediate() Scheduler
。此调度器不会修改代码正在运行的 Thread
。
但在 reactor-test
库中还有一个更有用的示例:VirtualTimeScheduler
。此 Scheduler
在当前 Thread
上执行,但会为提交到它的所有任务加上它们应该运行的时间戳。
然后它管理一个虚拟时钟(由于 Scheduler
也承担着时钟的责任),可以手动推进它。这样做时,排队在新的虚拟时间戳之前或在新的虚拟时间戳执行的任务将被执行。
这在测试场景中非常有用,在这些场景中,您有一个带有长间隔/延迟的 Flux
或 Mono
,并且您希望测试逻辑而不是时间。例如,类似于 Mono.delay(Duration.ofHours(4))
的内容可以在不到 100ms
的时间内运行…
也可以想象围绕 Actor 系统、ForkJoinPool
、即将推出的 Loom 纤维等实现 Scheduler
…
关于主
Thread
通常,人们会询问在
Scheduler
的线程和主线程之间来回切换的问题。从主线程到调度器显然是可能的,但从任意线程到主线程是不可能的。这纯粹是 Java 的限制,因为无法将任务提交到主线程(例如,没有 MainThreadExecutorService)。
现在我们熟悉了 Reactor 中线程的基本构建块,让我们看看这如何在操作符的世界中体现。
我们已经确定,大多数操作符会继续在其被发出信号的 Thread
上工作,除了基于时间的操作符(如 Mono.delay
、bufferTimeout()
等…)。
Reactor 的理念是通过组合操作符为您提供执行正确操作的工具。线程也不例外:遇到 subscribeOn
和 publishOn
。
这两个操作符简单地获取一个 Scheduler
,并将执行切换到该调度器的一个工作器上。当然,两者之间存在主要差异 :)
publishOn(Scheduler s)
操作符当您想跳跃线程时,这是您需要的基本操作符。来自其源的传入信号将在给定的 Scheduler
上发布,有效地将线程切换到该调度器的一个工作器上。
这对于onNext
、onComplete
和onError
信号有效。也就是说,从上游源到下游订阅者的信号流。
因此,从本质上讲,出现在此操作符下方的每个处理步骤都将在新的Scheduler
s
上执行,直到另一个操作符再次切换(例如,另一个publishOn
)。
让我们举一个故意含糊的例子,其中包含阻塞调用,但请记住,在反应式链中进行阻塞调用始终是危险的! :)
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
在上面的示例中,假设此代码在主线程上执行,每个Flux.fromIterable
都在同一个Thread
上发出其List
的内容。然后,我们在map
内部使用一个命令式阻塞式 Web 客户端来获取每个url
的主体,它“继承”了该线程(从而阻塞了它)。因此,每个subscribe
中的数据消费 lambda 也在主线程上运行。
结果,所有这些 url 都在主线程上按顺序处理。
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
如果我们引入publishOn
,我们可以使此代码性能更高,以便Flux
不会相互阻塞。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
这可能会给我们类似以下的输出。
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
第一个列表和第二个列表现在交错在一起了,太棒了!
subscribeOn(Scheduler s)
操作符在前面的示例中,我们看到了如何使用publishOn
通过在提供的Scheduler
上切换用于该阻塞工作的触发器(要获取的 url)的发布来将阻塞工作偏移到单独的线程上。
由于map
操作符在其源线程上运行,因此通过在map
之前放置一个publishOn
来切换该源线程可以按预期工作。
但是,如果该 url 获取方法是由其他人编写的,并且他们遗憾地忘记添加publishOn
呢?有没有办法影响上游的Thread
?
在某种程度上,确实有。这就是subscribeOn
派上用场的地方。
此操作符更改了subscribe
方法的执行位置。由于 subscribe 信号向上流动,因此它直接影响源Flux
订阅和开始生成数据的位置。
结果,它似乎可以作用于操作符反应式链的上部和下部(只要混合中没有publishOn
)。
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url)); //oops!
}
//your code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
就像我们在第二个publishOn
示例中一样,该代码将正确输出类似以下内容。
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
发生了什么?
subscribe
调用仍在主线程上运行,但它们将subscribe
信号传播到其源subscribeOn
。反过来,subscribeOn
将其传播到来自fetchUrls
的自己的源,**但在boundedElastic Worker
上**。
在fetchUrls
返回的Flux
序列中,map 在 boundedElastic 工作线程上订阅,range
也是如此。range
开始生成数据,仍然在 boundedElastic 工作线程上。
这会一直持续到数据路径的末端,每个订阅者都在其源线程(即boundedElastic
线程)上执行onNext
。
最后,在subscribe(...)
调用中配置的 lambda 也在boundedElastic
线程上执行。
重要
区分订阅的行为和传递给
subscribe()
方法的 lambda 非常重要。此方法订阅其源Flux
,但 lambda 在处理结束时执行,此时数据已流经所有步骤(包括跳到另一个线程的步骤)。因此,lambda 执行所在的
Thread
可能与订阅Thread
不同,即调用subscribe
方法所在的线程。
如果我们是fetchUrls
库的作者,我们可以通过以稍微不同的方式利用subscribeOn
让每个获取操作在其自己的Worker
上运行,从而使代码的性能更高。
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
subscribeOn
将在整个订阅阶段起作用,从下到上,然后在数据路径上起作用,直到遇到publishOn
(或基于时间的操作符)。
让我们考虑以下示例。
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
这将打印。
just elastic-1
publish boundedElastic-1
hello delayed parallel-1
我们应该逐步分解发生的事情。
subscribe
在主线程上被调用,但由于正上方的subscribeOn
,订阅迅速切换到elastic
调度程序。elastic
上订阅,从下到上。just
在elastic
调度程序上发出其值。doOnNext
在同一线程上接收该值并将其打印出来:just elastic-1
publishOn
:来自doOnNext
的数据在下游的boundedElastic
调度程序上传播。doOnNext
在其boundedElastic
上接收数据并相应地打印publish bounderElastic-1
。delayElements
是一个时间操作符,因此默认情况下它在Schedulers.parallel()
调度程序上发布数据。subscribeOn
除了在同一线程上传播信号之外,什么也不做。subscribe(...)
的 lambda 在接收数据信号的线程上执行,因此 lambda 打印hello delayed parallel-1
。在本文中,我们学习了Scheduler
抽象以及它如何支持高级用法,例如VirtualTimeScheduler
。
然后,我们学习了如何在反应式序列的中间切换线程(或更确切地说Scheduler
工作线程),以及publishOn
和subscribeOn
之间的区别。
在下一期中,我们将更深入地研究库的内部结构,以描述一些用于确保 Reactor 性能的优化措施。
在此期间,祝您反应式编程愉快!