Reactor - JVM 上异步应用的基础框架

工程 | Jon Brisbin | 2013 年 5 月 13 日 | ...

我们很高兴地宣布,经过一段时间的内部孵化,我们发布了一个名为 Reactor 的 JVM 异步应用程序基础框架。它为 Java、Groovy 和其他 JVM 语言提供了抽象,使构建事件和数据驱动的应用程序更加容易。它也非常快。在普通的硬件上,使用最快的非阻塞 Dispatcher 每秒可以处理超过 15,000,000 个事件。其他调度程序可供开发人员选择,从线程池样式、长时间运行的任务执行到非阻塞、高容量的任务调度。GitHub 仓库地址为 https://github.com/reactor/reactor

顾名思义,Reactor 受到 著名的 Reactor 设计模式 的很大影响。但它也受到其他事件驱动设计实践以及多年来开发的几个很棒的基于 JVM 的解决方案的影响。Reactor 的目标是将这些想法和模式浓缩成一个简单且可重用的基础,使事件驱动编程更加容易。

Reactor 的抽象为开发人员提供了一套工具,不仅可以开发,还可以以更有效地利用系统资源的方式 组合 应用程序——这在云端运行时尤其重要——并减少或消除嵌套回调的混乱(恰如其分地命名为 “回调地狱”),这些回调一直困扰着大多数异步应用程序。

Reactor 适用于什么?

由于其固有的灵活性,您可以让 Reactor 做很多事情,但它实际上是为需要在执行合理小的无状态异步处理块时具有高吞吐量的应用程序设计的。现代应用程序中大量非人工生成的数据很容易超过传统的单线程阻塞设计模型。从生成位置信息流的移动应用程序到发送大量几何数据的计算机化制造机器,再到挖掘实时日志以生成业务指标的大数据应用程序:现代数据驱动的应用程序需要比传统的命令式阻塞应用程序通常所能提供的更好的资源利用率和更高的吞吐量。

这就是 Spring XD 项目(以及 Spring 生态系统中的其他几个项目,如 Spring Integration 和 Spring Batch)打算利用 Reactor 的原因。将 Reactor 的异步调度与来自 Spring Integration 的基于 NIO 的 TCP 适配器相结合,以提供高吞吐量的 syslog 和 MQTT 摄取,这只是一个例子。

选择器、消费者和事件

Reactor 的 reactor-core 模块中三个最基本组件是 SelectorConsumerEvent。可以通过使用 SelectorConsumer 分配给 ReactorSelector 是一个简单的抽象,用于在查找要为 Event 调用的 Consumers 时提供灵活性。提供了一系列默认选择器。从纯字符串到正则表达式到 Spring MVC 风格的 URL 模板。

以下是一些示例代码,展示了使用 Reactor 创建事件驱动应用程序是多么容易。


// This helper method is like jQuery’s.
// It creates a Selector instance so you don’t have 
// to construct one using 'new Selector("parse")'
import static reactor.Fn.$;

Reactor reactor = R.create();

// Register interest in events published to key "parse"
reactor.on($("parse"), new Consumer<Event<String>>() {
  public void call(Event<String> ev) {
    service.handleEvent(ev);
  }
});

// Send an event to this Reactor and trigger all actions 
// that match the given Selector
reactor.notify("parse", Fn.event("Hello World!"));
 

献给 Groovy 的爱

Reactor 发行版中包含一个名为 reactor-groovy 的模块。它包含一个 Groovy 绑定,该绑定提供了表达性语法、使用 @CompileStatic 进行编译时检查、将 Closure 隐式转换为 Consumer 以及其他 Groovy 特定的省时功能。


// Assign a Closure as a Consumer
reactor.on($('hello')) { Event<String> ev ->
  if(ev.headers['specialHeader']) { // Events can have metadata
    doSomethingWith(ev.data)
  }
}

// Use Groovy helpers for notify
reactor.notify for: 'hello', data: 'Hello World!', specialHeader: 'specialValue'
 

最棒的是:我们不必为了实现这一点而牺牲性能。相同的 JVM 优化适用于 Groovy 代码和 Java 代码。我们不断(有人会说“痴迷地”)对调度代码进行微基准测试,使其尽可能快,并为 Java 和 Groovy 用户提供尽可能高的吞吐量。

随时准备迎接 Java 8

Reactor 旨在与 Java SE 8 的 lambda 表达式 兼容,Reactor 中的许多组件都可以用 lambda 替换,以使您的 Java 代码更简洁。我们还发现使用 Java 8 lambda(和方法引用)可以提高吞吐量。当 Java 8 正式发布时,您无需等待 Reactor 支持它。它将“即用”(tm)。


