使用 Project Reactor 2 进行上下文传播 - Spring Cloud Sleuth 的坎坷之路

工程 | Dariusz Jędrzejczyk | 2023年3月29日 | ...

这篇文章是系列文章的一部分

  1. 基础知识
  2. Spring Cloud Sleuth 的坎坷之路
  3. 响应式和命令式之间的统一桥接

Spring Cloud Sleuth 最近成为 Micrometer Tracing,它是 Micrometer 项目的一部分。大多数跟踪检测都集中在 Micrometer 中,位于新的可观察性 API之下。这些项目的目的是实现任何应用程序的可观察性——以指标、跟踪和包含相关标识符的日志的形式。为了实现这一目标,库需要一种传输上下文信息的方法。当应用程序以任何形式处理异步时,此任务就会变得非常具有挑战性。在上一篇文章中,我们介绍了使用ThreadLocal和 Reactor Context进行上下文传播的基础知识。

Spring Cloud Sleuth 在异步上下文传播的方法方面经历了许多转变。由于 Sleuth 处理不需要具有响应式 API 的第三方检测库,因此建立一种方法使上下文可供它们使用至关重要。这些库通常不假设异步性,而是依赖于静态ThreadLocal状态。多年来,ThreadLocal为 JVM 应用程序提供了隐式、上下文存储,用于驱动可观察性功能。随着时间的推移,Project Reactor 在底层原语之上引入了各种钩子和包装机制,以使响应式和命令式之间的桥接成为可能。在本文中,我们旨在探讨将上下文传播到ThreadLocal值的方法,并讨论它们可能存在的错误。我们将探讨 Sleuth 采用的方法,并以我们发现的良好折衷方案的总结作为结尾,该方案具有良好的性能和语义完整性。

在描述 Sleuth 引入的方法之前,我们应该考虑在命令式和响应式世界之间桥接所存在的危险。

隐藏并发情况下副作用的陷阱

我们在上一篇文章中讨论了Thread切换和相关副作用的一些潜在问题。现在,我们将使用 Reactor 的插件机制进一步探讨响应式编程的特性,以解决我们可能遇到的问题。

总而言之,Spring Cloud Sleuth 遇到的所有问题都是一个不断变化的目标。此外,组织中还有许多实现,它们实现了自己的上下文传播机制,例如,用于填充 SLF4J 的MDC。本文并非旨在全面总结所有潜在的陷阱。它更旨在培养一些直觉,帮助您理解最终的真相:要么遵循响应式编程规则,要么准备好在最意想不到的时刻失败。

调度程序钩子

众所周知,响应式链可以使用不同的Thread传播信号。根据我们在上一篇文章中了解到的内容,当在另一个Thread上继续执行时,在运行任务时恢复上下文是有意义的。Project Reactor 将管理Thread的任务委托给Scheduler。它还提供了一个专用的钩子,允许拦截特定工作单元的调度和运行:Schedulers.onScheduleHook。它的工作方式类似于上一篇文章中的WrappedExecutor。让我们看看可能考虑使用它的场景。

清理

在第 1 部分中,我们了解到我们不能依赖ThreadLocal值在响应式链中始终可用。如果我们尝试在订阅时初始化它,并在doFinally操作符中清除它会怎么样?我们的应用程序可以使用有限数量的Thread处理许多请求,其中一些是并发的。由于这些平台Thread可以重复使用,因此我们需要在处理另一个请求之前清除与一个请求关联的任何ThreadLocal状态,以防止不同的请求使用残留的相关标识符。

以下代码示例是对我们在上一部分编写的代码的修改,在其中我们没有使用 Reactor Context

handleRequest方法的潜在实现可能如下所示

Mono<Void> handleRequest() {
  return Mono.fromSupplier(() -> {
    initRequest(); // <1>
    return "test-product";
  }).flatMap(product ->
    Flux.concat(
      addProduct(product),
      notifyShop(product)).then())
    .doOnSuccess(v -> log("Done."))
    .doFinally(signalType ->
      CORRELATION_ID.remove()); // <2>
}

<1>中,我们设置ThreadLocal值,在<2>中,我们尝试清除它。

我们还修改了我们执行的操作,以便能够在addProduct方法中添加人工延迟

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return Mono.<Void>empty()
      .delaySubscription(Duration.ofMillis(10),
        Schedulers.single()); // <1>
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return Mono.just(true);
  });
}

请注意,在<1>中,我们通过延迟订阅并使用Schedulers.single()在 10 毫秒后启动订阅来引入异步性。delaySubscription将使用该Scheduler的底层ScheduledExecutorService并在延迟后在另一个Thread上启动订阅。

从上一篇文章中,我们知道在这种情况下我们需要恢复ThreadLocals,因此我们使用提到的Scheduler插件来实现这一点

Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);

