使用 Project Reactor 2 进行上下文传播 - Spring Cloud Sleuth 的坎坷之路

工程 | Dariusz Jędrzejczyk | 2023年3月29日 | ...

本文是系列文章的一部分

  1. 基础知识
  2. Spring Cloud Sleuth 的坎坷之路
  3. 响应式与命令式之间的统一桥接

Spring Cloud Sleuth 最近成为了 Micrometer Tracing,它是 Micrometer 项目的一部分。大多数跟踪插桩都集中在 Micrometer 中,位于新的可观测性 API之下。这些项目的目标是让任何应用程序都具备可观测性——以包含关联标识符的指标、跟踪和日志的形式。为了实现这一目标,库需要一种方式来传输上下文信息。当应用程序以任何形式处理异步时,这项任务就变得相当具有挑战性。在上一篇文章中,我们回顾了使用 ThreadLocal 和 Reactor Context 进行上下文传播的基础知识。

Spring Cloud Sleuth 在处理异步上下文传播的方式上经历了多次重大调整。由于 Sleuth 需要与不一定具有响应式 API 的第三方插桩库打交道,因此拥有一种既定的方式来使上下文对它们可用至关重要。这些库通常不假设异步,而是依赖静态的 ThreadLocal 状态。多年来,ThreadLocal 为 JVM 应用程序提供了隐式的上下文存储,用于驱动可观测性功能。随着时间的推移,Project Reactor 在底层原语之上引入了各种钩子(hook)和包装(wrapping)机制,以便实现响应式和命令式之间的桥接。在本文中,我们将探讨将上下文传播到 ThreadLocal 值的方法,并讨论它们可能出现的问题。我们将探讨 Sleuth 采取的方法,并总结我们发现的一种兼顾性能和语义合理性的良好折衷方案。

在描述 Sleuth 引入的方法之前,我们应该考虑在命令式世界和响应式世界之间进行桥接所存在的危险。

隐藏并发下的副作用陷阱 (Pitfalls of Side Effects in the Face of Hidden Concurrency)

在上一篇文章中,我们讨论了一些 Thread 切换和相关副作用的潜在问题。现在,我们将利用 Reactor 的插件机制来解决我们可能遇到的问题,从而更深入地探索响应式编程的特性。

总结 Spring Cloud Sleuth 遇到的所有问题是一个不断变化的目标。此外,许多组织都有自己的上下文传播机制实现,例如用于填充 SLF4J 的 MDC。本文无意全面总结所有潜在陷阱。它旨在建立一些直觉,帮助你理解最终的真理:要么遵循响应式编程规则,要么准备在最意想不到的时刻失败。

调度器钩子 (Scheduler Hook)

正如我们所知,响应式链可以使用不同的 Thread 来传播信号。从上一篇文章中我们了解到,当执行在另一个 Thread 上继续时,在任务运行时恢复上下文是合理的。Project Reactor 将管理 Thread 的任务委托给 Scheduler。它还提供了一个专用的钩子,允许拦截特定工作单元的调度和运行:Schedulers.onScheduleHook。它的工作方式类似于上一篇文章中的 WrappedExecutor。让我们来看一个可能考虑使用它的场景。

清理 (Cleanup)

在第一部分中,我们了解到在响应式链中不能持续依赖 ThreadLocal 值可用。如果我们尝试在订阅时初始化它,并在 doFinally 操作符中清除它呢?我们的应用程序可以使用有限数量的 Thread 来处理许多请求,其中一些是并发的。由于这些平台 Thread 可以被重用,我们需要在处理另一个请求之前对与前一个请求相关的任何 ThreadLocal 状态进行清理,以确保不同的请求不会使用残留的关联标识符。

接下来的代码示例是对上一部分中我们编写的未使用 Reactor Context 的代码进行的修改。

handleRequest 方法的一种潜在实现可能如下所示

