飞行 Flux 2 - 调试注意事项

工程 | Simon Baslé | April 16, 2019 | ...

这篇博客文章是系列文章的第二篇,旨在更深入地探讨 Reactor 的高级概念和内部工作原理。

它源于我的 Flight of the Flux 演讲,我发现其内容更适合博客文章的形式。

其他文章发布后,我将更新下表中的链接,但这是计划中的内容

  1. 装配 vs 订阅
  2. 调试注意事项(本文)
  3. 线程和调度器切换
  4. 内部工作原理:工作窃取
  5. 内部工作原理:操作符融合

如果您想了解 Reactive Streams 的介绍以及 Reactor 的基本概念,请前往网站的 学习部分参考指南

事不宜迟,让我们开始吧

在响应式世界中调试

从命令式、阻塞范式切换到响应式、非阻塞范式带来了好处,但也伴随一些注意事项。其中之一就是调试体验。为什么呢?

主要是因为您已经习惯依赖于经典的 堆栈跟踪,但由于响应式编程的 异步 特性,这个宝贵的工具突然变得价值大减。然而,这并非响应式编程所特有:一旦引入异步代码,您就会在程序中创建调度代码和异步执行代码之间的边界。

用普通异步代码演示问题

让我们来看一个使用 ExecutorServiceFuture 的例子(这里没有 Reactor 代码)

	private static void imperative() throws ExecutionException, InterruptedException {
		final ScheduledExecutorService executor =
				Executors.newSingleThreadScheduledExecutor();

		int seconds = LocalTime.now().getSecond();
		List<Integer> source;
		if (seconds % 2 == 0) {
			source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
		}
		else if (seconds % 3 == 0) {
			source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
		}
		else {
			source = Arrays.asList(1, 2, 3, 4);
		}

		executor.submit(() -> source.get(5))  //line 76
		        .get();
	}

这个例子有点牵强,但让我们想象一下,在代码中有三条路径中的两条可能导致异步任务抛出 IndexOutOfBoundsException... 堆栈跟踪会有多大帮助呢?

java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at Scratch.imperative(Scratch.java:77)
	at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
	at Scratch.lambda$imperative$0(Scratch.java:76)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

我们看到

  • Futureget() 方法抛出了 ExecutionException
  • 原因是 IndexOutOfBoundsException
  • 抛出异常的代码位于 submit(() -> source.get(5)) lambda 表达式 第 76 行
  • 它在 FutureTask 中执行,来自于一个叫做 ThreadPoolExecutor 的东西,它本身又运行在 Thread 中...
  • 我们有两个潜在的源可能导致这个问题,但不知道是哪一个罪魁祸首(在调用 submit() 之前的测试中采用了哪条路径)。

不太有用 :-(

在 Reactor 中演示问题

如果我们在 Reactor 中寻找上述代码的等价实现,我们可以得到这个

	private static void reactive() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.subscribeOn(Schedulers.parallel())
		      .block(); //line 97
	}

这会触发以下堆栈跟踪

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		at Scratch.reactive(Scratch.java:97)
		at Scratch.main(Scratch.java:51)
  • 我们再次看到了 ArrayIndexOutOfBoundsException,这暗示着源对于 MonoElementAt 操作符来说太短了
  • 我们看到它来自于一个 onComplete 事件,该事件本身由 request 触发... 以及 reactor.core.publisher 中的一堆其他步骤
  • 对这些 reactor 方法稍加熟悉后,我们或许可以推断出管道由 range (FluxRange.subscribe)、elementAtsubscribeOn 组成...
  • 抛出异常的代码似乎是在 ThreadPoolExecutor 的工作 Thread 中执行的
  • 线索在这里中断了...

更糟的是,即使我们去掉了 subscribeOn,我们仍然无法发现触发的是两种可能的错误路径中的哪一个

	private static void reactiveNoSubscribeOn() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.block(); //line 116
	}

产生堆栈跟踪

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
	at Scratch.main(Scratch.java:52)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		... 2 more