在 Reactor 的Scheduler上执行的每个任务都将恢复ThreadLocal值,因此我们应该是安全的。

现在,让我们模拟两个顺序请求,它们之间用一个日志分隔,该日志验证CORRELATION_ID是否已正确清除

log("Got first request, calling handler");
handleRequest().block();

log("Got second request, calling handler");
log("There should be no correlationId on this line!");

handleRequest().block();

日志如下所示

[      main][                null] Got first request, calling handler // <1>
[      main][ 8658769170992364531] Adding product: test-product
[  single-1][ 8658769170992364531] Notifying shop about: test-product
[  single-1][ 8658769170992364531] Done.
[      main][ 8658769170992364531] Got second request, calling handler
[      main][ 8658769170992364531] There should be no correlationId on this line!
[      main][  711436174608061530] Adding product: test-product
[  single-1][  711436174608061530] Notifying shop about: test-product
[  single-1][  711436174608061530] Done.

“test-product”处理相关的日志具有正确相关标识符。但是,请求之间发生了什么?我们期望在doFinally中清除ThreadLocal。不幸的是,请求之间的日志仍然包含标识符。那么发生了什么呢?

请注意“Notifying shop about”日志发生在Thread single-1上。信号是在该Thread上传递的,因此我们清除了那里的ThreadLocal,但留下了主Thread被污染(在<1>中)。现在,我们处理程序之外的执行可以将错误的相关标识符用于不同的目的。我们可以尝试通过在服务器层(调度请求)添加清理逻辑来缓解此问题,并确保用于请求的每个Thread都没有被污染。如果我们的管道更复杂,这将无法保存所有其他潜在的Scheduler Thread

这种方法在允许应用程序在响应式链中透明地使用ThreadLocal值方面取得了很大进展。从性能的角度来看,它也是合理的,因为它不会在每个操作符周围设置和重置ThreadLocal,而只会在处理项目时发生Thread切换时设置和重置。但是,它也表明有一些副作用仍未解决。在接下来的示例中,我们将体验并尝试解决不同的场景。

外部源和接收器的困难

使用ThreadLocal作为上下文元数据传输机制的策略的另一个常见问题是,当使用与 Reactor 不同的异步库并且它自行切换Thread时。当执行更改为不受包装的ExecutorService控制的不同Thread时,上下文将丢失。

让我们看看它的实际操作。我们将重用迄今为止看到的大部分代码,并在notifyShop方法中进行一项更改。它现在通过使用以下方法模拟远程调用

Mono<Boolean> makeRequest(String productName) {
  return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
    CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}

因此notifyShop如下所示

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName);
  });
}

如果我们触发一次处理程序

handleRequest().block();

我们将获得以下输出

[      main][  683056557275963407] Adding product: test-product
[  single-1][  683056557275963407] Notifying shop about: test-product
[l-worker-1][                null] Done!

日志缩短了Thread名称以提高可见性,但l-worker-1实际上是ForkJoinPool.commonPool-worker-1的缩写版本。

如我们所见,我们的执行在不受我们控制的公共ForkJoinPool上继续进行。一个问题是我们从该Thread切换开始不再看到我们的相关标识符,但另一个问题是我们在一个实际上缺少相关信息的Thread上执行清理。

我们可以通过Executor或任务包装(如上一篇文章中所示)来潜在地改进(部分)情况,但我们并不总是拥有这样的控制权——例如,如果我们调用使用CompletableFuture的外部库。

操作符钩子

我们即将讨论 Sleuth 的策略。Schedulers.onScheduleHook 在处理反应式过程中可能发生的非明显的 Thread 切换方面,能力有限。我们需要更多地控制操作的执行。我们将通过引入两种外部服务通信的方式来演示这些限制。

addProduct 方法现在发出远程请求,并在我们控制的 Scheduler 上发布结果。将繁重的计算卸载到不同的 Thread 上是很常见的做法。为此,我们使用 publishOn 运算符

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return makeRequest(productName)
      .publishOn(Schedulers.single())
      .then();
  });
}

notifyShop 方法模拟将结果映射到可能多个 Publisher 中。如果响应是复合结果,这可能是一个典型的场景 - 例如,如果响应是 JSON 数组,而我们打算将每个项目作为对另一个服务的单独调用进行处理或丰富单个结果。让我们使用一个简化的版本,只获取单个结果

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName)
      .flatMapMany(result ->
        Flux.just("result")
          .map(x -> result))
          .take(1)
          .single();
    });
}

让我们暂时跳过处理程序,手动初始化关联标识符,然后订阅这些链

initRequest();
addProduct("test-product")
  .doOnSuccess(v -> log("Added."))
  .block();

