reactor-core 3.6.0 有什么新变化?

工程 | Oleh Dokuka | 2023年10月31日 | ...

Reactor 3.6.0 即将发布,并将在 11 月 14 日正式发布 (GA)。这篇博文介绍了此即将发布的版本中包含的新功能!

支持虚拟线程

如今,每个人都在谈论 Java 21Project Loom。Project Reactor 团队听取了这些意见,并看到了该项目在我们生态系统中的价值。在此即将发布的版本中,我们引入了对 VirtualThread 实现的支持。

为何如此方便?

考虑以下代码示例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines 方法是系统 I/O 调用,已知会阻塞。因此,我们将所有这些操作调度到共享的 Schedulers.boundedElastic() 调度器上。Schedulers.boundedElastic() 是用于卸载系统中可能进行的任何阻塞调用的主要共享调度器,这已不是秘密。它既用于简单的 HTTP 阻塞调用,也用于包装一些不可避免的阻塞系统交互,例如生成随机的 UUID。但是,它默认使用 平台 Thread 实例,这可能会增加系统的竞争。

现在,借助 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 实现可以替换默认实现,以便在 Schedulers.boundedElastic() 中使用虚拟线程而不是平台线程。您只需在 Java 21+ 上运行您的应用并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系统属性即可。

Reactive Bounded Elastic on VirtualThreads

您可能已经注意到,您将拥有一个承载计划工作的 VirtualThread 实例。

更好的自动上下文传播

正如您可能从我们自 Reactor 3.5.0 开始的早期博客中听到的那样,我们引入了一种机制,用于在 handletap 等操作符中,从 Reactor Context 自动恢复 ThreadLocal。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中可用的所有操作符中添加了 ThreadLocal 值的自动恢复功能。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的代码生成一个随机的 UUID,它将阻塞生成过程 <3> 卸载到专门的工作器上。要启用自动 ThreadLocal 传播魔法,您需要在运行时提供 Micrometer Context Propagation 库,注册 <1> 所需的 ThreadLocal 实例,然后调用 Hooks API <2> 来激活此特定的传播模式。如果我们检查上面代码的输出,我们会看到指定的 <1> TRACE_ID ThreadLocal 在所有位置 <3> <4> 始终可用,无论 Thread 如何切换。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. Thread main 上设置了 Trace ID
  2. Thread boundedElastic-1 上可用的相同 Trace ID

尽管此机制已足够接近每个人想要的,但它受到 Reactor 自有的生产者和转换器的限制。要了解它可能无法完美工作的地方,让我们修改上面的示例,并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient)的集成

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改后的示例中,我们进行网络调用 <1>,然后读回响应。响应体表示为 Flow.Publisher <2>,我们将其展平并转换为字符串表示形式 <3>。此代码运行后,可能的输出之一可能如下所示

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

从输出中我们可以观察到,使用 reactor 3.5.3+,消费外部 Publisher 可能会导致上下文丢失 <1>,因为我们不知道是否需要进行额外的提升来恢复丢失的 ThreadLocal 实例。

使用 reactor 3.6.x,此输出始终一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我们增强了 ThreadLocal 值恢复机制,并添加了额外的逻辑来检测任何外部 Publisher 实现。一旦检测到这些实现,我们就会对它们进行装饰,以确保您在管道中操作时永远不会丢失 ThreadLocal 值。

还有什么?支持 Multi-Release Jar!

在 reactor 3.6.x 中,我们采纳了 multi-release jar (MRJ) 支持,并已经添加了改进,在可能的情况下消除了反射。我们计划在未来的版本中扩展 MRJ 的使用,并利用所有 JDK9+ 的特性!

敬请关注!所有源码都可以在 Github 上找到

获取 Spring 新闻简报

订阅 Spring 新闻简报以保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部