Project Reactor 3 的上下文传播 - 统一连接响应式与命令式

工程 | Dariusz Jędrzejczyk | 2023 年 3 月 30 日 | ...

本文是系列文章的一部分

  1. 基础知识
  2. Spring Cloud Sleuth 的坎坷之路
  3. 统一连接响应式与命令式

我们以 Spring Cloud Sleuth 的 MANUAL 上下文传播策略既高效又提供正确语义的思考结束了上一篇文章。基于诸多经验,Spring、Micrometer 和 Reactor 团队创建了一个新的上下文传播库。其目标是封装在 ThreadLocal 值和类似 Map 的结构之间传输上下文数据的关注点。Micrometer 1.10 和 Reactor 3.5 都以此为基础,在 Reactor 和命令式代码之间提供一流的体验。通过使用 Reactor Context,我们隐式暴露了 Micrometer 用于追踪库以及填充 SLF4J 的 MDC 以提供包含追踪标识符的日志所使用的 ThreadLocal 值。

在本文中,我们将采取与之前不同的方法。我们将从现有的顶层 API 开始,然后解释其背后的原理。最后,您将能够:

  • 理解这些机制为何如此运作。
  • 决定在您的应用或库中采取哪种方法。
  • 了解何时以及为何您无需做任何事情,并期望一切开箱即用。

响应式 Context 和 ThreadLocals

让我们回顾第一篇文章中的示例,其中我们展示了 delayElement 操作符如何导致响应式链丢失关联标识符。让我们回顾一下代码,从我们的操作开始:

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

然后我们需要回顾一下绑定请求处理器:

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product)).then())
}

从 Reactor 3.5.0 开始,Reactor Context 能够与 Micrometer 旗下的一个新库集成,该库名为 context-propagation。我们将在本文末尾更详细地描述该集成。在 Reactor 3.5+ 中,当 context-propagation 库位于类路径上时,我们可以期望在 handle 操作符以及新的 tap 操作符中进行日志记录时,我们的 ThreadLocal 值能够存在。

为了传播我们的自定义 ThreadLocal,我们需要注册一个 ThreadLocalContextAccessor

ContextRegistry.getInstance()
  .registerThreadLocalAccessor("CORRELATION_ID",
    CORRELATION_ID::get,
    CORRELATION_ID::set,
    CORRELATION_ID::remove);

目前,context-propagation 库的细节对于实现我们需要的功能并不重要。我们只需要知道我们使用了键 CORRELATION_ID,它将与 Reactor Context 一起用于在我们特殊的 operators 中恢复 ThreadLocal。让我们修改其余代码来使用它们并在指定位置记录日志。

我们只需要对请求处理器进行一项更改:

Mono<Void> handleRequest() {
  initRequest(); // <1>
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextCapture(); // <2>
}

我们所做的唯一修改是返回给调用者的链末尾的 <2> contextCapture 操作符。此操作符的任务是捕获当前 ThreadLocal 值(ThreadLocalAccessor 实例已在 ContextRegistry 中注册)并将其存储在 Reactor Context 中,使用相同的键。在此特定实现中,我们唯一的希望是订阅在装配阶段立即发生,就像我们在 <1> 中设置 ThreadLocal 值一样。

接下来,我们将使用 tap 操作符添加日志记录:

Mono<Void> addProduct(String productName) {
  return Mono.<Void>empty()
    .tap(() -> new DefaultSignalListener<>() {
      @Override
      public void doOnComplete() throws Throwable {
        log("Adding product: " + productName);
      }
  });
}

这里,我们扩展了 reactor-corereactor.core.observability 包中的 DefaultSignalListener。我们只对订阅完成信号感兴趣,在此处执行日志操作。

对于 handle 操作符,我们将更改 notifyShop 方法:

Mono<Boolean> notifyShop(String productName) {
  return Mono.just(true)
    .handle((result, sink) -> {
      log("Notifying shop about: " + productName);
      sink.next(result);
    });
}

让我们看看现在当我们调用处理器时,是否能得到正确的输出:

handleRequest().block();

结果如下:

[      main][  643805344761543048] Assembling the chain
[parallel-1][  643805344761543048] Adding product: test-product
[parallel-1][  643805344761543048] Notifying shop about: test-product

太棒了!这实际上与 Spring Cloud Sleuth 的 MANUAL 策略相同,但已集成到 Reactor 的内部机制中,因此您无需手动恢复 ThreadLocal 值。我们选择 taphandle 是因为这些操作符可以访问绑定到 SubscriberContext,并允许在具体的 Reactive Streams 信号上采取行动。

请记住:Reactor Context 用于写入,ThreadLocals 用于读取。

实际上,我们的请求处理器有点危险。如果我们延迟订阅行为,我们将丢失关联标识符。考虑一下:

Mono<Void> requestHandler = handleRequest(); // <1>

Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();

输出如下:

[      main][ 1388809065574796038] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

