使用 Project Reactor 1 进行上下文传播 - 基础知识

工程 | Dariusz Jędrzejczyk | 2023年3月28日 | ...

此帖子是系列的一部分

  1. 基础知识
  2. Spring Cloud Sleuth 的坎坷之路
  3. 响应式和命令式之间的统一桥接

Spring Boot 3 和 Spring Framework 6 为我们带来了一种统一且一致的方式来启用使用 Micrometer 的应用程序中的可观察性。从 Spring Cloud Sleuth 到 Micrometer 的演变,以及 Observation API 和 Micrometer Tracing,使我们能够整合各种上下文传播方法。在本博客文章系列中,我们的目标是解释我们如何在 Project Reactor 中支持上下文传播以满足命令式库的需求。通过从头开始构建您的理解,您将能够使用这些构造并了解底层发生的事情。我们假设您了解基本的响应式编程概念。如果您是新手或想复习您的知识,请查看 Project Reactor 文档中的响应式编程简介

在本文中,我们开发了一个简单的电子商务应用程序。我们有限的角度考虑了一个添加产品的请求,并通知商店已将新产品添加到库存中。作为负责任的开发人员,我们希望记录特定请求执行的所有步骤,以便如果我们要调查问题,我们可以查看日志并了解发生了什么。我们将探讨如何在命令式风格中使用上下文元数据为日志实用程序提供日志记录功能,并将此与 Project Reactor 更具功能性和声明式的风格进行比较。接下来的文章将更详细地探讨为什么以及如何需要在两种编程风格之间建立桥梁。

ThreadLocal

为了识别属于特定请求的日志,我们需要一种关联它们的方法。我们可以生成一个简单的随机标识符,如下所示

static long correlationId() {
  return Math.abs(ThreadLocalRandom.current().nextLong());
}

我们需要一种方法使关联标识符在日志记录实用程序中可用。我们可以将关联作为业务逻辑中每个方法调用的部分,但这会非常侵入性和冗长。

通常,第三方库使用 JDK 的 ThreadLocal 来传达与我们应用程序业务逻辑的主要关注点无关的隐式信息。

让我们为我们的关联标识符声明一个静态字段

static final ThreadLocal<Long> CORRELATION_ID = new ThreadLocal<>();

这是我们的日志方法。它打印当前的 Thread 名称并格式化输出

static void log(String message) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, CORRELATION_ID.get(), message);
}

现在我们拥有了处理请求并使用隐式关联标识符记录每个步骤所需的一切。

在每个请求的开头,应用程序都会调用以下方法来启动关联

static void initRequest() {
  CORRELATION_ID.set(correlationId()));
}

我们简化的请求处理程序执行以下操作

void handleRequest() {
  initRequest();

  addProduct("test-product");
  notifyShop("test-product");
}

业务逻辑中的日志记录如下所示

void addProduct(String productName) {
  log("Adding product: " + productName);
  // ...
}

void notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  // ...
}

我们可以预期我们的应用程序记录以下行

[      main][ 8592000019542134146] Adding product: test-product
[      main][ 8592000019542134146] Notifying shop about: test-product

只要特定请求的执行发生在同一个 Thread 上并且没有与其他关注点交织在一起,ThreadLocal 就可以让我们将业务逻辑与用于日志记录的元数据解耦。

异步处理

让我们想象一下,此应用程序开始承受更高的负载,需要处理许多并发请求。想象一下,我们可以使用异步和非阻塞服务器实现,该实现要求我们提供异步声明而不是命令式和阻塞步骤。

我们的请求处理程序可以返回一个 CompletableFuture 以异步和非阻塞的方式处理请求

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"))
    .thenRunAsync(() -> notifyShop("test-product"))
}

不幸的是,当我们执行异步版本时,日志不再包含关联标识符

[l-worker-1][                null] Adding product: test-product
[l-worker-1][                null] Notifying shop about: test-product

任务包装

此问题的一个已知缓解措施是包装异步 API 执行的任务。包装是指执行 ThreadLocal 上下文恢复的实现。创建任务时,会捕获当前上下文。当工作 Thread 实际执行任务时,该上下文将被恢复。让我们看看这在我们的示例案例中如何使用 Runnable 工作

class WrappedRunnable implements Runnable {

  private final Long correlationId;
  private final Runnable wrapped;

  public WrappedRunnable(Runnable wrapped) {
    this.correlationId = CORRELATION_ID.get();
    this.wrapped = wrapped;
  }

  @Override
  public void run() {
    Long old = CORRELATION_ID.get();
    CORRELATION_ID.set(this.correlationId);
    try {
      wrapped.run();
    } finally {
      CORRELATION_ID.set(old);
    }
  }
}

