领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Reactor 团队很高兴地宣布发布 2.0.0.RC1 版本,现已在 spring.io Maven 仓库以及 Maven Central 中提供。2.0 版本是 Reactor 1.1 的 #uberupdate,包含了几个新组件以及对 Stream 等重要类的完全重写,现在它实现了 Reactive Streams 标准。
请注意,Reactor 2.0 的 Maven 坐标已从 Reactor 1.x 的坐标更改。新的坐标都属于组 ID io.projectreactor,而不是之前的 org.projectreactor。Gradle 项目的示例依赖块可能如下所示
ext {
reactorVersion = '2.0.0.RC1'
}
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
compile "io.projectreactor:reactor-core:$reactorVersion",
"io.projectreactor:reactor-net:$reactorVersion",
"io.projectreactor.spring:reactor-spring-context:$reactorVersion"
如果您是 Reactor 的完全新手,您可能需要先跳转到炫酷的新网站 http://projectreactor.io 了解一下,这样才能理解其中的一些内容。
除了 2.0.0.M1 版本中宣布的更改,以下是相对于 1.1 版本的一些重要更改的简要列表
Stream 已重写以实现 Reactive Streams 标准,速度提高了 5-10 倍,并且开销比以前的版本少得多。Stream 的新签名和 Reactor 重命名为 EventBus。有关此转换的文档正在进行中。Stream API 源自 Reactive Extensions,并借鉴了其许多命名约定。通过利用通用词汇和行为,可以轻松地将 Rx.NET 和 RxJava 示例转换为 Reactor。Stream 集成、DSL 辅助方法。如果必须将讨论限制在一个更改上,那么就是对 Reactive Streams 的原生和基础支持。很难夸大 Reactive Streams 对 Reactor 的重要性。流处理是 新 黑科技,Reactor 从项目开始就接受了这一点。然而,Reactive Streams 的加入及其对背压传播的支持,使您的云应用程序更容易在实时或接近实时处理大量数据。现在,诸如负载下的 stop-read、batch flush 或 adaptive batch 等模式已开箱即用。
Reactor Stream 中的每个步骤都是一个 Reactive Streams 组件,它根据当前资源限制下的处理速率正确传播需求和背压。使用 Reactive Streams,Reactor 2.0 可以创建一个自动调整其资源使用的处理流。由于 Reactive Streams 背压向上游通信的方式,您可以影响将新项目拉入系统的速率。这意味着缓慢的下游组件可以将压力一直推回源头,以降低摄取速率,如果当前处理正在使用所有可用资源。
Pool<Connection> pool;
Stream<Message> input;
input.capacity(1)
.batchConsume(msg -> {
pool.getConnection().merge(msg);
}, requestMore -> Math.max(pool.getSize() - pool.getActive(), 1));
在上面的代码片段中,我们根据池中可用连接的数量调整要处理的项目数量。作为 batchConsume 方法的第一个参数传递的 Consumer 将根据作为第二个参数传递的 Function 返回的 requestMore 值调用。在这种情况下,我们将预取与池中空闲连接数量相等的邮件数量,如果所有连接都处于活动状态,则只预取单个邮件(在这种情况下,我们将依赖于连接池的背压)。
如果我们要确保我们的流不会占用资源,我们还可以将容量算法更改为返回小于可用连接数量的数字,这将为应用程序中的其他组件留下一些可用连接。
从 Reactor 2.0.0.RC1 开始,可以通过简单地排除 gs-collections 库将 Reactor 包含在您的 Android 应用程序中,否则由于其大小,您将不得不经历一些麻烦。我们为 EventBus 实现了一个不使用 gs-collections 的 SimpleCachingRegistry。未来的改进可能包括一个专用的 UI 事件循环 Dispatcher,以确保您的事件处理程序在正确的线程上运行。
我们非常感兴趣地看到 Reactor 如何促进 Android 设备上的响应式应用程序,以及这如何与 Reactor 在服务器端极高的吞吐量、低延迟能力相关联。如果您正在 Android 上使用 Reactor,并且我们可以做些什么来改善这种体验,请告诉我们。
RC1 引入了基于 Reactor 使用 Netty 4 的 HTTP 新支持。它尚未全面,但它提供了一些辅助方法和一些有用的抽象,用于构建(和访问)非阻塞的基于 REST 的微服务和纳服务。我目前不会尝试用它来构建生产服务,因为在正式发布之前还需要进行一些完善。您可以使用 Reactor 嵌入微服务,而无需直接使用 Netty API。
以下内容创建一个基于 Netty 的嵌入式 HTTP 服务器,该服务器具有路径参数,可将任务分派到共享的 RingBufferDispatcher。
HttpServer<String, String> server = NetStreams.httpServer(
spec -> spec.listen(3000)
.codec(StandardCodecs.STRING_CODEC)
.dispatcher(Environment.sharedDispatcher())
);
server.get("/echo/{greeting}", ch -> {
String greeting = ch.param("greeting") + " World!";
ch.transfer(Transfer.NON_CHUNKED)
.responseHeader("Content-Length", "" + greeting.length())
.log("server");
return Streams.just(greeting);
});
server.start();
我们还更新了 TCP 和 ZeroMQ 支持,以更好地利用我们对 Stream 所做的重要更改。最重要的是,TCP 服务器和客户端利用 Reactive Streams 背压支持来实现“停止读取”等模式,以防止服务器在有可用资源处理之前从客户端读取过多数据而导致下游处理溢出。
在发布 Reactor 2.0 GA 之前,我们将至少再发布一个 RC 版本。在确保其可预测性令人满意之前,我们还需要对复杂的 fork/join 调度进行一些调整。由于这只是第一版,可能会对 HTTP 支持进行一些补充。我们可能还会发现一些边缘情况下的错误。
我们对这个候选版本感觉很好,我们鼓励您试用它。如果您正在进行新开发,那么我们强烈建议您在 Reactor 2.0 的 Reactive Streams 基础上构建,而不是使用功能较弱的、Reactive Streams 之前的 1.1 版本。如果您正在升级现有的 Reactor 代码,升级路径实际上非常简单。在几乎所有情况下,您的代码都将大大简化。
如果您在升级代码时遇到问题,或者对如何使用 Reactor 解决您的快速数据问题有一般性疑问,请随时在 Reactor Framework Google Group 上提问。
我们也欢迎通过 GitHub 上的拉取请求进行社区贡献。
您可能还会感兴趣的是,Reactive Streams 项目正在考虑以新的 java.util.concurrent.Flow 类和适当的内部类的形式包含在 JDK 9 中。关于此主题的讨论正在 JSR-166 concurrency-interest 邮件列表中进行,该列表由纽约州立大学奥斯威戈分校的 Doug Lea 教授管理。
Reactor 采用 Apache 2.0 许可证,项目通过 GitHub 管理