装配发生在 <1> 中,并且在 main 中设置了 ThreadLocal。然而,订阅发生在 <2> 中的新 Thread 上,该 Thread 没有可捕获的 ThreadLocal 值。因此,我们的日志没有关联标识符。我们可以用 Mono.defer() 包装处理器的主体来解决此问题。但是,与其那样做,不如考虑我们是否一开始就需要实际设置 ThreadLocal

在调用 Reactor 链的命令式应用中(例如调用 WebClient 的 Spring MVC 控制器方法),ThreadLocal 值已经建立,contextCapture 将会捕获它们并将其存储在 Context 中。

另一方面,在像 WebFlux 这样的响应式栈中,直接使用 contextWrite 更有意义。

我们知道 Reactor 会使用其 Context 的内容来恢复 ThreadLocal 值。如果我们直接将预期值存储在 Context 中,而不是从当前状态捕获它们,我们将稍微提高性能,同时也将提高与函数式编程范式的兼容性。让我们试试看:

Mono<Void> handleRequest() {
  // initRequest(); -- no write to ThreadLocal
  log("Assembling the chain");

  return Mono.just("test-product")
    // <1>
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextWrite(
      Context.of("CORRELATION_ID", correlationId())); // <2>
}

让我们运行它:

[      main][                null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product

很好!我们的实际响应式链包含正确的关联标识符。

不幸的是,我们在装配阶段丢失了一个。原因之一是日志记录并非发生在 handletap 操作符内部。如果我们在 <1> 中使用 tap 操作符添加初始日志,就不会出现问题。包含关联标识符的 Context 绑定到 <2> 上游的链。如果我们紧随 contextWrite 调用后添加一个记录日志的 tap 操作符,我们将看不到正确的关联标识符——在该阶段附加的 Context 是不同的,不包含我们的标识符。我们稍后将回到这个问题,但首先,让我们考虑是否可以简化代码并避免使用特殊的操作符。

自动上下文传播

reactor-core 3.5.0 发布时,它被包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。使用 Spring Cloud Sleuth 进行追踪的现有 Spring 用户习惯于日志中填充 trace-idspan-id 值(类似于我们的关联标识符)。切换到新范式(可观察性是 Spring 核心产品套件的一部分)将要求现有应用重写其日志逻辑以使用 handletap 操作符。我们继续思考如何使更多操作符能够恢复 ThreadLocal 值。

正如我们在前一篇文章中看到的,恢复可以跨多个操作符的 ThreadLocal 值并非易事。选择 handletap 是因为它们不会导致 ThreadLocal 值泄露。用户代码运行后不会传播任何信号。用户代码运行时 ThreadLocal 值存在。然后捕获结果。最后,清除 ThreadLocal 上下文。只有在此之后,响应式信号才会传播到下游操作符。此外,我们希望更加有选择性,因为在每个操作符中执行恢复会产生大量开销,正如第 2 部分中所讨论的。

我们仔细重新思考了所有内容,并提出了一个想法,可以将其结合到以下调用中(从 reactor-core 3.5.3 开始):

Hooks.enableAutomaticContextPropagation();

我们可以将其添加到我们应用的 main 方法中。

现在我们可以恢复我们最初的操作方法实现:

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty();
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true);
}

我们保持 handleRequest 方法和在新 Thread 上的订阅不变。让我们运行它:

[      main][                null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product

成功!

有了这项功能,我们可以将使用 Spring Cloud Sleuth 的现有代码库迁移到新的 Spring Framework,而无需更改日志记录方式。通过上述钩子,如果您使用带有 Micrometer Tracing 的 Spring Boot Actuator,SLF4J 日志将填充追踪信息,而无需进行任何操作。很快,Spring Boot 将自动为您调用该钩子。

编写框架代码

我们提到过,我们将回到装配时日志记录的问题。到目前为止,我们一直在请求处理逻辑中启动关联标识符生成过程。理想情况下,我们的处理器应该由服务器调用,并且结果 Publisher (FluxMono) 由调用代码订阅。我们的处理器回到了最初的形状:

Mono<Void> handleRequest() {
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then());
}

让我们通过将上下文附加到返回的 Mono 来模仿服务器代码:

Mono<Void> requestHandler = handleRequest()
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

然后我们需要运行它:

requestHandler.block();

装配时仍然缺少关联标识符:

