领先一步
VMware 提供培训和认证,帮助您快速提升技能。
了解更多Reactor 3.6.0 即将发布,并将于 11 月 14 日正式发布。这篇博文描述了即将发布的版本中包含的新特性!
如今,每个人都在谈论 Java 21 和 Loom 项目。Reactor 团队也注意到了这一点,并认为该项目对我们的生态系统很有价值。在这个即将发布的版本中,我们引入了对 VirtualThread
实现的支持。
让我们考虑以下代码示例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines
方法是一个系统 I/O 调用,已知会阻塞。因此,我们将所有这些操作调度到共享的 Schedulers.boundedElastic()
调度器上。众所周知,Schedulers.boundedElastic()
是 卸载系统中可能执行的所有阻塞调用 的主要共享调度器。它用于简单的 HTTP 阻塞调用,以及包装一些不可避免的阻塞系统交互,例如生成随机 UUID
。但是,它默认使用 平台 Thread
实例,这可能会给您的系统带来更多竞争。
现在,使用 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler
实现可以替换默认实现,以便使用虚拟线程而不是平台线程与 Schedulers.boundedElastic()
协同工作。您只需要在 Java 21+ 上运行您的应用程序并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true
系统属性。
您可能已经注意到,您将拥有一个 VirtualThread
实例来执行计划的任务。
您可能已经从我们之前的博客中了解到,从 Reactor 3.5.0 开始,我们引入了一种机制,用于从 Reactor Context
中自动恢复 ThreadLocal
,例如在 handle
和 tap
等操作符中。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中所有可用操作符的整个集合中添加了 ThreadLocal
值的自动恢复功能。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的代码生成一个随机 UUID
,它将 <3> 阻塞生成过程卸载到专用的工作线程上。要启用自动 ThreadLocal
传播功能,您需要在运行时提供 Micrometer 上下文传播 库,注册 <1>所需的 ThreadLocal
实例,然后调用 Hooks
API <2> 以激活此特定的 传播模式。如果我们检查上面代码的输出,我们会看到指定的 <1> TRACE_ID
ThreadLocal
在所有地方 <3> <4> 始终可用,无论 Thread
如何切换。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main
上设置跟踪 IDThread boundedElastic-1
上可用的相同跟踪 ID尽管此机制非常接近于每个人的期望,但它受 Reactor 拥有的生产者和转换器限制。要了解它在哪里可能无法完美工作,让我们修改上面的示例并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient
)的集成。
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改后的示例中,我们进行网络调用 <1>,然后读取响应。响应体表示为 Flow.Publisher
<2>,我们将其展平并转换为字符串表示形式 <3>。一旦此代码运行,可能的输出之一可能如下所示
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
从输出中我们可以观察到,使用 reactor 3.5.3+,外部 Publisher
的使用可能会导致上下文丢失 <1>,因为我们不知道是否需要执行额外的提升来恢复丢失的 ThreadLocal
实例。
使用 reactor 3.6.x,此输出始终一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我们加强了 ThreadLocal
值恢复机制,并添加了检测任何外部 Publisher
实现的额外逻辑。一旦检测到这些实现,我们就会对其进行装饰,以确保在我们的管道中操作时永远不会丢失 ThreadLocal
值。
在 reactor 3.6.x 中,我们采用了 多版本 Jar (MRJ) 支持,并已添加 改进 以在可能的情况下消除反射。我们 计划 在即将发布的版本中扩展 MRJ 的使用并使用所有 JDK9+ 功能!
敬请期待!所有源代码都可以在 Github 上找到。