这是因为,正如我们之前所见,代码在 装配订阅 之间存在一个额外的“边界”。线索只能追溯到 订阅 的点(这里是 block()):-(

因此,在异步世界中使用堆栈跟踪进行分析和调试会更困难,而在 Reactor 中甚至更难(因为它既是异步的,又采用默认惰性的装配 vs 订阅方式)。但幸运的是,库中提供了一些工具来尝试缓解这一问题。

改进方法

回归经典:log

还记得您在命令式代码中散布 print 语句的日子吗?它可能不像启动步进调试器那样酷炫,但有时它正是您需要的快速而粗糙的解决方案。

在 Reactor 中,您有 log() 操作符

  • 它会记录 Reactive Stream 信号:onNextonCompleteonError(甚至包括 onSubscribecancelrequest!)
  • 您可以调整它,只白名单其中的一部分信号
  • 您也可以选择一个特定的 Logger

简而言之,log 是获取序列中某一步骤正在发生的情况的便捷概览(鸟瞰图)的快速而粗糙的解决方案。在开发过程中请随意使用它,并可以通过为每个 log 调用指定一个“名称”来区分它们。

使用 log(String) 可以用来获取导致错误的源的提示

	private static void log() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .log("source A");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .log("source B");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .log("source C");
		}

		source.block(); //line 138
	}

堆栈跟踪本身并没有更有趣的信息(除了提到了 MonoLogFuseable 类之外),但日志本身包含了这个有趣的小细节

17:01:23.711 [main] INFO  source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO  source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C - 
java.lang.IndexOutOfBoundsException: null

至少我们得到了硬编码的 source C 标签...

使用调试模式丰富堆栈跟踪

Reactor 中另一种可用的方法是尝试在运行时堆栈跟踪中找回装配信息。

这可以通过 Hooks 类激活所谓的“调试模式”来实现

Hooks.onOperatorDebug();

它有什么作用?它使得每个操作符实例化(即装配)捕获一个堆栈跟踪并保留下来供以后使用。

如果一个 onError 到达某个操作符,它会将该装配堆栈跟踪附加到 onErrorThrowable 上(作为 suppressed Exception)。结果是,当您查看堆栈跟踪时,您将获得运行时和装配时更完整的视图。

开启调试模式后,在我们之前的例子中,我们将能够看到采用了哪条装配路径以及实际处理的是哪个源

	private static void hook() {
		Hooks.onOperatorDebug();
		try {
			int seconds = LocalTime.now().getSecond();
			Mono<Integer> source;
			if (seconds % 2 == 0) {
				source = Flux.range(1, 10)
				             .elementAt(5); //line 149
			}
			else if (seconds % 3 == 0) {
				source = Flux.range(0, 4)
				             .elementAt(5); //line 153
			}
			else {
				source = Flux.just(1, 2, 3, 4)
				             .elementAt(5); //line 157
			}

			source.block(); //line 160
		}
		finally {
			Hooks.resetOnOperatorDebug();
		}
	}

这会产生以下堆栈跟踪

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.hook(Scratch.java:160)
	at Scratch.main(Scratch.java:54)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
	reactor.core.publisher.Flux.elementAt(Flux.java:4367)
	Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
	|_	Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)

注意最后一行了吗?耶 :-D

使用 checkpoint 降低开销

使用 Hooks.onOperatorDebug() 的一个缺点是它会 为应用程序中使用的每一个操作符 执行装配堆栈跟踪捕获。填充单个堆栈跟踪是一项昂贵的操作,因此毫无疑问这会对性能产生严重影响。因此,这只推荐在开发环境中使用。

幸运的是,如果您能识别出代码库中容易出现此类源模糊性的部分,可以稍微降低开销。

通过使用 checkpoint() 操作符,可以在代码库的特定点才激活装配跟踪捕获。如果您使用 checkpoint(String) 为检查点提供一个唯一且有意义的名称,甚至可以完全不做堆栈跟踪填充。

	private static void checkpoint() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .checkpoint("source range(1,10)");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .checkpoint("source range(0,4)");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .checkpoint("source just(1,2,3,4)");
		}

		source.block(); //line 186
	}

这会产生以下堆栈跟踪

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.checkpoint(Scratch.java:186)
	at Scratch.main(Scratch.java:55)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].

请注意 is identified by light checkpoint [source just(1,2,3,4)].,这指出了我们的罪魁祸首(因为我们为检查点使用了有意义的描述)。

结论

在本文中,我们了解到堆栈跟踪在异步编程中可能不太有用。Reactor 允许以惰性方式构建响应式序列,这使得这种影响进一步加剧。

我们研究了可能遇到的最糟糕情况,以及几种可以减轻此问题的方法。

完整的代码可以在一个 gist 这里找到。

在下一部分中,我们将了解调度器以及如何在线程之间切换。

与此同时,祝您响应式编码愉快!

订阅 Spring 电子报

通过 Spring 电子报保持联系

订阅

抢占先机

VMware 提供培训和认证,助您飞速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部