抢占先机
VMware 提供培训和认证,助您加速发展。
了解更多本月早些时候,我们发布了 Reactor 2020.0 的第一个里程碑版本。这个代号为 Europium 的周期接续了 Dysprosium 周期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包括 reactor-core 3.4.0 和 reactor-netty 1.0.0。
在这篇博文中,我们将介绍 reactor-core 里程碑版本的一些重点内容,并简要提及 M2 版本的规划。
关于 reactor-netty,我们会在单独的博文发布后立即在此处提供链接。
另请注意,新的版本控制方案已经到位,并已被 Spring 项目组合采纳:请参阅参考指南和这篇博文。
Processor 的变更core 中的主要变化是针对 Reactor 中的 Processor 实现及其暴露方式进行了姗姗来迟的改进。
这是 reactor-core 3.4.0-M1 的主要关注点,目标是逐步淘汰具体 FluxProcessor 变体(以及某种程度上的 MonoProcessor)的使用。
Processor 是 Reactive Streams 的一个接口,最初旨在表示反应式管道中可在库之间共享的“步骤”。但如今,操作符大多直接实现为 Publisher/Subscriber 对,因此在 Reactor 中,处理器最终涵盖了不同的用例(通常是将一个 Publisher 多播到多个 Subscriber)。
因此,用户最常将处理器视为一种“手动创建 Flux”的方式:他们不是将 Processor 连接到父发布者(即将其用作 Subscriber),而是直接调用其 onNext/onComplete/onError 方法。不幸的是,这是一种有问题的做法,因为这些调用**必须**以符合 Reactive Streams 规范的方式进行,这意味着它们需要外部同步。
从历史上看,通过在 FluxProcessor 上引入 sink() 方法缓解了这个问题。其思路是,如果您想以这种手动方式使用 FluxProcessor,您需要实例化所需的处理器变体,然后**仅调用一次**其 sink() 方法,并从此以后使用生成的 FluxSink 来触发信号给订阅者。在下游,FluxProcessor 本身被暴露出来(作为可在其上组合操作符的 Flux)。
从可发现性角度来看,这仍然存在问题,因为满足最常见用例的“正确方法”是最难想到的。
借助 3.4.0,我们打算扭转局面,将 Sink 的使用模式作为一流公民放在首位,并使 Processor 的使用模式更难被意外发现或误用。
第一个里程碑通过以下方式迈出了第一步:
FluxProcessor 实现,这些实现计划在 3.5.0 中移除。Sinks 工具类,其中包含用于手动触发 sink 的工厂方法。在 M1 中,处理器的变体仍然存在,但工厂方法已被复制到 Processors 类中,但这已经在 M2 中进行重构。我们打算在 M2 中将变体的选择移到 Sinks 上。届时,将有一种方法可以将 Sink 转换为 FluxProcessor,从而消除 M2 中对 Processors 的需求。
在 M1 中,所有具体 xxxProcessor 上的工厂方法(例如 UnicastProcessor.create())已移至 Processors 用于基本情况,或移至 Processors.more() 用于允许更精细调整的重载方法。这些方法通过前缀区分变体:
UnicastProcessor -> Processors.unicast() 和 Processors.more().unicast(...)EmitterProcessor -> Processors.multicast() 和 Processors.more().multicast(...)DirectProcessor -> Processors.more().multicastNoBackpressure()ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及 Processors.more() 上的类似方法从概念上讲,所有这些处理器都具有相同的输入和输出类型 <T>,因此它们是 FluxProcessor<T,T>。M1 中引入了一个便利接口 FluxIdentityProcessor<T>,但除了减少泛型数量之外并没有带来太多好处,因此它可能会在 M2 中移除。
但我们说过,相对于使用 Processors 中的 FluxProcessor,人们应该优先使用 Sinks。在这种情况下,首先会获得一个 sink,并将其转换为 Flux 或 Mono,供应用程序的其余部分进行组合,如下面的示例所示:
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
请注意,该类目前提供的变体比 Processors 少,但这正在 M2 中重新考虑。
一些早在 3.3.0 中已被弃用的类已被**移除**:
TopicProcessorWorkQueueProcessorSchedulers.boundedElastic() 自 3.3.0 起已可用,我们认为现在可以**弃用**其祖先方法 elastic(),而不仅仅是建议使用 boundedElastic 而非 elastic。
再往后,在 3.5.0 中,elastic Scheduler 将被移除。
这里有很多内容需要涵盖,我们将在另一篇博文中进行详细介绍。
请试用 M1 版本!
我们已经在 M2 中对 sink 和 processor 进行了进一步更改,同时还处理了其他主题,如 Context 操作符、避免在 subscribe 中抛出异常以及改进指标方面的内容。
一如既往,非常欢迎对 M1 和当前的 M2 快照版本提出反馈意见。
同时,祝您编码愉快! Reactor 团队。