领先一步
VMware 提供培训和认证,以加快您的进步。
了解更多我非常高兴地宣布 Project Reactor 的第一个里程碑版本发布!Project Reactor 是一个用于构建 JVM 上异步 FastData 应用程序的基础框架。Reactor 1.0.0.M1 中的一些优点包括:响应式组合助手 Stream 和 Promise,TcpServer 和 TcpClient 以及 Groovy 和 Spring 支持。受 Reactive Extensions、RxJava、新的 JDK 8 Stream API(以及 Scala 等)的启发,这些可组合对象使协调异步任务变得非常简单。它们支持使用 Consumer 的传统回调式编程,但也提供了一个简洁的组合 API,其中包含 map(Function fn)
、filter(Predicate
、batch(int size)
等方法。
Reactor 从一开始就被设计为一个高性能、高扩展性的平台,用于构建下一代大数据应用程序。在将应用程序扩展到数百、数千甚至数百万用户时,异步架构明显优于每个请求一个线程的架构。Reactor 的异步基础为处理数十万甚至数百万个事件的大数据应用程序提供了坚实的基础。它提供了简单的工具来将异步任务链接在一起,并使执行这些任务变得像调用单个方法一样简单。
Stream 是一种简单的方法,可以在数据异步流经应用程序时对其进行处理。在 Reactor 中,Stream 实际上有两个部分:Deferred,它是发布者,以及实际的 Stream,它是消费者。您可以使用组合方法和简单的回调的组合在 Stream 上分配处理程序来处理数据。
在将数据放入队列以使用 Stream 和 JDK Lambda 进行进一步处理之前,转换和过滤进入应用程序的数据看起来像这样
// Create Environment in which Reactors operate
Environment env = new Environment();
// Create a Stream using the high-speed LMAX Disruptor RingBuffer
Deferred<Trade, Stream<Trade>> incoming = Streams.<Trade>defer()
.env(env)
.dispatcher(Environment.RING_BUFFER)
.get();
// Work with the incoming trades
Stream<Trade> trades = incoming.compose();
Stream<Order> orders = trades.map(trade -> tradeService.placeTrade(trade));
// Filter out large orders from small
Stream<Order> highPriority = orders.filter(order -> order.getSize() >= 1000);
Stream<Order> lowPriority = orders.filter(order -> order.getSize() < 1000);
// Consume the orders in different ways
highPriority.consume(order -> orderService.executeNow(order));
lowPriority.consume(order -> orderService.executeLater(order));
M1 还包括一个易于使用的 TCP 客户端和服务器。由功能强大的 Netty 网络库提供支持,Reactor 驱动的 syslog 服务器可以在服务器级硬件上每秒接收大约 100 万条消息。Reactor TCP 支持包括一个简单的 Codec 功能,该功能易于扩展到核心提供的默认编解码器集之外,并且旨在通过使用 Reactor 的 Buffer 类来实现轻量级,该类提供了诸如对数据的极其有效的视图之类的东西,以及大量用于处理标准 Java NIO ByteBuffers 的辅助方法——但无需直接处理 ByteBuffer。
Reactor 的 TCP 支持开箱即用地提供 JSON。创建使用 JSON 作为协议的基于 TCP 的 RPC 服务器就像这样简单
TcpServer<Pojo, Pojo> server = new TcpServerSpec<Pojo, Pojo>(NettyTcpServer.class)
.env(env)
.codec(new JsonCodec<>(Pojo.class))
.consume(conn -> {
conn.consume(data -> {
// handle incoming data
});
})
.get()
.start();
Reactor M1 还提供了很棒的 Groovy 支持。它提供了帮助程序,使使用 Closure 使用事件变得非常简洁。不用说,用 Groovy 编写 Reactor 事件处理非常容易。使用 Closure 处理 Reactor 使异步代码实际上可读!
def env = new Environment()
// Create Reactor using default RingBuffer Dispatcher
def reactor = Reactors.reactor().env(env).get()
reactor.on('topic') { String s ->
// handle data
}
// Publish an event to a topic
r1.notify 'topic', 'Hello World!'
Reactor M1 还包括 Spring 支持,使编写事件驱动的 POJO 变得像 MVC 控制器一样容易。通过使用 @On
注解方法,组件扫描拾取的 bean 可以自动连接到 Reactor 并收到事件通知。
一个简单的基于 JavaConfig 的 Spring 配置可能如下所示
public class HandlerBean {
@On(reactor = "@rootReactor", selector = '$("test")')
public void handleTest() {
// event 'test' was fired
}
}
@Configuration
public class AnnotatedHandlerConfig {
@Bean
public Environment env() {
return new Environment();
}
@Bean
public Reactor rootReactor() {
return env().getRootReactor();
}
}
只需将您的 Reactor 注入服务层,当事件准备就绪时,即可使用 notify()
方法在 Reactor 上发布它们。
Maven 工件可在 SpringSource Artifactory 存储库中获得。在 Gradle 项目中,您将像这样引入 Reactor
ext {
reactorVersion = '1.0.0.M1'
}
repositories {
maven { url 'http://repo.springsource.org/libs-milestone' }
mavenCentral()
}
dependencies {
// Reactor Core
compile 'org.projectreactor:reactor-core:$reactorVersion'
}
源代码可在 GitHub 上获得:https://github.com/reactor/reactor
加入 Reactor Google+ 社区 以了解 Reactor 的最新动态,或 在 Twitter 上关注我们 @ProjectReactor。
文档可在 GitHub Wiki 和 API Javadoc 上找到。
您也可以在 GitHub Issues 上提交问题并跟踪开发进度。
今年我们将 在 SpringOne 上举办关于 Reactor 的完整课程。如果您还没有计划参加,那么您真的应该参加!议程中挤满了关于 Spring 社区正在做的令人兴奋的事情的精彩课程。加入我们吧!
SpringOne2GX 2013,9 月 9 日至 12 日,加利福尼亚州圣克拉拉
我迫不及待地想深入下一个 sprint 以努力实现 1.0 GA。我们很乐意让您参与其中!