领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 **Reactor Bismuth** 的 GA 版本发布,其中尤其包含了 reactor-core
**3.1.0.RELEASE** 和 reactor-netty
**0.7.0.RELEASE** ?
随着 Spring Framework 5.0 现在刚刚发布,您可以想象这对 Project Reactor 来说是一个巨大的进步 :)
就像铋晶体一样,这个版本非常复杂
此版本包含许多更改和 API 优化。对于 reactor-core,您可以在 发行说明 中找到详尽的列表。
重要
reactor-core
和 reactor-test
的 **源代码** jar 文件存在一个 **已知(轻微)问题**:它们包含重复的 Java 源代码条目。有关更多详细信息,请参阅问题 #887。
让我们概述一下此版本中的新功能和值得注意的功能
在 3.0.x 周期的最后一个版本和 3.1.0 之间进行了一些 API 更改。我们希望为 Spring Framework 5 的长期运行提供最佳的 API,因此这些重大更改是必要的。发行说明包含更详尽的此类更改列表,但让我们看一下其中的一些。
错误处理运算符在 Flux
和 Mono
中都变得更加一致。所有运算符都使用 onError
前缀,并且 API 在两个类中都保持一致
**onErrorReturn** 切换到备用值(以前在 Mono 中为 otherwiseReturn
)
**onErrorResume** 切换到备用 Publisher
(以前在 Flux 中为 onErrorResumeWith
,在 Mono 中为 otherwise
)
**onErrorMap** 将 Exception
转换为另一个异常(以前在 Flux 和 Mono 中都为 mapError
)
**switchIfEmpty** 如果源为空,则切换到备用序列(以前在 Mono
中为 otherwiseIfEmpty
)
Flux#switchOnError
已被移除,可以使用 onErrorResume
并使用忽略其参数的 lambda 表达式来实现相同的功能。
Flux
和 Mono
概念和运算符之间的对齐Mono
API 倾向于在某些情况下偏离 Flux
API,尽管概念相似。在有意义的地方,这些差异已经得到修复。
when
、and
、zip
和 zipWith
的对齐例如,Mono
具有 and
运算符和 when
静态方法 **用于组合元素并生成 Mono<Tuple2>
**。从本质上讲,它们基本上是 Flux
中 zipWith
和 zip
的专门化,但具有不同的名称使得难以关联这些概念。这就是为什么这些方法在 Mono
中被重命名的原因。
另一方面,您会注意到 Mono
API 中仍然存在不同类型的 when
和 and
。与我们之前看到的相反,这些返回 Mono<Void>
。处理任务的完成是 Mono
的一个基本用例,这些方法现在专门针对此类应用程序量身定制:它们将组合并执行多个任务(表示为源 Monos),忽略其潜在的 onNext
信号,并且只传播 onComplete
信号的组合,从而产生一个 Mono<Void>
,该信号在 N 个任务完成后完成。
then
前缀的一致语义类似地,then
前缀现在始终表示要丢弃源的 onNext
,而是构建在终端信号之上。这在 Flux
和 Mono
中都保持一致
then()
返回一个 Mono<Void>
,它只传播源的 onComplete
或 onError
信号。
then(Mono<V>)
返回一个 Mono<V>
:它在切换到另一个提供的 Mono
之前等待原始的 onComplete
信号,只发出来自该另一个 Mono
的元素。
thenMany(Publisher<V>)
类似,只是它会继续到 Flux<V>
中。
thenEmpty(Publisher<Void>)
返回一个 Mono<Void>
,该信号在原始 Mono
然后 Publisher
完成后完成。也就是说,它表示顺序完成,与 and
立即订阅两个序列不同。
请注意,采用 Supplier
参数的变体已被完全移除(它们的延迟语义可以用 Mono.defer
替换)。此外,上面描述的 Mono#thenEmpty
已从 then(Publisher<Void>)
重命名。
then
如何实际上是 flatMap
Mono
曾经有另一个有趣的 then
变体
Mono<V> then(Function<T, Mono<V>> thenFunction);
仔细观察,这与 then*
专注于终端信号的新语义不符。相反,它会将源转换为另一个 Mono
,具体取决于源的 onNext
。听起来熟悉吗?这确实与 Flux#flatMap
的作用一致!
不过,存在一个问题:Mono
已经有一个 flatMap
运算符
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
经过更多思考,我们认识到 flatMap
的经典语义是返回与应用 flatMap
的类型相同的类型的值。因此,让 Mono#flatMap
返回 Mono
更正确……
因此,我们将此 then 变体重命名为 flatMap
,并在将返回 Flux
的变体上使用 Many
后缀
Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
提示
为了简化迁移,我们建议您首先搜索所有旧 Mono.flatMap
的用法,并将这些用法替换为 flatMapMany
。然后搜索 then(Function)
的用法,并将这些用法替换为 flatMap
。
**时间**:在此版本中,所有处理时间的运算符都专门通过 Duration
类型进行。大多数运算符以前都具有一个带有 *Millis
后缀的变体,该变体使用 long
和 TimeUnit
来表示持续时间。这些变体已在 3.1.0 中删除。
**延迟错误**:另一个涉及后缀的跨领域更改:某些运算符具有可选配置,它们可以组合多个错误并在最后发出 onError
信号,从而允许某些值仍然进入生成的序列中。例如,flatMap
可以立即停止(如果内部序列发出 onError
),或者在传播该错误之前继续合并来自其他内部序列的元素。
在某些情况下,该可选行为在 API 中表示为 boolean
标志参数。在其他情况下,它是一个以 DelayError
为后缀的单独变体。在 3.1.0 中,所有这些变体都已对齐,以始终使用 *DelayError
后缀而不是布尔标志。
**简化的接口**:一些专门的接口也已删除,取而代之的是更简单或更通用的替代方案
Cancellation
接口已被移除,取而代之的是更通用的 Disposable
接口。
TimedScheduler
接口已被移除。少数几个不具备时间功能的 Scheduler
,当尝试在其上使用 schedulePeriodically
时,将抛出 RejectedExecutionException
异常来指示这一点。此外,Scheduler#shutdown
方法已被移除,取而代之的是 Disposable
接口中的 dispose()
方法。
几个用于内省的接口已被简化为单个 Scannable
接口,该接口通过其 scan(Scannable.Attr)
方法(以尽力而为的方式)公开有关操作符当前状态的信息。
QueueSupplier
已重命名为 Queues
,现在只是一个工具类(不再是 Supplier
)。
Kotlin 扩展和 Reactor 测试构件已直接集成到 reactor-core
的主存储库中。
Kotlin 扩展是 reactor-core
构件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions
。
reactor-test
现在与 reactor-core
具有相同的 groupId。
将 io.projectreactor.addons:reactor-test
替换为 io.projectreactor:reactor-test
。
在此发布周期中,我们一直致力于更好地与 Kotlin 等语言集成。
这尤其体现在一些 API 的重构上,以避免 lambda 表达式出现歧义签名。当一个方法有两个重载,而它们仅仅是所接受的功能接口类型不同时,我们将其中一个变体提取出来作为操作符的新后缀。
例如,考虑 buffer(Publisher, Supplier)
和 buffer(Publisher, Function)
。第二个变体已重命名为 bufferWhen
,因为它在来自该 Function
的配套 Publisher
发射时创建缓冲区。
正如我们上面所看到的,**Kotlin 扩展**也已直接集成到 reactor-core 存储库中。
此外,通过引入三个注解,改进了对空安全分析的支持。这些注解基于 JSR 305,尽管该标准处于休眠状态,但仍被包括 IntelliJ IDE 在内的多个静态分析工具所使用。以下注解在 reactor.util.annotation
包中提供。
@NonNull
指示特定参数、返回值或字段不能为 null。(在 @NonNullApi
适用的参数和返回值上不需要它)。
@Nullable
指示参数、返回值或字段可以为 null。
@NonNullApi
是一个包级注解,指示非 null 是参数和返回值的默认行为。
我们利用这些注解在所有公共 Reactor Core API 上表达明确且可操作的空安全契约。
文档也得到了改进:参考指南终于完成,并且已经审查和改写了 javadoc,以便更清楚地描述某些方法。
随着 Kotlin 支持直接集成到 reactor-core 中,参考指南中添加了一个新章节,并且 Kotlin 文档已发布。
Context
现在可以将上下文数据附加到 Flux
或 Mono
,每个订阅作为一个类似 Map 的 Context
对象!
这是一个高级功能,主要会让库开发者感兴趣,但我们知道它将被证明在迁移以前依赖于命令式代码中 ThreadLocal
的功能方面非常有价值。
在 Spring 产品组合中,我们预计 spring-security
和 spring-cloud-sleuth
将从中受益匪浅。
为了向 Context
添加信息,请使用 subscriberContext(Context)
操作符,如下例所示。
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
提示
Context
是不可变的,并在订阅阶段传播,该阶段从链的末端(subscribe()
调用)向链的开头运行。
因此,通过 subscriberContext
放入 Context
中的内容会导致 Context
的一个富化副本,**只有上游的操作符才能看到**。
为了检索和使用放入 Context
中的信息,操作符的上游链可以使用 Mono.subscriberContext()
,它将可见的 Context
物化(例如,在 flatMap
中)。
它可能看起来像以下代码片段。
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
);
访问参考指南以了解更多关于 Context
的信息。
expand
操作符此操作符可用于递归地将源元素扩展为嵌套的 Publishers
,生成一个深度优先或广度优先的 Publisher
图。
null
提供程序映射到空 Mono
当将返回 null
的源 Callable
或 Supplier
转换为 Mono
时,fromCallable
和 fromSupplier
过去会发出错误。现在它们可以容纳 null
结果,并改为生成一个空的 Mono
。
可以为 Flux
提供单个 name(String)
。可以使用 Scannable.name()
检索它,该方法向上游遍历操作符链,直到找到第一个声明的名称。
类似地,可以将多个 tag(String, String)
键值对与 Flux
或 Mono
关联。可以通过 Scannable#tags()
方法将其检索为 Stream<Tuple2<String, String>>
,该方法向上游遍历整个操作符链。
BiPredicate
的 distinctUntilChanged
变体BiPredicate
不使用 Set
语义来评估是否存在更改,而是将其应用于当前源元素和最后发出的元素。这使得如果元素太接近于最后发出的元素(例如,差值 < 1 的 Doubles
),则可以跳过这些元素。
Mono.cache(Duration)
这使得可以轻松地缓存一个难以计算的单一值(或错误)一段时间。在 TTL 周期结束后,第一个进入的订阅者将重新触发对源的订阅。
checkpoint(String)
的性能checkpoint(String)
变体现在默认情况下是轻量级的,这意味着在实例化时不会填充堆栈跟踪(使操作符的使用成本更低)。我们现在假设 String
标识符足够唯一,足以找到以错误终止的序列的实例化点。
refCount
添加宽限期使用 refCount
时,现在可以提供一个 Duration
。当引用计数序列的订阅者数量低于阈值时,操作符会等待该持续时间,而不是立即取消订阅源。如果在此宽限期内有足够的订阅者返回,则不会发生取消。
delayUntil
替换 untilOther
delayUntil
操作符延迟 Mono
的发射,直到伴随的 Publisher
(从源值生成)完成之后。
请注意,之前有一个 untilOther
操作符已被移除。它过去也用于延迟,但会在伴随的 Publisher
的第一个 onNext
时触发。delayUntil
更加灵活,因为可以通过在伴随的 Publisher
上附加一个 take(1)
来实现相同的行为。
reactor-test
工件也有一些新功能
围绕使用断言兼容的错误验证的新期望:expectErrorSatisfies
和 verifyErrorSatisfies
。
为 StepVerifier#verify()
添加了一个可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout
方法进行设置。
添加了一个 PublisherProbe
,以便轻松检查具有条件切换(例如 switchIfEmpty
)的复杂操作符链是否通过逻辑分支,同时仍通过包装任何 Flux
或 Mono
为测试发出有意义的数据。
如果您是 Reactor 的新手,现在是开始使用 Spring Framework 5.0 进行响应式之旅的激动人心的时刻。如果您不是新手,我们希望您现在能够更加享受使用 Reactor,因为所有这些更改都已到位。
如果您遇到任何迁移困难,请查看 发行说明 或在我们的 Gitter 上寻求帮助。
快乐的响应式编码!
铋晶体照片 CC-By-SA David Abercrombie 通过 Flickr