领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我非常高兴地宣布 **Reactor Bismuth** 的 GA 版本,其中包含了 reactor-core **3.1.0.RELEASE** 和 reactor-netty **0.7.0.RELEASE** ?
随着 Spring Framework 5.0 的发布,您可以想象这对 Project Reactor 来说是一个巨大的进步 :)

正如铋晶体一样,这个版本是错综复杂的。
本次发布包含大量更改和 API 优化。有关 reactor-core 的详尽列表,请参阅发布说明。
重要
reactor-core 和 reactor-test 的 源码 jar 包 存在一个已知(小)问题:它们包含重复的 Java 源码条目。更多细节请参阅 issue #887。
让我们来概览一下此版本中的新增内容和亮点。
在 3.0.x 系列的最后一个版本和 3.1.0 之间进行了一些 API 更改。我们希望与 Spring Framework 5 长期共存的最佳 API,因此这些破坏性更改是必要的。发布说明包含此类更改的更详尽列表,但让我们来看其中的一些。
Flux 和 Mono 中的错误处理操作符已更加一致。所有操作符都使用 onError 前缀,并且 API 在两个类中都已对齐。
onErrorReturn 用于切换到备用值(以前在 Mono 中是 otherwiseReturn)。
onErrorResume 用于切换到备用 Publisher(以前在 Flux 中是 onErrorResumeWith,在 Mono 中是 otherwise)。
onErrorMap 用于将一个 Exception 转换为另一个(以前在 Flux 和 Mono 中都是 mapError)。
switchIfEmpty 用于在源为空时切换到备用序列(以前在 Mono 中是 otherwiseIfEmpty)。
Flux#switchOnError 已被移除,可以通过 onErrorResume 使用忽略其参数的 lambda 来实现相同的功能。
Flux 和 Mono 概念与操作符对齐Mono API 过去有时会偏离 Flux API,尽管概念相似。在有意义的地方,这些分离已被修复。
when, and, zip, 和 zipWith 对齐例如,Mono 有一个 and 操作符和 when 静态方法,它们曾经用于组合元素并产生 Mono<Tuple2>。本质上,它们基本上是 Flux 中 zipWith 和 zip 的特例,但名称不同,导致概念难以关联。这就是为什么这些方法在 Mono 中被重命名了。
另一方面,您会注意到 Mono API 中仍然存在 when 和 and 的另一种形式。与我们之前看到的相反,这些返回 Mono<Void>。处理任务完成是 Mono 的一个基本用例,而这些方法现在专门为此类应用进行了调整:它们将组合并执行多个任务(表示为源 Monos),忽略它们潜在的 onNext 信号,并且只传播 onComplete 信号的组合,从而产生一个在 N 个任务完成时完成的 Mono<Void>。
then 前缀的语义一致性同样,then 前缀现在一致地表示源的 onNext 将被丢弃,而是建立在终止信号之上。这一点在 Flux 和 Mono 中都已实现。
then() 返回一个 Mono<Void>,它仅传播源的 onComplete 或 onError 信号。
then(Mono<V>) 返回一个 Mono<V>:它等待原始 onComplete 信号,然后切换到提供的另一个 Mono,只发出那个其他 Mono 的元素。
thenMany(Publisher<V>) 类似,除了它继续到一个 Flux<V>。
thenEmpty(Publisher<Void>) 返回一个 Mono<Void>,它在原始 Mono *然后* Publisher 完成后完成。也就是说,它代表顺序完成,不像 and 会立即订阅两个序列。
请注意,接受 Supplier 参数的变体已被完全移除(它们的惰性语义可以通过 Mono.defer 替代)。此外,上面描述的 Mono#thenEmpty 是从 then(Publisher<Void>) 重命名的。
then 曾经是 flatMapMono 曾经有另一个有趣的 then 变体。
Mono<V> then(Function<T, Mono<V>> thenFunction);
仔细观察,这与 then* 关注终止信号的新语义不符。相反,它会根据源的 onNext 将源转换为另一个 Mono。听起来很熟悉?这确实与 Flux#flatMap 的作用一致!
不过,有一个问题:Mono 已经有一个 flatMap 操作符。
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
经过进一步思考,我们认识到 flatMap 的经典语义是返回与应用 flatMap 的类型相同的值。因此,让 Mono#flatMap 返回一个 Mono 更为正确……
结果,我们重命名了这个 then 变体为 flatMap,并将 Many 后缀用于返回 Flux 的变体。
Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
提示
为了便于迁移,我们建议您搜索所有旧的 Mono.flatMap 的用法,并首先将它们替换为 flatMapMany。然后搜索 then(Function) 的用法,并将它们替换为 flatMap。
时间:在此版本中,所有涉及时间的运算符都仅通过 Duration 类型进行处理。大多数以前有一个带 *Millis 后缀的变体,它使用 long 和 TimeUnit 来表示时长。这些变体在 3.1.0 中已被移除。
延迟错误:另一个处理后缀的横切性更改:一些运算符具有可选配置,可以将多个错误合并并在最后发出 onError 信号,从而允许一些值仍然进入结果序列。例如,flatMap 可以在内部序列发出 onError 时立即停止,也可以在传播该错误之前继续合并其他内部序列的元素。
在某些情况下,这种可选行为在 API 中表示为 boolean 标志参数。在其他情况下,它是一个以 DelayError 作为后缀的单独变体。在 3.1.0 中,所有这些变体都已对齐,以一致地使用 *DelayError 后缀,而不是布尔标志。
简化接口:一些专用接口也已被移除,以支持更简单或更通用的替代方案。
Cancellation 接口已被移除,以支持更通用的 Disposable 接口。
TimedScheduler 接口已被移除。少数非时间功能的 Scheduler 在尝试对其使用 schedulePeriodically 时将抛出 RejectedExecutionException 来指示这一点。此外,Scheduler#shutdown 已被移除,以支持 Disposable 的 dispose()。
用于自省的几个接口已被简化为一个 Scannable 接口,该接口通过其 scan(Scannable.Attr) 方法公开运算符当前状态的信息(尽最大努力)。
QueueSupplier 已重命名为 Queues,现在纯粹是一个实用类(不再是 Supplier)。
Kotlin 扩展和 Reactor Test 工件已直接集成到 reactor-core 的主仓库中。
Kotlin 扩展是 reactor-core 工件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions。
reactor-test 现在与 reactor-core 属于同一 GroupId。
将 io.projectreactor.addons:reactor-test 替换为 io.projectreactor:reactor-test。
在此发行周期中,我们致力于更好地与 Kotlin 等语言集成。
这尤其体现在一些 API 重构上,以避免与 lambda 产生歧义签名。当一个方法有两个重载,仅仅是它们接受的函数式接口类型不同时,我们将其中一个变体提取为操作符的新后缀。
例如,考虑 buffer(Publisher, Supplier) 和 buffer(Publisher, Function)。第二个变体已重命名为 bufferWhen,因为它会在 Function 产生的伴随 Publisher 发射时创建一个缓冲区。
如上所述,Kotlin 扩展也已直接集成到 reactor-core 仓库中。
此外,通过引入三个注解,改进了对 null-safety 分析的支持。这些注解基于 JSR 305,尽管 JSR 305 已休眠,但包括 IntelliJ IDE 在内的多个静态分析工具都依赖它。以下注解在 reactor.util.annotation 包中提供:
@NonNull 表示特定的参数、返回值或字段不能为 null。(在应用了 @NonNullApi 的参数和返回值上不需要)。
@Nullable 表示参数、返回值或字段可以为 null。
@NonNullApi 是一个包级别注解,表示非 null 是参数和返回值的默认行为。
我们利用这些注解在所有公共 Reactor Core API 上表达明确且可操作的 null-safety 合约。
文档也得到了一些关注:参考指南终于完成,Javadoc 已被审查和改写,以便更清晰地描述某些方法。
随着 Kotlin 支持的直接集成到 reactor-core 中,参考指南中已添加了一个新部分,并且 KotlinDocs 也已发布。
Context现在可以将上下文数据附加到每个订阅的 Flux 或 Mono,作为一个类 Map 的 Context 对象!
这是一个高级功能,主要会引起库开发者的兴趣,但我们知道它在迁移以前依赖于命令式代码中 ThreadLocal 的功能等方面将非常宝贵。
在 Spring 产品组合中,我们预计 spring-security 和 spring-cloud-sleuth 将从中受益匪浅。
为了将信息添加到 Context,请使用 subscriberContext(Context) 操作符,例如下面的示例。
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
提示
Context 是不可变的,并且在 *订阅* 阶段传播,该阶段从链的末端(subscribe() 调用)向链的开始运行。
因此,您通过 subscriberContext 放入 Context 的内容会产生一个 Context 的丰富副本,仅对它上游的操作符可见。
为了检索和使用 Context 中放入的信息,上游操作符链可以使用 Mono.subscriberContext(),它会具体化可见的 Context(例如,在 flatMap 中)。
它可能看起来像下面的片段。
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
);
前往参考指南以了解更多关于 Context 的信息。
expand 操作符此操作符可用于递归地将源元素*扩展*为嵌套的 Publishers,从而生成深度优先或广度优先的 Publisher 图。
null Provider 映射到空 Mono当将返回 null 的源 Callable 或 Supplier 转换为 Mono 时,fromCallable 和 fromSupplier 过去会发出错误。它们现在会处理 null 结果,并生成一个空 Mono。
name 和 tag 的能力可以为 Flux 指定一个 name(String)。可以使用 Scannable.name() 检索它,该方法会向上遍历操作符链,直到找到第一个声明的名称。
同样,可以将几个 tag(String, String) 键值对与 Flux 或 Mono 关联起来。这些可以通过 Scannable#tags() 方法作为 Stream<Tuple2<String, String>> 检索,该方法会遍历整个操作符链。
BiPredicate 的 distinctUntilChanged 变体BiPredicate 用于评估是否有变化,而不是使用 Set 语义,它被应用于当前源元素和最后一个发射的元素。这使得在元素*过于接近*最后一个发射的元素时可以跳过它们(例如,差值小于 1 的 Doubles)。
Mono.cache(Duration)这允许轻松缓存一个难以计算的单个值(或错误)一段时间。在 TTL 期限后到达的第一个订阅者将重新触发对源的订阅。
checkpoint(String) 的成本优化checkpoint(String) 变体现在默认是轻量级的,这意味着在实例化时没有填充堆栈跟踪(从而降低了操作符的使用成本)。我们现在假设 String 标识符足够独特,足以找到导致错误终止的序列的实例化点。
refCount 添加了宽限期使用 refCount 时,现在可以提供一个 Duration。当引用计数序列的订阅者数量降至阈值以下时,操作符将等待该持续时间,而不是立即取消对源的订阅。如果在宽限期内有足够多的订阅者返回,则不会发生取消。
delayUntil 替换 untilOtherdelayUntil 操作符会将 Mono 的发射延迟,直到由源值生成的伴随 Publisher 完成。
请注意,有一个 untilOther 操作符已被移除。它曾经也进行延迟,但会在伴随的第一个 onNext 时触发。delayUntil 更灵活,因为可以通过附加 take(1) 到伴随来达到相同的行为。
reactor-test 工件也有一些新特性:
新的错误验证期望,与断言兼容:expectErrorSatisfies 和 verifyErrorSatisfies。
为 StepVerifier#verify() 添加了可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout 方法进行设置。
添加了 PublisherProbe,用于轻松检查具有条件切换(例如 switchIfEmpty)的复杂操作符链是否会通过逻辑分支,同时通过包装任何 Flux 或 Mono 来为测试发出有意义的数据。
如果您是 Reactor 新手,现在是与 Spring Framework 5.0 一起开始响应式旅程的激动人心之时。如果您不是,我们希望在所有这些更改到位后,您会更喜欢使用 Reactor。
如果您在迁移过程中遇到任何困难,请查看发布说明或在我们的Gitter上寻求帮助。
祝您编码愉快!
bismuth crystal photo CC-By-SA David Abercrombie via Flickr