Project Reactor 上下文传播 1 - 基础

工程 | Dariusz Jędrzejczyk | 2023年3月28日 | ...

本文属于系列文章

  1. 基础
  2. Spring Cloud Sleuth 的坎坷之路
  3. 响应式与命令式编程的统一桥接

Spring Boot 3 和 Spring Framework 6 带来了一种统一且一致的方式,以在使用了 Micrometer 的应用程序中启用可观测性 (Observability)。从 Spring Cloud Sleuth 到 Micrometer 的演进,以及 Observation API 和 Micrometer Tracing,促使我们整合了各种上下文传播 (Context Propagation) 的方法。在本系列博文中,我们将解释我们是如何在 Project Reactor 中实现上下文传播支持,以满足命令式 (Imperative) 库的需求。通过从基础开始构建理解,您将能够使用这些构造并了解底层发生的情况。我们假设您对响应式编程概念有基本了解。如果您是新手或想复习知识,请参阅 Project Reactor 文档中的响应式编程简介

在本文中,我们将开发一个简单的电商应用。我们的讨论角度限定于这样一个请求:添加一个产品并通知商店库存中新增了产品。作为负责任的开发者,我们希望记录针对特定请求所采取的所有步骤,以便在调查问题时,我们可以查看日志并了解发生了什么。我们将探讨如何在命令式风格下实现为日志工具提供关于请求的上下文元数据的目标,并将其与 Project Reactor 更具函数式、声明式的风格进行比较。接下来的文章将更详细地探讨我们为何以及如何需要在两种编程风格之间建立桥梁。

ThreadLocal

为了识别属于特定请求的日志,我们需要一种关联它们的方式。我们可以生成一个简单的随机标识符,如下所示:

static long correlationId() {
  return Math.abs(ThreadLocalRandom.current().nextLong());
}

我们需要一种方法,使关联标识符在日志工具中可用。我们可以将关联作为业务逻辑中每个方法调用的一部分,但这会非常侵入且冗长。

通常,第三方库使用 JDK 的 ThreadLocal 来传递并非应用程序业务逻辑主要关注点的隐式信息。

让我们为关联标识符声明一个静态字段

static final ThreadLocal<Long> CORRELATION_ID = new ThreadLocal<>();

这是我们的日志方法。它打印当前 Thread 的名称并格式化输出

static void log(String message) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, CORRELATION_ID.get(), message);
}

现在我们拥有了处理请求并使用隐式关联标识符记录每个步骤所需的一切。

在每个请求开始时,应用程序会调用以下方法来初始化关联

static void initRequest() {
  CORRELATION_ID.set(correlationId()));
}

我们简化的请求处理器执行以下操作

void handleRequest() {
  initRequest();

  addProduct("test-product");
  notifyShop("test-product");
}

业务逻辑中的日志记录如下所示

void addProduct(String productName) {
  log("Adding product: " + productName);
  // ...
}

void notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  // ...
}

我们可以预期应用程序会记录这些行

[      main][ 8592000019542134146] Adding product: test-product
[      main][ 8592000019542134146] Notifying shop about: test-product

只要特定请求的执行发生在同一个 Thread 上,并且不与其他关注点交错,ThreadLocal 就允许我们将业务逻辑与用于日志记录的元数据解耦。

异步处理

假设此应用程序开始面临更高的负载,需要处理大量并发请求。想象一下,我们可以使用一个异步且非阻塞的服务器实现,它要求我们提供异步声明,而不是命令式和阻塞步骤。

我们的请求处理器可以返回一个 CompletableFuture,以异步且非阻塞的方式处理请求

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"))
    .thenRunAsync(() -> notifyShop("test-product"))
}

不幸的是,当我们执行异步版本时,日志不再包含关联标识符

[l-worker-1][                null] Adding product: test-product
[l-worker-1][                null] Notifying shop about: test-product

任务包装

解决此问题的一个已知缓解方法是包装异步 API 执行的任务。包装是指一种执行 ThreadLocal 上下文恢复的实现。当任务创建时,捕获当前上下文。当工作 Thread 实际执行任务时,恢复该上下文。让我们看看这在我们使用 Runnable 的示例中如何工作

class WrappedRunnable implements Runnable {

  private final Long correlationId;
  private final Runnable wrapped;

  public WrappedRunnable(Runnable wrapped) {
    this.correlationId = CORRELATION_ID.get();
    this.wrapped = wrapped;
  }

  @Override
  public void run() {
    Long old = CORRELATION_ID.get();
    CORRELATION_ID.set(this.correlationId);
    try {
      wrapped.run();
    } finally {
      CORRELATION_ID.set(old);
    }
  }
}

我们可以重新实现我们的处理器,如下所示

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(new WrappedRunnable(
      () -> addProduct("test-product")))
    .thenRunAsync(new WrappedRunnable(
      () -> notifyShop("test-product")));
}

