使用 Project Reactor 3 进行上下文传播 - 反应式和命令式之间的统一桥接

工程 | Dariusz Jędrzejczyk | 2023 年 3 月 30 日 | ...

这篇文章是系列文章的一部分

  1. 基础知识
  2. Spring Cloud Sleuth 的坎坷之路
  3. 反应式和命令式之间的统一桥接

我们在上一篇文章中得出结论,Spring Cloud Sleuth 的MANUAL上下文传播策略既高效又提供了正确的语义。通过许多经验,Spring、Micrometer 和 Reactor 团队创建了一个新的上下文传播库。其目标是封装在线程局部变量值和类似映射结构之间传输上下文数据的关注点。Micrometer 1.10 和 Reactor 3.5 都基于它构建,以在 Reactor 和命令式代码之间提供一流的体验。通过使用 Reactor Context,我们隐式地暴露了ThreadLocal值,这些值被 Micrometer 用于检测跟踪库以及填充 SLF4J 的MDC,以提供包含跟踪标识符的日志。

在本文中,我们将采用与之前不同的方法。我们不会从头开始构建我们的知识,而是从可用的顶级 API 开始,然后解释幕后发生的事情。最后,您将能够

  • 了解这些机制为何以这种方式工作。
  • 针对您的应用程序或库做出关于采用哪种方法的正确决策。
  • 了解何时以及为何无需执行任何操作并期望一切都能开箱即用。

反应式上下文和线程局部变量

让我们重新审视第一篇文章中的示例,我们在其中展示了delayElement运算符如何导致反应式链丢失关联标识符。让我们回顾一下代码,从我们的操作开始

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

然后我们需要回顾绑定请求处理程序

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product)).then())
}

从 Reactor 3.5.0 开始,Reactor Context能够与一个新的库集成,该库位于 Micrometer 的保护伞下,称为context-propagation。我们将在本文末尾更详细地描述这种集成。在 Reactor 3.5+ 中,当context-propagation库在类路径上时,我们可以预期我们的ThreadLocal值在我们使用handle运算符以及新的tap运算符进行日志记录时存在。

为了传播我们自定义的ThreadLocal,我们需要注册一个ThreadLocalContextAccessor

ContextRegistry.getInstance()
  .registerThreadLocalAccessor("CORRELATION_ID",
    CORRELATION_ID::get,
    CORRELATION_ID::set,
    CORRELATION_ID::remove);

目前,context-propagation库的细节对于实现我们的目标并不重要。我们唯一需要知道的是我们使用了键CORRELATION_ID,它将与 Reactor Context一起用于在我们的特殊运算符中恢复ThreadLocal。让我们修改代码的其余部分以使用它们并在指定位置记录日志。

我们只需要对请求处理程序进行一次更改

Mono<Void> handleRequest() {
  initRequest(); // <1>
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextCapture(); // <2>
}

我们所做的唯一修改是在我们返回给调用者的链的末尾使用<2> contextCapture运算符。此运算符的作用是捕获当前的ThreadLocal值,这些值已在ContextRegistry中注册了ThreadLocalAccessor实例,并将它们存储在 Reactor Context中的相同键下。在此特定实现中,我们唯一的希望是订阅发生在组装阶段之后,如<1>中,我们在其中设置了ThreadLocal值。

接下来,我们将使用tap运算符添加日志记录

Mono<Void> addProduct(String productName) {
  return Mono.<Void>empty()
    .tap(() -> new DefaultSignalListener<>() {
      @Override
      public void doOnComplete() throws Throwable {
        log("Adding product: " + productName);
      }
  });
}

在这里,我们正在扩展reactor-corereactor.core.observability包中的DefaultSignalListener。我们只对点击完成信号感兴趣,在完成信号中,我们执行日志操作。

对于handle运算符,我们将更改notifyShop方法

Mono<Boolean> notifyShop(String productName) {
  return Mono.just(true)
    .handle((result, sink) -> {
      log("Notifying shop about: " + productName);
      sink.next(result);
    });
}

让我们看看现在,当我们调用处理程序时,是否会得到正确的输出

handleRequest().block();

结果如下

[      main][  643805344761543048] Assembling the chain
[parallel-1][  643805344761543048] Adding product: test-product
[parallel-1][  643805344761543048] Notifying shop about: test-product

太好了!这实际上与 Spring Cloud Sleuth 的MANUAL策略相同,但已集成到 Reactor 的内部,因此您无需手动恢复ThreadLocal值。我们选择taphandle是因为这些运算符可以访问Subscriber绑定的Context,并允许在具体的 Reactive Streams 信号上采取行动。

请记住:Reactor Context用于写入,ThreadLocals用于读取。

事实上,我们的请求处理程序有点危险。如果我们延迟订阅操作,我们将丢失关联标识符。考虑

Mono<Void> requestHandler = handleRequest(); // <1>

Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();

输出结果如下

