领先一步
VMware 提供培训和认证,助您加速进步。
了解更多此文章是系列文章的一部分
Spring Cloud Sleuth 最近更名为 Micrometer Tracing,它是 Micrometer 项目的一部分。大部分追踪检测都集中在 Micrometer 中新的可观察性 API 下。这些项目的目标是使任何应用程序都可观察——以包含关联标识符的指标、追踪和日志的形式。为了实现这个目标,库需要一种传输上下文信息的方法。当应用程序以任何形式处理异步时,这项任务就变得相当具有挑战性。在上一篇文章中,我们介绍了使用 ThreadLocal 和 Reactor Context 进行上下文传播的基础知识。
Spring Cloud Sleuth 在异步上下文传播的方法上经历了多次调整。由于 Sleuth 处理不需要响应式 API 的第三方检测库,因此建立一种向它们提供上下文的稳定方式至关重要。这些库通常不假设异步,而是依赖于静态的 ThreadLocal 状态。多年来,ThreadLocal 为 JVM 应用程序提供了隐式的上下文存储,以驱动可观察性功能。随着时间的推移,Project Reactor 在底层原语之上引入了各种钩子和包装机制,以使响应式和命令式之间的桥接成为可能。在本文中,我们将探讨将上下文传播到 ThreadLocal 值的方法,并讨论其中可能存在的错误。我们将探讨 Sleuth 采取的方法,并总结我们发现的既高性能又语义健全的良好折衷方案。
在我们描述 Sleuth 引入的方法之前,我们应该考虑在命令式和响应式世界之间进行桥接所带来的危险。
我们在上一篇文章中讨论了 Thread 切换和相关副作用的一些潜在问题。现在我们将通过使用 Reactor 的插件机制来解决可能遇到的问题,从而更多地探索响应式编程的属性。
总结 Spring Cloud Sleuth 遇到的所有问题是一个不断变化的目标。此外,许多组织都有自己的上下文传播机制实现,例如,用于填充 SLF4J 的 MDC。本文无意全面总结所有潜在陷阱。它旨在建立一些直觉,帮助你理解最终的真理:你要么遵守响应式编程规则,要么就准备好在最意想不到的时刻失败。
众所周知,响应式链可以使用不同的 Thread 来传播信号。从我们上一篇文章中学到的,当执行在另一个 Thread 上继续时,在任务运行时恢复上下文是有意义的。Project Reactor 将管理 Thread 的任务委托给 Scheduler。它还提供了一个专用的钩子,允许拦截特定工作单元的调度和运行:Schedulers.onScheduleHook。它的工作方式与上一篇文章中的 WrappedExecutor 类似。让我们看看何时可能考虑使用它的场景。
在第一部分中,我们了解到不能依赖 ThreadLocal 值在响应式链中始终可用。如果我们尝试在订阅时初始化它,并在 doFinally 操作符中清除它呢?我们的应用程序可以使用有限数量的 Thread 处理许多请求,其中一些是并发的。由于这些平台 Thread 可以重用,我们需要在处理另一个请求之前清理与一个请求相关的任何 ThreadLocal 状态,以免不同的请求使用残留的关联标识符。
接下来的代码示例是对我们之前编写的未使用 Reactor Context 的代码的修改。
handleRequest 方法的潜在实现可能如下所示
Mono<Void> handleRequest() {
return Mono.fromSupplier(() -> {
initRequest(); // <1>
return "test-product";
}).flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.doOnSuccess(v -> log("Done."))
.doFinally(signalType ->
CORRELATION_ID.remove()); // <2>
}
在 <1> 中我们设置 ThreadLocal 值,在 <{2}gt; 中我们尝试清除它。
我们还修改了我们执行的操作,以便能够在 addProduct 方法中添加人工延迟
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return Mono.<Void>empty()
.delaySubscription(Duration.ofMillis(10),
Schedulers.single()); // <1>
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return Mono.just(true);
});
}
请注意,在 <1> 中,我们通过延迟订阅引入了异步性,并使用 Schedulers.single() 在 10ms 后启动订阅。delaySubscription 将使用该 Scheduler 的底层 ScheduledExecutorService,并在延迟后在另一个 Thread 上启动订阅。
从上一篇文章中我们知道,在这种情况下我们需要恢复 ThreadLocals,所以我们使用提到的 Scheduler 插件来实现
Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);
在 Reactor 的 Scheduler 上执行的每个任务都将恢复 ThreadLocal 值,所以我们应该很安全。
现在,让我们模仿两个连续的请求,中间用一条日志验证 CORRELATION_ID 是否已正确清除
log("Got first request, calling handler");
handleRequest().block();
log("Got second request, calling handler");
log("There should be no correlationId on this line!");
handleRequest().block();
日志如下:
[ main][ null] Got first request, calling handler // <1>
[ main][ 8658769170992364531] Adding product: test-product
[ single-1][ 8658769170992364531] Notifying shop about: test-product
[ single-1][ 8658769170992364531] Done.
[ main][ 8658769170992364531] Got second request, calling handler
[ main][ 8658769170992364531] There should be no correlationId on this line!
[ main][ 711436174608061530] Adding product: test-product
[ single-1][ 711436174608061530] Notifying shop about: test-product
[ single-1][ 711436174608061530] Done.
与 “test-product” 处理相关的日志具有正确的关联标识符。然而,请求之间发生了什么?我们期望 ThreadLocal 在 doFinally 中被清除。不幸的是,请求之间的日志仍然包含一个标识符。那么发生了什么?
请注意,“Notifying shop about” 日志发生在 single-1 Thread 上。信号是在那个 Thread 上传递的,所以我们在那里清除了 ThreadLocal,但主 Thread 仍然被污染(在 <1>)。现在,我们处理程序之外的执行可以使用错误的关联标识符用于不同的目的。我们可以尝试通过在服务器层(分派请求)添加清理逻辑来缓解这个问题,并确保用于请求的每个 Thread 都未被污染。如果我们的管道更复杂,这并不能保存所有其他潜在的 Scheduler Thread。
这种方法在允许应用程序在响应式链中透明地使用 ThreadLocal 值方面取得了相当大的进展。从性能角度来看,它也是合理的,因为它不会在每个操作符周围设置和重置 ThreadLocal,而只在处理项目时发生 Thread 切换时才进行。然而,它也表明仍有一些未解决的副作用。在接下来的示例中,我们将体验并尝试解决不同的场景。
使用 ThreadLocal 作为上下文元数据传输机制的策略,另一个常见问题是当使用了不同于 Reactor 的异步库并且它自己切换 Thread 时。当执行切换到不受包装的 ExecutorService 控制的不同 Thread 时,上下文会丢失。
让我们看看实际情况。我们将重用目前为止看到的大部分代码,只对 notifyShop 方法进行一次更改。它现在使用以下方法模拟远程调用:
Mono<Boolean> makeRequest(String productName) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}
所以 notifyShop 看起来像这样
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName);
});
}
如果我们触发一次处理程序
handleRequest().block();
我们得到以下输出
[ main][ 683056557275963407] Adding product: test-product
[ single-1][ 683056557275963407] Notifying shop about: test-product
[l-worker-1][ null] Done!
为了更好的可读性,日志缩短了 Thread 名称,但 l-worker-1 实际上是 ForkJoinPool.commonPool-worker-1 的缩写。
正如我们所看到的,我们的执行在一个我们不控制的通用 ForkJoinPool 上继续。一个问题是,从那个 Thread 切换开始,我们就不再看到我们的关联标识符了,而另一个问题是,我们在一个实际上缺少关联信息的 Thread 上执行了清理。
我们可能可以通过 Executor 或任务包装来(部分)改善这种情况,如上一篇文章所述,但我们并不总是拥有这种控制权——例如,如果我们调用使用 CompletableFuture 的外部库。
我们已经准备好讨论 Sleuth 的策略了。Schedulers.onScheduleHook 在响应式处理中可能发生的非显式 Thread 切换方面提供了有限的能力。我们需要对操作的执行有更多的控制。我们将通过引入两种外部服务通信方式来演示其局限性。
addProduct 方法现在发出远程请求,并在我们控制的 Scheduler 上发布结果。将繁重计算卸载到不同的 Thread 上是很常见的。为此,我们使用 publishOn 操作符
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return makeRequest(productName)
.publishOn(Schedulers.single())
.then();
});
}
notifyShop 方法模拟将结果映射到可能多个 Publisher。这在响应是复合结果的典型场景中很常见——例如,如果响应是 JSON 数组,我们打算将每个项目作为对另一个服务的单独调用进行处理或丰富单个结果。让我们使用简化版本,只取一个结果
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName)
.flatMapMany(result ->
Flux.just("result")
.map(x -> result))
.take(1)
.single();
});
}
现在让我们跳过处理程序,手动启动关联标识符,然后订阅这些链
initRequest();
addProduct("test-product")
.doOnSuccess(v -> log("Added."))
.block();
initRequest();
notifyShop("test-product")
.doOnSuccess(v -> log("Notified."))
.block();
我们看看输出
[ main][ 6606077262934500649] Adding product: test-product
[ single-1][ null] Added.
[ main][ 182687922231622589] Notifying shop about: test-product
[l-worker-1][ null] Notified.
这是预期的,因为 doOnSuccess 中发生的两个日志都是由于 CompletableFuture 在 ForkJoinPool Thread 上传递值而触发的。尽管我们有 Scheduler 包装,但结果首先在不受我们控制的 Thread 上传递,因此即使 addProduct 中使用的 publishOn 也没有帮助。
我们能做些什么来改善这种情况吗?Reactor 有一个细粒度的插件系统,它允许我们装饰任何管道中的任何操作符。我们可以尝试使用它来恢复关联标识符。
插件将使用自定义的 Subscriber 实现,它在订阅时捕获关联标识符
static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
final CoreSubscriber<T> delegate;
Long correlationId;
public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
this.correlationId = CORRELATION_ID.get();
}
@Override
public void onNext(T t) {
CORRELATION_ID.set(this.correlationId);
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
CORRELATION_ID.set(this.correlationId);
delegate.onError(t);
}
@Override
public void onComplete() {
CORRELATION_ID.set(this.correlationId);
delegate.onComplete();
}
}
要更改操作符,使其实现将调用委托给实际的 Subscriber 实例,我们可以使用 Operators.lift 方法
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber));
首先,我们将尝试一个插件,它允许我们更改链中的每个操作符
Hooks.onEachOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
我们再运行一次示例,检查输出
[ main][ 7295088917002526647] Adding product: test-product
[ single-1][ 7295088917002526647] Added.
[ main][ 383851863754448684] Notifying shop about: test-product
[l-worker-1][ 383851863754448684] Notified.
哇!我们甚至在如此复杂的场景中也成功获取了关联标识符。订阅的初始动作捕获了 ThreadLocal 值,并在每一步恢复了它。即使 notifyShop 方法中使用的 flatMap (它自己订阅)也有效,因为在订阅到另一个 Thread 之前,ThreadLocal 会从之前的捕获中填充!这听起来确实很棒,但这种方法也有缺点。第一个也是最明显的缺点是性能。传播发生在每个操作符中。使用这种技术,我们首先装饰每个对象,并在每一步进行 ThreadLocal 访问。所有这些操作都很昂贵。要了解更多信息,请观看Oleh 关于响应式性能的演讲。
所以我们尝试另一种方法。这次,我们将使用一个插件,它附加到链中被认为是最后一个操作符——一个直接在 subscribe() 调用之前的操作符。
关于响应式链,可以观察到一点:在同步操作符的情况下,我们不需要在每个单独的操作(例如,filter 或 map)中恢复最初捕获的上下文,而只需在链中的最后一个操作符被订阅时恢复。只要不涉及 Thread 边界交叉,这种机制就有效。为了支持可能跨越这些边界的操作符(例如,flatMap,它涉及订阅一个新的 Publisher),有一个特殊的技巧。它将映射结果视为它们所操作的内部 Publishers 的最后一个操作符。
我们试试这个方法
Hooks.onLastOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
并运行
[ main][ 2122332013640150746] Adding product: test-product
[ single-1][ 2122332013640150746] Added.
[ main][ 459477771449275997] Notifying shop about: test-product
[l-worker-1][ null] Notified.
它在 addProduct 中使用了 publishOn 成功了,但在 notifyShop 中的 flatMap 失败了。
让我们分析一下 notifyShop 失败的原因。我们对 block() 的调用捕获了 ThreadLocal 并为每个向下游传输的信号恢复了它。在 flatMapMany 中完成的映射,我们正在处理我们之前提到的异步边界。我们的插件实际上应用于内部源(Flux.just().map().single())。
然而,尽管自定义 Subscriber 在 flatMapMany 内部被调用并尝试恢复 ThreadLocal 值,这些努力仍然没有帮助。触发内部订阅的信号是在我们不控制的 Thread 上启动的,所以我们根本没有 ThreadLocal 可供捕获。
在 publishOn 操作符的情况下则不同。它的订阅开始于我们控制的 Thread。因此,当由于 makeRequest() 方法而处理信号时,它只在受我们控制的 Thread 上传递。.doOnSuccess(v -> log("Added.")) 的执行发生在与 flatMapMany 不同步的 Thread 边界之后。
这就是为什么 onEachOperator 涵盖了更多情况——它在每一步恢复初始值,无论异步边界如何。尽管如此,onLastOperator 的性能略优于 onEachOperator。
还有一个插件,我们可以用它来完全控制响应式传递,如果我们将它与之前的钩子结合使用的话。Spring Cloud Sleuth 也使用了它。我们正在考虑最近引入的一个插件,Hooks.addQueueWrapper。不过,我们不会详细探讨它。它可以解决 Reactor 中工作窃取机制引入的问题。像 flatMap 这样的异步操作符,可以在向操作符传递信号的各种 Thread 上取得进展。想象一个背压场景,处理过程暂停了一段时间。在某个时刻,一个新的 Thread 可以接管并发出 Subscription.request(n) 调用,这会导致累积的值立即传递。现在你可能会问自己:“什么累积值?”这是一个好问题。Reactor 中的许多操作符都使用内部 Queue 来实现背压或保持串行传递语义。因为这些 Queue 的 draining 可以在任何 Thread 上发生,所以上下文信息应该附加到存储在 Queue 中的每个信号——也就是说,我们用于关联目的的 ThreadLocal 值。这就是我们需要 Queue 包装器的原因——当一个值被提交到 Queue 时,我们捕获 ThreadLocal 状态。当一个值从 Queue 中检索时,状态就会恢复。
在展示了在响应式流之外操作的风险以及我们可以使用哪些机制来传播 ThreadLocal 上下文之后,让我们总结一下 Spring Cloud Sleuth 使用的四种策略
DECORATE_ON_EACHDECORATE_ON_LASTDECORATE_QUEUESMANUAL前三种策略试图利用响应式操作符的一些特性,结合 Reactor 的插件机制,并将 ThreadLocal 用作内部传输机制以及与检测库共享上下文数据的方式。前三种策略也假设使用Schedulers.onScheduleHook进行 Scheduler 包装。另一方面,最后一种策略利用了 Reactor 的 Subscriber 绑定的 Context。
此策略使用我们之前看过的Hooks.onEachOperator插件。尽管 Sleuth 添加了许多优化以避免不必要的恢复,但性能影响仍然非常显著。通常,这种方法非常有效。不过,它非常激进,因此如果操作符需要更改上下文,就可能难以应对。下游操作符将看不到更改,因为每个步骤都会恢复初始订阅的上下文。
Hooks.onLastOperator 用于提高性能。这种方法可能因为其提供的灵活性而失败。如果上游操作符修改了上下文,下游操作将看到此更改。这带来了风险,如果某个操作符清除该上下文,那么在该上下文丢失后,直到另一个信号调度到包装的 Scheduler,该上下文都将丢失。另一个风险是我们早期示例中看到的,订阅发生在某个 Thread 上,但请求数据发生在另一个不受 Reactor 控制的 Thread 上。
作为前一种策略的演进,DECORATE_QUEUES 纠正了一些错误场景(请求数据带外发生或多个 Thread 发布数据),但并非所有场景。我们之前描述过Hooks.addQueueWrapper插件的使用方式。Queue 包装的一个已知问题是,在处理项目之后没有可靠的清理方式。上下文在从 Queue 中检索项目时恢复。没有围绕项目处理的范围,该项目会通过下游操作符传播。因此,这种方法也容易污染 ThreadLocal 存储。最近在 draining 过程中进行了一些改进,以限制其影响。
在此策略中,Sleuth 所做的唯一事情是,在订阅时将 ThreadLocal 中的值作为快照捕获到 Reactor 的 Context 中。用户需要自行在相关位置提取该快照,并填充 ThreadLocal,以便检测库可以使用它们。对于受支持的追踪检测,例如 Zipkin 和 Brave,Sleuth 通过使用范围概念来恢复 ThreadLocal——ThreadLocal 在检测期间恢复,并在快照关闭后立即消失。这是性能最高的方法,尽管它需要用户手动处理(正如其名称所示)。
在局部范围内使用 Reactor Context 填充 ThreadLocal 既高效又符合响应式链的工作方式。将上下文与 Subscriber 关联是一种行之有效的方法,它不会意外地导致上下文数据丢失。在下一篇文章中,我们将展示 Reactor 3.5 和 Micrometer 1.10 如何将手动方法提升到新的水平,并提供一种跨响应式和命令式边界进行上下文传播的结构化方法。