领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布,Reactor,一个功能强大的基础库,用于在 JVM 上构建响应式、快数据应用程序,现已正式发布!
Reactor 提供了必要的抽象,用于构建高吞吐量、低延迟(我们现在称之为“快数据”)的应用程序,这些应用程序绝对**必须**能够处理每秒数千、数万甚至数百万个并发请求。
您应该关心 Reactor,因为具有非人类使用者(例如手机及其上运行的应用程序)的现代应用程序会生成比传统的每连接线程服务器能够支持的数据更多,因此 Reactor 为您提供了构建这些类型的高规模应用程序所需的工具和抽象,而不会陷入管理状态和在异步应用程序中传递事件的复杂逻辑。现代 JVM 应用程序必须建立在异步和响应式组件的坚实基础之上,这些组件有效地管理在极少数系统线程上执行大量任务。Reactor 专门设计用于帮助您构建这些类型的应用程序,而不会妨碍您或强迫您在特定的模式下工作。
Reactor 本身深受同名著名设计模式的影响,但它不仅从该模式中汲取灵感。它还包含Actor 模型和传统基于事件的回调编程的元素。
虽然它是Spring IO 平台基础的一部分,但**Reactor 核心库不依赖于 Spring**。Reactor 核心是一个自包含的库,其唯一外部依赖项是SLF4J和出色的LMAX Disruptor RingBuffer 库。
在 Reactor 的核心之上构建了其他可选组件,以方便针对常见模式开发应用程序。Reactor 的一些内置的一流支持包括
@CompileStatic
,并提供了一个全面的环境构建和事件连接 DSL。Reactor 从一开始就被设计成在其执行操作方面灵活高效,以便它能够避免妨碍您,并帮助您尽可能快地处理应用程序中的数据。在其最快的配置中,标准的基于 RingBuffer 的 Reactor 可以在标准的开发人员笔记本电脑上每秒发布超过 1000-1500 万个事件。高性能的Processor
抽象可以每秒将超过 1 亿个事件泵入您的应用程序。您的应用程序如何处理数据以减慢 Reactor 的速度可能因任务而异。但在最佳的无操作模式下,吞吐量如此之高,应用程序将不会闲置等待 Reactor 完成其工作!
Reactor 核心包含一些受(在某些情况下直接基于)JDK 8 的新函数式抽象启发的基本抽象,例如Function<T,V>
、Consumer<T>
、Supplier<T>
和Predicate<T>
。Reactor 本身不仅建立在这些抽象的基础之上,而且您的应用程序也可以利用它们。将来某个时候,JDK 8 的采用将足够普遍,Reactor 可以简单地从 Reactor 中删除这些抽象,并依赖于 JDK 8 中的那些抽象。在此之前,您的 JDK 6 和 7 应用程序现在可以从这些函数式抽象中受益。
受Reactive Extensions for .NET、Netflix 的 RxJava、JDK 8 的Stream抽象以及许多其他库(更不用说 20 年的基于事件的计算机科学)的启发,Reactor 提供了一种“响应式”编程模型,使协调异步任务变得更加容易。像Stream<T>
和Promise<T>
这样的抽象使链接非阻塞操作变得简单易懂,并且没有回调意大利面!
@Inject
AsyncDataLoader loader;
Promise<Buffer> p = loader.get("U-U-I-D")
.map(new Function<Buffer, Data>() {
public Data apply(Buffer buff) {
// transform data
Data data = parser.parse(buff);
return data;
}
})
.filter(new Predicate<Data>() {
public boolean test(Data data) {
// check Data for certain conditions being true
return null != data.getName();
}
})
.consume(new Consumer<Data>() {
public void accept(Data data) {
// only Data that passes the Predicate test makes it here...
}
});
// Promises can also block like a Future
Buffer buff = p.await();
这些操作(map
、filter
、consume
)中的每一个都是(可能)异步执行的不同操作。在传统的多线程环境中,必须添加关于阻塞 Future 和等待完成的嘈杂代码片段。但是,使用 Reactor,您只需以响应式的方式将操作链接在一起,以便操作在先前操作完成时“对数据做出反应”。
Reactor 包括对 Groovy 语言的一流支持。它支持使用闭包作为回调,具有用于配置 Reactor 环境的功能强大的 DSL,并提供了一些非常酷的操作符重载以编写简洁的代码。
Clojurewerkz 有一个名为Meltdown的库,它基于 Reactor。其他 JVM 语言支持可以毫不费力地添加。Reactor 的 API 设计为可扩展的,以便非 Java 语言可以受益于 Reactor 中的工具。
Reactor 已经准备好用于 Java 8,因此让我们首先使用 JDK 8 的强大 Lambda 功能查看一些 Reactor 代码
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));
r.notify("topic", Event.wrap("Hello World!"));
Reactor 希望实现的一件事是减少您必须编写的代码量;以上代码非常简洁。但即使在 Java 6 和 7 中,它也非常简洁
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.on($("topic"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.prinltn("greeting: " + ev.getData());
}
});
r.notify("topic", Event.wrap("Hello World!"));
在 Groovy 中,它甚至更加简洁(正如您所料),因为语言支持负责将某些对象转换为正确的类型,并允许使用闭包
def env = new Environment()
def r = Reactors.reactor(env).on("topic") { String greeting ->
println "greeting: $greeting"
}
r.notify "topic", "Hello World!"
Dispatcher
负责在给定的 Thread
上执行任务。Dispatcher
有多种内置实现,可以执行调用线程中的任务、线程池中的线程、使用单线程事件循环样式调度或最快的调度程序:RingBufferDispatcher
,它使用 LMAX Disruptor RingBuffer 分派任务。
在 Reactor 中创建组件时,通常需要指定在调度事件时使用的 Dispatcher
。与使用线程池相比,线程池在高容量应用程序中会消耗大量的 CPU 和 GC 资源,而将事件分派到 RingBuffer 中则非常高效。使用 RingBufferDispatcher
每秒可以分派数千万个事件。
Selector
是操作到事件键的动态映射。将操作分配给 Reactor
时,通过注册 Selector
来告诉它响应哪些事件键。有几种内置实现可以匹配诸如 Object.equals()
之类的内容,进行基于字符串的正则表达式匹配,URI 模板匹配,以便您可以使用熟悉的带括号的分隔符占位符表示法来匹配 URI,Class.isAssignableFrom()
匹配以仅选择那些从公共抽象派生的键,Predicate
匹配以允许您基于作用域谓词创建任意 Predicate<T>
选择器,甚至还有一个可选的 JsonPathSelector
,它使用 JsonPath 使用 JsonPath 表达式查询键中的数据。
您可能在示例中注意到了一些内容,作为 Java 开发人员,您可能会有点困惑:用于创建 Selector
的 $
快捷方法 [1]。如果您使用过 jQuery 进行 Web 开发,那么您会感觉很熟悉,因为 $
方法只是一个创建 Selector
的快捷方式,就像 jQuery 在编写 $(".css-class")
之类的内容时创建 CSS 查询一样。如果美元符号对您来说过于不寻常,Reactor 始终尝试提供多种完成任务的方法;您可以使用 Selectors.object(T)
或 ObjectSelector.objectSelector()
静态创建方法(或仅使用构造函数新建 ObjectSelector
实例)。
[1]: 除了 $(T)
之外,还有其他用于创建 Selector 的快捷帮助器方法。有 R(String)
用于创建 RegexSelectors,T(Class<?>)
用于创建 ClassSelectors,以及 U(String)
用于创建 UriTemplateSelectors。
Reactor 的 Promise
和 Stream
提供了一种反应式、组合式的方式来协调多个异步任务,而无需过多的回调意大利面。Promise
是一个有状态的组件,可以在应用程序中传递,并表示一个将从另一个线程填充的值。与传统的 Future
类似,Promise
可以阻塞调用线程。但更重要的是,Promise
使转换值和执行整个处理链变得容易。
Stream
类似于 Promise
,因为它提供了一个组合 API 来对未来值做出反应。但 Stream
与 Promise
的区别在于,它旨在处理多个值传递。
要填充 Promise
或 Stream
中的值,请创建一个 Deferred
,它是一个 Consumer<T>
。您可以将此 Deferred
传递到服务层,以将最终值传达回调用方。
// Only create one of these per JVM
static Environment env = new Environment();
public class DataLoader {
public Promise<Buffer> load(String key) {
Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);
// submit work to be done in another thread
// like reading data from a datastore
datastore.load(key, deferred);
return deferred.compose();
}
}
// Your service layer uses this API
@Inject
DataLoader loader;
loader.load("obj-key")
.onSuccess(new Consumer<Buffer>() {
public void accept(Buffer b) {
// handle eventual data
}
})
.onError(new Consumer<Throwable>() {
public void accept(Throwable t) {
// handle errors
}
});
Scala 的 Tuple 类是一种类型安全的方式,用于传递封装其他值的单个对象,而无需创建特定于应用程序的、一次性使用的“持有者” Bean。Reactor 将此功能整合到其对 Tuple
类的解释中。
元组非常易于使用。您可以使用 Tuple.from(T1, T2, …)
方法创建一个元组,并且可以使用 Tuple.getT1()
到 Tuple.getTN()
方法获取其中的值。
reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
public void accept(Event<Tuple2<URI, Buffer>> ev) {
URI uri = tup.getT1();
Buffer buff = tup.getT2();
// deal with request from uri.getPath()
}
});
// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));
查看 Tuple API 文档 以了解所有可能性。
Reactor 带有功能齐全的 TCP 客户端和服务器抽象。它们提供了一种简单的方法来构建可以支持大量客户端的基于 TCP 的应用程序。Reactor TCP 支持中的基本抽象是通用的,可以创建多个实现来利用不同的 TCP 技术。但是,内置实现利用了出色的 Netty 库 来执行异步 IO。
Reactor 是开源的,并采用 Apache 许可证。开发人员和用户社区只是一群想要共同努力为在 JVM 上构建反应式、FastData 应用程序创建出色基础的普通人。加入我们的社区,了解更多关于 Reactor 的信息,或通过您希望看到的任何改进回馈社区。
要快速开始使用 Reactor 并查看各种上下文中的一些代码,请查看快速入门
https://github.com/reactor/reactor-quickstart
或示例
https://github.com/reactor/reactor-samples
要分叉源代码、阅读 Wiki 或提交问题,请访问我们在 GitHub 上的页面
https://github.com/reactor/reactor
您可以加入一个 Google 论坛,提出问题或以其他方式参与围绕 Reactor 的讨论
https://groups.google.com/forum/#!forum/reactor-framework
访问 Maven 工件以包含在您的项目中
<dependencies>
<!-- core components -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- groovy support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-groovy</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- tcp client/server -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-tcp</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- spring support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
</dependencies>