取得领先
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 Reactor Bismuth 的 GA 版本发布,它特别包含 reactor-core
3.1.0.RELEASE 和 reactor-netty
0.7.0.RELEASE ?
随着 Spring Framework 5.0 的发布,你可以想象这对于 Project Reactor 来说是一个巨大的进步 :)
就像铋晶体一样,这个版本非常复杂
此版本包含大量更改和 API 改进。 对于 reactor-core,您可以在 发行说明 中找到详尽的列表。
重要提示
reactor-core
和 reactor-test
源代码 jar 存在一个已知(小)问题:它们包含重复的 java 源代码条目。 有关更多详细信息,请参见问题 #887。
让我们概述一下此版本中的新增功能和值得注意的地方
在 3.0.x 周期中的最后一个版本和 3.1.0 之间,已进行了许多 API 更改。 我们希望为 Spring Framework 5 推出最佳 API,因此这些重大更改是必要的。 发行说明包含此类更改的更详尽列表,但让我们看一下其中的一些更改。
错误处理运算符在 Flux
和 Mono
中都更加一致。 所有都使用 onError
前缀,并且 API 在两个类中对齐
onErrorReturn 切换到备用值(以前在 Mono 中为 otherwiseReturn
)
onErrorResume 切换到备用 Publisher
(以前在 Flux 中为 onErrorResumeWith
,在 Mono 中为 otherwise
)
onErrorMap 将 Exception
转换为另一个异常(以前在 Flux 和 Mono 中为 mapError
)
switchIfEmpty 如果源为空,则切换到备用序列(以前在 Mono
中为 otherwiseIfEmpty
)
Flux#switchOnError
已删除,可以使用一个忽略其参数的 lambda 的 onErrorResume
来实现相同的目的。
Flux
和 Mono
概念与运算符之间的对齐尽管概念相似,但 Mono
API 有时会偏离 Flux
API。 在有意义的地方,这些分离已被修复。
when
、and
、zip
和 zipWith
的对齐例如,Mono
有一个 and
运算符和 when
静态方法用于组合元素并生成 Mono<Tuple2>
。 本质上,它们是 Flux
中 zipWith
和 zip
的一种专门化,但是具有不同的名称使得难以附加概念。 这就是为什么这些方法已在 Mono
中重命名。
另一方面,您会注意到 Mono
API 中仍然存在不同风格的 when
和 and
。 与我们之前看到的相反,这些返回 Mono<Void>
。 处理任务的完成是 Mono
的一个重要用例,这些方法现在专门为此类应用程序量身定制:它们将组合和执行多个任务(表示为源 Mono),忽略其潜在的 onNext
信号,并且仅传播 onComplete
信号的组合,从而导致一个 Mono<Void>
,该信号在 N 个任务完成时完成。
then
前缀的一致语义同样,then
前缀现在始终表示要丢弃源的 onNext
,而是建立在终端信号上。 这在 Flux
和 Mono
中都保持一致
then()
返回一个 Mono<Void>
,该信号仅传播来自源的 onComplete
或 onError
信号。
then(Mono<V>)
返回一个 Mono<V>
:它等待原始的 onComplete
信号,然后切换到另一个提供的 Mono
,仅发出来自其他 Mono
的元素。
thenMany(Publisher<V>)
与此类似,不同之处在于它继续进入 Flux<V>
。
thenEmpty(Publisher<Void>)
返回一个 Mono<Void>
,该信号在原始 Mono
然后 Publisher
完成后完成。 也就是说,它表示顺序完成,与立即订阅两个序列的 and
不同。
请注意,采用 Supplier
参数的变体已完全删除(它们的惰性语义可以由 Mono.defer
替换)。 此外,上面描述的 Mono#thenEmpty
已从 then(Publisher<Void>)
重命名。
then
实际上是如何成为 flatMap
的Mono
过去还有另一个有趣的 then
变体
Mono<V> then(Function<T, Mono<V>> thenFunction);
仔细观察,这与 then*
关注终端信号的新语义不符。 相反,它会根据源的 onNext
将源转换为另一个 Mono
。 听起来是不是很熟悉? 这确实与 Flux#flatMap
的功能一致!
不过,有一个问题:Mono
已经有了一个 flatMap
运算符
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
在进一步思考之后,我们认识到 flatMap
的经典语义是返回与应用 flatMap
的类型相同的类型的值。 因此,更正的做法是使 Mono#flatMap
返回一个 Mono
…
因此,我们将这个 then 变体重命名为 flatMap
,并在返回 Flux
的变体上使用了 Many
后缀。
Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
提示
为了方便迁移,我们建议您首先搜索所有旧的 Mono.flatMap
的用法,并将其替换为 flatMapMany
。然后搜索 then(Function)
的用法,并将其替换为 flatMap
。
时间:在此版本中,所有处理时间的运算符都专门使用 Duration
类型。大多数曾经有一个带有 *Millis
后缀的变体,该变体使用 long
和 TimeUnit
来表示持续时间。这些变体已在 3.1.0 中删除。
延迟错误:另一个涉及后缀的交叉性更改:一些运算符具有可选配置,它们可以组合多个错误,并在最后发出 onError
信号,从而允许某些值仍然出现在结果序列中。例如,flatMap
可以立即停止,如果内部序列发出 onError
,或者继续合并来自其他内部序列的元素,然后再传播该错误。
在某些情况下,该可选行为在 API 中表示为 boolean
标志参数。在其他情况下,它是一个单独的变体,带有 DelayError
后缀。在 3.1.0 中,所有这些变体都已对齐,以始终使用 *DelayError
后缀,而不是布尔标志。
简化的接口:为了支持更简单或更通用的替代方案,一些专门的接口也被删除。
Cancellation
接口已被删除,取而代之的是更通用的 Disposable
接口。
TimedScheduler
接口已被删除。如果尝试在不具备时间功能的 Scheduler
上使用 schedulePeriodically
,少数几个 Scheduler
将会抛出 RejectedExecutionException
,指示这一点。此外,Scheduler#shutdown
已被删除,取而代之的是 Disposable 中的 dispose()
。
用于内省的几个接口已被简化为单个 Scannable
接口,该接口通过其 scan(Scannable.Attr)
方法公开关于运算符当前状态的信息(尽最大努力)。
QueueSupplier
已重命名为 Queues
,现在纯粹是一个实用程序类(不再是 Supplier
)。
Kotlin 扩展和 Reactor Test 工件都已直接集成到 reactor-core
的主存储库中。
Kotlin 扩展是 reactor-core
工件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions
。
reactor-test
现在与 reactor-core
位于同一 groupId 下。
将 io.projectreactor.addons:reactor-test
替换为 io.projectreactor:reactor-test
。
在此版本周期中,已经做出持续的努力,以更好地与 Kotlin 等语言集成。
这主要转化为一些 API 重构,以避免使用 lambda 的模糊签名。每当一个方法有两个仅因其采用的函数式接口类型而不同的重载时,我们都会将其中一个变体提取为运算符的新后缀。
例如,考虑 buffer(Publisher, Supplier)
和 buffer(Publisher, Function)
。第二个变体已重命名为 bufferWhen
,因为它在来自该 Function
的配套 Publisher
发出时创建缓冲区。
正如我们上面看到的,Kotlin 扩展也已直接集成到 reactor-core 存储库中。
此外,通过引入三个注解,改进了对 null 安全分析的支持。这些注解构建于 JSR 305 之上,尽管 JSR 305 处于休眠状态,但包括 IntelliJ IDE 在内的多个静态分析工具都使用了它。以下注解在 reactor.util.annotation
包中提供
@NonNull
表示特定的参数、返回值或字段不能为 null。(在应用 @NonNullApi
的参数和返回值上不需要)。
@Nullable
表示参数、返回值或字段可以为 null。
@NonNullApi
是一个包级别注解,表示非 null 是参数和返回值的默认行为。
我们利用这些注解在所有公共 Reactor Core API 上表达显式且可操作的 null 安全约定。
文档也进行了一些改进:参考指南终于完成了,并且 javadoc 已经过审查和改写,以便更清楚地描述一些方法。
由于对 Kotlin 的支持已直接集成到 reactor-core 中,因此已在参考指南中添加了新部分,并且 KotlinDocs 已发布。
Context
(上下文)上下文数据现在可以作为类似 Map 的 Context
对象附加到每个订阅的 Flux
或 Mono
!
这是一个高级功能,主要对库开发人员感兴趣,但我们知道它将在迁移以前依赖于命令式代码中的 ThreadLocal
的功能方面证明非常宝贵。
在 Spring 产品组合中,我们预计 spring-security
和 spring-cloud-sleuth
都将从中受益匪浅。
为了向 Context
添加信息,请使用 subscriberContext(Context)
运算符,如下例所示
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
提示
Context
是不可变的,并且在订阅阶段传播,该阶段从链的末尾(subscribe()
调用)运行到链的开头。
因此,您通过 subscriberContext
放入 Context
的内容会导致 Context
的丰富副本,仅对它上面的运算符可见。
为了检索和使用放入 Context
中的信息,运算符的上游链可以利用 Mono.subscriberContext()
,它会物化可见的 Context
(例如,在 flatMap
内部)。
它可能看起来像以下代码段
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
);
请访问参考指南以了解有关 Context
的更多信息。
expand
运算符此运算符可用于递归地将源元素扩展为嵌套的 Publishers
,从而生成 Publisher
的深度优先或广度优先的图。
null
提供者映射到空的 Mono
当将返回 null
的源 Callable
或 Supplier
转换为 Mono
时,fromCallable
和 fromSupplier
过去会发出错误。现在它们可以容纳 null
结果,并生成一个空的 Mono
。
name
和 tag
反应式序列的能力可以为 Flux
提供一个单独的 name(String)
。可以使用 Scannable.name()
检索它,该方法向上游遍历运算符链,直到找到第一个声明的名称。
类似地,可以将多个 tag(String, String)
键值对与 Flux
或 Mono
关联。这些可以通过 Scannable#tags()
方法检索为 Stream<Tuple2<String, String>>
,该方法向上游遍历整个运算符链。
BiPredicate
的 distinctUntilChanged
变体BiPredicate
应用于当前源元素和最后发出的元素,而不是使用 Set
语义来评估是否存在更改。这使得可以跳过*太接近*最后发出元素 (例如,差异小于 1 的 Doubles
) 的元素。
Mono.cache(Duration)
这允许轻松缓存一个难以计算的单个值(或错误)一段有限的时间。TTL 周期后第一个进入的订阅者将重新触发对源的订阅。
checkpoint(String)
的成本改进checkpoint(String)
变体现在默认情况下是轻量级的,这意味着在实例化时没有填充堆栈跟踪(使得操作员的使用成本更低)。我们现在假设 String
标识符足够唯一,足以找到以错误终止的序列的实例化点。
refCount
添加一个宽限期使用 refCount
时,现在可以提供 Duration
。当引用计数序列的订阅者数量低于阈值时,操作员会等待该持续时间,而不是立即从源取消订阅。如果在此宽限期内有足够的订阅者返回,则不会发生取消。
delayUntil
替换 untilOther
delayUntil
操作符延迟 Mono
的发射,直到从源值生成的伴随 Publisher
完成。
请注意,有一个 untilOther
操作符已被删除。它过去也延迟,但在伴随对象的第一个 onNext
上触发。delayUntil
更灵活,因为可以通过在伴随对象上附加 take(1)
来实现相同的行为。
reactor-test
工件也有一些新特性
围绕使用断言的错误验证的新期望:expectErrorSatisfies
和 verifyErrorSatisfies
。
为 StepVerifier#verify()
添加了一个可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout
方法进行设置。
添加了一个 PublisherProbe
,通过包装任何 Flux
或 Mono
,可以轻松检查具有条件切换(例如 switchIfEmpty
)的复杂操作符链是否通过逻辑分支,同时仍然为测试发出有意义的数据。
如果您是 Reactor 的新手,现在是使用 Spring Framework 5.0 开始您的反应式旅程的激动人心的时刻。如果您不是,我们希望您会更喜欢使用 Reactor,因为所有这些更改都已到位。
如果您遇到任何迁移困难,请查看发行说明或在我们的 Gitter 上寻求帮助。
祝您反应式编码愉快!
铋晶体照片 CC-By-SA David Abercrombie 通过 Flickr