不幸的是,这带来了很多开销。幸运的是,JDK 有一个用于执行异步任务的 API:Executor 接口。在实际场景中,我们希望使用更全面的 API,即 ExecutorService。然而,为了说明目的,Executor 就足够了。

让我们来看看

static class WrappedExecutor implements Executor {

  private final Executor actual;

  WrappedExecutor(Executor actual) {
    this.actual = actual;
  }

  @Override
  public void execute(Runnable command) {
    actual.execute(new WrappedRunnable(command));
  }
}

让我们重用 CompletableFuture 框架默认使用的通用 ForkJoinPool,但用我们的实现对其进行包装。现在我们的代码看起来像这样

static Executor executor = new WrappedExecutor(ForkJoinPool.commonPool());

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"), executor)
    .thenRunAsync(() -> notifyShop("test-product"), executor);
}

我们的日志再次正常工作

[l-worker-1][ 7321040639376081961] Adding product: test-product
[l-worker-2][ 7321040639376081961] Notifying shop about: test-product

在某些场景下,CompletableFuture 框架可以提供以非阻塞方式处理异步任务的手段。然而,在许多情况下,其有限的 API 表面及其行为特性可能会受到限制。例如,我们可能希望延迟处理并在系统达到容量时稍后恢复。使用 CompletableFuture,所有创建的实例一旦创建就立即开始计算。我们可能还希望对数据流应用更精细的操作,而不是仅对单一计算单元进行操作。出于这些原因以及更多原因,我们可能会考虑使用响应式编程库。我们将考虑使用 Project Reactor,它是 Spring 产品组合中默认的响应式实现。

Project Reactor

为了提供一个健壮的异步处理框架,Java 社区提出了 Reactive Streams 规范。它帮助建立了 JDK 之前所缺乏的通用词汇——明确的信号传播、错误处理、终止和生命周期管理语义。它还允许内置背压 (backpressure)。Spring 通过引入 WebFlux 采用了这种方法,使 Project Reactor 及其响应式类型成为 API 的一等公民。

Reactive Streams 为异步流处理带来了优雅且极简的解决方案。然而,上下文传播 (Context Propagation) 并非规范的一部分。响应式库的非阻塞和异步特性,加上潜在复杂的实现,使得使用 ThreadLocal 变得极其困难。原因是无法保证用户的代码可以在哪个 Thread 上运行。只要实现保证串行交付,就允许进行各种优化,从而使用户的代码与并发无关 (concurrency-agnostic),将处理并发的负担转移到库的内部。

为了履行其保证,Java 中的响应式编程假定使用函数式编程范式来形成声明式且可组合的流,这与不同 Thread 可以执行用户提供的代码这一事实无关。只要用户代码中没有假设在特定 Thread 内执行的副作用,响应式库就可以提供极其高性能的运行时,同时遵守规范。ThreadLocal 显然违反了这一要求。

让我们尝试重写我们的处理器以使用 Project Reactor。单个操作变为

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

让我们尝试使用上述代码

Mono<Void> handleRequest() {
  initRequest();
  log("Assembling the chain");

  return Mono.just("test-product")
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product))
      .then())
}

我们幼稚的实现产生了预期的输出

[      main][ 7224499961623309444] Assembling the chain
[      main][ 7224499961623309444] Adding product: test-product
[      main][ 7224499961623309444] Notifying shop about: test-product

上述实现在 main Thread 中调用,且执行被限制在该 Thread 中。但是我们不应该做这样的假设。

在处理器中,我们在传播处理结果之前引入了轻微延迟。我们这样做是为了演示幕后发生的隐式 Thread 切换。

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product))
      .then())
}

运行时,会打印以下内容

[      main][ 6265915299594887150] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

发生了什么?为什么其中一个日志有关联标识符而其他日志没有?

当服务器调用我们的处理器时,<1> 处的初始化设置了 ThreadLocal 关联标识符,并且 <2> 处的日志能够使用它。有响应式编程经验的人会告诉您问题在于执行发生在不同的阶段。ThreadLocal 是在组装时 (assembly time) 设置的。“您也应该在订阅时 (subscription time) 恢复它”会是一个建议。稍后我们会回到这一点。如果“组装时”、“订阅时”和“执行时 (execution time)”这些术语让您感到困惑,请参阅 Simon 的博文或观看同名演讲中的精彩解释。

虽然方法会立即返回,但这不保证执行已经开始。这是因为返回的 Mono 必须被订阅 (subscribe) 才能触发处理。它可能在不同的 Thread 中发生。<3> 处的 delayElement 操作符隐式地使用了 Reactor 中的一个共享 Scheduler(线程池的抽象)在指定的延迟后将信号发送到另一个 Thread 上。该信号传播到下游操作符,这让我们能够按顺序先添加产品再通知商店。我们组装的管道还有更多令人惊讶的方面,但我们先不要过于困惑。

