领先一步
VMware 提供培训和认证,助您加速发展。
了解更多Reactor 3.6.0 即将发布,并将在 11 月 14 日正式发布 (GA)。这篇博文介绍了此即将发布的版本中包含的新功能!
如今,每个人都在谈论 Java 21 和 Project Loom。Project Reactor 团队听取了这些意见,并看到了该项目在我们生态系统中的价值。在此即将发布的版本中,我们引入了对 VirtualThread
实现的支持。
考虑以下代码示例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的代码以响应式方式读取文本文件中的所有行。不幸的是,Files.lines
方法是系统 I/O 调用,已知会阻塞。因此,我们将所有这些操作调度到共享的 Schedulers.boundedElastic()
调度器上。Schedulers.boundedElastic()
是用于卸载系统中可能进行的任何阻塞调用的主要共享调度器,这已不是秘密。它既用于简单的 HTTP 阻塞调用,也用于包装一些不可避免的阻塞系统交互,例如生成随机的 UUID
。但是,它默认使用 平台 Thread
实例,这可能会增加系统的竞争。
现在,借助 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler
实现可以替换默认实现,以便在 Schedulers.boundedElastic()
中使用虚拟线程而不是平台线程。您只需在 Java 21+ 上运行您的应用并设置 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true
系统属性即可。
您可能已经注意到,您将拥有一个承载计划工作的 VirtualThread
实例。
正如您可能从我们自 Reactor 3.5.0 开始的早期博客中听到的那样,我们引入了一种机制,用于在 handle
和 tap
等操作符中,从 Reactor Context
自动恢复 ThreadLocal
。后来,在 reactor 3.5.3 中,我们在 Project Reactor 中可用的所有操作符中添加了 ThreadLocal
值的自动恢复功能。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的代码生成一个随机的 UUID
,它将阻塞生成过程 <3> 卸载到专门的工作器上。要启用自动 ThreadLocal
传播魔法,您需要在运行时提供 Micrometer Context Propagation 库,注册 <1> 所需的 ThreadLocal
实例,然后调用 Hooks
API <2> 来激活此特定的传播模式。如果我们检查上面代码的输出,我们会看到指定的 <1> TRACE_ID
ThreadLocal
在所有位置 <3> <4> 始终可用,无论 Thread
如何切换。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main
上设置了 Trace IDThread boundedElastic-1
上可用的相同 Trace ID尽管此机制已足够接近每个人想要的,但它受到 Reactor 自有的生产者和转换器的限制。要了解它可能无法完美工作的地方,让我们修改上面的示例,并添加与外部基于 Reactive Streams 的库(例如 JDK11 HttpClient
)的集成
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改后的示例中,我们进行网络调用 <1>,然后读回响应。响应体表示为 Flow.Publisher
<2>,我们将其展平并转换为字符串表示形式 <3>。此代码运行后,可能的输出之一可能如下所示
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
从输出中我们可以观察到,使用 reactor 3.5.3+,消费外部 Publisher
可能会导致上下文丢失 <1>,因为我们不知道是否需要进行额外的提升来恢复丢失的 ThreadLocal
实例。
使用 reactor 3.6.x,此输出始终一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我们增强了 ThreadLocal
值恢复机制,并添加了额外的逻辑来检测任何外部 Publisher
实现。一旦检测到这些实现,我们就会对它们进行装饰,以确保您在管道中操作时永远不会丢失 ThreadLocal
值。
在 reactor 3.6.x 中,我们采纳了 multi-release jar (MRJ) 支持,并已经添加了改进,在可能的情况下消除了反射。我们计划在未来的版本中扩展 MRJ 的使用,并利用所有 JDK9+ 的特性!
敬请关注!所有源码都可以在 Github 上找到