领先一步
VMware 提供培训和认证,助您加速前进。
了解更多本文是系列文章的一部分
Spring Cloud Sleuth 最近成为了 Micrometer Tracing,它是 Micrometer 项目的一部分。大多数跟踪插桩都集中在 Micrometer 中,位于新的可观测性 API之下。这些项目的目标是让任何应用程序都具备可观测性——以包含关联标识符的指标、跟踪和日志的形式。为了实现这一目标,库需要一种方式来传输上下文信息。当应用程序以任何形式处理异步时,这项任务就变得相当具有挑战性。在上一篇文章中,我们回顾了使用 ThreadLocal
和 Reactor Context
进行上下文传播的基础知识。
Spring Cloud Sleuth 在处理异步上下文传播的方式上经历了多次重大调整。由于 Sleuth 需要与不一定具有响应式 API 的第三方插桩库打交道,因此拥有一种既定的方式来使上下文对它们可用至关重要。这些库通常不假设异步,而是依赖静态的 ThreadLocal
状态。多年来,ThreadLocal
为 JVM 应用程序提供了隐式的上下文存储,用于驱动可观测性功能。随着时间的推移,Project Reactor 在底层原语之上引入了各种钩子(hook)和包装(wrapping)机制,以便实现响应式和命令式之间的桥接。在本文中,我们将探讨将上下文传播到 ThreadLocal
值的方法,并讨论它们可能出现的问题。我们将探讨 Sleuth 采取的方法,并总结我们发现的一种兼顾性能和语义合理性的良好折衷方案。
在描述 Sleuth 引入的方法之前,我们应该考虑在命令式世界和响应式世界之间进行桥接所存在的危险。
在上一篇文章中,我们讨论了一些 Thread
切换和相关副作用的潜在问题。现在,我们将利用 Reactor 的插件机制来解决我们可能遇到的问题,从而更深入地探索响应式编程的特性。
总结 Spring Cloud Sleuth 遇到的所有问题是一个不断变化的目标。此外,许多组织都有自己的上下文传播机制实现,例如用于填充 SLF4J 的 MDC
。本文无意全面总结所有潜在陷阱。它旨在建立一些直觉,帮助你理解最终的真理:要么遵循响应式编程规则,要么准备在最意想不到的时刻失败。
正如我们所知,响应式链可以使用不同的 Thread
来传播信号。从上一篇文章中我们了解到,当执行在另一个 Thread
上继续时,在任务运行时恢复上下文是合理的。Project Reactor 将管理 Thread
的任务委托给 Scheduler
。它还提供了一个专用的钩子,允许拦截特定工作单元的调度和运行:Schedulers.onScheduleHook
。它的工作方式类似于上一篇文章中的 WrappedExecutor
。让我们来看一个可能考虑使用它的场景。
在第一部分中,我们了解到在响应式链中不能持续依赖 ThreadLocal
值可用。如果我们尝试在订阅时初始化它,并在 doFinally
操作符中清除它呢?我们的应用程序可以使用有限数量的 Thread
来处理许多请求,其中一些是并发的。由于这些平台 Thread
可以被重用,我们需要在处理另一个请求之前对与前一个请求相关的任何 ThreadLocal
状态进行清理,以确保不同的请求不会使用残留的关联标识符。
接下来的代码示例是对上一部分中我们编写的未使用 Reactor Context
的代码进行的修改。
handleRequest
方法的一种潜在实现可能如下所示
Mono<Void> handleRequest() {
return Mono.fromSupplier(() -> {
initRequest(); // <1>
return "test-product";
}).flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.doOnSuccess(v -> log("Done."))
.doFinally(signalType ->
CORRELATION_ID.remove()); // <2>
}
在 <1>
中,我们设置了 ThreadLocal
的值,在 <2>
中,我们尝试清除它。
我们还修改了执行的操作,以便能够在 addProduct
方法中添加一个人工延迟
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return Mono.<Void>empty()
.delaySubscription(Duration.ofMillis(10),
Schedulers.single()); // <1>
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return Mono.just(true);
});
}
注意,在 <1>
中,我们通过延迟订阅并使用 Schedulers.single()
在 10ms 后启动订阅来引入异步。delaySubscription
将使用该 Scheduler
底层的 ScheduledExecutorService
并在延迟后在另一个 Thread
上启动订阅。
从上一篇文章中我们知道,在这种情况下我们需要恢复 ThreadLocals
,因此我们使用提到的 Scheduler
插件来实现这一点
Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);
在 Reactor 的 Scheduler
上执行的每个任务都会恢复 ThreadLocal
的值,因此我们应该安全无虞。
现在,让我们模拟两个顺序请求,它们之间有一个日志,用于验证 CORRELATION_ID
是否已正确清除
log("Got first request, calling handler");
handleRequest().block();
log("Got second request, calling handler");
log("There should be no correlationId on this line!");
handleRequest().block();
日志如下所示
[ main][ null] Got first request, calling handler // <1>
[ main][ 8658769170992364531] Adding product: test-product
[ single-1][ 8658769170992364531] Notifying shop about: test-product
[ single-1][ 8658769170992364531] Done.
[ main][ 8658769170992364531] Got second request, calling handler
[ main][ 8658769170992364531] There should be no correlationId on this line!
[ main][ 711436174608061530] Adding product: test-product
[ single-1][ 711436174608061530] Notifying shop about: test-product
[ single-1][ 711436174608061530] Done.
与 “test-product”
处理相关的日志具有正确的关联标识符。然而,在请求之间发生了什么?我们期望 ThreadLocal
在 doFinally
中被清除。不幸的是,请求之间的日志仍然包含一个标识符。那么发生了什么?
注意 “Notifying shop about”
的日志发生在 Thread
single-1
上。信号是在该 Thread
上传递的,所以我们在那里清除了 ThreadLocal
,但主 Thread
仍然被污染了(在 <1>
中)。现在,处理程序之外的执行可以使用错误的关联标识符用于不同的目的。我们可以尝试通过在服务器层(负责分派请求)添加清理逻辑来缓解此问题,并确保用于请求的每个 Thread
不被污染。但是,如果我们的管道更复杂,这并不能挽救所有其他潜在的 Scheduler
Thread
。
这种方法在允许应用程序在响应式链中透明地使用 ThreadLocal
值方面取得了相当大的进展。从性能角度来看,它也是合理的,因为它不会在每个操作符周围设置和重置 ThreadLocal
,而只在处理项目时发生 Thread
切换时进行。然而,它也表明仍然存在一些未解决的副作用。在接下来的示例中,我们将体验并尝试解决不同的场景。
使用 ThreadLocal
作为上下文元数据传输机制的策略还有一个常见问题,即使用了 Reactor 之外的异步库,并且该库自行切换了 Thread
。当执行切换到不受包装的 ExecutorService
控制的不同 Thread
时,上下文就会丢失。
让我们实际看看。我们将重用目前为止看过的大部分代码,但对 notifyShop
方法进行一项更改。它现在通过使用以下方法来模拟远程调用
Mono<Boolean> makeRequest(String productName) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}
所以 notifyShop
看起来像这样
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName);
});
}
如果我们触发处理程序一次
handleRequest().block();
我们得到以下输出
[ main][ 683056557275963407] Adding product: test-product
[ single-1][ 683056557275963407] Notifying shop about: test-product
[l-worker-1][ null] Done!
日志缩短了 Thread
名称以提高可见性,但 l-worker-1
实际上是 ForkJoinPool.commonPool-worker-1
的缩短版。
正如我们所见,我们的执行在一个我们无法控制的通用 ForkJoinPool
上继续进行。一个问题是,从该 Thread
切换开始,我们就再也看不到我们的关联标识符了;另一个问题是,我们在一个实际上缺少关联信息的 Thread
上执行了清理操作。
我们可以通过 Executor
或任务包装来潜在地改善(部分)情况,如前一篇文章所述,但我们并非总是拥有这样的控制权——例如,如果我们调用一个使用 CompletableFuture
的外部库。
我们几乎准备好讨论 Sleuth 的策略了。Schedulers.onScheduleHook
对于响应式处理中可能发生的非显而易见的 Thread
切换提供了有限的能力。我们需要对操作的执行有更多的控制。我们将通过引入两种外部服务通信方式来演示这些限制。
addProduct
方法现在发起一个远程请求,并在我们控制的 Scheduler
上发布结果。将繁重的计算转移到不同的 Thread
上是很常见的做法。为此,我们使用了 publishOn
操作符
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return makeRequest(productName)
.publishOn(Schedulers.single())
.then();
});
}
notifyShop
方法模拟将结果映射到可能多个 Publisher
中。这在响应是一个复合结果的情况下是典型的场景——例如,如果响应是一个 JSON 数组,我们打算将每个项作为对另一个服务的单独调用进行处理,或者丰富单个结果。我们使用一个简化版本,只取一个结果
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName)
.flatMapMany(result ->
Flux.just("result")
.map(x -> result))
.take(1)
.single();
});
}
现在我们跳过处理程序,手动初始化关联标识符,然后订阅这些链
initRequest();
addProduct("test-product")
.doOnSuccess(v -> log("Added."))
.block();
initRequest();
notifyShop("test-product")
.doOnSuccess(v -> log("Notified."))
.block();
让我们看看输出
[ main][ 6606077262934500649] Adding product: test-product
[ single-1][ null] Added.
[ main][ 182687922231622589] Notifying shop about: test-product
[l-worker-1][ null] Notified.
这是预料之中的,因为 doOnSuccess
中发生的两个日志都是由 CompletableFuture
在 ForkJoinPool
Thread
上传递值触发的。即使我们有 Scheduler
包装,结果首先是在我们无法控制的 Thread
上传递的,因此即使在 addProduct
中使用 publishOn
也无济于事。
我们能做些什么来改善这种情况吗?Reactor 有一个细粒度的插件系统,它允许我们在任何管道中装饰任何操作符。我们可以尝试使用它来恢复关联标识符。
这些插件将使用自定义的 Subscriber
实现,该实现在订阅时捕获关联标识符
static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
final CoreSubscriber<T> delegate;
Long correlationId;
public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
this.correlationId = CORRELATION_ID.get();
}
@Override
public void onNext(T t) {
CORRELATION_ID.set(this.correlationId);
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
CORRELATION_ID.set(this.correlationId);
delegate.onError(t);
}
@Override
public void onComplete() {
CORRELATION_ID.set(this.correlationId);
delegate.onComplete();
}
}
要更改操作符,使其实现将调用委托给实际的 Subscriber
实例,我们可以使用 Operators.lift
方法
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber));
首先,我们尝试一个插件,它允许我们修改链中的每一个操作符
Hooks.onEachOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
让我们再次运行示例并查看输出
[ main][ 7295088917002526647] Adding product: test-product
[ single-1][ 7295088917002526647] Added.
[ main][ 383851863754448684] Notifying shop about: test-product
[l-worker-1][ 383851863754448684] Notified.
哇!即使在如此复杂的场景中,我们也设法获取了关联标识符。最初的订阅行为捕获了 ThreadLocal
值并在每一步都恢复了它。即使 notifyShop
方法中使用的 flatMap
(它会自行订阅)也奏效了,因为在订阅另一个 Thread
之前,ThreadLocal
已从先前的捕获中填充!这听起来确实很棒,但这种方法也有缺点。第一个也是最明显的缺点是性能。传播发生在每个操作符上。使用这种技术,我们首先包装(decorate)每个对象,并在每一步都进行 ThreadLocal
访问。所有这些操作都很昂贵。要了解更多信息,请观看Oleh 关于响应式性能的演讲。
所以我们尝试另一种方法。这次,我们将使用一个插件,它会附着到链中被视为最后一个操作符的操作符上——也就是紧接在 subscribe()
调用之前的操作符。
对于响应式链可以得出一个观察结论:对于同步操作符,我们不需要在每个单独的操作(例如 filter
或 map
)中恢复最初捕获的上下文,而只需要在链中最后一个操作符被订阅时恢复。只要不涉及 Thread
边界的跨越,这种机制就能奏效。为了支持可能跨越这些边界的操作符(例如 flatMap
,它涉及订阅一个新的 Publisher
),需要一个特殊的技巧。它将映射的结果视为它们操作的内部 Publisher
的最后一个操作符。
让我们尝试这种方法
Hooks.onLastOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
并运行
[ main][ 2122332013640150746] Adding product: test-product
[ single-1][ 2122332013640150746] Added.
[ main][ 459477771449275997] Notifying shop about: test-product
[l-worker-1][ null] Notified.
它在 addProduct
中与 publishOn
一起工作正常,但在 notifyShop
中的 flatMap
上失败了。
让我们分析一下为什么 notifyShop
会失败。我们调用 block()
会捕获 ThreadLocal
并为每个向下游传播的信号恢复它。通过在 flatMapMany
中完成的映射,我们正在处理之前提到的异步边界。我们的插件实际上应用于内部源 (Flux.just().map().single()
)。
然而,这些努力仍然没有奏效,尽管自定义的 Subscriber
在 flatMapMany
内部被调用并尝试恢复 ThreadLocal
的值。触发内部订阅的信号是在我们无法控制的 Thread
上启动的,因此我们根本没有 ThreadLocal
可以捕获。
在使用 publishOn
操作符的情况下则不同。对其的订阅始于我们控制的 Thread
。因此,当处理来自 makeRequest()
方法的结果信号时,它只会在我们控制的 Thread
上传递。.doOnSuccess(v -> log("Added."))
的执行发生在与 flatMapMany
不同步的 Thread
边界之后。
这就是为什么 onEachOperator
能覆盖更多情况——它在每一步都恢复初始值,无论是否存在异步边界。尽管如此,onLastOperator
的性能略优于 onEachOperator
。
还有一个插件,如果将其与之前的钩子结合使用,我们可以完全控制响应式传递。Spring Cloud Sleuth 也使用了它。我们考虑的是最近引入的插件 Hooks.addQueueWrapper
。不过,我们将不详细探讨它。它可以解决 Reactor 中工作窃取机制引入的问题。异步操作符,例如 flatMap
,可以在将信号传递给操作符的各种 Thread
上取得进展。想象一个背压场景,处理暂停了一段时间。在某个时刻,一个新的 Thread
可以接管并发出 Subscription.request(n)
调用,这将导致累积的值立即传递。现在你可以问自己:“什么累积的值?”这是一个好问题。Reactor 中的许多操作符使用内部 Queue
来实现背压或保留串行传递语义。因为这些 Queue
的 draining 可以在任何 Thread
上发生,所以上下文信息应该附加到存储在 Queue
中的每个信号上——也就是说,用于我们关联目的的 ThreadLocal
值。这就是为什么我们需要一个 Queue
包装器——在将值提交到 Queue
时,我们捕获 ThreadLocal
状态。当从 Queue
中检索值时,状态就会恢复。
在展示了在 reactive-streams 术语之外操作的风险以及我们可以用来传播 ThreadLocal
上下文的机制后,让我们总结一下 Spring Cloud Sleuth 使用的四种策略
DECORATE_ON_EACH
DECORATE_ON_LAST
DECORATE_QUEUES
MANUAL
前三种策略试图利用响应式操作符的一些特性,结合 Reactor 的插件机制,并使用 ThreadLocal
作为内部传输机制以及与插桩库共享上下文数据的方式。前三种策略还假定使用 Schedulers.onScheduleHook 进行 Scheduler
包装。另一方面,最后一种策略则利用了 Reactor 绑定到 Subscriber
的 Context
。
此策略使用我们之前见过的 Hooks.onEachOperator
插件。性能影响巨大,尽管 Sleuth 添加了许多优化以在非必要时不进行恢复。通常,这种方法非常有效。但它也非常激进,因此如果某个操作符需要更改上下文,就可能难以处理。下游操作符将看不到更改,因为来自初始订阅的上下文在每一步都被恢复了。
使用 Hooks.onLastOperator
是为了提高性能。这种方法可能会失败,因为它提供了灵活性。如果上游操作符修改了上下文,下游操作会看到更改。这带来了风险,如果某个操作符清除了该上下文,那么在该上下文被调度到包装的 Scheduler
之前,上下文就会丢失。另一个风险是我们之前示例中看到的情况,即订阅发生在某个 Thread
上,但请求数据发生在另一个不受 Reactor 控制的 Thread
上。
DECORATE_QUEUES
是前一种策略的演进,它纠正了一些错误场景(请求数据带外发生或多个 Thread
发布数据),但并非所有场景。我们之前描述过如何使用 Hooks.addQueueWrapper
插件。Queue
包装的一个已知问题是,在处理完一个项目后,没有可靠的方法进行清理。上下文在从 Queue
中检索项目时恢复。没有围绕通过下游操作符传输的项目处理的范围。因此,这种方法也容易污染 ThreadLocal
存储。最近在 draining 过程中进行了一些改进以限制影响。
在这种策略中,Sleuth 唯一做的事情是在订阅时将 ThreadLocal
的值作为快照捕获到 Reactor 的 Context
中。用户需要在相关位置提取该快照并填充 ThreadLocal
,以便将其提供给插桩库。对于支持的跟踪插桩,例如 Zipkin 和 Brave,Sleuth 通过使用作用域(scoping)的概念来恢复 ThreadLocal
—— ThreadLocal
会为插桩而恢复,并在快照关闭后立即消失。这是性能最高的方法,尽管它需要用户进行手动(正如其名称所示)处理。
在局部作用域中使用 Reactor Context 填充 ThreadLocal
被证明既具有高性能,又符合响应式链的工作方式。将上下文与 Subscriber
关联是一种经过验证的方法,不会意外导致上下文数据丢失。在下一篇文章中,我们将展示 Reactor 3.5 和 Micrometer 1.10 如何将手动方法提升到新的水平,并提供一种结构化的方法来跨越响应式和命令式边界传播上下文。