[      main][                null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product

contextWrite 操作符在订阅时(以及其他生命周期事件)恢复 ThreadLocal 值。要在装配时让用户代码拥有日志,调用该代码的整个过程需要成为响应式链的一部分。这样,用户代码在外部 Mono 订阅时执行,并且返回的内部 Mono 立即被订阅。在整个执行过程中,如果我们这样做在我们的“框架”代码中,外部 MonoContext 就可以在 ThreadLocal 中可用:

Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

我们只需要做的是使用 Mono.defer() 并将 Context 附加到它。

幸运的是,Spring Framework 做得很好,并且也在订阅阶段处理了我们的装配。

上下文传播问题是否已解决?

这种新方法看起来非常令人鼓舞。有人可能会想,考虑到过去采用的方法,这种新机制将如何打破?我们对这种方法更有信心,因为它更符合 Reactive Streams 的本质。那些不基于 Reactor Context 的方法内嵌着一个主要的误解——它们将 ThreadLocal 值向下传播——希望在某个时刻进行清理。然而,传播并没有语义上的边界来停止。

依赖 ThreadLocal 值向下传播也可能是错误的来源。响应式库向上游和下游传播信号。一个信号可能触发另一个信号,但并非必须如此。不同的 Thread 可以继续处理。像 flatMap 之类的操作符进行的某些优化(例如预取)可以从上游请求并排队值,而无需我们下游传播机制的参与。如果我们想在背压或取消时也能获得上下文信息,我们需要考虑所有可能的信号。

一个重要的观察来自 Context 规定逻辑边界的方式。当你调用 contextWrite 并将值存储在 Context 中时,所有上游操作符都可以访问修改后的版本。所有下游操作符将看不到修改,但会看到你的修改所基于的状态。

绑定到 SubscriberContext 的性质是我们新方法的基础。我们修改了 contextWrite 操作符,使其在信号在订阅时、取消和请求时向上游传播时设置 ThreadLocal 值以反映当前的 Context。但是,当信号向下游传播时,它会将这些 ThreadLocal 值重置为下游 Context 中表示的值。

我们仍然需要使用 Scheduler 包装方法。我们还需要 Queue 包装方法(为此我们需要改进生命周期语义)。

但我们可以考虑通过在这些情况下传输 Reactor Context 而不是捕获 ThreadLocal 值来改善情况。这可以提高性能。

此外,当我们使用不在 Reactor 控制范围内的 Publishers 或使用我们无法控制的 Threads 的源(例如使用 Mono.fromFuture() 来模拟远程调用)时,我们仍然会丢失 ThreadLocal 值。目前的缓解措施是引入 contextWrite 操作符的语义边界,它并不真正改变 Context,就像 notifyShop 方法的这个变体一样:

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return makeRequest(productName) // <1>
    .contextWrite(Function.identity()) // <2>
    .doOnNext(r -> log("Request done.")); // <3>
}

makeRequest 方法定义在本系列的上一篇文章中。如果我们假设 makeRequest 是第三方库调用,它使用了我们无法控制的 Threads,那么我们也无法包装它在 <1> 中执行的代码以及完成其操作的异步代码。该链部分进行的任何日志记录都不会填充关联标识符。传播此类上下文将是库作者的责任。但是,由于我们在 <2> 中使用了边界,因此我们在 <3> 中的日志包含关联标识符。

我们打算在 reactor-core 中添加必要的功能,以便为那些可能以 Reactor 无法控制的方式改变 Threads 的源提供此类边界。

在命令式场景中,只调用响应式代码以使用阻塞订阅(例如通过使用 block()),我们计划自动执行 contextCapture,以透明地将当前 ThreadLocal 值传播到响应式链中。这在使用 WebClient 的 Spring MVC 应用程序中将非常有用。

Context-propagation 库

捕获 ThreadLocal 状态并在不同位置恢复它本身就是一个有趣的主题。通常,我们会想到多个相互之间具有逻辑连接或与各种关注点相对应的类似 Map 的结构。我们创建了一个专用库,允许通过捕获其状态并将其恢复到各自的目标来在 ThreadLocals 和任意对象之间进行转换。在前面的示例中,我们使用了 context-propagation 库的一些 API。它在 Micrometer 旗下开源,如果您想在代码中使用它,它也有包含示例的参考文档

Project Reactor 使用 ServiceLoader JDK 机制注册了一个处理 Reactor ContextContextAccessor。另一方面,Micrometer 注册了一个 ObservationThreadLocalAccessor,它处理 Micrometer Tracing 和其他仪器机制通过单一 Observation 概念工作所需的 ThreadLocal 状态。

我们强烈建议尝试使用 Spring Boot 和 Spring Boot Actuator 来启用追踪功能,亲身感受一下这种体验的连贯性。

总结

在本系列博客文章中,我们介绍了上下文传播的基础知识,并探讨了连接命令式和响应式编程范例的历史和现状。我们希望您现在可以自信地使用我们实现的功能。在最好的情况下,如果您使用自动上下文传播功能,您几乎无需进行任何工作。此外,在这种有趣的场景下,我们希望您的自定义传播逻辑可以利用我们在本文中描述的原语。如果您有疑问或想在 GitHub 上报告问题,可以联系我们

致谢

如果没有同事们逐字逐句的审阅,本系列文章将无法发表。我要感谢(按字母顺序排列):Simon Baslé, Jay Bryant, Pierre De Rop, Oleh Dokuka, Marcin Grzejszczak, Robert McNees, Rossen Stoyanchev, 和 Tadaya Tsuyukubo。

彩蛋

要玩转使用的示例,请随时使用我的 GitHub 仓库中的相关包

订阅 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

保持领先

VMware 提供培训和认证,助力您突飞猛进。

了解更多

获取支持

Tanzu Spring 通过简单的订阅,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部