领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我很高兴地宣布 Reactor 的第二个里程碑版本,它正在朝着 1.0 版本迈进!Reactor 1.0.0.M2 的 Maven 工件可在常规的里程碑存储库中获取。
Reactor 是一个基础框架,用于在 JVM 上构建高吞吐量、异步、响应式应用程序。它提供 Selector 风格的事件路由主题匹配、动态 Consumer 分配、一个超快的任务处理器以及响应式 Stream 和 Promise API,用于异步处理数据和协调异步任务。它提供全面的 Groovy 语言支持,通过提供语言扩展,使得用 Groovy 编写 Reactor 应用程序非常 Groovy!它还具有易于使用的 Spring 支持,可以自动将带注解的 POJO 连接到 Reactors。
这个第二个里程碑版本包括了一些错误修复和一些真正令人兴奋的新功能。Reactor 现在包含了一个 Processor 抽象,这是一个基于 LMAX Disruptor RingBuffer 的高度优化任务处理器。它使用 Reactor 的通用抽象来配置 RingBuffer,并允许您使用 Reactor 的通用 API,而不是 Disruptor 特定的 API。它还通过设计跳过了 Reactor 提供的 Selector 匹配和动态 Consumer 分配,以榨取其所能达到的每一个吞吐量。在 MacBook Pro 上的传闻基准测试显示,Processor 每秒可以通过管道处理大约 1 亿个事件。是的,您没有看错:每秒 1 亿个!
1.0.0.M2 还在 Reactor API 中包含了一个虽小但重要的新功能,它优化了 Reactor 中的事件发布,以获得大约 30-50% 的更高吞吐量。它不适用于所有情况,因为它从 Reactor 中准备了一个优化的 Consumer 列表,但对于每秒额外的 1000 万个事件的吞吐量来说,这是一个很棒的新功能。
Reactor 的强大之处之一是 Selector 匹配主题(ish)的发布/订阅。它允许您使用主题、匿名对象、可分配类型层次结构、URI 路径匹配或正则表达式(如果您实现自己的领域特定 Selector,则可以使用任何其他类型的 Selector 匹配)轻松地将处理程序分配给事件。但是许多应用程序可以在启动时分配其处理程序,这意味着到这些 Consumer 的路径可以针对高效的事件发布进行优化。新的 Reactor 方法 prepare(Object) 允许您预先选择一个键的 Consumer。它返回一个 Consumer 本身,事件发布者可以使用它来有效地通知新事件。
// Create Environment in which Reactors operate
Environment env = new Environment();
Reactor reactor = Reactors.reactor().env(env).get();
reactor.on($("say.hello"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.println("Hello " + ev.getData() + "!");
}
});
Consumer<Event<String>> sayHello = reactor.prepare("say.hello");
for(String name : listOfNames) {
sayHello.accept(name);
}
Reactor 1.0.0.M2 包含了 Processor 抽象。它是一个由 LMAX Disruptor RingBuffer 支持的简单任务处理器,旨在将其无缝集成到 Reactor 中使用的响应式 API 中,因此它使用 Supplier 和 Consumer 等通用抽象。一个完全配置的 Processor 可以在单个表达式中创建,使用 Java 8 lambda 更简洁。
Processor<Message> proc = new ProcessorSpec<Message>()
.dataSupplier({ return new Message(); })
.consume({ msg -> // handle the updated Message object })
.get();
Processor 提供了两种与底层 RingBuffer 交互的方式。单操作模式通过调用 prepare() 方法从 Processor 请求一个 Operation 对象来工作。Operation 上有一个 get() 方法,用于访问 RingBuffer 在创建时填充的预分配事件对象。此对象的成员可以用新数据进行更新。准备发布操作并触发事件处理程序时,只需调用 Operation 的 commit() 方法。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(Buffer buff) {
Operation<Message> op = proc.prepare();
op.get().type = buff.readInt();
op.get().buffer = buff;
op.commit();
}
如果您可以批量操作数据,那么 Processor 提供了一个 batch(int, Consumer 方法,允许您指定批处理大小并传递一个 Consumer 形式的修改器,其工作是更新每个事件的数据。如果批处理大小大于底层 RingBuffer 的大小,批处理将隐式刷新,否则发布步骤将延迟,直到达到批处理大小。这通常会提高吞吐量和效率。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(List<Buffer> buffs) {
proc.batch(buffs.size(), new Consumer<Message>() {
ListIterator<Buffer> it = buffs.listIterator();
public void accept(Message msg) {
Buffer next = it.next();
msg.type = next.readInt();
msg.buffer = next;
}
});
}
Reactor 将在今年距离不到两周的 SpringOne2GX 大会上突出展示。将有 Stephane Maldini 和 Jon Brisbin 主导的完整专题讨论,以及关于这项技术如何改变您构建应用程序的方式的几乎不间断的茶水间讨论。仍然有时间 注册 并预订房间。但要快!
GitHub:(源代码,问题跟踪器)https://github.com/reactor/reactor/
Wiki:https://github.com/reactor/reactor/wiki
API 文档:http://reactor.github.io/docs/api/
ext {
reactorVersion = '1.0.0.M2'
}
repositories {
mavenCentral()
maven { url 'http://repo.springsource.org/libs-release' }
maven { url 'http://repo.springsource.org/libs-milestone' }
}
dependencies {
// Reactor core
compile "org.projectreactor:reactor-core:$reactorVersion"
// Reactor Groovy support
compile "org.projectreactor:reactor-groovy:$reactorVersion"
// Reactor TCP client/server
compile "org.projectreactor:reactor-tcp:$reactorVersion"
// Reactor Spring support
compile "org.projectreactor:reactor-spring:$reactorVersion"
}