领先一步
VMware 提供培训和认证,为您的进步加速助力。
了解更多我代表 Reactor 团队高兴地宣布最新的 Reactor 里程碑版本 Californium-M1
? ?
团队一直忙于开发 Reactor 3 的第三个主要版本 Californium
。我们现在准备就一些选定的问题收集您的反馈,并且我们还准备了许多增强功能和错误修复供您使用。
作为第三个发布列车,我们继续采用元素周期表中按字母顺序递增的名称主题。锎 (Californium) 是一种首次在加利福尼亚合成的元素。
该里程碑的 BOM 包含
reactor-core
3.2.0.M3
reactor-extra
3.2.0.M1
(包含一些 API 对齐更改)reactor-netty
0.8.0.M1
今年早些时候(M1),reactor-core
发布了一个早期预览版,主要聚焦于“错误模式继续”功能,并且核心库还在 6 月份发布了一个脱离发布列车的里程碑版本(M2)。这篇博客文章涵盖了后者的更改以及全新的 M3 中的更改。
这里的重点是 reactor-netty
。预计会有一篇更全面的博客文章详细介绍 API 更改和新功能的理由,其中包括:
团队对 API 进行了大量重构,使其在构建客户端和服务器时更具指导性,避免了在 0.7.x 版本中容易发生的难以容忍的配置错误。
新的 API 也更清晰地阐明了生命周期。
是的,HTTP2 支持 ? 目前主要以透明方式升级到 HTTP2,但我们正在努力在不久的将来将 HTTP2 独立流作为一等公民加入。
总的来说,与之前的 Bismuth 迭代相比,M2 和 M3 带来了 70 多项更改。
reactor-core 中的 API 更改比 reactor-netty 少,更新注意事项主要涉及里程碑版本本身的差异。有关更多详细信息,请参阅 M2 和 M3 的“更新注意事项”部分。
我们最需要您对以下功能的反馈
已添加 Micrometer 和 .metrics()
支持(#1183, #1123)。新的 .metrics()
操作符仅在 classpath 中存在 Micrometer
时才会执行某些操作。
它记录 onNext
时间、订阅到完成时间、信号计数等指标——所有这些都从紧邻上游操作符产生信号的角度进行衡量。
它是在 M2 中引入的,但在 M3 中得到了改进,并且某些标签名称发生了(破坏性)更改(#1245)。
请注意,一个重要目标是避免在 Reactor 公共 API 中暴露 Micrometer 的内容。我们不希望对 Micrometer 产生强制依赖,并且努力将其使用限制在仅当我们在 classpath 中检测到它时才加载的内部类中。
下一步:在 GA 版本之前,还应该为
Schedulers
(或者更确切地说,支持某些Schedulers
的ExecutorServices
)提供基本的测量支持。我们还在寻找一种方法,可以在全局范围内为 Reactor 选择特定的MeterRegistry
,同样不暴露引用MeterRegistry
接口的公共 API。
我们添加了一个预配置的 retryWhen
替代方案,具有指数回退和抖动 (retryBackoff()
)。详见 #1122。
此版本的重试反映了我们认为是业界在重试方面的最佳实践。它是过于简单的 retry(n)
、复杂的 retryWhen(Function)
和 reactor-addons
中更可配置的 RetryFunction
之间的一个很好的折中方案。
为了帮助您构建响应式事务块,我们添加了 usingWhen
。与 using
类似,它包装一个资源,从中生成一个 Flux
,并确保当 Flux 终止时资源得到妥善清理。
主要区别在于
Publisher
异步提供。Function<Resource, Publisher>
),并且只延迟终止信号的传播,不延迟 onNext 信号。这在 M2 中引入,但在 M3 中略有更改,以修复 Context
传播并支持取消 Publisher<Resource>
。通过在 Resource
甚至发出之前取消此操作符返回的主 Flux<T>
,您的取消指令将传播到 Publisher<Resource>
。
onDiscard
钩子这个全局钩子以 Consumer<Object>
的形式出现,旨在为处理需要特殊清理的堆外对象的高级用户提供最后一块缺失的功能。
通常,Netty 的 ByteBuf
或 Spring 5 的 DataBuffer
属于这一类:它们是池化的堆外对象,在使用完毕后需要调用 release()
,否则可能导致内存泄漏。
这些元素可能在响应式序列的缝隙中丢失,并在以下三种主要情况下永远不会到达用户代码:
onComplete
信号之后发出 ByteBuf
)。filter
)。情况 (1) 已经被 onNextDropped
钩子覆盖,但情况 (3) 绝对没有。情况 (2)(过滤语义)有点居中,例如可以在过滤 Predicate
内部进行清理。但这很麻烦且容易被遗忘。
因此,我们将 onDiscard
添加到我们的 Hooks
库中,以涵盖 (2) 和 (3)。请注意,与“错误时继续”功能不同,目前还没有公共 API 可以在特定的 Flux
实例上设置该钩子。存在一个通过 Context
实现的未支持的变通方法,并且官方 API 很可能在 GA 版本或以后的维护版本中出现。
onDiscard
钩子具有以下特性和要求
Hooks.onDiscard(Consumer)
将使用 Consumer#andThen
组合这两个消费者。Consumer
进行重置)。Consumer
在类型转换之前必须执行 instanceof
检查,因为它将与不同类型的对象一起使用。Consumer
不得抛出异常,并且应根据需要包含 try catch
块。Consumer
必须是幂等的,因为它可能在同一个实例上被多次调用(例如,在缓冲区重叠的情况下)。另外值得一提的是,errorStrategyContinue()
在 M3 中已重命名为 onErrorContinue()
。
最后,reactor-extra
在 retry/repeat 工具方面进行了一些次要的 API 更改。它与 core
操作符对齐,使用相同的默认值,并且使用 Long
而不是 Integer
作为索引。
reactor-core
的下一步是对 Processor
对象的暴露方式进行重构。当前的 FluxProcessor<IN, OUT>
有点臃肿,因为它扩展并暴露了整个 Flux
API。
此外,FluxProcessor#sink()
及其相关的 FluxSink
太容易被误用,尤其是在既希望将 Processor
订阅到 Publisher
源,又希望通过 sink()
手动向其推送数据时,这目前实际上是不支持的。另外,sink()
应该只调用一次,并且返回的 FluxSink<T>
实例应该被重用的事实也不够清晰。
因此,我们正在考虑在 Processor<T, T>
上构建一个外观模式,它直接实现 FluxSink
(而不是 Flux
),可以在同时用作订阅者和 sink 时工作,并且包含一个 asFlux()
视图方法,以便选择性地在其之上构建一个 Flux
操作符链。
MonoProcessor
很可能会沿着这些步骤发展,成为一个(更简单的)接口,具体实现将被重命名为 MonoNextProcessor
。我们还在考虑提供一个独立的 MonoSink
实现,用户可以直接操作它,而无需使用 Mono.create()
。
酷人从不等待 GA 版本发布!赶快去试试那个闪亮的里程碑版本吧,一如既往地欢迎您的反馈。 :)
祝您响应式编程愉快!