问题在于,在 <4> 处,如果我们记录日志,我们无法真正确定调用将发生在哪个 Thread 上。像 flatMap 这样的操作符可以引入它们自己的异步性。

在通常情况下,当链被订阅时,值开始被发送。因此,我们可以在每次订阅时恢复 ThreadLocal 的值。但这并非总是最好的主意。Subscription 可以在不同的 Thread 上异步发送。值也可以在不同的 Thread 上发送。在背压 (backpressure) 的情况下,信号可以作为请求更多数据的结果发送到执行请求的 Thread 上,而不是数据 Publisher 使用的 Thread 上。有很多活动部件和奇怪之处需要考虑!要了解更多关于 Reactor 中的线程和异步执行的信息,请查阅我们之前系列博文的另一部分

Reactor Context

Project Reactor 引入了一种与函数式编程良好契合的机制,用于传输上下文元数据 (contextual metadata)。它简称为 Context。尽管幕后会发生 Thread 切换,但它始终附着在响应式链上。

正如我们所见,Project Reactor 允许声明式地指定意图,同时保持与并发无关。它确实在必要时提供了控制并发的手段,通过使用专用的操作符或配置参数(例如 publishOn, subscribeOnflatMap 的高级参数),但该级别的控制已从核心处理逻辑中抽象出来。

我们之前提到了副作用。我们如何摆脱这些副作用,同时仍然能够传输上下文元数据 (contextual metadata)?

为了与函数式编程很好地配合,Context 绑定到 Subscriber,即 Publisher 发出的信号的消费者。订阅后,Subscriber 对组装管道中的所有前置操作符可见。当我们为一个 Subscriber 实例关联一个不可变的类似 Map 的数据结构时,它允许在响应式管道的各个部分附加和检索上下文信息。

通过控制影响以及在响应式链的各个步骤之间提供继承的手段,Reactor Context 是一个无副作用的概念,可用于为处理提供元信息。“这正是我们需要关联请求的方式!”

让我们重写我们的应用程序,使用 Reactor Context 代替 ThreadLocal

首先,我们需要将关联标识符作为日志方法的显式参数

static void log(String message, long correlationId) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, correlationId, message);
}

我们的操作如下

Mono<Void> addProduct(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Adding product: " + productName, ctx.get("CORRELATION_ID"));
    return Mono.empty(); // Assume we’re actually storing the product
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Notifying shop about: " + productName,
      ctx.get("CORRELATION_ID"));
    return Mono.just(true);
  });
}

有趣的是我们如何提供关联标识符。我们使用一个特殊的操作符,Mono.deferContextual,它可以访问 Context。我们从 ContextView(一个简化的只读 Context 版本)中提取关联标识符,然后返回一个实际的 Mono 供调用者订阅。

我们的处理器看起来像这样

Mono<Void> handleRequest() {
  long correlationId = correlationId();
  log("Assembling the chain", correlationId);

  Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(addProduct(product), notifyShop(product))
          .then())
    .contextWrite(Context.of("CORRELATION_ID", correlationId));

订阅后,输出正如预期

[      main][ 6328001264807824115] Assembling the chain
[parallel-1][ 6328001264807824115] Adding product: test-product
[parallel-1][ 6328001264807824115] Notifying shop about: test-product

信息流的反转非常明显。就像任何响应式链一样,我们通过组装一系列操作符来定义处理流程。一旦我们(或者实际上是服务器)订阅此链,信息就会从下游操作符流向上游操作符以启动处理。之后,实际的数据信号从上游发送到下游——例如,“test-product”值会传递给 flatMap 操作符,然后传递给 concat 操作符,后者又将值提供给 addProductnotifyShop。由于这种逻辑流,我们在非常末端(使用 contextWrite 方法)写入 Context,就在任何 Subscriber 订阅链之前。我们可以想象,Context 随后会与 Subscriber 一起对上游操作符中的所有阶段都可访问。

无论响应式管道在执行用户业务逻辑的过程中进行多少次线程跳转,上下文都不会丢失。

您可以在我们的文档中阅读更多关于 Reactor Context 的信息。

第三方库

不幸的是,我们不能指望第三方库使用 Reactor Context 来提供可观测性 (observability) 能力。传播隐式元信息的实际标准是 ThreadLocal。像 SLF4J 这样的库使用命令式 (imperative) 风格,在 Java 社区中地位稳固。如果我们可以让它们与响应式范式一起工作,而不是期望它们去适应响应式范式,那将是显而易见的胜利。在下一部分中,我们将讨论在 Spring Cloud Sleuth(一个可以与 Reactor 一起使用的追踪库)的响应式链中传播 ThreadLocal 值的历史和挑战。

获取 Spring 简报

订阅 Spring 简报,保持连接

订阅

领先一步

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部