initRequest();
notifyShop("test-product")
  .doOnSuccess(v -> log("Notified."))
  .block();

让我们看看输出

[      main][ 6606077262934500649] Adding product: test-product
[  single-1][                null] Added.
[      main][  182687922231622589] Notifying shop about: test-product
[l-worker-1][                null] Notified.

这是预期的,因为 doOnSuccess 中发生的两个日志都是由于 CompletableFutureForkJoinPool Thread 上传递值而触发的。即使我们有 Scheduler 包装,结果首先是在我们无法控制的 Thread 上传递的,因此即使在 addProduct 中使用的 publishOn 也无济于事。

我们能做些什么来改善这种情况呢?Reactor 拥有一个细粒度的插件系统,它允许我们在任何管道中装饰任何运算符。我们可以尝试将其用于恢复关联标识符的目的。

插件将使用自定义的 Subscriber 实现,该实现会在订阅时捕获关联标识符

static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
  final CoreSubscriber<T> delegate;
  Long correlationId;

  public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
    this.delegate = delegate;
  }

  @Override
  public void onSubscribe(Subscription s) {
    delegate.onSubscribe(s);
    this.correlationId = CORRELATION_ID.get();
  }

  @Override
  public void onNext(T t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onNext(t);
  }

  @Override
  public void onError(Throwable t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onError(t);
  }

  @Override
  public void onComplete() {
    CORRELATION_ID.set(this.correlationId);
    delegate.onComplete();
  }
}

为了更改运算符以使我们的实现将调用委托给实际的 Subscriber 实例,我们可以使用 Operators.lift 方法

Operators.lift((scannable, subscriber) ->
  new CorrelatingSubscriber<>(subscriber));

onEachOperator Hook

首先,我们将尝试一个插件,它允许我们更改链中的每个运算符

Hooks.onEachOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

让我们再次运行我们的示例并检查输出

[      main][ 7295088917002526647] Adding product: test-product
[  single-1][ 7295088917002526647] Added.
[      main][  383851863754448684] Notifying shop about: test-product
[l-worker-1][  383851863754448684] Notified.

哇!即使在如此复杂的场景中,我们也设法获得了关联标识符。订阅的初始操作捕获了 ThreadLocal 值并在每个步骤中将其恢复。即使在 notifyShop 方法中使用的 flatMap(它自己订阅)也能正常工作,因为在另一个 Thread 上订阅之前,ThreadLocal 会从之前的捕获中填充!这确实听起来很棒,但是这种方法也有一些缺点。第一个也是最明显的缺点是性能。传播发生在每个运算符上。使用这种技术,我们首先装饰每个对象,并在每个步骤中进行 ThreadLocal 访问。所有这些都是昂贵的。要了解更多信息,请观看 Oleh 关于反应式性能的演讲

onLastOperator Hook

所以让我们尝试一种不同的方法。这次,我们将使用一个插件,该插件附加到链中被认为是最后一个的每个运算符 - subscribe() 调用之前的直接运算符。

关于反应式链可以做出一个观察:在同步运算符的情况下,我们不需要在每次单独的操作(例如,filtermap)中恢复最初捕获的上下文,而只需要在链中的最后一个运算符被订阅时恢复。只要不涉及 Thread 边界交叉,此机制就能正常工作。为了支持可能跨越这些边界的运算符(例如 flatMap,它涉及订阅新的 Publisher),需要使用一个特殊的技巧。它将映射的结果视为其操作的内部 Publishers 的最后一个运算符。

让我们尝试这种方法

Hooks.onLastOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

并运行

[      main][ 2122332013640150746] Adding product: test-product
[  single-1][ 2122332013640150746] Added.
[      main][  459477771449275997] Notifying shop about: test-product
[l-worker-1][                null] Notified.

它在 addProduct 中的 publishOn 上有效,但在 notifyShop 中的 flatMap 上失败了。

让我们分析一下 notifyShop 为什么失败。我们对 block() 的调用捕获了 ThreadLocal 并为向下游传播的每个信号恢复它。通过在 flatMapMany 中完成的映射,我们正在处理之前提到的异步边界。实际上,我们的插件应用于内部源 (Flux.just().map().single())。

但是,尽管 flatMapMany 内部调用了自定义的 Subscriber 并尝试恢复 ThreadLocal 值,但这些努力仍然没有奏效。触发内部订阅的信号是在我们无法控制的 Thread 上发起的,因此我们首先没有 ThreadLocal 可以捕获。

publishOn 运算符的情况下有所不同。对它的订阅始于我们控制的 Thread。因此,当信号作为 makeRequest() 方法的结果进行处理时,它只会传递到我们控制的 Thread 上。.doOnSuccess(v -> log("Added.")) 的执行发生在与 flatMapMany 情况不同的 Thread 边界之后。

