Reactor Bismuth 已发布

发布 | Simon Baslé | 2017年9月28日 | ...

我非常高兴地宣布 **Reactor Bismuth** 的 GA 版本,其中包含了 reactor-core **3.1.0.RELEASE** 和 reactor-netty **0.7.0.RELEASE** ?

随着 Spring Framework 5.0 的发布,您可以想象这对 Project Reactor 来说是一个巨大的进步 :)

bismuth crystal

正如铋晶体一样,这个版本是错综复杂的。

 
本次发布包含大量更改和 API 优化。有关 reactor-core 的详尽列表,请参阅发布说明

重要

reactor-corereactor-test源码 jar 包 存在一个已知(小)问题:它们包含重复的 Java 源码条目。更多细节请参阅 issue #887

让我们来概览一下此版本中的新增内容和亮点。

API 最终润色

在 3.0.x 系列的最后一个版本和 3.1.0 之间进行了一些 API 更改。我们希望与 Spring Framework 5 长期共存的最佳 API,因此这些破坏性更改是必要的。发布说明包含此类更改的更详尽列表,但让我们来看其中的一些。

错误处理操作符

FluxMono 中的错误处理操作符已更加一致。所有操作符都使用 onError 前缀,并且 API 在两个类中都已对齐。

  • onErrorReturn 用于切换到备用值(以前在 Mono 中是 otherwiseReturn)。

  • onErrorResume 用于切换到备用 Publisher(以前在 Flux 中是 onErrorResumeWith,在 Mono 中是 otherwise)。

  • onErrorMap 用于将一个 Exception 转换为另一个(以前在 Flux 和 Mono 中都是 mapError)。

  • switchIfEmpty 用于在源为空时切换到备用序列(以前在 Mono 中是 otherwiseIfEmpty)。

  • Flux#switchOnError 已被移除,可以通过 onErrorResume 使用忽略其参数的 lambda 来实现相同的功能。

FluxMono 概念与操作符对齐

Mono API 过去有时会偏离 Flux API,尽管概念相似。在有意义的地方,这些分离已被修复。

when, and, zip, 和 zipWith 对齐

例如,Mono 有一个 and 操作符和 when 静态方法,它们曾经用于组合元素并产生 Mono<Tuple2>。本质上,它们基本上是 FluxzipWithzip 的特例,但名称不同,导致概念难以关联。这就是为什么这些方法在 Mono 中被重命名了。

另一方面,您会注意到 Mono API 中仍然存在 whenand 的另一种形式。与我们之前看到的相反,这些返回 Mono<Void>。处理任务完成是 Mono 的一个基本用例,而这些方法现在专门为此类应用进行了调整:它们将组合并执行多个任务(表示为源 Monos),忽略它们潜在的 onNext 信号,并且只传播 onComplete 信号的组合,从而产生一个在 N 个任务完成时完成的 Mono<Void>

then 前缀的语义一致性

同样,then 前缀现在一致地表示源的 onNext 将被丢弃,而是建立在终止信号之上。这一点在 FluxMono 中都已实现。

  • then() 返回一个 Mono<Void>,它仅传播源的 onCompleteonError 信号。

  • then(Mono<V>) 返回一个 Mono<V>:它等待原始 onComplete 信号,然后切换到提供的另一个 Mono,只发出那个其他 Mono 的元素。

  • thenMany(Publisher<V>) 类似,除了它继续到一个 Flux<V>

  • thenEmpty(Publisher<Void>) 返回一个 Mono<Void>,它在原始 Mono *然后* Publisher 完成后完成。也就是说,它代表顺序完成,不像 and 会立即订阅两个序列。

请注意,接受 Supplier 参数的变体已被完全移除(它们的惰性语义可以通过 Mono.defer 替代)。此外,上面描述的 Mono#thenEmpty 是从 then(Publisher<Void>) 重命名的。

一个 then 曾经是 flatMap

Mono 曾经有另一个有趣的 then 变体。

Mono<V> then(Function<T, Mono<V>> thenFunction);

仔细观察,这与 then* 关注终止信号的新语义不符。相反,它会根据源的 onNext 将源转换为另一个 Mono。听起来很熟悉?这确实与 Flux#flatMap 的作用一致!

不过,有一个问题:Mono 已经有一个 flatMap 操作符。

