Reactor 调试体验

工程 | Sergei Egorov | 2019年3月28日 | ...

Project Reactor 团队中,我们认为你所依赖库的调试体验与例如功能集或性能同样重要。

今天,我们很高兴地宣布 Reactor 家族中的两个新的实验性项目!

BlockHound - 初来乍到

初学者最常见的错误之一是阻塞那些应该只运行非阻塞代码的 Java 线程(例如,Schedulers.parallel())。
这是最有害的问题之一,因为它可能会阻塞不相关的处理,甚至导致死锁!

考虑以下代码

Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
        .subscribeOn(Schedulers.parallel())
        .map(i -> {
            CountDownLatch latch = new CountDownLatch(1);

            Mono.delay(Duration.ofMillis(i * 100))
                .subscribe(it -> latch.countDown());

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return i;
        })
        .blockLast();

运行这段代码需要多久?1 秒?10 秒?
如果我告诉你它永远不会退出并会创建死锁呢?原因如下:

  1. 我们创建 N * 2 个信号,其中 N 是 JVM 可以使用的 CPU 数量。
  2. 我们使用 Schedulers.parallel 进行订阅 - 这是一个限定大小的线程池,限制为 N 个线程。
  3. 对于每个信号,我们都为 parallel 调度器安排另一个任务(Mono.delay 隐式地使用 Schedulers.parallel)。
  4. 我们的逻辑假设延迟很快就会被处理,并且闩锁最终会解除阻塞。
  5. 然而,所有 N 个线程都会等待它们的闩锁,而延迟任务将永远不会被执行!

即使你没有阻塞所有线程,而只阻塞了一部分,你也会阻止其他不相关的任务前进。最可能的结果是性能会下降。

当你正在 将旧的阻塞代码迁移到响应式方法时,这个问题尤其突出。即使是最有经验的代码审查员也可能在你的 函数颜色相同时未能发现阻塞调用!

这就是我们创建 BlockHound 的原因 - 它是一个 Java Agent,用于检测来自非阻塞线程的阻塞调用。与其他解决方案不同,它会修改原始方法(甚至是 native 方法!),使得即使使用反射也无法调用阻塞方法!

现在,如果按照 文档中的描述 将其添加到我们的应用程序中,我们将得到以下异常:

java.lang.Error: Blocking call! sun.misc.Unsafe#park
  at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
  at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
  at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
  at sun.misc.Unsafe.park(Unsafe.java)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
  at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)

请注意,await 在内部调用了 Unsafe#park 来实现等待逻辑。我们不希望我们的线程被 park 或阻塞,BlockHound 会保护我们免受此害!

如果你想了解实现细节,请阅读 工作原理 页面。
TL;DR: 它包装了原始方法,并且只添加了两个方法调用。

你可以在测试环境或 QA/staging 环境中运行它,而不会损失性能。天哪(Holy Atom),考虑到它的低开销,你甚至可以尝试在生产环境中运行它! :)

BlockHound 可用于 Project Reactor 和 RxJava 2,并且你可以 编写自己的集成

Reactor Debug agent - 生产就绪的组装回溯

由于其函数式编程特性,调试响应式代码有时可能具有挑战性:你不是精确地命令如何处理数据,而是声明数据应该如何在系统中流动。这意味着声明和执行发生在不同的时间点。

你可以在 Simon 的精彩文章中了解更多信息:https://springframework.org.cn/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription

在 Reactor 中,我们称之为“组装时(Assembly time)”和“执行时(Execution time)”。在组装时,你通过调用 myFlux.map(i -> i * 2).filter(5 % i == 1).single() 和其他操作符来“设计”你的管道。稍后,这个“管道定义”将被用来处理由 myFlux 发布的信号。但是当发生错误时会怎样?

你们中的一些人可能已经知道 Hooks.onOperatorDebug()。这是 reactor-core 中一个非常有用的钩子。它可以将堆栈跟踪从这样转换

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
  at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
  at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
  at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

转换成这样

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
   ...
  at java.lang.Thread.run(Thread.java:748)
  Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
  reactor.core.publisher.Flux.single(Flux.java:7380)
  com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
  |_	Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
  |_	Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)

对于以下代码

9:  public class Example {
10:
11:     public static void run() {
12:        Flux.range(0, 5)
13:            .single() // <-- Aha!
14:            .subscribeOn(Schedulers.parallel())
15:            .block();
16:     }
17: }

如你所见,启用调试模式后,我们可以清楚地识别出发生错误的组装操作。它就像一个堆栈跟踪,但(由于执行与组装分离)是一个回溯。

你可能会想:“太棒了,现在我想在生产环境中使用它!”——我们也是这么想的。但是当你使用 Hooks.onOperatorDebug() 时,我们必须在组装时进行繁重的堆栈遍历,以便在你每次调用诸如 .map(...) 这样的操作符时捕获调用点,即使你的代码永远不会抛出错误!这是因为 Java 缺乏调用点跟踪功能,唯一的替代方案是 new Exception().getStackTrace()StackWalker(在 Java 9+ 中)。

显然,我们不能在生产环境中使用这种方法,所以我们为此创建了一个工具!

ReactorDebugAgent 是来自 reactor-tools 项目 的一个 Java agent,它可以帮助你在应用程序中调试异常,而无需付出运行时开销(与 Hooks.onOperatorDebug() 不同)。

⚠️ 该项目正处于孵化阶段,未来可能成为一个独立项目或 https://github.com/reactor/reactor-core 的一个模块,也可能不会。

它(通过字节码转换)转换链式调用,例如

Flux.range(0, 5)
       .single()

转换为

Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));

要启用它,你需要先初始化 agent

ReactorDebugAgent.init();

ℹ️ 由于此实现会在加载你的类时对其进行修改,因此最佳位置是将其放在你的 main(String[]) 方法中的所有其他内容之前

public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

结论

我们希望这些工具能让你的开发者生涯更轻松,并让你在使用 Project Reactor 时感到更舒适!

获取 Spring 新闻简报

通过 Spring 新闻简报保持联系

订阅

先行一步

VMware 提供培训和认证,为你的进步加速。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部