领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多Reactor 团队很高兴地宣布,Reactor 框架的一些重大更新现已在 Reactor 灵活、异步、快速数据框架的 1.1.0.RELEASE 版本中可用。此版本包含许多错误修复和关键组件的重写,以使其更快,也许更重要的是,在内存使用方面更有效率。Reactor 1.1 现在包含来自高盛的出色 gs-collections
库 [1],该库为处理各种地图和集合提供了非常流畅的 API。
以下是 Reactor 1.0 和 1.1 之间更改的非详尽列表
Stream API 相比 1.0 版本的一些更有用的新增功能包括 Stream.window
和 Stream.timeout
方法。这允许您在给定时间段内收集值并将它们传递到处理链中。例如,要每 500 毫秒处理已收集的任何值,请使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer
的健壮 HashWheelTimer 实现gs-collections
5.0 [1] 的新的消费者注册表实现如果您需要以更可预测的方式控制内存使用,Reactor 包含一个分配 API,该 API 可以由您需要的任何特定池化实现支持。Reactor 1.1 附带两个实现:基于 RingBuffer 的 Allocator
和引用计数 Allocator
。
基于 RingBuffer 的 Allocator
可以配置为非常类似于标准 Disruptor RingBuffer
以及事件处理程序。但是,如果您只需要阻塞生产者并使用基于插槽的分配策略,那么使用 RingBuffer 进行分配非常简单。
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
日志记录可能对异步应用程序的性能非常不利,特别是使用 RingBuffer 等技术的应用程序,后者使用单个线程来支持许多任务。如果该线程被一个执行 IO 写入日志条目的任务阻塞,那么这可能会级联到应用程序中并导致其停止运行。
Reactor 包含一个高效的异步 Appender
实现,用于 Logback [2],它将实际的追加移到一个专用的日志记录线程上。这应该有助于缓解大多数应用程序中由日志记录引起的线程压力。但有时即使这样也不够,需要更高吞吐量的解决方案。这就是 Reactor 基于 Java Chronicle 的 Appender
派上用场的地方。
Java Chronicle [3] 是一个高速消息传递库,它使用内存映射文件来实现快速高效的数据持久化。Reactor 通过提供一个 Appender
将其与 Logback 集成,该 Appender
记录来自应用程序的原始事件数据,但不必调用下游追加器。这意味着您的日志记录事件存储在 Chronicle
中,但处于其原始状态。需要一个额外的实用程序来后处理“持久”日志文件,并将这些事件发送到“真实”追加器(例如发送到文件或数据库)或查看 Chronicle
并查找与给定模式匹配的条目。这在生产环境中非常有用,在生产环境中,您不关心应用程序正常运行时的日志记录,但如果出现问题,您可以轻松地将数据从 Chronicle
提取到标准日志文件中进行取证分析。
要配置 Reactor DurableAsyncAppender
以进行高速日志记录,只需在您的 Logback 配置中声明它即可。这是一个在 logback.xml
配置中使用它的示例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出现问题,您可以使用包含的实用程序分析纪事,方法是将从纪事中提取的事件定向到给定的“真实” Appender
。此示例调用日志实用程序(reactor-logback.jar
工件必须位于类路径上)并读取 log/
目录中的持久日志文件,从 logback.xml
读取 Logback 配置,然后将所有 ERROR 消息输出到 logfile
追加器,该追加器在 logback.xml
配置文件中定义。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 刚刚发布,并且包括 大量新功能和性能改进 以及 lambda 闭包支持和其他很酷的 JDK 8 功能。Reactor 的 Groovy 支持已准备好用于 Groovy 2.3,同时仍然与 JDK 7 上的 Groovy 2.2 兼容。
reactor-tcp
重命名为 reactor-net
jeromq
添加 ZeroMQ 支持reconnect
支持对 TCP 模块进行了改进,其中包含对 UDP 的支持以及基于 ZeroMQ 的新实现。[4]
Reactor 中的 ZeroMQ 支持具有 tcp
和 inproc
支持,并提供了一个简洁的流畅 API,用于使用 Reactor 的高效编解码器工具非常快速地创建客户端和服务器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
reactor-benchmark
项目工件在 Maven Central 和 repo.spring.io/libs-release
中可用。请注意,Spring 支持的坐标已在 1.1 版本中更改为 org.projectreactor.spring:reactor-spring-*
[6]。
参考文档可在 GitHub wiki 中找到。
更新的 API 文档位于 GitHub 页面站点 上。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - http://zeromq.org/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/