Flux<V> flatMap(Function<T, Publisher<V>> mapper);

经过进一步思考,我们认识到 flatMap 的经典语义是返回与应用 flatMap 的类型相同的值。因此,让 Mono#flatMap 返回一个 Mono 更为正确……

结果,我们重命名了这个 then 变体为 flatMap,并将 Many 后缀用于返回 Flux 的变体。

Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);

提示

为了便于迁移,我们建议您搜索所有旧的 Mono.flatMap 的用法,并首先将它们替换为 flatMapMany。然后搜索 then(Function) 的用法,并将它们替换为 flatMap

各种 API 简化

时间:在此版本中,所有涉及时间的运算符都仅通过 Duration 类型进行处理。大多数以前有一个带 *Millis 后缀的变体,它使用 longTimeUnit 来表示时长。这些变体在 3.1.0 中已被移除。

延迟错误:另一个处理后缀的横切性更改:一些运算符具有可选配置,可以将多个错误合并并在最后发出 onError 信号,从而允许一些值仍然进入结果序列。例如,flatMap 可以在内部序列发出 onError 时立即停止,也可以在传播该错误之前继续合并其他内部序列的元素。

在某些情况下,这种可选行为在 API 中表示为 boolean 标志参数。在其他情况下,它是一个以 DelayError 作为后缀的单独变体。在 3.1.0 中,所有这些变体都已对齐,以一致地使用 *DelayError 后缀,而不是布尔标志。

简化接口:一些专用接口也已被移除,以支持更简单或更通用的替代方案。

  • Cancellation 接口已被移除,以支持更通用的 Disposable 接口。

  • TimedScheduler 接口已被移除。少数非时间功能的 Scheduler 在尝试对其使用 schedulePeriodically 时将抛出 RejectedExecutionException 来指示这一点。此外,Scheduler#shutdown 已被移除,以支持 Disposabledispose()

  • 用于自省的几个接口已被简化为一个 Scannable 接口,该接口通过其 scan(Scannable.Attr) 方法公开运算符当前状态的信息(尽最大努力)。

  • QueueSupplier 已重命名为 Queues,现在纯粹是一个实用类(不再是 Supplier)。

Reactor-Test 和 Kotlin 扩展已迁移

Kotlin 扩展和 Reactor Test 工件已直接集成到 reactor-core 的主仓库中。

  • Kotlin 扩展是 reactor-core 工件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions

  • reactor-test 现在与 reactor-core 属于同一 GroupId。

    • io.projectreactor.addons:reactor-test 替换为 io.projectreactor:reactor-test

Kotlin 和 Null-Safety 支持

在此发行周期中,我们致力于更好地与 Kotlin 等语言集成。

这尤其体现在一些 API 重构上,以避免与 lambda 产生歧义签名。当一个方法有两个重载,仅仅是它们接受的函数式接口类型不同时,我们将其中一个变体提取为操作符的新后缀。

例如,考虑 buffer(Publisher, Supplier)buffer(Publisher, Function)。第二个变体已重命名为 bufferWhen,因为它会在 Function 产生的伴随 Publisher 发射时创建一个缓冲区。

如上所述,Kotlin 扩展也已直接集成到 reactor-core 仓库中。

此外,通过引入三个注解,改进了对 null-safety 分析的支持。这些注解基于 JSR 305,尽管 JSR 305 已休眠,但包括 IntelliJ IDE 在内的多个静态分析工具都依赖它。以下注解在 reactor.util.annotation 包中提供:

  • @NonNull 表示特定的参数、返回值或字段不能为 null。(在应用了 @NonNullApi 的参数和返回值上不需要)。

  • @Nullable 表示参数、返回值或字段可以为 null。

  • @NonNullApi 是一个包级别注解,表示非 null 是参数和返回值的默认行为。

我们利用这些注解在所有公共 Reactor Core API 上表达明确且可操作的 null-safety 合约。

文档润色

文档也得到了一些关注:参考指南终于完成,Javadoc 已被审查和改写,以便更清晰地描述某些方法。

随着 Kotlin 支持的直接集成到 reactor-core 中,参考指南中已添加了一个新部分,并且 KotlinDocs 也已发布

Context

现在可以将上下文数据附加到每个订阅的 FluxMono,作为一个类 Map 的 Context 对象!

