先行一步
VMware 提供培训和认证,为你的进步加速。
了解更多在 Project Reactor 团队中,我们认为你所依赖库的调试体验与例如功能集或性能同样重要。
今天,我们很高兴地宣布 Reactor 家族中的两个新的实验性项目!
初学者最常见的错误之一是阻塞那些应该只运行非阻塞代码的 Java 线程(例如,Schedulers.parallel()
)。
这是最有害的问题之一,因为它可能会阻塞不相关的处理,甚至导致死锁!
考虑以下代码
Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
.subscribeOn(Schedulers.parallel())
.map(i -> {
CountDownLatch latch = new CountDownLatch(1);
Mono.delay(Duration.ofMillis(i * 100))
.subscribe(it -> latch.countDown());
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
})
.blockLast();
运行这段代码需要多久?1 秒?10 秒?
如果我告诉你它永远不会退出并会创建死锁呢?原因如下:
N * 2
个信号,其中 N
是 JVM 可以使用的 CPU 数量。Schedulers.parallel
进行订阅 - 这是一个限定大小的线程池,限制为 N
个线程。Mono.delay
隐式地使用 Schedulers.parallel
)。即使你没有阻塞所有线程,而只阻塞了一部分,你也会阻止其他不相关的任务前进。最可能的结果是性能会下降。
当你正在 将旧的阻塞代码迁移到响应式方法时,这个问题尤其突出。即使是最有经验的代码审查员也可能在你的 函数颜色相同时未能发现阻塞调用!
这就是我们创建 BlockHound 的原因 - 它是一个 Java Agent,用于检测来自非阻塞线程的阻塞调用。与其他解决方案不同,它会修改原始方法(甚至是 native 方法!),使得即使使用反射也无法调用阻塞方法!
现在,如果按照 文档中的描述 将其添加到我们的应用程序中,我们将得到以下异常:
java.lang.Error: Blocking call! sun.misc.Unsafe#park
at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
at sun.misc.Unsafe.park(Unsafe.java)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)
请注意,await
在内部调用了 Unsafe#park
来实现等待逻辑。我们不希望我们的线程被 park 或阻塞,BlockHound 会保护我们免受此害!
如果你想了解实现细节,请阅读 工作原理 页面。
TL;DR: 它包装了原始方法,并且只添加了两个方法调用。
你可以在测试环境或 QA/staging 环境中运行它,而不会损失性能。天哪(Holy Atom),考虑到它的低开销,你甚至可以尝试在生产环境中运行它! :)
BlockHound 可用于 Project Reactor 和 RxJava 2,并且你可以 编写自己的集成。
由于其函数式编程特性,调试响应式代码有时可能具有挑战性:你不是精确地命令如何处理数据,而是声明数据应该如何在系统中流动。这意味着声明和执行发生在不同的时间点。
你可以在 Simon 的精彩文章中了解更多信息:https://springframework.org.cn/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription
在 Reactor 中,我们称之为“组装时(Assembly time)”和“执行时(Execution time)”。在组装时,你通过调用 myFlux.map(i -> i * 2).filter(5 % i == 1).single()
和其他操作符来“设计”你的管道。稍后,这个“管道定义”将被用来处理由 myFlux
发布的信号。但是当发生错误时会怎样?
你们中的一些人可能已经知道 Hooks.onOperatorDebug()
。这是 reactor-core
中一个非常有用的钩子。它可以将堆栈跟踪从这样转换
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
转换成这样
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
...
at java.lang.Thread.run(Thread.java:748)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7380)
com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
|_ Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
|_ Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)
对于以下代码
9: public class Example {
10:
11: public static void run() {
12: Flux.range(0, 5)
13: .single() // <-- Aha!
14: .subscribeOn(Schedulers.parallel())
15: .block();
16: }
17: }
如你所见,启用调试模式后,我们可以清楚地识别出发生错误的组装操作。它就像一个堆栈跟踪,但(由于执行与组装分离)是一个回溯。
你可能会想:“太棒了,现在我想在生产环境中使用它!”——我们也是这么想的。但是当你使用 Hooks.onOperatorDebug()
时,我们必须在组装时进行繁重的堆栈遍历,以便在你每次调用诸如 .map(...)
这样的操作符时捕获调用点,即使你的代码永远不会抛出错误!这是因为 Java 缺乏调用点跟踪功能,唯一的替代方案是 new Exception().getStackTrace()
或 StackWalker
(在 Java 9+ 中)。
显然,我们不能在生产环境中使用这种方法,所以我们为此创建了一个工具!
ReactorDebugAgent
是来自 reactor-tools 项目 的一个 Java agent,它可以帮助你在应用程序中调试异常,而无需付出运行时开销(与 Hooks.onOperatorDebug()
不同)。
⚠️ 该项目正处于孵化阶段,未来可能成为一个独立项目或 https://github.com/reactor/reactor-core 的一个模块,也可能不会。
它(通过字节码转换)转换链式调用,例如
Flux.range(0, 5)
.single()
转换为
Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));
要启用它,你需要先初始化 agent
ReactorDebugAgent.init();
ℹ️ 由于此实现会在加载你的类时对其进行修改,因此最佳位置是将其放在你的 main(String[])
方法中的所有其他内容之前
public static void main(String[] args) {
ReactorDebugAgent.init();
SpringApplication.run(Application.class, args);
}
我们希望这些工具能让你的开发者生涯更轻松,并让你在使用 Project Reactor 时感到更舒适!