反应堆铋已发布

发布 | Simon Baslé | 2017 年 9 月 28 日 | ...

我很高兴地宣布 **Reactor Bismuth** 的 GA 版本发布,其中尤其包含了 reactor-core **3.1.0.RELEASE** 和 reactor-netty **0.7.0.RELEASE** ?

随着 Spring Framework 5.0 现在刚刚发布,您可以想象这对 Project Reactor 来说是一个巨大的进步 :)

bismuth crystal

就像铋晶体一样,这个版本非常复杂

 
此版本包含许多更改和 API 优化。对于 reactor-core,您可以在 发行说明 中找到详尽的列表。

重要

reactor-corereactor-test 的 **源代码** jar 文件存在一个 **已知(轻微)问题**:它们包含重复的 Java 源代码条目。有关更多详细信息,请参阅问题 #887

让我们概述一下此版本中的新功能和值得注意的功能

API 最终优化

在 3.0.x 周期的最后一个版本和 3.1.0 之间进行了一些 API 更改。我们希望为 Spring Framework 5 的长期运行提供最佳的 API,因此这些重大更改是必要的。发行说明包含更详尽的此类更改列表,但让我们看一下其中的一些。

错误处理运算符

错误处理运算符在 FluxMono 中都变得更加一致。所有运算符都使用 onError 前缀,并且 API 在两个类中都保持一致

  • **onErrorReturn** 切换到备用值(以前在 Mono 中为 otherwiseReturn

  • **onErrorResume** 切换到备用 Publisher(以前在 Flux 中为 onErrorResumeWith,在 Mono 中为 otherwise

  • **onErrorMap** 将 Exception 转换为另一个异常(以前在 Flux 和 Mono 中都为 mapError

  • **switchIfEmpty** 如果源为空,则切换到备用序列(以前在 Mono 中为 otherwiseIfEmpty

  • Flux#switchOnError 已被移除,可以使用 onErrorResume 并使用忽略其参数的 lambda 表达式来实现相同的功能。

FluxMono 概念和运算符之间的对齐

Mono API 倾向于在某些情况下偏离 Flux API,尽管概念相似。在有意义的地方,这些差异已经得到修复。

whenandzipzipWith 的对齐

例如,Mono 具有 and 运算符和 when 静态方法 **用于组合元素并生成 Mono<Tuple2>**。从本质上讲,它们基本上是 FluxzipWithzip 的专门化,但具有不同的名称使得难以关联这些概念。这就是为什么这些方法在 Mono 中被重命名的原因。

另一方面,您会注意到 Mono API 中仍然存在不同类型的 whenand。与我们之前看到的相反,这些返回 Mono<Void>。处理任务的完成是 Mono 的一个基本用例,这些方法现在专门针对此类应用程序量身定制:它们将组合并执行多个任务(表示为源 Monos),忽略其潜在的 onNext 信号,并且只传播 onComplete 信号的组合,从而产生一个 Mono<Void>,该信号在 N 个任务完成后完成。

then 前缀的一致语义

类似地,then 前缀现在始终表示要丢弃源的 onNext,而是构建在终端信号之上。这在 FluxMono 中都保持一致

  • then() 返回一个 Mono<Void>,它只传播源的 onCompleteonError 信号。

  • then(Mono<V>) 返回一个 Mono<V>:它在切换到另一个提供的 Mono 之前等待原始的 onComplete 信号,只发出来自该另一个 Mono 的元素。

  • thenMany(Publisher<V>) 类似,只是它会继续到 Flux<V> 中。

  • thenEmpty(Publisher<Void>) 返回一个 Mono<Void>,该信号在原始 Mono 然后 Publisher 完成后完成。也就是说,它表示顺序完成,与 and 立即订阅两个序列不同。

请注意,采用 Supplier 参数的变体已被完全移除(它们的延迟语义可以用 Mono.defer 替换)。此外,上面描述的 Mono#thenEmpty 已从 then(Publisher<Void>) 重命名。

一个 then 如何实际上是 flatMap

Mono 曾经有另一个有趣的 then 变体

Mono<V> then(Function<T, Mono<V>> thenFunction);

仔细观察,这与 then* 专注于终端信号的新语义不符。相反,它会将源转换为另一个 Mono具体取决于源的 onNext。听起来熟悉吗?这确实与 Flux#flatMap 的作用一致!

不过,存在一个问题:Mono 已经有一个 flatMap 运算符

Flux<V> flatMap(Function<T, Publisher<V>> mapper);

经过更多思考,我们认识到 flatMap 的经典语义是返回与应用 flatMap 的类型相同的类型的值。因此,让 Mono#flatMap 返回 Mono 更正确……

因此,我们将此 then 变体重命名为 flatMap,并在将返回 Flux 的变体上使用 Many 后缀

Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);

提示

为了简化迁移,我们建议您首先搜索所有旧 Mono.flatMap 的用法,并将这些用法替换为 flatMapMany。然后搜索 then(Function) 的用法,并将这些用法替换为 flatMap

各种 API 简化

**时间**:在此版本中,所有处理时间的运算符都专门通过 Duration 类型进行。大多数运算符以前都具有一个带有 *Millis 后缀的变体,该变体使用 longTimeUnit 来表示持续时间。这些变体已在 3.1.0 中删除。

**延迟错误**:另一个涉及后缀的跨领域更改:某些运算符具有可选配置,它们可以组合多个错误并在最后发出 onError 信号,从而允许某些值仍然进入生成的序列中。例如,flatMap 可以立即停止(如果内部序列发出 onError),或者在传播该错误之前继续合并来自其他内部序列的元素。

在某些情况下,该可选行为在 API 中表示为 boolean 标志参数。在其他情况下,它是一个以 DelayError 为后缀的单独变体。在 3.1.0 中,所有这些变体都已对齐,以始终使用 *DelayError 后缀而不是布尔标志。

**简化的接口**:一些专门的接口也已删除,取而代之的是更简单或更通用的替代方案

  • Cancellation 接口已被移除,取而代之的是更通用的 Disposable 接口。

  • TimedScheduler 接口已被移除。少数几个不具备时间功能的 Scheduler,当尝试在其上使用 schedulePeriodically 时,将抛出 RejectedExecutionException 异常来指示这一点。此外,Scheduler#shutdown 方法已被移除,取而代之的是 Disposable 接口中的 dispose() 方法。

  • 几个用于内省的接口已被简化为单个 Scannable 接口,该接口通过其 scan(Scannable.Attr) 方法(以尽力而为的方式)公开有关操作符当前状态的信息。

  • QueueSupplier 已重命名为 Queues,现在只是一个工具类(不再是 Supplier)。

Reactor 测试和 Kotlin 扩展已迁移

Kotlin 扩展和 Reactor 测试构件已直接集成到 reactor-core 的主存储库中。

  • Kotlin 扩展是 reactor-core 构件的一部分。不再需要依赖 io.projectreactor:reactor-kotlin-extensions

  • reactor-test 现在与 reactor-core 具有相同的 groupId。

    • io.projectreactor.addons:reactor-test 替换为 io.projectreactor:reactor-test

Kotlin 和空安全支持

在此发布周期中,我们一直致力于更好地与 Kotlin 等语言集成。

这尤其体现在一些 API 的重构上,以避免 lambda 表达式出现歧义签名。当一个方法有两个重载,而它们仅仅是所接受的功能接口类型不同时,我们将其中一个变体提取出来作为操作符的新后缀。

例如,考虑 buffer(Publisher, Supplier)buffer(Publisher, Function)。第二个变体已重命名为 bufferWhen,因为它在来自该 Function 的配套 Publisher 发射时创建缓冲区。

正如我们上面所看到的,**Kotlin 扩展**也已直接集成到 reactor-core 存储库中。

此外,通过引入三个注解,改进了对空安全分析的支持。这些注解基于 JSR 305,尽管该标准处于休眠状态,但仍被包括 IntelliJ IDE 在内的多个静态分析工具所使用。以下注解在 reactor.util.annotation 包中提供。

  • @NonNull 指示特定参数、返回值或字段不能为 null。(在 @NonNullApi 适用的参数和返回值上不需要它)。

  • @Nullable 指示参数、返回值或字段可以为 null。

  • @NonNullApi 是一个包级注解,指示非 null 是参数和返回值的默认行为。

我们利用这些注解在所有公共 Reactor Core API 上表达明确且可操作的空安全契约。

文档完善

文档也得到了改进:参考指南终于完成,并且已经审查和改写了 javadoc,以便更清楚地描述某些方法。

随着 Kotlin 支持直接集成到 reactor-core 中,参考指南中添加了一个新章节,并且 Kotlin 文档已发布

Context

现在可以将上下文数据附加到 FluxMono,每个订阅作为一个类似 Map 的 Context 对象!

这是一个高级功能,主要会让库开发者感兴趣,但我们知道它将被证明在迁移以前依赖于命令式代码中 ThreadLocal 的功能方面非常有价值。

在 Spring 产品组合中,我们预计 spring-securityspring-cloud-sleuth 将从中受益匪浅。

为了向 Context 添加信息,请使用 subscriberContext(Context) 操作符,如下例所示。

Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
    .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
    .filter(t -> t.getT1() < 300)
    .map(Tuple2::getT2);

提示

Context 是不可变的,并在订阅阶段传播,该阶段从链的末端(subscribe() 调用)向链的开头运行。
因此,通过 subscriberContext 放入 Context 中的内容会导致 Context 的一个富化副本,**只有上游的操作符才能看到**。

为了检索和使用放入 Context 中的信息,操作符的上游链可以使用 Mono.subscriberContext(),它将可见的 Context 物化(例如,在 flatMap 中)。

它可能看起来像以下代码片段。

Mono<Tuple2<String, Optional<Object>>> dataAndContext =
  data.zipWith(Mono.subscriberContext()
                   .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
  );

访问参考指南以了解更多关于 Context 的信息。

其他有趣的新功能

添加了 expand 操作符

此操作符可用于递归地将源元素扩展为嵌套的 Publishers,生成一个深度优先或广度优先的 Publisher 图。

null 提供程序映射到空 Mono

当将返回 null 的源 CallableSupplier 转换为 Mono 时,fromCallablefromSupplier 过去会发出错误。现在它们可以容纳 null 结果,并改为生成一个空的 Mono

添加了为反应式序列命名和标记的功能

可以为 Flux 提供单个 name(String)。可以使用 Scannable.name() 检索它,该方法向上游遍历操作符链,直到找到第一个声明的名称。

类似地,可以将多个 tag(String, String) 键值对与 FluxMono 关联。可以通过 Scannable#tags() 方法将其检索为 Stream<Tuple2<String, String>>,该方法向上游遍历整个操作符链。

添加了带 BiPredicatedistinctUntilChanged 变体

BiPredicate 不使用 Set 语义来评估是否存在更改,而是将其应用于当前源元素和最后发出的元素。这使得如果元素太接近于最后发出的元素(例如,差值 < 1 的 Doubles),则可以跳过这些元素。

添加了 Mono.cache(Duration)

这使得可以轻松地缓存一个难以计算的单一值(或错误)一段时间。在 TTL 周期结束后,第一个进入的订阅者将重新触发对源的订阅。

改进了 checkpoint(String) 的性能

checkpoint(String) 变体现在默认情况下是轻量级的,这意味着在实例化时不会填充堆栈跟踪(使操作符的使用成本更低)。我们现在假设 String 标识符足够唯一,足以找到以错误终止的序列的实例化点。

refCount 添加宽限期

使用 refCount 时,现在可以提供一个 Duration。当引用计数序列的订阅者数量低于阈值时,操作符会等待该持续时间,而不是立即取消订阅源。如果在此宽限期内有足够的订阅者返回,则不会发生取消。

Mono delayUntil 替换 untilOther

delayUntil 操作符延迟 Mono 的发射,直到伴随的 Publisher(从源值生成)完成之后。

请注意,之前有一个 untilOther 操作符已被移除。它过去也用于延迟,但会在伴随的 Publisher 的第一个 onNext 时触发。delayUntil 更加灵活,因为可以通过在伴随的 Publisher 上附加一个 take(1) 来实现相同的行为。

测试功能

reactor-test 工件也有一些新功能

  • 围绕使用断言兼容的错误验证的新期望:expectErrorSatisfiesverifyErrorSatisfies

  • StepVerifier#verify() 添加了一个可选的可配置默认超时。通过使用静态 StepVerifier#setDefaultTimeout 方法进行设置。

  • 添加了一个 PublisherProbe,以便轻松检查具有条件切换(例如 switchIfEmpty)的复杂操作符链是否通过逻辑分支,同时仍通过包装任何 FluxMono 为测试发出有意义的数据。

结论

如果您是 Reactor 的新手,现在是开始使用 Spring Framework 5.0 进行响应式之旅的激动人心的时刻。如果您不是新手,我们希望您现在能够更加享受使用 Reactor,因为所有这些更改都已到位。

如果您遇到任何迁移困难,请查看 发行说明 或在我们的 Gitter 上寻求帮助。

快乐的响应式编码!

铋晶体照片 CC-By-SA David Abercrombie 通过 Flickr

获取 Spring 新闻

与 Spring 新闻保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部