这是一个高级功能,主要会引起库开发者的兴趣,但我们知道它在迁移以前依赖于命令式代码中 ThreadLocal 的功能等方面将非常宝贵。

在 Spring 产品组合中,我们预计 spring-securityspring-cloud-sleuth 将从中受益匪浅。

为了将信息添加到 Context,请使用 subscriberContext(Context) 操作符,例如下面的示例。

Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
    .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
    .filter(t -> t.getT1() < 300)
    .map(Tuple2::getT2);

提示

Context 是不可变的,并且在 *订阅* 阶段传播,该阶段从链的末端(subscribe() 调用)向链的开始运行。
因此,您通过 subscriberContext 放入 Context 的内容会产生一个 Context 的丰富副本,仅对它上游的操作符可见

为了检索和使用 Context 中放入的信息,上游操作符链可以使用 Mono.subscriberContext(),它会具体化可见的 Context(例如,在 flatMap 中)。

它可能看起来像下面的片段。

Mono<Tuple2<String, Optional<Object>>> dataAndContext =
  data.zipWith(Mono.subscriberContext()
                   .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
  );

前往参考指南以了解更多关于 Context 的信息。

其他有趣的特性

添加了 expand 操作符

此操作符可用于递归地将源元素*扩展*为嵌套的 Publishers,从而生成深度优先或广度优先的 Publisher 图。

null Provider 映射到空 Mono

当将返回 null 的源 CallableSupplier 转换为 Mono 时,fromCallablefromSupplier 过去会发出错误。它们现在会处理 null 结果,并生成一个空 Mono

添加了为响应式序列 nametag 的能力

可以为 Flux 指定一个 name(String)。可以使用 Scannable.name() 检索它,该方法会向上遍历操作符链,直到找到第一个声明的名称。

同样,可以将几个 tag(String, String) 键值对与 FluxMono 关联起来。这些可以通过 Scannable#tags() 方法作为 Stream<Tuple2<String, String>> 检索,该方法会遍历整个操作符链。

添加了带 BiPredicatedistinctUntilChanged 变体

BiPredicate 用于评估是否有变化,而不是使用 Set 语义,它被应用于当前源元素和最后一个发射的元素。这使得在元素*过于接近*最后一个发射的元素时可以跳过它们(例如,差值小于 1 的 Doubles)。

添加了 Mono.cache(Duration)

这允许轻松缓存一个难以计算的单个值(或错误)一段时间。在 TTL 期限后到达的第一个订阅者将重新触发对源的订阅。

checkpoint(String) 的成本优化

checkpoint(String) 变体现在默认是轻量级的,这意味着在实例化时没有填充堆栈跟踪(从而降低了操作符的使用成本)。我们现在假设 String 标识符足够独特,足以找到导致错误终止的序列的实例化点。

refCount 添加了宽限期

使用 refCount 时,现在可以提供一个 Duration。当引用计数序列的订阅者数量降至阈值以下时,操作符将等待该持续时间,而不是立即取消对源的订阅。如果在宽限期内有足够多的订阅者返回,则不会发生取消。

Mono delayUntil 替换 untilOther

delayUntil 操作符会将 Mono 的发射延迟,直到由源值生成的伴随 Publisher 完成。

请注意,有一个 untilOther 操作符已被移除。它曾经也进行延迟,但会在伴随的第一个 onNext 时触发。delayUntil 更灵活,因为可以通过附加 take(1) 到伴随来达到相同的行为。

测试特性

reactor-test 工件也有一些新特性:

  • 新的错误验证期望,与断言兼容:expectErrorSatisfiesverifyErrorSatisfies

  • StepVerifier#verify() 添加了可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout 方法进行设置。

  • 添加了 PublisherProbe,用于轻松检查具有条件切换(例如 switchIfEmpty)的复杂操作符链是否会通过逻辑分支,同时通过包装任何 FluxMono 来为测试发出有意义的数据。

结论

如果您是 Reactor 新手,现在是与 Spring Framework 5.0 一起开始响应式旅程的激动人心之时。如果您不是,我们希望在所有这些更改到位后,您会更喜欢使用 Reactor。

如果您在迁移过程中遇到任何困难,请查看发布说明或在我们的Gitter上寻求帮助。

祝您编码愉快!

bismuth crystal photo CC-By-SA David Abercrombie via Flickr

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有