领先一步
VMware 提供培训和认证,助您加速进步。
了解更多这篇文章是系列文章的一部分
Spring Cloud Sleuth 最近成为 Micrometer Tracing,它是 Micrometer 项目的一部分。大多数跟踪检测都集中在 Micrometer 中,位于新的可观察性 API之下。这些项目的目的是实现任何应用程序的可观察性——以指标、跟踪和包含相关标识符的日志的形式。为了实现这一目标,库需要一种传输上下文信息的方法。当应用程序以任何形式处理异步时,此任务就会变得非常具有挑战性。在上一篇文章中,我们介绍了使用ThreadLocal
和 Reactor Context
进行上下文传播的基础知识。
Spring Cloud Sleuth 在异步上下文传播的方法方面经历了许多转变。由于 Sleuth 处理不需要具有响应式 API 的第三方检测库,因此建立一种方法使上下文可供它们使用至关重要。这些库通常不假设异步性,而是依赖于静态ThreadLocal
状态。多年来,ThreadLocal
为 JVM 应用程序提供了隐式、上下文存储,用于驱动可观察性功能。随着时间的推移,Project Reactor 在底层原语之上引入了各种钩子和包装机制,以使响应式和命令式之间的桥接成为可能。在本文中,我们旨在探讨将上下文传播到ThreadLocal
值的方法,并讨论它们可能存在的错误。我们将探讨 Sleuth 采用的方法,并以我们发现的良好折衷方案的总结作为结尾,该方案具有良好的性能和语义完整性。
在描述 Sleuth 引入的方法之前,我们应该考虑在命令式和响应式世界之间桥接所存在的危险。
我们在上一篇文章中讨论了Thread
切换和相关副作用的一些潜在问题。现在,我们将使用 Reactor 的插件机制进一步探讨响应式编程的特性,以解决我们可能遇到的问题。
总而言之,Spring Cloud Sleuth 遇到的所有问题都是一个不断变化的目标。此外,组织中还有许多实现,它们实现了自己的上下文传播机制,例如,用于填充 SLF4J 的MDC
。本文并非旨在全面总结所有潜在的陷阱。它更旨在培养一些直觉,帮助您理解最终的真相:要么遵循响应式编程规则,要么准备好在最意想不到的时刻失败。
众所周知,响应式链可以使用不同的Thread
传播信号。根据我们在上一篇文章中了解到的内容,当在另一个Thread
上继续执行时,在运行任务时恢复上下文是有意义的。Project Reactor 将管理Thread
的任务委托给Scheduler
。它还提供了一个专用的钩子,允许拦截特定工作单元的调度和运行:Schedulers.onScheduleHook
。它的工作方式类似于上一篇文章中的WrappedExecutor
。让我们看看可能考虑使用它的场景。
在第 1 部分中,我们了解到我们不能依赖ThreadLocal
值在响应式链中始终可用。如果我们尝试在订阅时初始化它,并在doFinally
操作符中清除它会怎么样?我们的应用程序可以使用有限数量的Thread
处理许多请求,其中一些是并发的。由于这些平台Thread
可以重复使用,因此我们需要在处理另一个请求之前清除与一个请求关联的任何ThreadLocal
状态,以防止不同的请求使用残留的相关标识符。
以下代码示例是对我们在上一部分编写的代码的修改,在其中我们没有使用 Reactor Context
。
handleRequest
方法的潜在实现可能如下所示
Mono<Void> handleRequest() {
return Mono.fromSupplier(() -> {
initRequest(); // <1>
return "test-product";
}).flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.doOnSuccess(v -> log("Done."))
.doFinally(signalType ->
CORRELATION_ID.remove()); // <2>
}
在<1>
中,我们设置ThreadLocal
值,在<2>
中,我们尝试清除它。
我们还修改了我们执行的操作,以便能够在addProduct
方法中添加人工延迟
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return Mono.<Void>empty()
.delaySubscription(Duration.ofMillis(10),
Schedulers.single()); // <1>
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return Mono.just(true);
});
}
请注意,在<1>
中,我们通过延迟订阅并使用Schedulers.single()
在 10 毫秒后启动订阅来引入异步性。delaySubscription
将使用该Scheduler
的底层ScheduledExecutorService
并在延迟后在另一个Thread
上启动订阅。
从上一篇文章中,我们知道在这种情况下我们需要恢复ThreadLocals
,因此我们使用提到的Scheduler
插件来实现这一点
Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);
在 Reactor 的Scheduler
上执行的每个任务都将恢复ThreadLocal
值,因此我们应该是安全的。
现在,让我们模拟两个顺序请求,它们之间用一个日志分隔,该日志验证CORRELATION_ID
是否已正确清除
log("Got first request, calling handler");
handleRequest().block();
log("Got second request, calling handler");
log("There should be no correlationId on this line!");
handleRequest().block();
日志如下所示
[ main][ null] Got first request, calling handler // <1>
[ main][ 8658769170992364531] Adding product: test-product
[ single-1][ 8658769170992364531] Notifying shop about: test-product
[ single-1][ 8658769170992364531] Done.
[ main][ 8658769170992364531] Got second request, calling handler
[ main][ 8658769170992364531] There should be no correlationId on this line!
[ main][ 711436174608061530] Adding product: test-product
[ single-1][ 711436174608061530] Notifying shop about: test-product
[ single-1][ 711436174608061530] Done.
与“test-product”
处理相关的日志具有正确相关标识符。但是,请求之间发生了什么?我们期望在doFinally
中清除ThreadLocal
。不幸的是,请求之间的日志仍然包含标识符。那么发生了什么呢?
请注意“Notifying shop about”
日志发生在Thread
single-1
上。信号是在该Thread
上传递的,因此我们清除了那里的ThreadLocal
,但留下了主Thread
被污染(在<1>
中)。现在,我们处理程序之外的执行可以将错误的相关标识符用于不同的目的。我们可以尝试通过在服务器层(调度请求)添加清理逻辑来缓解此问题,并确保用于请求的每个Thread
都没有被污染。如果我们的管道更复杂,这将无法保存所有其他潜在的Scheduler
Thread
。
这种方法在允许应用程序在响应式链中透明地使用ThreadLocal
值方面取得了很大进展。从性能的角度来看,它也是合理的,因为它不会在每个操作符周围设置和重置ThreadLocal
,而只会在处理项目时发生Thread
切换时设置和重置。但是,它也表明有一些副作用仍未解决。在接下来的示例中,我们将体验并尝试解决不同的场景。
使用ThreadLocal
作为上下文元数据传输机制的策略的另一个常见问题是,当使用与 Reactor 不同的异步库并且它自行切换Thread
时。当执行更改为不受包装的ExecutorService
控制的不同Thread
时,上下文将丢失。
让我们看看它的实际操作。我们将重用迄今为止看到的大部分代码,并在notifyShop
方法中进行一项更改。它现在通过使用以下方法模拟远程调用
Mono<Boolean> makeRequest(String productName) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}
因此notifyShop
如下所示
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName);
});
}
如果我们触发一次处理程序
handleRequest().block();
我们将获得以下输出
[ main][ 683056557275963407] Adding product: test-product
[ single-1][ 683056557275963407] Notifying shop about: test-product
[l-worker-1][ null] Done!
日志缩短了Thread
名称以提高可见性,但l-worker-1
实际上是ForkJoinPool.commonPool-worker-1
的缩写版本。
如我们所见,我们的执行在不受我们控制的公共ForkJoinPool
上继续进行。一个问题是我们从该Thread
切换开始不再看到我们的相关标识符,但另一个问题是我们在一个实际上缺少相关信息的Thread
上执行清理。
我们可以通过Executor
或任务包装(如上一篇文章中所示)来潜在地改进(部分)情况,但我们并不总是拥有这样的控制权——例如,如果我们调用使用CompletableFuture
的外部库。
我们即将讨论 Sleuth 的策略。Schedulers.onScheduleHook
在处理反应式过程中可能发生的非明显的 Thread
切换方面,能力有限。我们需要更多地控制操作的执行。我们将通过引入两种外部服务通信的方式来演示这些限制。
addProduct
方法现在发出远程请求,并在我们控制的 Scheduler
上发布结果。将繁重的计算卸载到不同的 Thread
上是很常见的做法。为此,我们使用 publishOn
运算符
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return makeRequest(productName)
.publishOn(Schedulers.single())
.then();
});
}
notifyShop
方法模拟将结果映射到可能多个 Publisher
中。如果响应是复合结果,这可能是一个典型的场景 - 例如,如果响应是 JSON 数组,而我们打算将每个项目作为对另一个服务的单独调用进行处理或丰富单个结果。让我们使用一个简化的版本,只获取单个结果
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName)
.flatMapMany(result ->
Flux.just("result")
.map(x -> result))
.take(1)
.single();
});
}
让我们暂时跳过处理程序,手动初始化关联标识符,然后订阅这些链
initRequest();
addProduct("test-product")
.doOnSuccess(v -> log("Added."))
.block();
initRequest();
notifyShop("test-product")
.doOnSuccess(v -> log("Notified."))
.block();
让我们看看输出
[ main][ 6606077262934500649] Adding product: test-product
[ single-1][ null] Added.
[ main][ 182687922231622589] Notifying shop about: test-product
[l-worker-1][ null] Notified.
这是预期的,因为 doOnSuccess
中发生的两个日志都是由于 CompletableFuture
在 ForkJoinPool
Thread
上传递值而触发的。即使我们有 Scheduler
包装,结果首先是在我们无法控制的 Thread
上传递的,因此即使在 addProduct
中使用的 publishOn
也无济于事。
我们能做些什么来改善这种情况呢?Reactor 拥有一个细粒度的插件系统,它允许我们在任何管道中装饰任何运算符。我们可以尝试将其用于恢复关联标识符的目的。
插件将使用自定义的 Subscriber
实现,该实现会在订阅时捕获关联标识符
static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
final CoreSubscriber<T> delegate;
Long correlationId;
public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
this.correlationId = CORRELATION_ID.get();
}
@Override
public void onNext(T t) {
CORRELATION_ID.set(this.correlationId);
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
CORRELATION_ID.set(this.correlationId);
delegate.onError(t);
}
@Override
public void onComplete() {
CORRELATION_ID.set(this.correlationId);
delegate.onComplete();
}
}
为了更改运算符以使我们的实现将调用委托给实际的 Subscriber
实例,我们可以使用 Operators.lift
方法
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber));
首先,我们将尝试一个插件,它允许我们更改链中的每个运算符
Hooks.onEachOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
让我们再次运行我们的示例并检查输出
[ main][ 7295088917002526647] Adding product: test-product
[ single-1][ 7295088917002526647] Added.
[ main][ 383851863754448684] Notifying shop about: test-product
[l-worker-1][ 383851863754448684] Notified.
哇!即使在如此复杂的场景中,我们也设法获得了关联标识符。订阅的初始操作捕获了 ThreadLocal
值并在每个步骤中将其恢复。即使在 notifyShop
方法中使用的 flatMap
(它自己订阅)也能正常工作,因为在另一个 Thread
上订阅之前,ThreadLocal
会从之前的捕获中填充!这确实听起来很棒,但是这种方法也有一些缺点。第一个也是最明显的缺点是性能。传播发生在每个运算符上。使用这种技术,我们首先装饰每个对象,并在每个步骤中进行 ThreadLocal
访问。所有这些都是昂贵的。要了解更多信息,请观看 Oleh 关于反应式性能的演讲。
所以让我们尝试一种不同的方法。这次,我们将使用一个插件,该插件附加到链中被认为是最后一个的每个运算符 - subscribe()
调用之前的直接运算符。
关于反应式链可以做出一个观察:在同步运算符的情况下,我们不需要在每次单独的操作(例如,filter
或 map
)中恢复最初捕获的上下文,而只需要在链中的最后一个运算符被订阅时恢复。只要不涉及 Thread
边界交叉,此机制就能正常工作。为了支持可能跨越这些边界的运算符(例如 flatMap
,它涉及订阅新的 Publisher
),需要使用一个特殊的技巧。它将映射的结果视为其操作的内部 Publishers
的最后一个运算符。
让我们尝试这种方法
Hooks.onLastOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
并运行
[ main][ 2122332013640150746] Adding product: test-product
[ single-1][ 2122332013640150746] Added.
[ main][ 459477771449275997] Notifying shop about: test-product
[l-worker-1][ null] Notified.
它在 addProduct
中的 publishOn
上有效,但在 notifyShop
中的 flatMap
上失败了。
让我们分析一下 notifyShop
为什么失败。我们对 block()
的调用捕获了 ThreadLocal
并为向下游传播的每个信号恢复它。通过在 flatMapMany
中完成的映射,我们正在处理之前提到的异步边界。实际上,我们的插件应用于内部源 (Flux.just().map().single()
)。
但是,尽管 flatMapMany
内部调用了自定义的 Subscriber
并尝试恢复 ThreadLocal
值,但这些努力仍然没有奏效。触发内部订阅的信号是在我们无法控制的 Thread
上发起的,因此我们首先没有 ThreadLocal
可以捕获。
在 publishOn
运算符的情况下有所不同。对它的订阅始于我们控制的 Thread
。因此,当信号作为 makeRequest()
方法的结果进行处理时,它只会传递到我们控制的 Thread
上。.doOnSuccess(v -> log("Added."))
的执行发生在与 flatMapMany
情况不同的 Thread
边界之后。
这就是 onEachOperator
涵盖更多情况的原因 - 它在每个步骤中恢复初始值,而不管异步边界如何。不过,onLastOperator
的性能略好于 onEachOperator
。
如果我们将其与之前的钩子结合使用,还有一个插件可以用来完全控制反应式传递。它也由 Spring Cloud Sleuth 使用。我们正在考虑一个最近引入的插件,Hooks.addQueueWrapper
。不过,我们不会详细探讨它。它可以解决 Reactor 中工作窃取机制引入的问题。异步运算符(如 flatMap
)可以在各种传递信号到运算符的 Thread
上取得进展。想象一个背压场景,其中处理暂停了一段时间。在某个时刻,一个新的 Thread
可以接管并发出 Subscription.request(n)
调用,这会导致累积的值立即传递。现在您可以问自己:“哪些累积的值?”这是一个好问题。Reactor 中的许多运算符使用内部 Queue
来实现背压或保留串行传递语义。因为这些 Queue
的清空可能发生在任何 Thread
上,所以上下文信息应该附加到存储在 Queue
中的每个信号上 - 即,出于我们的关联目的的 ThreadLocal
值。这就是我们需要 Queue
包装器的原因 - 在将值提交到 Queue
时,我们捕获 ThreadLocal
状态。当从 Queue
中检索值时,状态将被恢复。
在展示了在反应式流术语之外操作的风险以及我们可以使用哪些机制来传播 ThreadLocal
上下文之后,让我们总结一下 Spring Cloud Sleuth 使用的四种 策略
DECORATE_ON_EACH
DECORATE_ON_LAST
DECORATE_QUEUES
MANUAL
前三种策略试图利用反应式运算符的一些特性,以及 Reactor 的 插件机制,并使用 ThreadLocal
作为内部传输机制以及与检测库共享上下文数据的方法。前三种策略还假设使用 Schedulers.onScheduleHook 进行 Scheduler
包装。另一方面,最后一种策略利用了 Reactor 的 Subscriber
绑定的 Context
。
此策略使用我们之前看到的 Hooks.onEachOperator
插件。即使 Sleuth 添加了许多优化以避免在不需要时进行恢复,性能影响也是巨大的。通常,此方法非常有效。不过,它非常激进,因此如果运算符需要更改上下文,则可能难以应对。下游运算符将看不到更改,因为初始订阅的上下文在每个步骤中都会被恢复。
Hooks.onLastOperator
用于提高性能。这种方法可能会失败,因为它提供了灵活性。如果上游运算符修改了上下文,则下游操作会看到更改。这存在风险,如果某个运算符清除了该上下文,则该上下文将丢失,直到另一个信号被调度到包装的 Scheduler
。另一个风险是我们之前示例中看到的,订阅发生在某个 Thread
上,但请求数据发生在另一个 Thread
上,而该 Thread
不在 Reactor 的控制之下。
相较于之前的策略,DECORATE_QUEUES
解决了部分错误场景(请求数据发生在带外或多个线程
发布数据),但并非所有场景。我们使用前面描述的方式使用 Hooks.addQueueWrapper
插件。队列
包装的一个已知问题是,没有可靠的方法来清理项目处理后的状态。从队列
中检索项目时会恢复上下文。在通过下游操作符传递的项目的处理过程中,没有范围。因此,这种方法也容易污染ThreadLocal
存储。最近在排水过程中有一些改进,以限制其影响。
在这种策略中,Sleuth 唯一做的事情是在订阅时将ThreadLocal
中的值作为快照捕获到 Reactor 的Context
中。用户需要在相关位置提取该快照并填充ThreadLocal
,使其可供检测库使用。对于支持的跟踪检测,例如 Zipkin 和 Brave,Sleuth 通过使用范围的概念恢复ThreadLocal
- ThreadLocal
会在检测过程中恢复,并在快照关闭后立即消失。虽然需要用户手动(顾名思义)处理,但这是性能最高的方案。
使用 Reactor Context 在本地范围内填充ThreadLocal
被证明既高效又符合反应式链的工作方式。将上下文与订阅者
关联是一种行之有效的方法,不会意外导致上下文数据丢失。在下一篇文章中,我们将展示 Reactor 3.5 和 Micrometer 1.10 如何将手动方法提升到一个新的水平,并提供一种跨反应式和命令式边界传播上下文的有条理的方法。