我们可以像这样重新实现我们的处理程序

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(new WrappedRunnable(
      () -> addProduct("test-product")))
    .thenRunAsync(new WrappedRunnable(
      () -> notifyShop("test-product")));
}

不幸的是,这有很多开销。幸运的是,JDK 具有用于执行异步任务的 API:Executor 接口。在现实世界中,我们希望使用更全面的 API,即 ExecutorService。但是,出于我们的解释目的,Executor 就足够了。

让我们看一看

static class WrappedExecutor implements Executor {

  private final Executor actual;

  WrappedExecutor(Executor actual) {
    this.actual = actual;
  }

  @Override
  public void execute(Runnable command) {
    actual.execute(new WrappedRunnable(command));
  }
}

让我们重用 CompletableFuture 框架默认使用的通用 ForkJoinPool,但用我们的实现将其包装起来。现在我们的代码如下所示

static Executor executor = new WrappedExecutor(ForkJoinPool.commonPool());

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"), executor)
    .thenRunAsync(() -> notifyShop("test-product"), executor);
}

我们的日志再次正常工作

[l-worker-1][ 7321040639376081961] Adding product: test-product
[l-worker-2][ 7321040639376081961] Notifying shop about: test-product

在某些情况下,CompletableFuture 框架可以提供以非阻塞方式处理异步任务的方法。但是,在许多情况下,有限的 API 表面及其行为特征可能会受到限制。例如,我们可能希望延迟处理并在我们的系统达到其容量时恢复。使用 CompletableFuture,所有创建的实例都在创建后立即开始计算。我们可能还想在数据流之上应用更细粒度的操作,而不是对单个计算单元进行操作。由于其中一些原因以及更多其他原因,我们可能会考虑使用响应式编程库。我们将考虑 Project Reactor,它是 Spring 产品组合中的默认响应式实现。

Project Reactor

为了提供一个弹性的异步处理框架,Java 社区提出了 Reactive Streams 规范。它帮助建立了 JDK 之前缺少的通用词汇表——信号传播、错误处理、终止和生命周期管理的清晰语义。它还允许内置背压。Spring 通过引入 WebFlux 采用了这种方法,使 Project Reactor 及其响应式类型成为 API 的一等公民。

Reactive Streams 为异步流处理提供了优雅且极简的解决方案。但是,上下文传播不是规范的一部分。响应式库的非阻塞和异步特性以及潜在的复杂实现使得使用 ThreadLocal 变得极其困难。原因是,关于哪个 Thread 可以运行用户代码没有保证。实现被允许执行各种优化,只要它们保证串行交付,从而使用户代码与并发无关,并将处理并发的负担转移到库内部。

为了交付其保证,Java 中的响应式编程假设使用函数式编程范式来形成一个声明式且可组合的流,该流与不同 Thread 可以执行用户提供的代码这一事实无关。响应式库可以提供极其高效的运行时,同时遵守规范,只要用户代码中没有假设在特定 Thread 中执行的副作用。ThreadLocal 明显违反了此要求。

让我们尝试重写我们的处理程序以使用 Project Reactor。各个操作变为

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

让我们尝试使用以上方法

Mono<Void> handleRequest() {
  initRequest();
  log("Assembling the chain");

  return Mono.just("test-product")
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product))
      .then())
}

我们简单的实现产生了所需的输出

[      main][ 7224499961623309444] Assembling the chain
[      main][ 7224499961623309444] Adding product: test-product
[      main][ 7224499961623309444] Notifying shop about: test-product

上述实现是在 main Thread 中调用的,并且执行限于该 Thread。但是,我们不应该做出这样的假设。

在处理程序中,我们在传播处理结果之前引入了一个轻微的延迟。我们这样做是为了演示幕后发生的隐式 Thread 切换。

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product))
      .then())
}

运行时,将打印以下内容

[      main][ 6265915299594887150] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

发生了什么?为什么一个日志有关联标识符而其他日志没有?

当服务器调用我们的处理程序时,<1> 处的初始化设置了 ThreadLocal 关联标识符,并且 <2> 处的日志能够使用它。那些有响应式编程经验的人会告诉你问题在于执行发生在不同的阶段。ThreadLocal 在组装时设置。“您也应该在订阅时恢复它”将是一条建议。我们稍后会回到这一点。如果“组装”、“订阅”和“执行时间”等术语让您感到困惑,请查看Simon 的博客文章或观看同名演讲中的出色解释。

虽然方法会立即返回,但它并不能保证执行已开始。这是因为返回的Mono必须被订阅才能触发处理。它可能在不同的Thread中发生。<3>处的delayElement操作符隐式地使用Reactor中的共享Scheduler(一个线程池的抽象)来在指定延迟后,在另一个Thread上传递信号。该信号会传播到下游操作符,这让我们能够按顺序先添加产品,然后再通知商店。我们组装的管道还有更多令人惊讶的方面,但我们不要太困惑。

