Reactor 2020.0 的第一个里程碑版本(代号 Europium)

工程 | Simon Baslé | 2020年7月10日 | ...

本月早些时候,我们发布了 Reactor 2020.0 的第一个里程碑版本。这个周期,代号为 Europium,紧随 Dysprosium 周期(其中包含 reactor-core 3.3.x 和 reactor-netty 0.9.x)。

它包含 reactor-core 3.4.0 和 reactor-netty 1.0.0

在这篇博文中,我们将介绍 reactor-core 里程碑版本的一些亮点,并简要提及 M2 中的内容。

对于 reactor-netty,一旦发布,我们将在此处链接到单独的博文。

另请注意,现已实施新的版本控制方案,该方案已在 Spring 产品组合中采用:请参阅参考指南这篇博文

围绕 Processor 的更改

核心中的主要更改是围绕 Reactor 中 Processor 实现及其公开方式的长期努力。

这是 reactor-core 3.4.0-M1 的主要重点,目标是逐步淘汰 FluxProcessor 的具体类型(以及在某种程度上淘汰 MonoProcessor)。

Processor 是 Reactive Streams 中的一个接口,最初旨在作为表示反应式管道中“步骤”的一种方式,该步骤可以在库之间共享。但如今,运算符在很大程度上直接实现为 Publisher/Subscriber 对,因此在 Reactor 中,处理器最终涵盖了不同的用例(最常见的是,从一个 Publisher 到多个 Subscriber 的多播)。

因此,大多数情况下,用户将处理器视为“手动创建 Flux”的一种方式:而不是将 Processor 连接到父发布者(即将其用作 Subscriber),而是直接调用其 onNext/onComplete/onError 方法。这不幸的是一种有问题的做法,因为此类调用必须以符合 Reactive Streams 规范的方式进行,这意味着它们需要进行外部同步。

从历史上看,这已通过在 FluxProcessor 上引入 sink() 方法得到缓解。其想法是,如果您想以这种手动方式使用 FluxProcessor,则需要实例化所需的处理器类型,然后调用其 sink() 方法一次,并从那里开始使用生成的 FluxSink 来触发信号到订阅者。在下游,FluxProcessor 本身会公开(作为可以对其进行运算符组合的 Flux)。

从可发现性的角度来看,这仍然存在问题,因为满足最常见用例的“正确方法”是最难想出来的。

在 3.4.0 中,我们打算扭转这种局面,并将 Sink 使用模式置于首位,并使 Processor 使用模式更难以意外发现或误用。

此第一个里程碑版本朝着这个方向迈出了第一步,通过

  • 弃用 FluxProcessor 的所有具体实现,这些实现现在计划在 3.5.0 中删除
  • 公开一个 Sinks 实用程序类,该类包含用于旨在手动触发的接收器的工厂方法

在 M1 中,处理器类型仍然存在,但工厂方法已复制到 Processors 类中,但 M2 中已对其进行了重新设计。我们打算在 M2 中将类型选择转移到 Sinks 上。然后,将有一种方法可以从此处将 Sink 转换为 FluxProcessor,从而在 M2 中无需 Processors

在 M1 中迁移离具体处理器

M1 中,所有在具体 xxxProcessor(例如 UnicastProcessor.create())上的工厂方法都已移动到 Processors(对于基本情况)或 Processors.more()(对于允许更精细调整的重载)中。这些方法通过前缀区分类型

  • UnicastProcessor -> Processors.unicast()Processors.more().unicast(...)
  • EmitterProcessor -> Processors.multicast()Processors.more().multicast(...)
  • DirectProcessor -> Processors.more().multicastNoBackpressure()
  • ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及 Processors.more() 上的类似方法

所有这些处理器在概念上都具有相同的输入和输出类型 <T>,因此它们是 FluxProcessor<T,T>。一个便利接口 FluxIdentityProcessor<T> 已在 M1 中引入,但除了减少泛型的数量之外,它没有带来太多其他内容,因此它可能会在 M2 中删除。

但是,我们说应该优先使用 Sinks,而不是从 Processors 使用 FluxProcessor。在这种情况下,首先会获得一个接收器,然后将其转换为 FluxMono,以便应用程序的其余部分对其进行组合,如下例所示

//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();

//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();

//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();

请注意,该类目前提供的变体少于 Processors,但这正在考虑在 M2 中进行改进。

弃用和删除

删除在 3.3.0 中弃用的几个类

  • TopicProcessor
  • WorkQueueProcessor

Schedulers.boundedElastic() 从 3.3.0 开始就已存在,我们认为现在可以弃用其祖先 elastic(),而不是仅仅建议在 elastic 上使用 boundedElastic。

在未来的 3.5.0 中,将删除 elastic Scheduler

Reactor-Netty 即将发布 1.0 版本

这里有很多内容需要介绍,我们将在单独的博文中进行介绍。

总结

请尝试使用M1

我们已经在 M2 中对接收器和处理器进行了进一步的更改,以及其他主题,例如 Context 运算符、避免在 subscribe 中抛出异常以及改进指标周围的故事。

与往常一样,非常欢迎您对 M1 和当前 M2 快照提供反馈。

同时,祝您反应式编码愉快!Reactor 团队。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部