Reactor Californium-M1,今夏的里程碑版本列车

工程 | Simon Baslé | 2018 年 8 月 7 日 | ...

我代表 Reactor 团队高兴地宣布最新的 Reactor 里程碑版本 Californium-M1 ? ?

团队一直忙于开发 Reactor 3 的第三个主要版本 Californium。我们现在准备就一些选定的问题收集您的反馈,并且我们还准备了许多增强功能和错误修复供您使用。

Californium-M1 BOM

作为第三个发布列车,我们继续采用元素周期表中按字母顺序递增的名称主题。锎 (Californium) 是一种首次在加利福尼亚合成的元素。

该里程碑的 BOM 包含

  • reactor-core 3.2.0.M3
  • reactor-extra 3.2.0.M1(包含一些 API 对齐更改)
  • reactor-netty 0.8.0.M1

今年早些时候(M1),reactor-core 发布了一个早期预览版,主要聚焦于“错误模式继续”功能,并且核心库还在 6 月份发布了一个脱离发布列车的里程碑版本(M2)。这篇博客文章涵盖了后者的更改以及全新的 M3 中的更改。

Reactor Netty 0.8.0.M1

这里的重点是 reactor-netty。预计会有一篇更全面的博客文章详细介绍 API 更改和新功能的理由,其中包括:

API 重构

团队对 API 进行了大量重构,使其在构建客户端和服务器时更具指导性,避免了在 0.7.x 版本中容易发生的难以容忍的配置错误。

新的 API 也更清晰地阐明了生命周期。

HTTP2 支持

是的,HTTP2 支持 ? 目前主要以透明方式升级到 HTTP2,但我们正在努力在不久的将来将 HTTP2 独立流作为一等公民加入。

Reactor Core 3.2.0

总的来说,与之前的 Bismuth 迭代相比,M2 和 M3 带来了 70 多项更改。

reactor-core 中的 API 更改比 reactor-netty 少,更新注意事项主要涉及里程碑版本本身的差异。有关更多详细信息,请参阅 M2M3 的“更新注意事项”部分。

我们最需要您对以下功能的反馈

指标