Mono<Void> handleRequest() {
  return Mono.fromSupplier(() -> {
    initRequest(); // <1>
    return "test-product";
  }).flatMap(product ->
    Flux.concat(
      addProduct(product),
      notifyShop(product)).then())
    .doOnSuccess(v -> log("Done."))
    .doFinally(signalType ->
      CORRELATION_ID.remove()); // <2>
}

<1> 中,我们设置了 ThreadLocal 的值,在 <2> 中,我们尝试清除它。

我们还修改了执行的操作,以便能够在 addProduct 方法中添加一个人工延迟

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return Mono.<Void>empty()
      .delaySubscription(Duration.ofMillis(10),
        Schedulers.single()); // <1>
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return Mono.just(true);
  });
}

注意,在 <1> 中,我们通过延迟订阅并使用 Schedulers.single() 在 10ms 后启动订阅来引入异步。delaySubscription 将使用该 Scheduler 底层的 ScheduledExecutorService 并在延迟后在另一个 Thread 上启动订阅。

从上一篇文章中我们知道,在这种情况下我们需要恢复 ThreadLocals,因此我们使用提到的 Scheduler 插件来实现这一点

Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);

在 Reactor 的 Scheduler 上执行的每个任务都会恢复 ThreadLocal 的值,因此我们应该安全无虞。

现在,让我们模拟两个顺序请求,它们之间有一个日志,用于验证 CORRELATION_ID 是否已正确清除

log("Got first request, calling handler");
handleRequest().block();

log("Got second request, calling handler");
log("There should be no correlationId on this line!");

handleRequest().block();

日志如下所示

[      main][                null] Got first request, calling handler // <1>
[      main][ 8658769170992364531] Adding product: test-product
[  single-1][ 8658769170992364531] Notifying shop about: test-product
[  single-1][ 8658769170992364531] Done.
[      main][ 8658769170992364531] Got second request, calling handler
[      main][ 8658769170992364531] There should be no correlationId on this line!
[      main][  711436174608061530] Adding product: test-product
[  single-1][  711436174608061530] Notifying shop about: test-product
[  single-1][  711436174608061530] Done.

“test-product” 处理相关的日志具有正确的关联标识符。然而,在请求之间发生了什么?我们期望 ThreadLocaldoFinally 中被清除。不幸的是,请求之间的日志仍然包含一个标识符。那么发生了什么?

注意 “Notifying shop about” 的日志发生在 Thread single-1 上。信号是在该 Thread 上传递的,所以我们在那里清除了 ThreadLocal,但主 Thread 仍然被污染了(在 <1> 中)。现在,处理程序之外的执行可以使用错误的关联标识符用于不同的目的。我们可以尝试通过在服务器层(负责分派请求)添加清理逻辑来缓解此问题,并确保用于请求的每个 Thread 不被污染。但是,如果我们的管道更复杂,这并不能挽救所有其他潜在的 Scheduler Thread

这种方法在允许应用程序在响应式链中透明地使用 ThreadLocal 值方面取得了相当大的进展。从性能角度来看,它也是合理的,因为它不会在每个操作符周围设置和重置 ThreadLocal,而只在处理项目时发生 Thread 切换时进行。然而,它也表明仍然存在一些未解决的副作用。在接下来的示例中,我们将体验并尝试解决不同的场景。

外部源和汇合的困难 (Difficulties with External Sources and Sinks)

使用 ThreadLocal 作为上下文元数据传输机制的策略还有一个常见问题,即使用了 Reactor 之外的异步库,并且该库自行切换了 Thread。当执行切换到不受包装的 ExecutorService 控制的不同 Thread 时,上下文就会丢失。

让我们实际看看。我们将重用目前为止看过的大部分代码,但对 notifyShop 方法进行一项更改。它现在通过使用以下方法来模拟远程调用

Mono<Boolean> makeRequest(String productName) {
  return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
    CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}

所以 notifyShop 看起来像这样

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName);
  });
}

如果我们触发处理程序一次

handleRequest().block();

