抢先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我很高兴地宣布 Reactor 的第二个里程碑版本发布,这是我们迈向 1.0 的一步! Reactor 1.0.0.M2 的 Maven 工件可在常用的里程碑仓库中找到。
Reactor 是一个基础框架,用于在 JVM 上构建高吞吐量、异步、响应式应用程序。 它为事件路由提供选择器风格的主题匹配、动态消费者分配、超快的任务处理器以及用于异步处理数据和协调异步任务的响应式 Stream 和 Promise API。 它通过提供语言扩展来全面支持 Groovy 语言,从而使在 Groovy 中编写 Reactor 应用程序非常 Groovy! 它还具有易于使用的 Spring 支持,可以自动将带注释的 POJO 连接到 Reactor。
第二个里程碑包括许多错误修复和一些非常令人兴奋的新功能。 Reactor 现在包含一个 Processor
抽象,它是一个基于 LMAX Disruptor RingBuffer 的高度优化的任务处理器。 它使用 Reactor 中的通用抽象来配置 RingBuffer,并允许您使用 Reactor 的通用 API 而不是 Disruptor 特定的 API。 通过设计,它还跳过了 Reactor 提供的选择器匹配和动态消费者分配,以便榨取它可以榨取的最后一滴吞吐量。 MacBook Pro 上的轶事基准测试表明,处理器每秒可以通过管道泵送大约 100,000,000 个事件。 是的,你没看错:每秒 1 亿个!
1.0.0.M2 还包括 Reactor API 中的一个小但重要的新功能,该功能优化了 Reactor 中的事件发布,从而实现了大约 30-50% 的更高吞吐量。 由于它从 Reactor 准备了一个优化的消费者列表,因此它并不适合所有情况,但对于每秒 1000 万个事件的吞吐量来说,这是一个很棒的新功能。
Reactor 的强大之处之一是选择器匹配主题(ish)发布/订阅。 它允许您使用主题、匿名对象、可分配类型层次结构、URI 路径匹配或正则表达式(或者如果您实现自己的特定于域的选择器,则可以使用任何其他类型的选择器匹配)轻松地将处理程序分配给事件。 但是许多应用程序可以在启动时分配其处理程序,这意味着可以优化到这些消费者的路径,以实现高效的事件发布。 新的 Reactor 方法 prepare(Object)
允许您预先选择密钥的消费者。 它返回一个消费者本身,事件发布者可以使用该消费者来有效地通知新事件。
// Create Environment in which Reactors operate
Environment env = new Environment();
Reactor reactor = Reactors.reactor().env(env).get();
reactor.on($("say.hello"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.println("Hello " + ev.getData() + "!");
}
});
Consumer<Event<String>> sayHello = reactor.prepare("say.hello");
for(String name : listOfNames) {
sayHello.accept(name);
}
Reactor 1.0.0.M2 包含 Processor
抽象。 这是一个由 LMAX Disruptor RingBuffer 支持的简单任务处理器,旨在将其无缝集成到 Reactor 中使用的响应式 API 中,因此它使用常见的抽象,例如 Supplier 和 Consumer。 可以在单个表达式中创建一个完全配置的处理器,并且使用 Java 8 lambda 表达式更为简洁
Processor<Message> proc = new ProcessorSpec<Message>()
.dataSupplier({ return new Message(); })
.consume({ msg -> // handle the updated Message object })
.get();
Processor
提供了两种与底层 RingBuffer 交互的方式。 单操作模式通过调用 prepare()
方法从 Processor
请求一个 Operation
对象来工作。 Operation
上有一个 get()
方法,用于访问 RingBuffer 创建时填充的预先分配的事件对象。 可以使用新数据更新此对象的成员。 准备好发布操作并触发事件处理程序后,只需调用 Operation 的 commit()
方法即可。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(Buffer buff) {
Operation<Message> op = proc.prepare();
op.get().type = buff.readInt();
op.get().buffer = buff;
op.commit();
}
如果您可以处理批量数据,则 Processor
提供一个 batch(int, Consumer
方法,该方法允许您指定批量大小并传递一个突变器,其形式为 Consumer
,其工作是更新每个事件的数据。 如果批处理大小大于底层 RingBuffer 的大小,则批处理将隐式刷新,否则发布步骤将延迟到达到批处理大小为止。 这通常会提高吞吐量和效率。
public class Message {
int type;
Buffer buffer;
}
@Autowired
Processor<Message> proc;
public void handle(List<Buffer> buffs) {
proc.batch(buffs.size(), new Consumer<Message>() {
ListIterator<Buffer> it = buffs.listIterator();
public void accept(Message msg) {
Buffer next = it.next();
msg.type = next.readInt();
msg.buffer = next;
}
});
}
Reactor 将在今年的 SpringOne2GX 会议上得到突出展示,距离现在不到两周的时间。 将有 一个由 Stephane Maldini 和 Jon Brisbin 主导的完整会议,以及几乎不停歇的关于这项技术如何改变您构建应用程序方式的茶歇讨论。 还有时间 注册 并预订房间。 但是快点!
GitHub:(源代码、问题跟踪器)https://github.com/reactor/reactor/
Wiki:https://github.com/reactor/reactor/wiki
API 文档:http://reactor.github.io/docs/api/
ext {
reactorVersion = '1.0.0.M2'
}
repositories {
mavenCentral()
maven { url 'http://repo.springsource.org/libs-release' }
maven { url 'http://repo.springsource.org/libs-milestone' }
}
dependencies {
// Reactor core
compile "org.projectreactor:reactor-core:$reactorVersion"
// Reactor Groovy support
compile "org.projectreactor:reactor-groovy:$reactorVersion"
// Reactor TCP client/server
compile "org.projectreactor:reactor-tcp:$reactorVersion"
// Reactor Spring support
compile "org.projectreactor:reactor-spring:$reactorVersion"
}