Reactor 2.5:JVM 的第二代反应式基础

版本 | Stephane Maldini | 2016年2月16日 | ...

进入反应式流时代

Reactor 2.0 的开发始于 2014 年底,大约与 Reactive Streams 同时期。我们渴望加入这项工作,并尽早采用 背压协议 来缓解我们主要的信令传递限制:容量边界。我们在 Reactor 2.0 中首次尝试使 基于环形缓冲区 的调度器的 Reactive Streams 实现,并由此衍生出一个越来越受欢迎的反应式模式:Reactive Extensions

与此同时,Reactive Streams 开始获得关注,整个库生态系统都在讨论这种转变。普遍的担忧是什么?实现 Reactive Streams 语义绝非易事。我们观察到,越来越需要一个反应式基础来解决信令传递并实现常见的流操作符。因此,我们为 Reactor Core 创建了一个专门的项目空间,并与 Spring Framework 团队 展开了一项重点工作。

从 2.5 开始,Reactor 现在 被组织成多个项目,例如 2.0.x 这样的维护分支保持不变。这反映在版本管理中,例如 Reactor Core 2.5 M1 是唯一可用的里程碑,其他项目将遵循其专属版本控制。

为了支持这种新的项目模型,我们在 http://projectreactor.io 上部署了一个新的、希望更友好的网站。

对 Reactive Streams 的协作式新视角

这种新的组织方式降低了参与项目活动的成本。该项目受益于 Spring API 设计协作以及来自 Sébastien DeleuzeBrian Clozel 等人的直接贡献。Reactor 也欢迎新的外部贡献者和审阅者

Reactor 2.5 中的依赖项和协作
Reactor 2.5

Reactive Streams Commons

Reactive Streams Commons 存储库 是一项开放的研究工作,专注于使用 Reactive Extensions 等技术提高 Reactive Streams 规范 的效率。它完全由 Reactor Core 和 Stream 内联,后者充当许多革命性工作的契约网关。

“RSC”因此是一个自由形式的项目,类似于 JCTools 对并发队列的处理方式。其最大的进步之一是“融合”协议的形式,以减少反应式处理链中大多数同步和某些异步阶段的开销。最后,这项工作帮助修复了 100 多个流错误,我们的测试过程现在包括 RSC 单元/集成测试和 JMH 基准测试,并结合了 Reactor 自身的集成测试和 基准测试

Reactor Core 2.5.0.M1

今天的 Reactor 博客系列以一个令人愉快的事件开始,Reactor Core 2.5.0.M1 发布!在新的范围内以及与 Reactive Streams Commons 的紧密联系下,Reactor Core 提供了构建反应式应用或库所需的 Rx 覆盖范围,例如 Spring Reactive Web 支持。对于心急的读者,可以查看 GitHub 上已经 提供的快速入门指南

快速浏览散布-收集场景

Mono.from(userRequestPublisher)
    .then(userRepository::findUserProfile, 
          userRepository::findUserPaymentMethod)
    .log("user.requests")
    .or(Mono.delay(5)
            .then(n -> Mono.error(new TimeoutException()))
    .mergeWith(userRepository::findSimilarUserDetails)
    .map(userDetailsTuple -> userDetailsTuple.t1.username)
    .publishOn(SchedulerGroup.io())
    .subscribe(responseSubscriber);

详细信息

  • Flux,一个 Publisher,用于发布0 到 N个数据信号,并具有轻量级的 Rx 范围。操作符包括create()interval()merge()zip()concat()switchOnError()switchOnEmpty()

Flux in action

  • Mono,一个 Publisher,用于发布0 或 1个数据信号,并具有轻量级的 Rx 范围,该范围适用于此特定数据量的强类型。操作符包括delay()then()any()and()or()otherwise()otherwiseIfEmpty()where() 和阻塞式get()

Mono in action

  • 基于普通 Java 接口(Runnable、Callable)的新型简单调度约定。

-- 包括 SchedulerGroupTopicProcessorWorkQueueProcessor。 -- 取代以前的Enviroment/Dispatcher对,同时满足相同需求,并且很快就会提供简单的迁移路径文档。不再存在保存调度器引用的静态状态。 -- 关联操作符:publishOn()dispatchOn()

  • 使用 TestSubscriberPublisher源进行测试支持。
  • CallableRunnableIterable、Java 8 CompletableFuture、Java 9 Flow.Publisher、RxJava 1 ObservableSingle 转换为 Reactive Streams 就绪的FluxMono,无需额外的桥接依赖项。
  • 完全改进和集成的 Javadoc,包括略微调整的大理石图。
  • 一组微型工具集,用于实用程序和基本 Subscriber,可随意重用以实现您自己的反应式组件。

-- 成本高效的 Timer API 和实现(哈希轮计时器)。 -- 新的 Fusion API,可在虚拟上合并反应式链中的两个或多个阶段 -- 一个经过调整的QueueSupplier,它将为正确的容量提供正确的队列

  • 基于 状态 表示的新型内省 API。

-- Publisher 日志记录,如果可用,则回退到 java.util.logging 或 SLF4J。可以直接在 FluxMono 上使用 log() 运算符。-- 与任何其他契约正交,包括 Reactive Streams,所有内容都可以是 BackpressurableCompletableReceiver,生成到泛型 Object(可能是 Subscriber),这反过来允许我们跟踪流的完整图并用状态指示器对其进行增强:

接下来是什么?

我们希望收集您的宝贵反馈,您可以访问相应的 issue 仓库或加入我们最近创建的 Gitter 频道。敬请关注关于 Reactor Stream 2.5.0.M1 的下一篇文章,这是 Reactive Streams 之上的完整 Rx 实现。

获取 Spring 电子邮件

关注 Spring 电子邮件

订阅

领先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部