我们得到以下输出

[      main][  683056557275963407] Adding product: test-product
[  single-1][  683056557275963407] Notifying shop about: test-product
[l-worker-1][                null] Done!

日志缩短了 Thread 名称以提高可见性,但 l-worker-1 实际上是 ForkJoinPool.commonPool-worker-1 的缩短版。

正如我们所见,我们的执行在一个我们无法控制的通用 ForkJoinPool 上继续进行。一个问题是,从该 Thread 切换开始,我们就再也看不到我们的关联标识符了;另一个问题是,我们在一个实际上缺少关联信息的 Thread 上执行了清理操作。

我们可以通过 Executor 或任务包装来潜在地改善(部分)情况,如前一篇文章所述,但我们并非总是拥有这样的控制权——例如,如果我们调用一个使用 CompletableFuture 的外部库。

操作符钩子 (Operator Hooks)

我们几乎准备好讨论 Sleuth 的策略了。Schedulers.onScheduleHook 对于响应式处理中可能发生的非显而易见的 Thread 切换提供了有限的能力。我们需要对操作的执行有更多的控制。我们将通过引入两种外部服务通信方式来演示这些限制。

addProduct 方法现在发起一个远程请求,并在我们控制的 Scheduler 上发布结果。将繁重的计算转移到不同的 Thread 上是很常见的做法。为此,我们使用了 publishOn 操作符

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return makeRequest(productName)
      .publishOn(Schedulers.single())
      .then();
  });
}

notifyShop 方法模拟将结果映射到可能多个 Publisher 中。这在响应是一个复合结果的情况下是典型的场景——例如,如果响应是一个 JSON 数组,我们打算将每个项作为对另一个服务的单独调用进行处理,或者丰富单个结果。我们使用一个简化版本,只取一个结果

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName)
      .flatMapMany(result ->
        Flux.just("result")
          .map(x -> result))
          .take(1)
          .single();
    });
}

现在我们跳过处理程序,手动初始化关联标识符,然后订阅这些链

initRequest();
addProduct("test-product")
  .doOnSuccess(v -> log("Added."))
  .block();

initRequest();
notifyShop("test-product")
  .doOnSuccess(v -> log("Notified."))
  .block();

让我们看看输出

[      main][ 6606077262934500649] Adding product: test-product
[  single-1][                null] Added.
[      main][  182687922231622589] Notifying shop about: test-product
[l-worker-1][                null] Notified.

这是预料之中的,因为 doOnSuccess 中发生的两个日志都是由 CompletableFutureForkJoinPool Thread 上传递值触发的。即使我们有 Scheduler 包装,结果首先是在我们无法控制的 Thread 上传递的,因此即使在 addProduct 中使用 publishOn 也无济于事。

我们能做些什么来改善这种情况吗?Reactor 有一个细粒度的插件系统,它允许我们在任何管道中装饰任何操作符。我们可以尝试使用它来恢复关联标识符。

这些插件将使用自定义的 Subscriber 实现,该实现在订阅时捕获关联标识符

static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
  final CoreSubscriber<T> delegate;
  Long correlationId;

  public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
    this.delegate = delegate;
  }

  @Override
  public void onSubscribe(Subscription s) {
    delegate.onSubscribe(s);
    this.correlationId = CORRELATION_ID.get();
  }

  @Override
  public void onNext(T t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onNext(t);
  }

  @Override
  public void onError(Throwable t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onError(t);
  }

  @Override
  public void onComplete() {
    CORRELATION_ID.set(this.correlationId);
    delegate.onComplete();
  }
}

要更改操作符,使其实现将调用委托给实际的 Subscriber 实例,我们可以使用 Operators.lift 方法

Operators.lift((scannable, subscriber) ->
  new CorrelatingSubscriber<>(subscriber));

onEachOperator 钩子 (onEachOperator Hook)

首先,我们尝试一个插件,它允许我们修改链中的每一个操作符

