Reactor Bismuth 发布了

发布 | Simon Baslé | 2017 年 9 月 28 日 | ...

我很高兴地宣布 Reactor Bismuth 的 GA 版本发布,它特别包含 reactor-core 3.1.0.RELEASEreactor-netty 0.7.0.RELEASE ?

随着 Spring Framework 5.0 的发布,你可以想象这对于 Project Reactor 来说是一个巨大的进步 :)

bismuth crystal

就像铋晶体一样,这个版本非常复杂

 
此版本包含大量更改和 API 改进。 对于 reactor-core,您可以在 发行说明 中找到详尽的列表。

重要提示

reactor-corereactor-test 源代码 jar 存在一个已知(小)问题:它们包含重复的 java 源代码条目。 有关更多详细信息,请参见问题 #887

让我们概述一下此版本中的新增功能和值得注意的地方

API 最终改进

在 3.0.x 周期中的最后一个版本和 3.1.0 之间,已进行了许多 API 更改。 我们希望为 Spring Framework 5 推出最佳 API,因此这些重大更改是必要的。 发行说明包含此类更改的更详尽列表,但让我们看一下其中的一些更改。

错误处理运算符

错误处理运算符在 FluxMono 中都更加一致。 所有都使用 onError 前缀,并且 API 在两个类中对齐

  • onErrorReturn 切换到备用值(以前在 Mono 中为 otherwiseReturn

  • onErrorResume 切换到备用 Publisher(以前在 Flux 中为 onErrorResumeWith,在 Mono 中为 otherwise

  • onErrorMapException 转换为另一个异常(以前在 Flux 和 Mono 中为 mapError

  • switchIfEmpty 如果源为空,则切换到备用序列(以前在 Mono 中为 otherwiseIfEmpty

  • Flux#switchOnError 已删除,可以使用一个忽略其参数的 lambda 的 onErrorResume 来实现相同的目的。

FluxMono 概念与运算符之间的对齐

尽管概念相似,但 Mono API 有时会偏离 Flux API。 在有意义的地方,这些分离已被修复。

whenandzipzipWith 的对齐

例如,Mono 有一个 and 运算符和 when 静态方法用于组合元素并生成 Mono<Tuple2>。 本质上,它们是 FluxzipWithzip 的一种专门化,但是具有不同的名称使得难以附加概念。 这就是为什么这些方法已在 Mono 中重命名。

另一方面,您会注意到 Mono API 中仍然存在不同风格的 whenand。 与我们之前看到的相反,这些返回 Mono<Void>。 处理任务的完成是 Mono 的一个重要用例,这些方法现在专门为此类应用程序量身定制:它们将组合和执行多个任务(表示为源 Mono),忽略其潜在的 onNext 信号,并且仅传播 onComplete 信号的组合,从而导致一个 Mono<Void>,该信号在 N 个任务完成时完成。

then 前缀的一致语义

同样,then 前缀现在始终表示要丢弃源的 onNext,而是建立在终端信号上。 这在 FluxMono 中都保持一致

  • then() 返回一个 Mono<Void>,该信号仅传播来自源的 onCompleteonError 信号。

  • then(Mono<V>) 返回一个 Mono<V>:它等待原始的 onComplete 信号,然后切换到另一个提供的 Mono,仅发出来自其他 Mono 的元素。

  • thenMany(Publisher<V>) 与此类似,不同之处在于它继续进入 Flux<V>

  • thenEmpty(Publisher<Void>) 返回一个 Mono<Void>,该信号在原始 Mono 然后 Publisher 完成后完成。 也就是说,它表示顺序完成,与立即订阅两个序列的 and 不同。

请注意,采用 Supplier 参数的变体已完全删除(它们的惰性语义可以由 Mono.defer 替换)。 此外,上面描述的 Mono#thenEmpty 已从 then(Publisher<Void>) 重命名。

一个 then 实际上是如何成为 flatMap

Mono 过去还有另一个有趣的 then 变体

Mono<V> then(Function<T, Mono<V>> thenFunction);

仔细观察,这与 then* 关注终端信号的新语义不符。 相反,它会根据源的 onNext 将源转换为另一个 Mono。 听起来是不是很熟悉? 这确实与 Flux#flatMap 的功能一致!

不过,有一个问题:Mono 已经有了一个 flatMap 运算符

Flux<V> flatMap(Function<T, Publisher<V>> mapper);

在进一步思考之后,我们认识到 flatMap 的经典语义是返回与应用 flatMap 的类型相同的类型的值。 因此,更正的做法是使 Mono#flatMap 返回一个 Mono…​

因此,我们将这个 then 变体重命名为 flatMap,并在返回 Flux 的变体上使用了 Many 后缀。

Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);

提示

为了方便迁移,我们建议您首先搜索所有旧的 Mono.flatMap 的用法,并将其替换为 flatMapMany。然后搜索 then(Function) 的用法,并将其替换为 flatMap

各种 API 简化

时间:在此版本中,所有处理时间的运算符都专门使用 Duration 类型。大多数曾经有一个带有 *Millis 后缀的变体,该变体使用 longTimeUnit 来表示持续时间。这些变体已在 3.1.0 中删除。

延迟错误:另一个涉及后缀的交叉性更改:一些运算符具有可选配置,它们可以组合多个错误,并在最后发出 onError 信号,从而允许某些值仍然出现在结果序列中。例如,flatMap 可以立即停止,如果内部序列发出 onError,或者继续合并来自其他内部序列的元素,然后再传播该错误。

在某些情况下,该可选行为在 API 中表示为 boolean 标志参数。在其他情况下,它是一个单独的变体,带有 DelayError 后缀。在 3.1.0 中,所有这些变体都已对齐,以始终使用 *DelayError 后缀,而不是布尔标志。

简化的接口:为了支持更简单或更通用的替代方案,一些专门的接口也被删除。

  • Cancellation 接口已被删除,取而代之的是更通用的 Disposable 接口。

  • TimedScheduler 接口已被删除。如果尝试在不具备时间功能的 Scheduler 上使用 schedulePeriodically,少数几个 Scheduler 将会抛出 RejectedExecutionException,指示这一点。此外,Scheduler#shutdown 已被删除,取而代之的是 Disposable 中的 dispose()

  • 用于内省的几个接口已被简化为单个 Scannable 接口,该接口通过其 scan(Scannable.Attr) 方法公开关于运算符当前状态的信息(尽最大努力)。

  • QueueSupplier 已重命名为 Queues,现在纯粹是一个实用程序类(不再是 Supplier)。

Reactor-Test 和 Kotlin 扩展已移动

Kotlin 扩展和 Reactor Test 工件都已直接集成到 reactor-core 的主存储库中。

  • Kotlin 扩展是 reactor-core 工件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions

  • reactor-test 现在与 reactor-core 位于同一 groupId 下。

    • io.projectreactor.addons:reactor-test 替换为 io.projectreactor:reactor-test

Kotlin 和 Null 安全支持

在此版本周期中,已经做出持续的努力,以更好地与 Kotlin 等语言集成。

这主要转化为一些 API 重构,以避免使用 lambda 的模糊签名。每当一个方法有两个仅因其采用的函数式接口类型而不同的重载时,我们都会将其中一个变体提取为运算符的新后缀。

例如,考虑 buffer(Publisher, Supplier)buffer(Publisher, Function)。第二个变体已重命名为 bufferWhen,因为它在来自该 Function 的配套 Publisher 发出时创建缓冲区。

正如我们上面看到的,Kotlin 扩展也已直接集成到 reactor-core 存储库中。

此外,通过引入三个注解,改进了对 null 安全分析的支持。这些注解构建于 JSR 305 之上,尽管 JSR 305 处于休眠状态,但包括 IntelliJ IDE 在内的多个静态分析工具都使用了它。以下注解在 reactor.util.annotation 包中提供

  • @NonNull 表示特定的参数、返回值或字段不能为 null。(在应用 @NonNullApi 的参数和返回值上不需要)。

  • @Nullable 表示参数、返回值或字段可以为 null。

  • @NonNullApi 是一个包级别注解,表示非 null 是参数和返回值的默认行为。

我们利用这些注解在所有公共 Reactor Core API 上表达显式且可操作的 null 安全约定。

文档优化

文档也进行了一些改进:参考指南终于完成了,并且 javadoc 已经过审查和改写,以便更清楚地描述一些方法。

由于对 Kotlin 的支持已直接集成到 reactor-core 中,因此已在参考指南中添加了新部分,并且 KotlinDocs 已发布

Context(上下文)

上下文数据现在可以作为类似 Map 的 Context 对象附加到每个订阅的 FluxMono

这是一个高级功能,主要对库开发人员感兴趣,但我们知道它将在迁移以前依赖于命令式代码中的 ThreadLocal 的功能方面证明非常宝贵。

在 Spring 产品组合中,我们预计 spring-securityspring-cloud-sleuth 都将从中受益匪浅。

为了向 Context 添加信息,请使用 subscriberContext(Context) 运算符,如下例所示

Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
    .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
    .filter(t -> t.getT1() < 300)
    .map(Tuple2::getT2);

提示

Context 是不可变的,并且在订阅阶段传播,该阶段从链的末尾(subscribe() 调用)运行到链的开头。
因此,您通过 subscriberContext 放入 Context 的内容会导致 Context 的丰富副本,仅对它上面的运算符可见

为了检索和使用放入 Context 中的信息,运算符的上游链可以利用 Mono.subscriberContext(),它会物化可见的 Context(例如,在 flatMap 内部)。

它可能看起来像以下代码段

Mono<Tuple2<String, Optional<Object>>> dataAndContext =
  data.zipWith(Mono.subscriberContext()
                   .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
  );

请访问参考指南以了解有关 Context 的更多信息。

其他有趣的新特性

添加了 expand 运算符

此运算符可用于递归地将源元素扩展为嵌套的 Publishers,从而生成 Publisher 的深度优先或广度优先的图。

null 提供者映射到空的 Mono

当将返回 null 的源 CallableSupplier 转换为 Mono 时,fromCallablefromSupplier 过去会发出错误。现在它们可以容纳 null 结果,并生成一个空的 Mono

添加了 nametag 反应式序列的能力

可以为 Flux 提供一个单独的 name(String)。可以使用 Scannable.name() 检索它,该方法向上游遍历运算符链,直到找到第一个声明的名称。

类似地,可以将多个 tag(String, String) 键值对与 FluxMono 关联。这些可以通过 Scannable#tags() 方法检索为 Stream<Tuple2<String, String>>,该方法向上游遍历整个运算符链。

添加了一个带有 BiPredicatedistinctUntilChanged 变体

BiPredicate 应用于当前源元素和最后发出的元素,而不是使用 Set 语义来评估是否存在更改。这使得可以跳过*太接近*最后发出元素 (例如,差异小于 1 的 Doubles) 的元素。

添加了 Mono.cache(Duration)

这允许轻松缓存一个难以计算的单个值(或错误)一段有限的时间。TTL 周期后第一个进入的订阅者将重新触发对源的订阅。

checkpoint(String) 的成本改进

checkpoint(String) 变体现在默认情况下是轻量级的,这意味着在实例化时没有填充堆栈跟踪(使得操作员的使用成本更低)。我们现在假设 String 标识符足够唯一,足以找到以错误终止的序列的实例化点。

refCount 添加一个宽限期

使用 refCount 时,现在可以提供 Duration。当引用计数序列的订阅者数量低于阈值时,操作员会等待该持续时间,而不是立即从源取消订阅。如果在此宽限期内有足够的订阅者返回,则不会发生取消。

Mono delayUntil 替换 untilOther

delayUntil 操作符延迟 Mono 的发射,直到从源值生成的伴随 Publisher 完成。

请注意,有一个 untilOther 操作符已被删除。它过去也延迟,但在伴随对象的第一个 onNext 上触发。delayUntil 更灵活,因为可以通过在伴随对象上附加 take(1) 来实现相同的行为。

测试特性

reactor-test 工件也有一些新特性

  • 围绕使用断言的错误验证的新期望:expectErrorSatisfiesverifyErrorSatisfies

  • StepVerifier#verify() 添加了一个可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout 方法进行设置。

  • 添加了一个 PublisherProbe,通过包装任何 FluxMono,可以轻松检查具有条件切换(例如 switchIfEmpty)的复杂操作符链是否通过逻辑分支,同时仍然为测试发出有意义的数据。

结论

如果您是 Reactor 的新手,现在是使用 Spring Framework 5.0 开始您的反应式旅程的激动人心的时刻。如果您不是,我们希望您会更喜欢使用 Reactor,因为所有这些更改都已到位。

如果您遇到任何迁移困难,请查看发行说明或在我们的 Gitter 上寻求帮助。

祝您反应式编码愉快!

铋晶体照片 CC-By-SA David Abercrombie 通过 Flickr

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

取得领先

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部