Reactor Core 3.6.0 有哪些新特性?

工程 | Oleh Dokuka | 2023 年 10 月 31 日 | ...

Reactor 3.6.0 即将发布,并将于 11 月 14 日正式发布。这篇博文描述了即将发布的版本中包含的新特性!

虚拟线程支持

如今,每个人都在谈论 Java 21Loom 项目。Reactor 团队也注意到了这一点,并认为该项目对我们的生态系统很有价值。在这个即将发布的版本中,我们引入了对 VirtualThread 实现的支持。

为什么它很方便?

让我们考虑以下代码示例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines 方法是一个系统 I/O 调用,已知会阻塞。因此,我们将所有这些操作调度到共享的 Schedulers.boundedElastic() 调度器上。众所周知,Schedulers.boundedElastic()卸载系统中可能执行的所有阻塞调用 的主要共享调度器。它用于简单的 HTTP 阻塞调用,以及包装一些不可避免的阻塞系统交互,例如生成随机 UUID。但是,它默认使用 平台 Thread 实例,这可能会给您的系统带来更多竞争。

现在,使用 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 实现可以替换默认实现,以便使用虚拟线程而不是平台线程与 Schedulers.boundedElastic() 协同工作。您只需要在 Java 21+ 上运行您的应用程序并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系统属性。

Reactive Bounded Elastic on VirtualThreads

您可能已经注意到,您将拥有一个 VirtualThread 实例来执行计划的任务。

更好的自动上下文传播

您可能已经从我们之前的博客中了解到,从 Reactor 3.5.0 开始,我们引入了一种机制,用于从 Reactor Context 中自动恢复 ThreadLocal,例如在 handletap 等操作符中。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中所有可用操作符的整个集合中添加了 ThreadLocal 值的自动恢复功能。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的代码生成一个随机 UUID,它将 <3> 阻塞生成过程卸载到专用的工作线程上。要启用自动 ThreadLocal 传播功能,您需要在运行时提供 Micrometer 上下文传播 库,注册 <1>所需的 ThreadLocal 实例,然后调用 Hooks API <2> 以激活此特定的 传播模式。如果我们检查上面代码的输出,我们会看到指定的 <1> TRACE_ID ThreadLocal 在所有地方 <3> <4> 始终可用,无论 Thread 如何切换。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. Thread main 上设置跟踪 ID
  2. Thread boundedElastic-1 上可用的相同跟踪 ID

尽管此机制非常接近于每个人的期望,但它受 Reactor 拥有的生产者和转换器限制。要了解它在哪里可能无法完美工作,让我们修改上面的示例并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient)的集成。

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改后的示例中,我们进行网络调用 <1>,然后读取响应。响应体表示为 Flow.Publisher <2>,我们将其展平并转换为字符串表示形式 <3>。一旦此代码运行,可能的输出之一可能如下所示

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

从输出中我们可以观察到,使用 reactor 3.5.3+,外部 Publisher 的使用可能会导致上下文丢失 <1>,因为我们不知道是否需要执行额外的提升来恢复丢失的 ThreadLocal 实例。

使用 reactor 3.6.x,此输出始终一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我们加强了 ThreadLocal 值恢复机制,并添加了检测任何外部 Publisher 实现的额外逻辑。一旦检测到这些实现,我们就会对其进行装饰,以确保在我们的管道中操作时永远不会丢失 ThreadLocal 值。

还有什么?多版本 Jar 支持!

在 reactor 3.6.x 中,我们采用了 多版本 Jar (MRJ) 支持,并已添加 改进 以在可能的情况下消除反射。我们 计划 在即将发布的版本中扩展 MRJ 的使用并使用所有 JDK9+ 功能!

敬请期待!所有源代码都可以在 Github 上找到。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,帮助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部