取得领先
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 Reactor Bismuth 的 GA 版本发布,它特别包含 reactor-core 3.1.0.RELEASE 和 reactor-netty 0.7.0.RELEASE ?
随着 Spring Framework 5.0 的发布,你可以想象这对于 Project Reactor 来说是一个巨大的进步 :)

就像铋晶体一样,这个版本非常复杂
此版本包含大量更改和 API 改进。 对于 reactor-core,您可以在 发行说明 中找到详尽的列表。
重要提示
reactor-core 和 reactor-test 源代码 jar 存在一个已知(小)问题:它们包含重复的 java 源代码条目。 有关更多详细信息,请参见问题 #887。
让我们概述一下此版本中的新增功能和值得注意的地方
在 3.0.x 周期中的最后一个版本和 3.1.0 之间,已进行了许多 API 更改。 我们希望为 Spring Framework 5 推出最佳 API,因此这些重大更改是必要的。 发行说明包含此类更改的更详尽列表,但让我们看一下其中的一些更改。
错误处理运算符在 Flux 和 Mono 中都更加一致。 所有都使用 onError 前缀,并且 API 在两个类中对齐
onErrorReturn 切换到备用值(以前在 Mono 中为 otherwiseReturn)
onErrorResume 切换到备用 Publisher(以前在 Flux 中为 onErrorResumeWith,在 Mono 中为 otherwise)
onErrorMap 将 Exception 转换为另一个异常(以前在 Flux 和 Mono 中为 mapError)
switchIfEmpty 如果源为空,则切换到备用序列(以前在 Mono 中为 otherwiseIfEmpty)
Flux#switchOnError 已删除,可以使用一个忽略其参数的 lambda 的 onErrorResume 来实现相同的目的。
Flux 和 Mono 概念与运算符之间的对齐尽管概念相似,但 Mono API 有时会偏离 Flux API。 在有意义的地方,这些分离已被修复。
when、and、zip 和 zipWith 的对齐例如,Mono 有一个 and 运算符和 when 静态方法用于组合元素并生成 Mono<Tuple2>。 本质上,它们是 Flux 中 zipWith 和 zip 的一种专门化,但是具有不同的名称使得难以附加概念。 这就是为什么这些方法已在 Mono 中重命名。
另一方面,您会注意到 Mono API 中仍然存在不同风格的 when 和 and。 与我们之前看到的相反,这些返回 Mono<Void>。 处理任务的完成是 Mono 的一个重要用例,这些方法现在专门为此类应用程序量身定制:它们将组合和执行多个任务(表示为源 Mono),忽略其潜在的 onNext 信号,并且仅传播 onComplete 信号的组合,从而导致一个 Mono<Void>,该信号在 N 个任务完成时完成。
then 前缀的一致语义同样,then 前缀现在始终表示要丢弃源的 onNext,而是建立在终端信号上。 这在 Flux 和 Mono 中都保持一致
then() 返回一个 Mono<Void>,该信号仅传播来自源的 onComplete 或 onError 信号。
then(Mono<V>) 返回一个 Mono<V>:它等待原始的 onComplete 信号,然后切换到另一个提供的 Mono,仅发出来自其他 Mono 的元素。
thenMany(Publisher<V>) 与此类似,不同之处在于它继续进入 Flux<V>。
thenEmpty(Publisher<Void>) 返回一个 Mono<Void>,该信号在原始 Mono 然后 Publisher 完成后完成。 也就是说,它表示顺序完成,与立即订阅两个序列的 and 不同。
请注意,采用 Supplier 参数的变体已完全删除(它们的惰性语义可以由 Mono.defer 替换)。 此外,上面描述的 Mono#thenEmpty 已从 then(Publisher<Void>) 重命名。
then 实际上是如何成为 flatMap 的Mono 过去还有另一个有趣的 then 变体
Mono<V> then(Function<T, Mono<V>> thenFunction);
仔细观察,这与 then* 关注终端信号的新语义不符。 相反,它会根据源的 onNext 将源转换为另一个 Mono。 听起来是不是很熟悉? 这确实与 Flux#flatMap 的功能一致!
不过,有一个问题:Mono 已经有了一个 flatMap 运算符
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
在进一步思考之后,我们认识到 flatMap 的经典语义是返回与应用 flatMap 的类型相同的类型的值。 因此,更正的做法是使 Mono#flatMap 返回一个 Mono…
因此,我们将这个 then 变体重命名为 flatMap,并在返回 Flux 的变体上使用了 Many 后缀。
Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
提示
为了方便迁移,我们建议您首先搜索所有旧的 Mono.flatMap 的用法,并将其替换为 flatMapMany。然后搜索 then(Function) 的用法,并将其替换为 flatMap。
时间:在此版本中,所有处理时间的运算符都专门使用 Duration 类型。大多数曾经有一个带有 *Millis 后缀的变体,该变体使用 long 和 TimeUnit 来表示持续时间。这些变体已在 3.1.0 中删除。
延迟错误:另一个涉及后缀的交叉性更改:一些运算符具有可选配置,它们可以组合多个错误,并在最后发出 onError 信号,从而允许某些值仍然出现在结果序列中。例如,flatMap 可以立即停止,如果内部序列发出 onError,或者继续合并来自其他内部序列的元素,然后再传播该错误。
在某些情况下,该可选行为在 API 中表示为 boolean 标志参数。在其他情况下,它是一个单独的变体,带有 DelayError 后缀。在 3.1.0 中,所有这些变体都已对齐,以始终使用 *DelayError 后缀,而不是布尔标志。
简化的接口:为了支持更简单或更通用的替代方案,一些专门的接口也被删除。
Cancellation 接口已被删除,取而代之的是更通用的 Disposable 接口。
TimedScheduler 接口已被删除。如果尝试在不具备时间功能的 Scheduler 上使用 schedulePeriodically,少数几个 Scheduler 将会抛出 RejectedExecutionException,指示这一点。此外,Scheduler#shutdown 已被删除,取而代之的是 Disposable 中的 dispose()。
用于内省的几个接口已被简化为单个 Scannable 接口,该接口通过其 scan(Scannable.Attr) 方法公开关于运算符当前状态的信息(尽最大努力)。
QueueSupplier 已重命名为 Queues,现在纯粹是一个实用程序类(不再是 Supplier)。
Kotlin 扩展和 Reactor Test 工件都已直接集成到 reactor-core 的主存储库中。
Kotlin 扩展是 reactor-core 工件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions。
reactor-test 现在与 reactor-core 位于同一 groupId 下。
将 io.projectreactor.addons:reactor-test 替换为 io.projectreactor:reactor-test。
在此版本周期中,已经做出持续的努力,以更好地与 Kotlin 等语言集成。
这主要转化为一些 API 重构,以避免使用 lambda 的模糊签名。每当一个方法有两个仅因其采用的函数式接口类型而不同的重载时,我们都会将其中一个变体提取为运算符的新后缀。
例如,考虑 buffer(Publisher, Supplier) 和 buffer(Publisher, Function)。第二个变体已重命名为 bufferWhen,因为它在来自该 Function 的配套 Publisher 发出时创建缓冲区。
正如我们上面看到的,Kotlin 扩展也已直接集成到 reactor-core 存储库中。
此外,通过引入三个注解,改进了对 null 安全分析的支持。这些注解构建于 JSR 305 之上,尽管 JSR 305 处于休眠状态,但包括 IntelliJ IDE 在内的多个静态分析工具都使用了它。以下注解在 reactor.util.annotation 包中提供
@NonNull 表示特定的参数、返回值或字段不能为 null。(在应用 @NonNullApi 的参数和返回值上不需要)。
@Nullable 表示参数、返回值或字段可以为 null。
@NonNullApi 是一个包级别注解,表示非 null 是参数和返回值的默认行为。
我们利用这些注解在所有公共 Reactor Core API 上表达显式且可操作的 null 安全约定。
文档也进行了一些改进:参考指南终于完成了,并且 javadoc 已经过审查和改写,以便更清楚地描述一些方法。
由于对 Kotlin 的支持已直接集成到 reactor-core 中,因此已在参考指南中添加了新部分,并且 KotlinDocs 已发布。
Context(上下文)上下文数据现在可以作为类似 Map 的 Context 对象附加到每个订阅的 Flux 或 Mono!
这是一个高级功能,主要对库开发人员感兴趣,但我们知道它将在迁移以前依赖于命令式代码中的 ThreadLocal 的功能方面证明非常宝贵。
在 Spring 产品组合中,我们预计 spring-security 和 spring-cloud-sleuth 都将从中受益匪浅。
为了向 Context 添加信息,请使用 subscriberContext(Context) 运算符,如下例所示
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
提示
Context 是不可变的,并且在订阅阶段传播,该阶段从链的末尾(subscribe() 调用)运行到链的开头。
因此,您通过 subscriberContext 放入 Context 的内容会导致 Context 的丰富副本,仅对它上面的运算符可见。
为了检索和使用放入 Context 中的信息,运算符的上游链可以利用 Mono.subscriberContext(),它会物化可见的 Context(例如,在 flatMap 内部)。
它可能看起来像以下代码段
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
);
请访问参考指南以了解有关 Context 的更多信息。
expand 运算符此运算符可用于递归地将源元素扩展为嵌套的 Publishers,从而生成 Publisher 的深度优先或广度优先的图。
null 提供者映射到空的 Mono当将返回 null 的源 Callable 或 Supplier 转换为 Mono 时,fromCallable 和 fromSupplier 过去会发出错误。现在它们可以容纳 null 结果,并生成一个空的 Mono。
name 和 tag 反应式序列的能力可以为 Flux 提供一个单独的 name(String)。可以使用 Scannable.name() 检索它,该方法向上游遍历运算符链,直到找到第一个声明的名称。
类似地,可以将多个 tag(String, String) 键值对与 Flux 或 Mono 关联。这些可以通过 Scannable#tags() 方法检索为 Stream<Tuple2<String, String>>,该方法向上游遍历整个运算符链。
BiPredicate 的 distinctUntilChanged 变体BiPredicate 应用于当前源元素和最后发出的元素,而不是使用 Set 语义来评估是否存在更改。这使得可以跳过*太接近*最后发出元素 (例如,差异小于 1 的 Doubles) 的元素。
Mono.cache(Duration)这允许轻松缓存一个难以计算的单个值(或错误)一段有限的时间。TTL 周期后第一个进入的订阅者将重新触发对源的订阅。
checkpoint(String) 的成本改进checkpoint(String) 变体现在默认情况下是轻量级的,这意味着在实例化时没有填充堆栈跟踪(使得操作员的使用成本更低)。我们现在假设 String 标识符足够唯一,足以找到以错误终止的序列的实例化点。
refCount 添加一个宽限期使用 refCount 时,现在可以提供 Duration。当引用计数序列的订阅者数量低于阈值时,操作员会等待该持续时间,而不是立即从源取消订阅。如果在此宽限期内有足够的订阅者返回,则不会发生取消。
delayUntil 替换 untilOtherdelayUntil 操作符延迟 Mono 的发射,直到从源值生成的伴随 Publisher 完成。
请注意,有一个 untilOther 操作符已被删除。它过去也延迟,但在伴随对象的第一个 onNext 上触发。delayUntil 更灵活,因为可以通过在伴随对象上附加 take(1) 来实现相同的行为。
reactor-test 工件也有一些新特性
围绕使用断言的错误验证的新期望:expectErrorSatisfies 和 verifyErrorSatisfies。
为 StepVerifier#verify() 添加了一个可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout 方法进行设置。
添加了一个 PublisherProbe,通过包装任何 Flux 或 Mono,可以轻松检查具有条件切换(例如 switchIfEmpty)的复杂操作符链是否通过逻辑分支,同时仍然为测试发出有意义的数据。
如果您是 Reactor 的新手,现在是使用 Spring Framework 5.0 开始您的反应式旅程的激动人心的时刻。如果您不是,我们希望您会更喜欢使用 Reactor,因为所有这些更改都已到位。
如果您遇到任何迁移困难,请查看发行说明或在我们的 Gitter 上寻求帮助。
祝您反应式编码愉快!
铋晶体照片 CC-By-SA David Abercrombie 通过 Flickr