reactor-core 3.6.0 中有哪些新特性?

工程 | Oleh Dokuka | 2023 年 10 月 31 日 | ...

Reactor 3.6.0 即将发布,并将于 11 月 14 日全面上市。这篇博文介绍了此即将发布的版本中包含的新功能!

虚拟线程支持

如今,每个人都在谈论 Java 21Project Loom。Project Reactor 团队也听到了这些声音,并看到了该项目在我们的生态系统中的价值。通过此即将发布的版本,我们引入了对 VirtualThread 实现的支持。

为什么它很方便?

让我们考虑以下代码示例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines 方法是一个系统 I/O 调用,已知是阻塞的。因此,我们将所有这些操作安排在共享的 Schedulers.boundedElastic() 调度器上。众所周知,Schedulers.boundedElastic() 是主要的共享调度器,用于 卸载 系统中可能进行的所有阻塞调用。它既用于简单的 HTTP 阻塞调用,也用于封装一些不可避免的阻塞系统交互,例如生成随机的 UUID。然而,它默认使用 平台 Thread 实例,这可能会增加系统的争用。

现在,随着 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 实现可以取代默认实现,以便在 Schedulers.boundedElastic() 中使用虚拟线程而不是平台线程。您所需要做的就是将您的应用程序运行在 Java 21+ 上,并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系统属性。

Reactive Bounded Elastic on VirtualThreads

正如您可能已经注意到的,您将有一个 VirtualThread 实例来承载计划的工作。

更好的自动上下文传播

正如您可能从我们之前的博客中听到的,从 Reactor 3.5.0 开始,我们引入了一种机制,用于在 handletap 等操作符中,从 Reactor Context 自动恢复 ThreadLocal。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中可用的所有操作符集中添加了 ThreadLocal 值的自动恢复。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的代码生成一个随机的 UUID,它将阻塞的生成过程 <3> 卸载到一个专门的工作器上。要启用 ThreadLocal 自动传播的魔力,您需要在运行时提供 Micrometer Context Propagation 库,注册 <1> 所需的 ThreadLocal 实例,然后调用 Hooks API <2> 来激活这种特定的 传播模式。如果我们检查上面的代码输出,我们会看到指定的 <1> TRACE_ID ThreadLocal 在所有地方 <3> <4> 都始终可用,无论 Thread 是否切换。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. 跟踪 ID 在 Thread main 上设置
  2. 相同的跟踪 ID 在 Thread boundedElastic-1 上可用

尽管这种机制已经足够接近每个人想要的,但它受到 Reactor 自己的生产者和转换器的限制。为了理解它可能无法完美工作的地方,让我们修改上面的示例并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient)的集成。

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改后的示例中,我们进行网络调用 <1>,然后读取响应。响应体表示为 Flow.Publisher <2>,我们将其展平并转换为字符串表示 <3>。一旦这段代码运行,其中一种可能的输出可能如下所示:

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

从输出中我们可以观察到,在 reactor 3.5.3+ 中,消费外部 Publisher 可能会导致上下文丢失 <1>,因为我们不知道是否需要额外的工作来恢复丢失的 ThreadLocal 实例。

使用 reactor 3.6.x,此输出始终一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我们增强了 ThreadLocal 值恢复机制,并添加了额外的逻辑,用于检测任何外部 Publisher 实现。一旦检测到这些实现,我们就会对其进行装饰,以确保您在我们的管道中操作时永远不会丢失 ThreadLocal 值。

还有什么?多版本 Jar 支持!

在 reactor 3.6.x 中,我们采纳了 多版本 jar (MRJ) 支持,并已添加 改进,尽可能消除了反射。我们 计划 在即将发布的版本中扩展 MRJ 使用,并使用所有 JDK9+ 特性!

敬请期待!所有源代码都可以在 Github 上找到。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有