领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Reactor 团队很高兴地宣布,Reactor 灵活、异步、快速数据框架的 1.1.0.RELEASE 版本现已推出了一些重要更新。此版本包含了大量的 bug 修复和关键组件的重写,以使其更快,也许更重要的是,在内存使用方面更高效。Reactor 1.1 现在包含了高盛 [1] 出色的 gs-collections 库,它为处理各种映射和集合提供了非常流畅的 API。
以下是 Reactor 1.0 和 1.1 之间变更的非详尽列表
与 1.0 版本相比,Stream API 最有用的新增功能包括 Stream.window 和 Stream.timeout 方法。这允许您在给定时间内收集值并将其传递到处理链中。例如,要每 500 毫秒处理一次收集到的值,请使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer 的健壮 HashWheelTimer 实现gs-collections 5.0 [1] 的新 Consumer Registry 实现如果您需要以更可预测的方式控制内存使用,Reactor 包含一个分配 API,该 API 可以由您需要的任何特定池实现支持。Reactor 1.1 提供了两种实现:基于 RingBuffer 的 Allocator 和基于引用计数的 Allocator。
基于 RingBuffer 的 Allocator 可以配置为非常类似于带有事件处理程序的标准 Disruptor RingBuffer。但是,如果您只需要阻塞生产者并使用基于槽的分配策略,那么使用 RingBuffer 进行分配就非常简单了
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
日志记录对异步应用程序的性能非常有害——特别是对于使用 RingBuffer 等技术(使用单个线程支持许多任务)的应用程序。如果该线程被某个任务在写入日志条目时进行 IO 阻塞,那么这可能会反向级联到应用程序中,导致应用程序停滞。
Reactor 包含一个高效的 Logback [2] 异步 Appender 实现,它将实际的附加操作转移到专用的日志线程上。这应该有助于缓解大多数应用程序中由日志记录引起的线程压力。但有时这还不够,需要更高吞吐量的解决方案。这就是 Reactor 基于 Java Chronicle 的 Appender 发挥作用的地方。
Java Chronicle [3] 是一个高速消息库,它使用内存映射文件实现快速高效的数据持久化。Reactor 通过提供一个 Appender 将其与 Logback 集成,该 Appender 记录应用程序的原始事件数据,但无需调用下游 appender。这意味着您的日志事件存储在 Chronicle 中,但处于原始状态。需要一个额外的实用程序来后处理“持久”日志文件,然后将这些事件发送到“真实”的 appender(例如文件或数据库),或者查看 Chronicle 并查找与给定模式匹配的条目。这在生产环境中非常有用,在应用程序正常运行时您不关心日志记录,但如果出现问题,您可以轻松地将数据从 Chronicle 提取到标准日志文件中进行取证分析。
要配置 Reactor DurableAsyncAppender 进行高速日志记录,只需在 Logback 配置中声明它。以下是 logback.xml 配置中使用的示例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出现问题,您可以使用随附的实用程序分析 chronicle,方法是将从 chronicle 中提取的事件定向到给定的“真实” Appender。此示例调用日志实用程序(reactor-logback.jar 工件必须在类路径上),并从 log/ 目录读取持久日志文件,从 logback.xml 读取 Logback 配置,然后将所有 ERROR 消息输出到 logfile appender,该 appender 在 logback.xml 配置文件中定义。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 刚刚发布,它包含大量新功能和性能改进,以及 lambda 闭包支持和其他酷炫的 JDK 8 功能。Reactor 的 Groovy 支持已准备好在 Groovy 2.3 中使用,同时仍兼容 JDK 7 上的 Groovy 2.2。
reactor-tcp 重命名为 reactor-netjeromq 添加了 ZeroMQ 支持reconnect 支持TCP 模块已进行改进,其中包括对 UDP 的支持以及基于 ZeroMQ [4] 的新实现。
Reactor 中的 ZeroMQ 支持具有 tcp 和 inproc 支持,并提供简洁流畅的 API,可以使用 Reactor 高效的编解码器设施快速创建客户端和服务器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
reactor-benchmark 项目Artifacts 可在 Maven Central 和 repo.spring.io/libs-release 中找到。请注意,Spring 支持的坐标在 1.1 版本中已更改为 org.projectreactor.spring:reactor-spring-* [6]。
参考文档可在 GitHub wiki 中找到。
更新的 API 文档可在 GitHub pages 网站上找到。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - https://zeromq.cn/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/