这就是 onEachOperator 涵盖更多情况的原因 - 它在每个步骤中恢复初始值,而不管异步边界如何。不过,onLastOperator 的性能略好于 onEachOperator

addQueueWrapper Hook

如果我们将其与之前的钩子结合使用,还有一个插件可以用来完全控制反应式传递。它也由 Spring Cloud Sleuth 使用。我们正在考虑一个最近引入的插件,Hooks.addQueueWrapper。不过,我们不会详细探讨它。它可以解决 Reactor 中工作窃取机制引入的问题。异步运算符(如 flatMap)可以在各种传递信号到运算符的 Thread 上取得进展。想象一个背压场景,其中处理暂停了一段时间。在某个时刻,一个新的 Thread 可以接管并发出 Subscription.request(n) 调用,这会导致累积的值立即传递。现在您可以问自己:“哪些累积的值?”这是一个好问题。Reactor 中的许多运算符使用内部 Queue 来实现背压或保留串行传递语义。因为这些 Queue 的清空可能发生在任何 Thread 上,所以上下文信息应该附加到存储在 Queue 中的每个信号上 - 即,出于我们的关联目的的 ThreadLocal 值。这就是我们需要 Queue 包装器的原因 - 在将值提交到 Queue 时,我们捕获 ThreadLocal 状态。当从 Queue 中检索值时,状态将被恢复。

Context Propagation in Spring Cloud Sleuth

在展示了在反应式流术语之外操作的风险以及我们可以使用哪些机制来传播 ThreadLocal 上下文之后,让我们总结一下 Spring Cloud Sleuth 使用的四种 策略

  1. DECORATE_ON_EACH
  2. DECORATE_ON_LAST
  3. DECORATE_QUEUES
  4. MANUAL

前三种策略试图利用反应式运算符的一些特性,以及 Reactor 的 插件机制,并使用 ThreadLocal 作为内部传输机制以及与检测库共享上下文数据的方法。前三种策略还假设使用 Schedulers.onScheduleHook 进行 Scheduler 包装。另一方面,最后一种策略利用了 Reactor 的 Subscriber 绑定的 Context

DECORATE_ON_EACH

此策略使用我们之前看到的 Hooks.onEachOperator 插件。即使 Sleuth 添加了许多优化以避免在不需要时进行恢复,性能影响也是巨大的。通常,此方法非常有效。不过,它非常激进,因此如果运算符需要更改上下文,则可能难以应对。下游运算符将看不到更改,因为初始订阅的上下文在每个步骤中都会被恢复。

DECORATE_ON_LAST

Hooks.onLastOperator 用于提高性能。这种方法可能会失败,因为它提供了灵活性。如果上游运算符修改了上下文,则下游操作会看到更改。这存在风险,如果某个运算符清除了该上下文,则该上下文将丢失,直到另一个信号被调度到包装的 Scheduler。另一个风险是我们之前示例中看到的,订阅发生在某个 Thread 上,但请求数据发生在另一个 Thread 上,而该 Thread 不在 Reactor 的控制之下。

DECORATE_QUEUES

相较于之前的策略,DECORATE_QUEUES 解决了部分错误场景(请求数据发生在带外或多个线程发布数据),但并非所有场景。我们使用前面描述的方式使用 Hooks.addQueueWrapper 插件。队列包装的一个已知问题是,没有可靠的方法来清理项目处理后的状态。从队列中检索项目时会恢复上下文。在通过下游操作符传递的项目的处理过程中,没有范围。因此,这种方法也容易污染ThreadLocal存储。最近在排水过程中有一些改进,以限制其影响。

MANUAL

在这种策略中,Sleuth 唯一做的事情是在订阅时将ThreadLocal中的值作为快照捕获到 Reactor 的Context中。用户需要在相关位置提取该快照并填充ThreadLocal,使其可供检测库使用。对于支持的跟踪检测,例如 Zipkin 和 Brave,Sleuth 通过使用范围的概念恢复ThreadLocal - ThreadLocal 会在检测过程中恢复,并在快照关闭后立即消失。虽然需要用户手动(顾名思义)处理,但这是性能最高的方案。

Evolution

使用 Reactor Context 在本地范围内填充ThreadLocal被证明既高效又符合反应式链的工作方式。将上下文与订阅者关联是一种行之有效的方法,不会意外导致上下文数据丢失。在下一篇文章中,我们将展示 Reactor 3.5 和 Micrometer 1.10 如何将手动方法提升到一个新的水平,并提供一种跨反应式和命令式边界传播上下文的有条理的方法。

获取 Spring 电子邮件简报

与 Spring 电子邮件简报保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部