领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多这篇文章是系列文章的一部分
我们在上一篇文章中得出结论,Spring Cloud Sleuth 的MANUAL
上下文传播策略既高效又提供了正确的语义。通过许多经验,Spring、Micrometer 和 Reactor 团队创建了一个新的上下文传播库。其目标是封装在线程局部变量值和类似映射结构之间传输上下文数据的关注点。Micrometer 1.10 和 Reactor 3.5 都基于它构建,以在 Reactor 和命令式代码之间提供一流的体验。通过使用 Reactor Context
,我们隐式地暴露了ThreadLocal
值,这些值被 Micrometer 用于检测跟踪库以及填充 SLF4J 的MDC
,以提供包含跟踪标识符的日志。
在本文中,我们将采用与之前不同的方法。我们不会从头开始构建我们的知识,而是从可用的顶级 API 开始,然后解释幕后发生的事情。最后,您将能够
让我们重新审视第一篇文章中的示例,我们在其中展示了delayElement
运算符如何导致反应式链丢失关联标识符。让我们回顾一下代码,从我们的操作开始
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty(); // Assume we’re actually storing the product
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true); // Assume we’re actually notifying the shop
}
然后我们需要回顾绑定请求处理程序
Mono<Void> handleRequest() {
initRequest(); <1>
log("Assembling the chain"); // <2>
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1)) // <3>
.flatMap(product ->
Flux.concat(
addProduct(product), // <4>
notifyShop(product)).then())
}
从 Reactor 3.5.0 开始,Reactor Context
能够与一个新的库集成,该库位于 Micrometer 的保护伞下,称为context-propagation
。我们将在本文末尾更详细地描述这种集成。在 Reactor 3.5+ 中,当context-propagation
库在类路径上时,我们可以预期我们的ThreadLocal
值在我们使用handle
运算符以及新的tap
运算符进行日志记录时存在。
为了传播我们自定义的ThreadLocal
,我们需要注册一个ThreadLocalContextAccessor
ContextRegistry.getInstance()
.registerThreadLocalAccessor("CORRELATION_ID",
CORRELATION_ID::get,
CORRELATION_ID::set,
CORRELATION_ID::remove);
目前,context-propagation
库的细节对于实现我们的目标并不重要。我们唯一需要知道的是我们使用了键CORRELATION_ID
,它将与 Reactor Context
一起用于在我们的特殊运算符中恢复ThreadLocal
。让我们修改代码的其余部分以使用它们并在指定位置记录日志。
我们只需要对请求处理程序进行一次更改
Mono<Void> handleRequest() {
initRequest(); // <1>
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextCapture(); // <2>
}
我们所做的唯一修改是在我们返回给调用者的链的末尾使用<2>
contextCapture
运算符。此运算符的作用是捕获当前的ThreadLocal
值,这些值已在ContextRegistry
中注册了ThreadLocalAccessor
实例,并将它们存储在 Reactor Context
中的相同键下。在此特定实现中,我们唯一的希望是订阅发生在组装阶段之后,如<1>
中,我们在其中设置了ThreadLocal
值。
接下来,我们将使用tap
运算符添加日志记录
Mono<Void> addProduct(String productName) {
return Mono.<Void>empty()
.tap(() -> new DefaultSignalListener<>() {
@Override
public void doOnComplete() throws Throwable {
log("Adding product: " + productName);
}
});
}
在这里,我们正在扩展reactor-core
的reactor.core.observability
包中的DefaultSignalListener
。我们只对点击完成信号感兴趣,在完成信号中,我们执行日志操作。
对于handle
运算符,我们将更改notifyShop
方法
Mono<Boolean> notifyShop(String productName) {
return Mono.just(true)
.handle((result, sink) -> {
log("Notifying shop about: " + productName);
sink.next(result);
});
}
让我们看看现在,当我们调用处理程序时,是否会得到正确的输出
handleRequest().block();
结果如下
[ main][ 643805344761543048] Assembling the chain
[parallel-1][ 643805344761543048] Adding product: test-product
[parallel-1][ 643805344761543048] Notifying shop about: test-product
太好了!这实际上与 Spring Cloud Sleuth 的MANUAL
策略相同,但已集成到 Reactor 的内部,因此您无需手动恢复ThreadLocal
值。我们选择tap
和handle
是因为这些运算符可以访问Subscriber
绑定的Context
,并允许在具体的 Reactive Streams 信号上采取行动。
请记住:Reactor Context
用于写入,ThreadLocals
用于读取。
事实上,我们的请求处理程序有点危险。如果我们延迟订阅操作,我们将丢失关联标识符。考虑
Mono<Void> requestHandler = handleRequest(); // <1>
Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();
输出结果如下
[ main][ 1388809065574796038] Assembling the chain
[parallel-1][ null] Adding product: test-product
[parallel-1][ null] Notifying shop about: test-product
组装发生在<1>
中,ThreadLocal
在main
中设置。但是,订阅发生在<2>
中的新Thread
上,该线程没有要捕获的ThreadLocal
值。因此,我们的日志没有关联标识符。我们可以用Mono.defer()
包装处理程序的主体来解决此问题。但是,与其这样做,不如考虑我们是否首先需要实际设置ThreadLocal
。
在调用 Reactor 链的命令式应用程序中,例如调用WebClient
的 Spring MVC 控制器方法,ThreadLocal
值已经建立,并且contextCapture
将拾取它们并将其存储在Context
中。
另一方面,在像 WebFlux 这样的反应式堆栈中,直接使用contextWrite
更有意义。
我们知道 Reactor 将使用其Context
的内容来恢复ThreadLocal
值。如果我们直接将预期的值存储在Context
中,而不是从当前状态捕获它们,我们将稍微提高性能,但我们还将提高与函数式编程范式的兼容性。让我们试试看
Mono<Void> handleRequest() {
// initRequest(); -- no write to ThreadLocal
log("Assembling the chain");
return Mono.just("test-product")
// <1>
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextWrite(
Context.of("CORRELATION_ID", correlationId())); // <2>
}
让我们运行它
[ main][ null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product
不错!我们的实际反应式链包含正确的关联标识符。
不幸的是,我们在组装阶段丢失了一个。其中一个原因是日志没有发生在handle
或tap
运算符中。如果我们在<1>
中使用tap
运算符添加了初始日志,我们将一切正常。带有关联标识符的Context
绑定到<2>
上游的链。如果我们在contextWrite
调用之后添加了一个日志记录tap
运算符,我们将看不到正确的关联标识符 - 此时附加的Context
是不同的,并且没有我们的标识符。我们稍后会回到这个问题,但首先,让我们考虑一下是否可以简化我们的代码并避免使用特殊运算符。
当发布reactor-core
3.5.0 时,它包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。使用 Spring Cloud Sleuth 进行跟踪的现有 Spring 用户习惯于使用填充了trace-id
和span-id
值的日志(类似于我们的关联标识符)。切换到新的范式(其中可观察性是 Spring 产品核心套件的一部分)将要求现有应用程序重写其日志记录以使用handle
和tap
运算符。我们继续思考如何使更多运算符能够恢复ThreadLocal
值。
正如我们在上一篇文章中看到的,恢复可能跨越多个运算符的ThreadLocal
值并非易事。选择handle
和tap
是因为它们不允许ThreadLocal
值泄漏。运行用户代码不会传播任何信号。当用户代码运行时,ThreadLocal
值存在。然后捕获结果。最后,ThreadLocal
上下文被清除。只有在那之后,信号的反应式传播才会发生到下游运算符。此外,我们希望更具选择性,因为在每个运算符中执行恢复会产生大量开销,如第 2 部分中所述。
我们仔细重新思考了所有内容,并提出了一个可以组合到以下调用中的想法(从reactor-core
3.5.3 开始)
Hooks.enableAutomaticContextPropagation();
我们可以将其添加到应用程序的main
方法中。
我们现在可以恢复操作方法的初始实现
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty();
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true);
}
我们保持handleRequest
方法和在新Thread
上进行订阅不变。让我们运行它
[ main][ null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product
成功!
使用此功能,我们可以将使用 Spring Cloud Sleuth 的现有代码库迁移到新的 Spring Framework,而无需对我们的日志记录方式进行任何更改。使用上述挂钩,如果您将 Spring Boot Actuator 与 Micrometer Tracing 一起使用,则 SLF4J 日志将填充跟踪信息,而无需执行任何操作。很快,Spring Boot 将自动为您调用挂钩。
我们提到过,我们会回到组装时日志的问题。到目前为止,我们一直在请求处理逻辑中启动相关标识符生成过程。理想情况下,我们的处理程序应该由服务器调用,并且从调用代码订阅生成的Publisher
(Flux
或Mono
)。我们的处理程序恢复到初始形状
Mono<Void> handleRequest() {
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then());
}
让我们通过将上下文附加到返回的Mono
来模拟服务器代码
Mono<Void> requestHandler = handleRequest()
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
然后我们需要运行它
requestHandler.block();
组装时间仍然缺少相关标识符
[ main][ null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product
contextWrite
操作符在订阅时(以及其他生命周期事件)恢复ThreadLocal
值。为了让用户代码在组装时拥有日志,对该代码的整个调用需要成为反应式链的一部分。这样,用户的代码在外部Mono
的订阅期间执行,并且立即订阅返回的内部Mono
。如果我们在“框架”代码中执行此操作,则在整个执行过程中,外部Mono
的Context
在ThreadLocal
中可用
Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
我们只需要使用Mono.defer()
并将Context
附加到它。
幸运的是,Spring Framework 很好地完成了它的工作,并在订阅阶段也处理了我们的组装。
这种新方法看起来非常有希望。有人可能会想,使用过去采用的方法,这种新机制将如何失效?我们对这种方法更有信心,因为它更符合 Reactive Streams 的本质。那些没有基于 Reactor Context
的方法都包含着一个主要的误解——它们将ThreadLocal
值传播到下游——希望在某个时候进行清理。但是,传播停止没有语义边界。
依赖于ThreadLocal
值的向下游传播也可能成为错误的来源。反应式库向上游和下游传播信号。一个信号可能会触发另一个信号,但它不必。不同的Thread
可以继续处理。由flatMap
类操作符完成的某些优化(例如预取),可以在没有我们的下游传播机制参与的情况下,从上游请求和排队值。如果我们希望即使在反压或取消时记录日志也能拥有上下文信息,我们需要考虑所有可能的信号。
一个重要的观察结果来自Context
指示逻辑边界的方式。当您调用contextWrite
并在Context
中存储一个值时,所有上游操作符都可以访问修改后的版本。所有下游操作符将看不到修改,但会看到您的修改所基于的状态。
Subscriber
绑定Context
的本质是我们新方法的基础。我们修改了contextWrite
操作符,以便在订阅时以及在取消和请求时,信号向上游传播时,将ThreadLocal
值设置为反映当前Context
。但只要信号向下游传播,它就会将这些ThreadLocal
值重置为下游Context
中表示的值。
我们仍然需要使用Scheduler
包装方法。我们还需要Queue
包装方法(为此我们需要改进生命周期语义)。
但我们可能会考虑通过在这些情况下传输 Reactor Context
来改善这种情况,而不是捕获ThreadLocal
值。这可以提高性能。
此外,当我们使用超出 Reactor 控制范围的Publisher
或使用我们无法控制的Thread
的源(如使用Mono.fromFuture()
示例模拟远程调用)时,我们仍然会丢失ThreadLocal
值。目前的缓解措施是引入contextWrite
操作符的语义边界,它不会真正更改Context
,就像此notifyShop
方法的变体一样
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return makeRequest(productName) // <1>
.contextWrite(Function.identity()) // <2>
.doOnNext(r -> log("Request done.")); // <3>
}
makeRequest
方法是在本系列的上一篇文章中定义的。如果我们假设makeRequest
是第三方库调用,它使用我们无法控制的Thread
,我们也无法将它执行的代码包装在<1>
中,以及在完成其操作的异步代码中。该链的这一部分执行的任何日志都不会填充相关标识符。传播此类上下文将是库作者的责任。但是,因为我们在<2>
中使用了边界,所以我们在<3>
中的日志包含相关标识符。
我们打算在reactor-core
中添加必要的函数,以便为以超出 Reactor 控制范围的方式更改Threads
的源提供这样的边界。
在命令式场景中,该调用仅反应式代码以使用阻塞订阅(例如使用block()
),我们计划自动执行contextCapture
以将当前ThreadLocal
值透明地传播到反应式链中。例如,在与 Spring MVC 应用程序中的 WebClient 交互时,这将非常有用。
捕获ThreadLocal
状态并在各个地方恢复它的任务本身就是一个有趣的话题。通常,我们会想到多个彼此具有逻辑连接的ThreadLocal
值,或者对应于各种关注点的Map
类结构。我们创建了一个专门的库,通过捕获其状态并将其恢复到相应的目标,允许在ThreadLocal
和任意对象之间进行转换。在前面的示例中,我们使用了context-propagation
库的一些 API。它是在 Micrometer 伞下开源的,如果您想在您的代码中使用它,它也有参考文档以及示例。
Project Reactor 注册了一个处理 Reactor Context
的ContextAccessor
,使用ServiceLoader
JDK 机制。另一方面,Micrometer 注册了一个ObservationThreadLocalAccessor
,它处理 Micrometer Tracing 和其他检测机制工作所需的ThreadLocal
状态,使用单个Observation
概念。
我们强烈建议尝试使用 Spring Boot 和 Spring Boot Actuator 来启用跟踪功能,并亲身体验其体验的凝聚力。
在本系列博文中,我们介绍了上下文传播的基础知识,并介绍了命令式和反应式编程范式之间桥接的历史和现状。我们希望您现在能够自信地使用我们实现的功能。在最佳情况下,如果您使用自动上下文传播功能,则您无需执行太多工作。此外,在这种有趣的场景中,我们希望您的自定义传播逻辑能够利用我们本文中描述的基元。如果您有任何疑问,可以联系我们,或在 GitHub 上报告问题。
如果没有同事们的帮助,这个系列将不会发表。我要感谢(按字母顺序排列):Simon Baslé、Jay Bryant、Pierre De Rop、Oleh Dokuka、Marcin Grzejszczak、Robert McNees、Rossen Stoyanchev 和 Tadaya Tsuyukubo。
要使用所用示例,请随时使用我的 GitHub 存储库中的相关包。