领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Reactor 3.6.0 即将发布,并将于 11 月 14 日全面上市。这篇博文介绍了此即将发布的版本中包含的新功能!
如今,每个人都在谈论 Java 21 和 Project Loom。Project Reactor 团队也听到了这些声音,并看到了该项目在我们的生态系统中的价值。通过此即将发布的版本,我们引入了对 VirtualThread 实现的支持。
让我们考虑以下代码示例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines 方法是一个系统 I/O 调用,已知是阻塞的。因此,我们将所有这些操作安排在共享的 Schedulers.boundedElastic() 调度器上。众所周知,Schedulers.boundedElastic() 是主要的共享调度器,用于 卸载 系统中可能进行的所有阻塞调用。它既用于简单的 HTTP 阻塞调用,也用于封装一些不可避免的阻塞系统交互,例如生成随机的 UUID。然而,它默认使用 平台 Thread 实例,这可能会增加系统的争用。
现在,随着 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 实现可以取代默认实现,以便在 Schedulers.boundedElastic() 中使用虚拟线程而不是平台线程。您所需要做的就是将您的应用程序运行在 Java 21+ 上,并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系统属性。

正如您可能已经注意到的,您将有一个 VirtualThread 实例来承载计划的工作。
正如您可能从我们之前的博客中听到的,从 Reactor 3.5.0 开始,我们引入了一种机制,用于在 handle 和 tap 等操作符中,从 Reactor Context 自动恢复 ThreadLocal。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中可用的所有操作符集中添加了 ThreadLocal 值的自动恢复。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的代码生成一个随机的 UUID,它将阻塞的生成过程 <3> 卸载到一个专门的工作器上。要启用 ThreadLocal 自动传播的魔力,您需要在运行时提供 Micrometer Context Propagation 库,注册 <1> 所需的 ThreadLocal 实例,然后调用 Hooks API <2> 来激活这种特定的 传播模式。如果我们检查上面的代码输出,我们会看到指定的 <1> TRACE_ID ThreadLocal 在所有地方 <3> <4> 都始终可用,无论 Thread 是否切换。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main 上设置Thread boundedElastic-1 上可用尽管这种机制已经足够接近每个人想要的,但它受到 Reactor 自己的生产者和转换器的限制。为了理解它可能无法完美工作的地方,让我们修改上面的示例并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient)的集成。
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改后的示例中,我们进行网络调用 <1>,然后读取响应。响应体表示为 Flow.Publisher <2>,我们将其展平并转换为字符串表示 <3>。一旦这段代码运行,其中一种可能的输出可能如下所示:
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
从输出中我们可以观察到,在 reactor 3.5.3+ 中,消费外部 Publisher 可能会导致上下文丢失 <1>,因为我们不知道是否需要额外的工作来恢复丢失的 ThreadLocal 实例。
使用 reactor 3.6.x,此输出始终一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我们增强了 ThreadLocal 值恢复机制,并添加了额外的逻辑,用于检测任何外部 Publisher 实现。一旦检测到这些实现,我们就会对其进行装饰,以确保您在我们的管道中操作时永远不会丢失 ThreadLocal 值。
在 reactor 3.6.x 中,我们采纳了 多版本 jar (MRJ) 支持,并已添加 改进,尽可能消除了反射。我们 计划 在即将发布的版本中扩展 MRJ 使用,并使用所有 JDK9+ 特性!
敬请期待!所有源代码都可以在 Github 上找到。