Hooks.onEachOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

让我们再次运行示例并查看输出

[      main][ 7295088917002526647] Adding product: test-product
[  single-1][ 7295088917002526647] Added.
[      main][  383851863754448684] Notifying shop about: test-product
[l-worker-1][  383851863754448684] Notified.

哇!即使在如此复杂的场景中,我们也设法获取了关联标识符。最初的订阅行为捕获了 ThreadLocal 值并在每一步都恢复了它。即使 notifyShop 方法中使用的 flatMap(它会自行订阅)也奏效了,因为在订阅另一个 Thread 之前,ThreadLocal 已从先前的捕获中填充!这听起来确实很棒,但这种方法也有缺点。第一个也是最明显的缺点是性能。传播发生在每个操作符上。使用这种技术,我们首先包装(decorate)每个对象,并在每一步都进行 ThreadLocal 访问。所有这些操作都很昂贵。要了解更多信息,请观看Oleh 关于响应式性能的演讲

onLastOperator 钩子 (onLastOperator Hook)

所以我们尝试另一种方法。这次,我们将使用一个插件,它会附着到链中被视为最后一个操作符的操作符上——也就是紧接在 subscribe() 调用之前的操作符。

对于响应式链可以得出一个观察结论:对于同步操作符,我们不需要在每个单独的操作(例如 filtermap)中恢复最初捕获的上下文,而只需要在链中最后一个操作符被订阅时恢复。只要不涉及 Thread 边界的跨越,这种机制就能奏效。为了支持可能跨越这些边界的操作符(例如 flatMap,它涉及订阅一个新的 Publisher),需要一个特殊的技巧。它将映射的结果视为它们操作的内部 Publisher 的最后一个操作符。

让我们尝试这种方法

Hooks.onLastOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

并运行

[      main][ 2122332013640150746] Adding product: test-product
[  single-1][ 2122332013640150746] Added.
[      main][  459477771449275997] Notifying shop about: test-product
[l-worker-1][                null] Notified.

它在 addProduct 中与 publishOn 一起工作正常,但在 notifyShop 中的 flatMap 上失败了。

让我们分析一下为什么 notifyShop 会失败。我们调用 block() 会捕获 ThreadLocal 并为每个向下游传播的信号恢复它。通过在 flatMapMany 中完成的映射,我们正在处理之前提到的异步边界。我们的插件实际上应用于内部源 (Flux.just().map().single())。

然而,这些努力仍然没有奏效,尽管自定义的 SubscriberflatMapMany 内部被调用并尝试恢复 ThreadLocal 的值。触发内部订阅的信号是在我们无法控制的 Thread 上启动的,因此我们根本没有 ThreadLocal 可以捕获。

在使用 publishOn 操作符的情况下则不同。对其的订阅始于我们控制的 Thread。因此,当处理来自 makeRequest() 方法的结果信号时,它只会在我们控制的 Thread 上传递。.doOnSuccess(v -> log("Added.")) 的执行发生在与 flatMapMany 不同步的 Thread 边界之后。

这就是为什么 onEachOperator 能覆盖更多情况——它在每一步都恢复初始值,无论是否存在异步边界。尽管如此,onLastOperator 的性能略优于 onEachOperator

addQueueWrapper 钩子 (addQueueWrapper Hook)

还有一个插件,如果将其与之前的钩子结合使用,我们可以完全控制响应式传递。Spring Cloud Sleuth 也使用了它。我们考虑的是最近引入的插件 Hooks.addQueueWrapper。不过,我们将不详细探讨它。它可以解决 Reactor 中工作窃取机制引入的问题。异步操作符,例如 flatMap,可以在将信号传递给操作符的各种 Thread 上取得进展。想象一个背压场景,处理暂停了一段时间。在某个时刻,一个新的 Thread 可以接管并发出 Subscription.request(n) 调用,这将导致累积的值立即传递。现在你可以问自己:“什么累积的值?”这是一个好问题。Reactor 中的许多操作符使用内部 Queue 来实现背压或保留串行传递语义。因为这些 Queue 的 draining 可以在任何 Thread 上发生,所以上下文信息应该附加到存储在 Queue 中的每个信号上——也就是说,用于我们关联目的的 ThreadLocal 值。这就是为什么我们需要一个 Queue 包装器——在将值提交到 Queue 时,我们捕获 ThreadLocal 状态。当从 Queue 中检索值时,状态就会恢复。