[      main][ 1388809065574796038] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

组装发生在<1>中,ThreadLocalmain中设置。但是,订阅发生在<2>中的新Thread上,该线程没有要捕获的ThreadLocal值。因此,我们的日志没有关联标识符。我们可以用Mono.defer()包装处理程序的主体来解决此问题。但是,与其这样做,不如考虑我们是否首先需要实际设置ThreadLocal

在调用 Reactor 链的命令式应用程序中,例如调用WebClient的 Spring MVC 控制器方法,ThreadLocal值已经建立,并且contextCapture将拾取它们并将其存储在Context中。

另一方面,在像 WebFlux 这样的反应式堆栈中,直接使用contextWrite更有意义。

我们知道 Reactor 将使用其Context的内容来恢复ThreadLocal值。如果我们直接将预期的值存储在Context中,而不是从当前状态捕获它们,我们将稍微提高性能,但我们还将提高与函数式编程范式的兼容性。让我们试试看

Mono<Void> handleRequest() {
  // initRequest(); -- no write to ThreadLocal
  log("Assembling the chain");

  return Mono.just("test-product")
    // <1>
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextWrite(
      Context.of("CORRELATION_ID", correlationId())); // <2>
}

让我们运行它

[      main][                null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product

不错!我们的实际反应式链包含正确的关联标识符。

不幸的是,我们在组装阶段丢失了一个。其中一个原因是日志没有发生在handletap运算符中。如果我们在<1>中使用tap运算符添加了初始日志,我们将一切正常。带有关联标识符的Context绑定到<2>上游的链。如果我们在contextWrite调用之后添加了一个日志记录tap运算符,我们将看不到正确的关联标识符 - 此时附加的Context是不同的,并且没有我们的标识符。我们稍后会回到这个问题,但首先,让我们考虑一下是否可以简化我们的代码并避免使用特殊运算符。

自动上下文传播

当发布reactor-core 3.5.0 时,它包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。使用 Spring Cloud Sleuth 进行跟踪的现有 Spring 用户习惯于使用填充了trace-idspan-id值的日志(类似于我们的关联标识符)。切换到新的范式(其中可观察性是 Spring 产品核心套件的一部分)将要求现有应用程序重写其日志记录以使用handletap运算符。我们继续思考如何使更多运算符能够恢复ThreadLocal值。

正如我们在上一篇文章中看到的,恢复可能跨越多个运算符的ThreadLocal值并非易事。选择handletap是因为它们不允许ThreadLocal值泄漏。运行用户代码不会传播任何信号。当用户代码运行时,ThreadLocal值存在。然后捕获结果。最后,ThreadLocal上下文被清除。只有在那之后,信号的反应式传播才会发生到下游运算符。此外,我们希望更具选择性,因为在每个运算符中执行恢复会产生大量开销,如第 2 部分中所述。

我们仔细重新思考了所有内容,并提出了一个可以组合到以下调用中的想法(从reactor-core 3.5.3 开始)

Hooks.enableAutomaticContextPropagation();

我们可以将其添加到应用程序的main方法中。

我们现在可以恢复操作方法的初始实现

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty();
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true);
}

我们保持handleRequest方法和在新Thread上进行订阅不变。让我们运行它

[      main][                null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product

成功!

使用此功能,我们可以将使用 Spring Cloud Sleuth 的现有代码库迁移到新的 Spring Framework,而无需对我们的日志记录方式进行任何更改。使用上述挂钩,如果您将 Spring Boot Actuator 与 Micrometer Tracing 一起使用,则 SLF4J 日志将填充跟踪信息,而无需执行任何操作。很快,Spring Boot 将自动为您调用挂钩。

编写框架代码

我们提到过,我们会回到组装时日志的问题。到目前为止,我们一直在请求处理逻辑中启动相关标识符生成过程。理想情况下,我们的处理程序应该由服务器调用,并且从调用代码订阅生成的PublisherFluxMono)。我们的处理程序恢复到初始形状

Mono<Void> handleRequest() {
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then());
}

让我们通过将上下文附加到返回的Mono来模拟服务器代码

Mono<Void> requestHandler = handleRequest()
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

然后我们需要运行它

requestHandler.block();

组装时间仍然缺少相关标识符