问题在于,在<4>中,如果我们进行日志记录,我们实际上无法确定调用将在哪个Thread上发生。像flatMap这样的操作符可能会引入自己的异步性。

在常规情况下,当链被订阅时,值就开始传递。因此,我们可以在每次订阅时恢复ThreadLocal值。但这并不总是最好的主意。Subscription可以在不同的Thread上异步传递。此外,值也可以在不同的Thread上传递。在背压的情况下,信号可能作为对更多数据的请求的结果,在执行请求的Thread上传递,而不是Publisher传递数据所使用的Thread上。需要考虑很多移动部件和特性!要了解更多关于Reactor中线程和异步执行的信息,请查看我们之前博客文章系列的另一部分

Reactor上下文

Project Reactor引入了一种与函数式编程非常契合的机制,用于提供传输上下文元数据的方法。它简单地称为Context。并且它始终附加到反应式链上,即使在幕后发生的线程切换也是如此。

正如我们所看到的,Project Reactor允许声明性地指定意图,同时保持与并发无关。它确实提供了在必要时控制并发的手段,方法是使用专用的操作符或配置参数(例如publishOnsubscribeOnflatMap的高级参数),但这种控制级别从核心处理逻辑中抽象了出来。

我们之前提到了副作用。我们如何摆脱这些副作用,同时仍然能够传输上下文元数据?

为了与函数式编程良好配合,Context绑定到Subscriber,即由Publisher发出的信号的使用者。订阅时,Subscriber对组装管道中的所有先前操作符可见。当我们将一个不可变的类似Map的数据结构关联到Subscriber实例时,它允许在反应式管道的各个部分附加和检索上下文信息。

通过控制影响并提供反应式链中步骤之间的继承手段,Reactor Context是一个无副作用的概念,可用于为处理提供元信息。“正是我们需要关联我们的请求!”。

让我们重写我们的应用程序,使用Reactor Context代替ThreadLocal

首先,我们需要使关联标识符成为日志方法的显式参数

static void log(String message, long correlationId) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, correlationId, message);
}

我们的操作如下

Mono<Void> addProduct(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Adding product: " + productName, ctx.get("CORRELATION_ID"));
    return Mono.empty(); // Assume we’re actually storing the product
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Notifying shop about: " + productName,
      ctx.get("CORRELATION_ID"));
    return Mono.just(true);
  });
}

有趣的是我们如何提供关联标识符。我们使用一个特殊的操作符Mono.deferContextual,它可以访问Context。从ContextView(一个简化的、只读的Context版本)中,我们在返回实际的Mono(供调用者订阅)之前提取关联标识符。

我们的处理程序如下所示

Mono<Void> handleRequest() {
  long correlationId = correlationId();
  log("Assembling the chain", correlationId);

  Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(addProduct(product), notifyShop(product))
          .then())
    .contextWrite(Context.of("CORRELATION_ID", correlationId));

订阅后,输出符合预期

[      main][ 6328001264807824115] Assembling the chain
[parallel-1][ 6328001264807824115] Adding product: test-product
[parallel-1][ 6328001264807824115] Notifying shop about: test-product

信息流的反转是显而易见的。与任何反应式链一样,我们通过组装一系列操作符来定义处理流程。一旦我们(或者实际上是服务器)订阅了这个链,信息就会从下游操作符流向上游操作符以启动处理。之后,实际的数据信号从上游传递到下游——例如,“test-product”值传递到flatMap操作符,然后传递到concat操作符,后者又将该值提供给addProductnotifyShop。由于这种逻辑流,我们在最后(使用contextWrite方法)写入Context,就在任何Subscriber订阅链之前。我们可以想象,Context然后与Subscriber一起对上游操作符中的所有阶段都变得可用。

无论反应式管道在执行用户业务逻辑的过程中进行了多少次线程跳转,上下文都不会丢失。

您可以在我们的文档中阅读更多关于Reactor Context的信息

第三方库

不幸的是,我们不能期望第三方库使用Reactor Context来提供可观察性功能。传播隐式元数据的实际货币是ThreadLocal。像SLF4J这样的库使用命令式风格,并且在Java社区中占据着稳定的地位。如果我们能让它们与反应式范式一起工作,而不是期望它们适应它,那将是一个明显的胜利。在下一部分中,我们将讨论在Spring Cloud Sleuth(一个可与Reactor一起使用的跟踪库)中传播反应式链中ThreadLocal值的背景和挑战。

获取Spring通讯

与Spring通讯保持联系

订阅

领先一步

VMware提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部