Spring Cloud Sleuth 中的上下文传播 (Context Propagation in Spring Cloud Sleuth)

在展示了在 reactive-streams 术语之外操作的风险以及我们可以用来传播 ThreadLocal 上下文的机制后,让我们总结一下 Spring Cloud Sleuth 使用的四种策略

  1. DECORATE_ON_EACH
  2. DECORATE_ON_LAST
  3. DECORATE_QUEUES
  4. MANUAL

前三种策略试图利用响应式操作符的一些特性,结合 Reactor 的插件机制,并使用 ThreadLocal 作为内部传输机制以及与插桩库共享上下文数据的方式。前三种策略还假定使用 Schedulers.onScheduleHook 进行 Scheduler 包装。另一方面,最后一种策略则利用了 Reactor 绑定到 SubscriberContext

DECORATE_ON_EACH

此策略使用我们之前见过的 Hooks.onEachOperator 插件。性能影响巨大,尽管 Sleuth 添加了许多优化以在非必要时不进行恢复。通常,这种方法非常有效。但它也非常激进,因此如果某个操作符需要更改上下文,就可能难以处理。下游操作符将看不到更改,因为来自初始订阅的上下文在每一步都被恢复了。

DECORATE_ON_LAST

使用 Hooks.onLastOperator 是为了提高性能。这种方法可能会失败,因为它提供了灵活性。如果上游操作符修改了上下文,下游操作会看到更改。这带来了风险,如果某个操作符清除了该上下文,那么在该上下文被调度到包装的 Scheduler 之前,上下文就会丢失。另一个风险是我们之前示例中看到的情况,即订阅发生在某个 Thread 上,但请求数据发生在另一个不受 Reactor 控制的 Thread 上。

DECORATE_QUEUES

DECORATE_QUEUES 是前一种策略的演进,它纠正了一些错误场景(请求数据带外发生或多个 Thread 发布数据),但并非所有场景。我们之前描述过如何使用 Hooks.addQueueWrapper 插件。Queue 包装的一个已知问题是,在处理完一个项目后,没有可靠的方法进行清理。上下文在从 Queue 中检索项目时恢复。没有围绕通过下游操作符传输的项目处理的范围。因此,这种方法也容易污染 ThreadLocal 存储。最近在 draining 过程中进行了一些改进以限制影响。

手动 (MANUAL)

在这种策略中,Sleuth 唯一做的事情是在订阅时将 ThreadLocal 的值作为快照捕获到 Reactor 的 Context 中。用户需要在相关位置提取该快照并填充 ThreadLocal,以便将其提供给插桩库。对于支持的跟踪插桩,例如 Zipkin 和 Brave,Sleuth 通过使用作用域(scoping)的概念来恢复 ThreadLocal —— ThreadLocal 会为插桩而恢复,并在快照关闭后立即消失。这是性能最高的方法,尽管它需要用户进行手动(正如其名称所示)处理。

演进 (Evolution)

在局部作用域中使用 Reactor Context 填充 ThreadLocal 被证明既具有高性能,又符合响应式链的工作方式。将上下文与 Subscriber 关联是一种经过验证的方法,不会意外导致上下文数据丢失。在下一篇文章中,我们将展示 Reactor 3.5 和 Micrometer 1.10 如何将手动方法提升到新的水平,并提供一种结构化的方法来跨越响应式和命令式边界传播上下文。

获取 Spring 时事通讯

订阅 Spring 时事通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速前进。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部