领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多这篇博文是系列文章中的第二篇,旨在更深入地了解Reactor更高级的概念和内部工作原理。
它源于我的“Flux 的飞行”演讲,我发现其内容更适合博客文章格式。
其他文章发布后,我会在下面的表格中更新链接,但这里列出了计划的内容
如果您缺少对 *Reactive Streams* 和 Reactor 基本概念的介绍,请访问该站点的学习部分和参考指南。
事不宜迟,让我们开始吧
从命令式、阻塞范式切换到响应式、非阻塞范式带来了好处,但也带来了一些注意事项。其中之一就是调试体验。这是为什么呢?
主要是因为您已经习惯了依赖良好的旧 *堆栈跟踪*,但由于响应式编程的**异步**特性,这个宝贵的工具突然变得不那么有价值了。但这并非响应式编程所独有:一旦引入异步代码,您就在程序中创建了一个边界,该边界位于 *调度* 代码和 *异步执行* 代码之间。
让我们以 ExecutorService
和 Future
为例(这里没有 Reactor 代码)
private static void imperative() throws ExecutionException, InterruptedException {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
int seconds = LocalTime.now().getSecond();
List<Integer> source;
if (seconds % 2 == 0) {
source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
}
else if (seconds % 3 == 0) {
source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
}
else {
source = Arrays.asList(1, 2, 3, 4);
}
executor.submit(() -> source.get(5)) //line 76
.get();
}
这个例子有点牵强,但让我们假设代码中的这两条路径中有三条可以导致异步任务抛出 IndexOutOfBoundsException
... 堆栈跟踪会有多有用呢?
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at Scratch.imperative(Scratch.java:77)
at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
at Scratch.lambda$imperative$0(Scratch.java:76)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我们看到
Future
的 get()
方法抛出了 ExecutionException
IndexOutOfBoundsException
submit(() -> source.get(5))
**lambda** 中FutureTask
中执行,来自名为 ThreadPoolExecutor
的某个东西,它本身在一个 Thread
中运行...submit()
之前测试中采取了哪条路径)。不是很有用 :-(
如果我们寻找上面代码的 Reactor 等价物,我们可以得到以下代码
private static void reactive() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.subscribeOn(Schedulers.parallel())
.block(); //line 97
}
这会触发以下堆栈跟踪
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
at Scratch.reactive(Scratch.java:97)
at Scratch.main(Scratch.java:51)
ArrayIndexOutOfBoundsException
,暗示源对于 MonoElementAt
操作符来说太短了onComplete
,它本身由 request
触发...以及 reactor.core.publisher
中的其他一些步骤range
(FluxRange.subscribe
)、elementAt
和 subscribeOn
组成...ThreadPoolExecutor
的工作线程 Thread
中执行的更糟糕的是,即使我们去掉了 subscribeOn
,我们仍然无法发现触发了哪两条可能的错误路径中的哪一条
private static void reactiveNoSubscribeOn() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.block(); //line 116
}
给出堆栈跟踪
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
at Scratch.main(Scratch.java:52)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
... 2 more
这是因为,正如我们前面看到的,在 *组装* 和 *订阅* 之间代码中存在一个额外的“边界”。线索只追溯到**订阅**点(这里为 block()
):-(
因此,在异步世界中,使用堆栈跟踪进行分析和调试更难,在 Reactor 中甚至更难一点(因为它既是异步的,又具有组装与订阅的默认惰性方法)。但幸运的是,库中有一些工具可以尝试缓解这一事实。
log
还记得您在命令式代码中使用 print
语句的时候吗?它可能不像启动步骤调试器那样酷,但有时它是您需要的快速而简陋的解决方案。
在 Reactor 中,您有 log()
操作符
onNext
、onComplete
、onError
(以及**甚至** onSubscribe
、cancel
和 request
!)Logger
简而言之,log
是快速获得序列中某一步正在发生什么的简要概览的简陋解决方案。在开发过程中随意使用它,并有可能为每个 log
调用指定一个“名称”以区分它们。
使用 log(String)
可以被转移以获得关于哪个源导致错误的提示
private static void log() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.log("source A");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.log("source B");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.log("source C");
}
source.block(); //line 138
}
堆栈跟踪本身并没有更有趣(除了提到 MonoLogFuseable
类之外,但日志本身包含这个有趣的片段
17:01:23.711 [main] INFO source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C -
java.lang.IndexOutOfBoundsException: null
至少我们得到了硬编码的 source C
标签...
Reactor 中另一种可用的方法是尝试在运行时堆栈跟踪中获取回组装信息。
这可以通过 Hooks
类激活所谓的“调试模式”来完成
Hooks.onOperatorDebug();
它做了什么?它使每个操作符实例化(即组装)捕获堆栈跟踪并将其保留以供以后使用。
如果 onError
达到某个操作符,它会将该组装堆栈跟踪附加到 onError
的 Throwable
(作为**被抑制的 Exception
**)。因此,当您看到堆栈跟踪时,您将获得运行时和组装的更完整图片。
在调试模式开启的情况下,在我们之前的示例中,我们将能够看到采取了哪条组装路径以及实际处理了哪个源
private static void hook() {
Hooks.onOperatorDebug();
try {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5); //line 149
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5); //line 153
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5); //line 157
}
source.block(); //line 160
}
finally {
Hooks.resetOnOperatorDebug();
}
}
产生以下堆栈跟踪
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.hook(Scratch.java:160)
at Scratch.main(Scratch.java:54)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
reactor.core.publisher.Flux.elementAt(Flux.java:4367)
Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
|_ Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)
注意最后一行?耶 :-D
checkpoint
降低成本使用Hooks.onOperatorDebug()
的一个缺点是,它会对应用程序中使用的每个操作符都进行汇编堆栈跟踪捕获。填充单个堆栈跟踪是一个代价高昂的操作,因此不言而喻,这可能会对性能产生重大影响。因此,这仅推荐在开发环境中使用。
幸运的是,如果您确定代码库中容易出现此类源代码歧义的部分,则可以稍微降低成本。
通过使用checkpoint()
操作符,可以仅在代码库中的特定点激活汇编跟踪捕获。如果您使用checkpoint(String)
为检查点提供唯一且有意义的名称,则甚至可以完全不用填充堆栈跟踪。
private static void checkpoint() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.checkpoint("source range(1,10)");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.checkpoint("source range(0,4)");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.checkpoint("source just(1,2,3,4)");
}
source.block(); //line 186
}
这会产生以下堆栈跟踪
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.checkpoint(Scratch.java:186)
at Scratch.main(Scratch.java:55)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].
注意is identified by light checkpoint [source just(1,2,3,4)].
,它告诉我们罪魁祸首(因为我们为检查点使用了有意义的描述)。
在本文中,我们了解到堆栈跟踪在异步编程中可能不太有用。Reactor 让你以延迟的方式构建反应式序列,这进一步加剧了这种影响。
我们查看了可能遇到的最坏情况以及解决此问题的一些方法。
整个代码可以在 此处 的 gist 中找到。
在下一期中,我们将学习调度器以及如何从一个线程跳到另一个线程。
在此期间,祝您反应式编程愉快!