已添加 Micrometer 和 .metrics() 支持(#1183, #1123)。新的 .metrics() 操作符仅在 classpath 中存在 Micrometer 时才会执行某些操作。

它记录 onNext 时间、订阅到完成时间、信号计数等指标——所有这些都从紧邻上游操作符产生信号的角度进行衡量。

它是在 M2 中引入的,但在 M3 中得到了改进,并且某些标签名称发生了(破坏性)更改(#1245)。

请注意,一个重要目标是避免在 Reactor 公共 API 中暴露 Micrometer 的内容。我们不希望对 Micrometer 产生强制依赖,并且努力将其使用限制在仅当我们在 classpath 中检测到它时才加载的内部类中。

下一步:在 GA 版本之前,还应该为 Schedulers(或者更确切地说,支持某些 SchedulersExecutorServices)提供基本的测量支持。我们还在寻找一种方法,可以在全局范围内为 Reactor 选择特定的 MeterRegistry,同样不暴露引用 MeterRegistry 接口的公共 API。

高级重试

我们添加了一个预配置的 retryWhen 替代方案,具有指数回退和抖动 (retryBackoff())。详见 #1122

此版本的重试反映了我们认为是业界在重试方面的最佳实践。它是过于简单的 retry(n)、复杂的 retryWhen(Function)reactor-addons 中更可配置的 RetryFunction 之间的一个很好的折中方案。

基于资源的响应式闭包

为了帮助您构建响应式事务块,我们添加了 usingWhen。与 using 类似,它包装一个资源,从中生成一个 Flux,并确保当 Flux 终止时资源得到妥善清理。

主要区别在于

  • 资源通过 Publisher 异步提供。
  • 清理也是异步的 (Function<Resource, Publisher>),并且只延迟终止信号的传播,不延迟 onNext 信号。
  • 该操作符可以针对完成、错误和取消终止分别进行异步“清理”。

这在 M2 中引入,但在 M3 中略有更改,以修复 Context 传播并支持取消 Publisher<Resource>。通过在 Resource 甚至发出之前取消此操作符返回的主 Flux<T>,您的取消指令将传播到 Publisher<Resource>

onDiscard 钩子

这个全局钩子以 Consumer<Object> 的形式出现,旨在为处理需要特殊清理的堆外对象的高级用户提供最后一块缺失的功能。

通常,Netty 的 ByteBuf 或 Spring 5 的 DataBuffer 属于这一类:它们是池化的堆外对象,在使用完毕后需要调用 release(),否则可能导致内存泄漏。

这些元素可能在响应式序列的缝隙中丢失,并在以下三种主要情况下永远不会到达用户代码:

  1. 操作符的源格式错误且不遵守 RS 规范(例如,在发出 onComplete 信号之后发出 ByteBuf)。
  2. 操作符作为其语义的一部分过滤某些元素(例如 filter)。
  3. 操作符为了背压优化目的进行预取,并且被取消,从而丢弃其预取队列。

情况 (1) 已经被 onNextDropped 钩子覆盖,但情况 (3) 绝对没有。情况 (2)(过滤语义)有点居中,例如可以在过滤 Predicate 内部进行清理。但这很麻烦且容易被遗忘。

因此,我们将 onDiscard 添加到我们的 Hooks 库中,以涵盖 (2) 和 (3)。请注意,与“错误时继续”功能不同,目前还没有公共 API 可以在特定的 Flux 实例上设置该钩子。存在一个通过 Context 实现的未支持的变通方法,并且官方 API 很可能在 GA 版本或以后的维护版本中出现。

onDiscard 钩子具有以下特性和要求

  • 它是累加的,这意味着两次调用 Hooks.onDiscard(Consumer) 将使用 Consumer#andThen 组合这两个消费者。
  • 它不是按键的,这意味着多次调用是累加的,但只能完全重置(而不是按每个 Consumer 进行重置)。
  • Consumer 在类型转换之前必须执行 instanceof 检查,因为它将与不同类型的对象一起使用。
  • Consumer 不得抛出异常,并且应根据需要包含 try catch 块。
  • Consumer 必须是幂等的,因为它可能在同一个实例上被多次调用(例如,在缓冲区重叠的情况下)。

另外值得一提的是,errorStrategyContinue() 在 M3 中已重命名为 onErrorContinue()

Reactor Extra 3.2.0.M1

最后,reactor-extra 在 retry/repeat 工具方面进行了一些次要的 API 更改。它与 core 操作符对齐,使用相同的默认值,并且使用 Long 而不是 Integer 作为索引。

下一步

reactor-core 的下一步是对 Processor 对象的暴露方式进行重构。当前的 FluxProcessor<IN, OUT> 有点臃肿,因为它扩展并暴露了整个 Flux API。

此外,FluxProcessor#sink() 及其相关的 FluxSink 太容易被误用,尤其是在既希望将 Processor 订阅到 Publisher 源,又希望通过 sink() 手动向其推送数据时,这目前实际上是不支持的。另外,sink() 应该只调用一次,并且返回的 FluxSink<T> 实例应该被重用的事实也不够清晰。

因此,我们正在考虑在 Processor<T, T> 上构建一个外观模式,它直接实现 FluxSink(而不是 Flux),可以在同时用作订阅者和 sink 时工作,并且包含一个 asFlux() 视图方法,以便选择性地在其之上构建一个 Flux 操作符链。

MonoProcessor 很可能会沿着这些步骤发展,成为一个(更简单的)接口,具体实现将被重命名为 MonoNextProcessor。我们还在考虑提供一个独立的 MonoSink 实现,用户可以直接操作它,而无需使用 Mono.create()

结论

酷人从不等待 GA 版本发布!赶快去试试那个闪亮的里程碑版本吧,一如既往地欢迎您的反馈。 :)

祝您响应式编程愉快!

订阅 Spring 邮件列表

通过 Spring 邮件列表保持联系

订阅

领先一步

VMware 提供培训和认证,为您的进步加速助力。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部