领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多本月早些时候,我们发布了 Reactor 2020.0 的第一个里程碑版本。这个周期,代号为 Europium
,紧随 Dysprosium 周期(其中包含 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包含 reactor-core 3.4.0
和 reactor-netty 1.0.0
。
在这篇博文中,我们将介绍 reactor-core 里程碑版本的一些亮点,并简要提及 M2 中的内容。
对于 reactor-netty,一旦发布,我们将在此处链接到单独的博文。
另请注意,现已实施新的版本控制方案,该方案已在 Spring 产品组合中采用:请参阅参考指南和这篇博文。
Processor
的更改核心中的主要更改是围绕 Reactor 中 Processor
实现及其公开方式的长期努力。
这是 reactor-core 3.4.0-M1
的主要重点,目标是逐步淘汰 FluxProcessor
的具体类型(以及在某种程度上淘汰 MonoProcessor
)。
Processor
是 Reactive Streams 中的一个接口,最初旨在作为表示反应式管道中“步骤”的一种方式,该步骤可以在库之间共享。但如今,运算符在很大程度上直接实现为 Publisher/Subscriber
对,因此在 Reactor 中,处理器最终涵盖了不同的用例(最常见的是,从一个 Publisher
到多个 Subscriber
的多播)。
因此,大多数情况下,用户将处理器视为“手动创建 Flux
”的一种方式:而不是将 Processor
连接到父发布者(即将其用作 Subscriber
),而是直接调用其 onNext
/onComplete
/onError
方法。这不幸的是一种有问题的做法,因为此类调用必须以符合 Reactive Streams 规范的方式进行,这意味着它们需要进行外部同步。
从历史上看,这已通过在 FluxProcessor
上引入 sink()
方法得到缓解。其想法是,如果您想以这种手动方式使用 FluxProcessor
,则需要实例化所需的处理器类型,然后调用其 sink()
方法一次,并从那里开始使用生成的 FluxSink
来触发信号到订阅者。在下游,FluxProcessor
本身会公开(作为可以对其进行运算符组合的 Flux
)。
从可发现性的角度来看,这仍然存在问题,因为满足最常见用例的“正确方法”是最难想出来的。
在 3.4.0 中,我们打算扭转这种局面,并将 Sink
使用模式置于首位,并使 Processor
使用模式更难以意外发现或误用。
此第一个里程碑版本朝着这个方向迈出了第一步,通过
FluxProcessor
的所有具体实现,这些实现现在计划在 3.5.0
中删除Sinks
实用程序类,该类包含用于旨在手动触发的接收器的工厂方法在 M1 中,处理器类型仍然存在,但工厂方法已复制到 Processors
类中,但 M2 中已对其进行了重新设计。我们打算在 M2 中将类型选择转移到 Sinks
上。然后,将有一种方法可以从此处将 Sink
转换为 FluxProcessor
,从而在 M2 中无需 Processors
。
在 M1
中,所有在具体 xxxProcessor
(例如 UnicastProcessor.create()
)上的工厂方法都已移动到 Processors
(对于基本情况)或 Processors.more()
(对于允许更精细调整的重载)中。这些方法通过前缀区分类型
UnicastProcessor
-> Processors.unicast()
和 Processors.more().unicast(...)
EmitterProcessor
-> Processors.multicast()
和 Processors.more().multicast(...)
DirectProcessor
-> Processors.more().multicastNoBackpressure()
ReplayProcessor
-> Processors.replayAll()
/replay(int)
/replayTimeout(Duration)
/replaySizeAndTimeout(int, Duration)
以及 Processors.more()
上的类似方法所有这些处理器在概念上都具有相同的输入和输出类型 <T>
,因此它们是 FluxProcessor<T,T>
。一个便利接口 FluxIdentityProcessor<T>
已在 M1 中引入,但除了减少泛型的数量之外,它没有带来太多其他内容,因此它可能会在 M2 中删除。
但是,我们说应该优先使用 Sinks
,而不是从 Processors
使用 FluxProcessor
。在这种情况下,首先会获得一个接收器,然后将其转换为 Flux
或 Mono
,以便应用程序的其余部分对其进行组合,如下例所示
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
请注意,该类目前提供的变体少于 Processors
,但这正在考虑在 M2 中进行改进。
已删除在 3.3.0 中弃用的几个类
TopicProcessor
WorkQueueProcessor
Schedulers.boundedElastic()
从 3.3.0 开始就已存在,我们认为现在可以弃用其祖先 elastic()
,而不是仅仅建议在 elastic 上使用 boundedElastic。
在未来的 3.5.0 中,将删除 elastic
Scheduler
。
这里有很多内容需要介绍,我们将在单独的博文中进行介绍。
请尝试使用M1!
我们已经在 M2 中对接收器和处理器进行了进一步的更改,以及其他主题,例如 Context
运算符、避免在 subscribe
中抛出异常以及改进指标周围的故事。
与往常一样,非常欢迎您对 M1 和当前 M2 快照提供反馈。
同时,祝您反应式编码愉快!Reactor 团队。