领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 Reactor 的第二个里程碑版本发布,这是我们迈向 1.0 的道路!Reactor 1.0.0.M2 的 Maven 工件可在通常的里程碑存储库中获得。
Reactor 是一个基础框架,用于在 JVM 上构建高吞吐量、异步、反应式应用程序。它为事件路由提供选择器风格的主题匹配、动态消费者分配、超快的任务处理器以及用于异步处理数据和协调异步任务的反应式流和 Promise API。它通过提供语言扩展来支持 Groovy 语言,使用 Groovy 编写 Reactor 应用程序变得非常简单!它还具有易于使用的 Spring 支持,可以自动将带注释的 POJO 连接到 Reactor。
第二个里程碑版本包含许多错误修复和一些非常令人兴奋的新功能。Reactor 现在包含一个 Processor
抽象,它是一个基于 LMAX Disruptor RingBuffer 的高度优化的任务处理器。它使用 Reactor 中的通用抽象来配置 RingBuffer,并允许您使用 Reactor 的通用 API 而不是 Disruptor 特定的 API。它还通过设计跳过 Reactor 提供的选择器匹配和动态消费者分配,以榨取其所能达到的每一滴吞吐量。在 MacBook Pro 上的轶事基准测试表明,处理器每秒可以将大约 1 亿个事件泵送到管道中。是的,您没看错:每秒 100 **百万**!
1.0.0.M2 还包含 Reactor API 中的一个小但重要的新功能,该功能优化了 Reactor 中的事件发布,以获得大约 30-50% 的更高吞吐量。它不适用于所有情况,因为它从 Reactor 中准备了一个优化的消费者列表,但为了额外获得每秒 1000 万个事件的吞吐量,这是一个很棒的新功能。
Reactor 的强大功能之一是选择器匹配主题(ish)发布/订阅。它允许您使用主题、匿名对象、可分配类型层次结构、URI 路径匹配或正则表达式(或如果您实现自己的特定于域的选择器,则可以使用任何其他类型的选择器匹配)轻松地将处理程序分配给事件。但是,许多应用程序可以在启动时分配其处理程序,这意味着可以针对高效事件发布优化这些消费者的路径。新的 Reactor 方法 prepare(Object)
允许您预先选择某个键的消费者。它返回一个消费者本身,事件发布者可以使用它来有效地通知新事件。
// Create Environment in which Reactors operate
Environment env = new Environment();
Reactor reactor = Reactors.reactor().env(env).get();
reactor.on($("say.hello"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.println("Hello " + ev.getData() + "!");
}
});
Consumer<Event<String>> sayHello = reactor.prepare("say.hello");
for(String name : listOfNames) {
sayHello.accept(name);
}
Reactor 1.0.0.M2 包含 Processor
抽象。它是一个由 LMAX Disruptor RingBuffer 支持的简单任务处理器,旨在将其无缝集成到 Reactor 中使用的反应式 API 中,因此它使用诸如 Supplier 和 Consumer 之类的通用抽象。一个完全配置的处理器可以在单个表达式中创建,使用 Java 8 lambda 表达式更简洁
Processor<Message> proc = new ProcessorSpec<Message>()
.dataSupplier({ return new Message(); })
.consume({ msg -> // handle the updated Message object })
.get();
Processor
提供两种与底层 RingBuffer 交互的方式。单操作模式通过调用 prepare()
方法从 Processor
请求一个 Operation
对象来工作。Operation
上有一个 get()
方法,用于访问 RingBuffer 创建时填充的预分配事件对象。可以使用新数据更新此对象成员。准备好发布操作并触发事件处理程序时,只需调用 Operation 的 commit()
方法。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(Buffer buff) {
Operation<Message> op = proc.prepare();
op.get().type = buff.readInt();
op.get().buffer = buff;
op.commit();
}
如果您可以在数据批次上进行操作,那么 Processor
提供一个 batch(int, Consumer
方法,该方法允许您指定批次大小并传递一个以 Consumer
形式存在的修改器,其工作是更新每个事件的数据。如果批次大小大于底层 RingBuffer 的大小,则批次将被隐式刷新,否则发布步骤将延迟到达到批次大小为止。这通常会提高吞吐量和效率。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(List<Buffer> buffs) {
proc.batch(buffs.size(), new Consumer<Message>() {
ListIterator<Buffer> it = buffs.listIterator();
public void accept(Message msg) {
Buffer next = it.next();
msg.type = next.readInt();
msg.buffer = next;
}
});
}
Reactor 将在今年的 SpringOne2GX 大会 上占据突出位置,距离现在不到两周的时间。将会有 由 Stephane Maldini 和 Jon Brisbin 主持的关于它的完整会议,以及关于这项技术如何改变您构建应用程序方式的几乎不间断的非正式讨论。您仍然可以 注册 并预订房间。但要尽快!
GitHub:(源代码、问题跟踪器) https://github.com/reactor/reactor/
Wiki: https://github.com/reactor/reactor/wiki
API 文档: http://reactor.github.io/docs/api/
ext {
reactorVersion = '1.0.0.M2'
}
repositories {
mavenCentral()
maven { url 'http://repo.springsource.org/libs-release' }
maven { url 'http://repo.springsource.org/libs-milestone' }
}
dependencies {
// Reactor core
compile "org.projectreactor:reactor-core:$reactorVersion"
// Reactor Groovy support
compile "org.projectreactor:reactor-groovy:$reactorVersion"
// Reactor TCP client/server
compile "org.projectreactor:reactor-tcp:$reactorVersion"
// Reactor Spring support
compile "org.projectreactor:reactor-spring:$reactorVersion"
}