[      main][                null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product

contextWrite操作符在订阅时(以及其他生命周期事件)恢复ThreadLocal值。为了让用户代码在组装时拥有日志,对该代码的整个调用需要成为反应式链的一部分。这样,用户的代码在外部Mono的订阅期间执行,并且立即订阅返回的内部Mono。如果我们在“框架”代码中执行此操作,则在整个执行过程中,外部MonoContextThreadLocal中可用

Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

我们只需要使用Mono.defer()并将Context附加到它。

幸运的是,Spring Framework 很好地完成了它的工作,并在订阅阶段也处理了我们的组装。

我们是否已经解决了上下文传播问题?

这种新方法看起来非常有希望。有人可能会想,使用过去采用的方法,这种新机制将如何失效?我们对这种方法更有信心,因为它更符合 Reactive Streams 的本质。那些没有基于 Reactor Context 的方法都包含着一个主要的误解——它们将ThreadLocal值传播到下游——希望在某个时候进行清理。但是,传播停止没有语义边界。

依赖于ThreadLocal值的向下游传播也可能成为错误的来源。反应式库向上游和下游传播信号。一个信号可能会触发另一个信号,但它不必。不同的Thread可以继续处理。由flatMap类操作符完成的某些优化(例如预取),可以在没有我们的下游传播机制参与的情况下,从上游请求和排队值。如果我们希望即使在反压或取消时记录日志也能拥有上下文信息,我们需要考虑所有可能的信号。

一个重要的观察结果来自Context指示逻辑边界的方式。当您调用contextWrite并在Context中存储一个值时,所有上游操作符都可以访问修改后的版本。所有下游操作符将看不到修改,但会看到您的修改所基于的状态。

Subscriber绑定Context的本质是我们新方法的基础。我们修改了contextWrite操作符,以便在订阅时以及在取消和请求时,信号向上游传播时,将ThreadLocal值设置为反映当前Context。但只要信号向下游传播,它就会将这些ThreadLocal值重置为下游Context中表示的值。

我们仍然需要使用Scheduler包装方法。我们还需要Queue包装方法(为此我们需要改进生命周期语义)。

但我们可能会考虑通过在这些情况下传输 Reactor Context来改善这种情况,而不是捕获ThreadLocal值。这可以提高性能。

此外,当我们使用超出 Reactor 控制范围的Publisher或使用我们无法控制的Thread的源(如使用Mono.fromFuture()示例模拟远程调用)时,我们仍然会丢失ThreadLocal值。目前的缓解措施是引入contextWrite操作符的语义边界,它不会真正更改Context,就像此notifyShop方法的变体一样

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return makeRequest(productName) // <1>
    .contextWrite(Function.identity()) // <2>
    .doOnNext(r -> log("Request done.")); // <3>
}

makeRequest方法是在本系列的上一篇文章中定义的。如果我们假设makeRequest是第三方库调用,它使用我们无法控制的Thread,我们也无法将它执行的代码包装在<1>中,以及在完成其操作的异步代码中。该链的这一部分执行的任何日志都不会填充相关标识符。传播此类上下文将是库作者的责任。但是,因为我们在<2>中使用了边界,所以我们在<3>中的日志包含相关标识符。

我们打算在reactor-core中添加必要的函数,以便为以超出 Reactor 控制范围的方式更改Threads的源提供这样的边界。

在命令式场景中,该调用仅反应式代码以使用阻塞订阅(例如使用block()),我们计划自动执行contextCapture以将当前ThreadLocal值透明地传播到反应式链中。例如,在与 Spring MVC 应用程序中的 WebClient 交互时,这将非常有用。

上下文传播库

捕获ThreadLocal状态并在各个地方恢复它的任务本身就是一个有趣的话题。通常,我们会想到多个彼此具有逻辑连接的ThreadLocal值,或者对应于各种关注点的Map类结构。我们创建了一个专门的库,通过捕获其状态并将其恢复到相应的目标,允许在ThreadLocal和任意对象之间进行转换。在前面的示例中,我们使用了context-propagation库的一些 API。它是在 Micrometer 伞下开源的,如果您想在您的代码中使用它,它也有参考文档以及示例。

Project Reactor 注册了一个处理 Reactor ContextContextAccessor,使用ServiceLoader JDK 机制。另一方面,Micrometer 注册了一个ObservationThreadLocalAccessor,它处理 Micrometer Tracing 和其他检测机制工作所需的ThreadLocal状态,使用单个Observation概念。

我们强烈建议尝试使用 Spring Boot 和 Spring Boot Actuator 来启用跟踪功能,并亲身体验其体验的凝聚力。

总结

在本系列博文中,我们介绍了上下文传播的基础知识,并介绍了命令式和反应式编程范式之间桥接的历史和现状。我们希望您现在能够自信地使用我们实现的功能。在最佳情况下,如果您使用自动上下文传播功能,则您无需执行太多工作。此外,在这种有趣的场景中,我们希望您的自定义传播逻辑能够利用我们本文中描述的基元。如果您有任何疑问,可以联系我们,或在 GitHub 上报告问题

鸣谢

如果没有同事们的帮助,这个系列将不会发表。我要感谢(按字母顺序排列):Simon Baslé、Jay Bryant、Pierre De Rop、Oleh Dokuka、Marcin Grzejszczak、Robert McNees、Rossen Stoyanchev 和 Tadaya Tsuyukubo。

额外内容

要使用所用示例,请随时使用我的 GitHub 存储库中的相关包

获取 Spring 电子报

与 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部