// Use a POJO as an event handler
class Service {
  public <T> void handleEvent(Event<T> ev) {
    // handle the event data
  }
}

@Inject
Service service;

// Use a method reference to create a Consumer<Event<T>>
reactor.on($("parse"), service::handleEvent);

// Notify consumers of the 'parse' topic that data is ready
// by passing a Supplier<Event<T>> in the form of a lambda
reactor.notify("parse", () -> {
  slurpNextEvent()
});
 

函数式、命令式、回调或 Promise:您来选择

Executor、事件循环、Actor、分布式——事件驱动编程中最重要用例之一(任务调度)有很多形式。Reactor 支持几种样式的事件驱动编程。除了传统的基于回调的 Consumer 接口外,Reactor 还对 Promises/A+ 规范 进行了解释,这使得使用延迟值和消费者变得非常容易。

嵌套回调虽然在 Java 等命令式语言中简单易用,但在应用程序复杂性增加时变得难以维护。Reactor 的 ComposablePromise 都与轻松组合操作有关。您可以将 Composable 连接到一系列操作中,这些操作可以转换值、将内容保存到数据存储、聚合值等。并且由于它们是可链接的,因此您可以在纯 Java 中以类型安全的方式执行所有这些操作。以下是如何使用 Composable 轻松链接一系列异步执行的任务的快速示例,这些任务在数据流通过 Composable 时对其执行转换和过滤。


Composable<Integer> c = new Composable<>()
  .map(new Function<Integer, Integer>() {
    public Integer apply(Integer i) {
      return i % 2;
    }
  })
  .filter(new Function<Integer, Boolean>() {
    public Boolean apply(Integer i) {
      return i == 0;
    }
  })
  .consume(new Consumer<Integer>() {
    public void accept(Integer eveni) {
      // work with only even numbers here
    }
  });
 

Composable 的每个步骤都是一个潜在的异步任务。对 mapfilterconsume 的调用分配任务,以便在先前步骤的值可用时执行——无需回调地狱。

调度

对于任何调度问题都没有万能的解决方案。Reactor 提供不同样式的 Dispatcher,因为每个异步应用程序在应用程序的不同部分都有不同的调度需求。例如,在摄取海量数据时,Reactor 将希望使用基于 久经考验的 Disruptor 环形缓冲区 的高速非阻塞 Dispatcher。但是,如果 Reactor 正在向数据库服务器发出阻塞调用或将数据块存储在 S3 中,它将希望利用低吞吐量的 worker pool Dispatcher。Reactor 提供了几个选项,以便您可以为工作选择合适的工具。

如果内置的 Dispatcher 实现不满足您的需求,那么 Reactor 提供了一个坚实的基础,您可以在其上构建自己的 Dispatcher,以满足您的问题域。

Grails,遇见事件,事件,遇见 Grails

Grails 是一个用于 JVM 的全栈 Web 应用程序框架。尽管 Grails 拥有成熟的代码库和蓬勃发展的社区支持,但它仍然面临着 新的架构挑战。通过 platform-core 插件 将事件引入 Grails。但事件功能如此强大,以至于此功能确实属于核心功能;因此,从 2.3 版开始,Grails 应用程序将内置一个极其强大但易于使用的基于约定的事件 API,它看起来非常类似于 platform-core 插件中的当前实现。此事件 API 将建立在 Reactor 基础之上。

将事件集成到 Grails 中的目标是针对新型开发——尤其是“实时 Web”和高规模的非阻塞应用程序开发。结合异步 GORM 功能,事件 API 将成为一个强大的盟友。访问大数据存储的复杂查询(因此需要很长时间才能处理)可以在其结果准备就绪时做出反应,将其直接推送到浏览器。

充满活力的社区至关重要

在接下来的几个月里,我们将努力准备 SpringOne,在该会议上,我们的许多大型、快速和可扩展的数据解决方案将成为主角。如果您还没有计划参加,您绝对应该参加!我们将举办 关于 Reactor 的研讨会,以及如何使用它创建高规模、高吞吐量的事件驱动应用程序。

但是,没有您的帮助我们无法做到这一点!只有您帮助我们为 JVM 上的大型、快速、事件驱动的应用程序开发创建一个充满活力和活跃的社区,这项工作才能取得成功。如果您有兴趣,请查看 GitHub 上的源代码,在 reactor-quickstart 中查看一些示例代码,报告您发现的任何问题,在 StackOverflow 上使用 标签 #reactor 提问关于 Reactor 的问题,加入 reactor-framework Google Groups 邮件列表 中的讨论,或 fork 仓库 以帮助添加功能、调整以提高吞吐量并贡献新的想法。

我们很乐意在那里见到你!

获取 Spring 新闻通讯

关注 Spring 电子简讯

订阅

抢先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部