领先一步
VMware 提供培训和认证,助力您快速提升。
了解更多Reactor 团队欣喜地宣布发布 2.0.0.RC1,该版本现已在spring.io Maven 仓库以及Maven 中央仓库提供。2.0 版本是对 Reactor 1.1 版本的重大升级(#uberupdate
),包含多个新组件,以及对重要类(例如现在实现Reactive Streams 标准的Stream
类)的完整重写。
请注意,Reactor 2.0 的 Maven 坐标已更改,与 Reactor 1.x 版本不同。新的坐标均属于组 ID io.projectreactor
,而不是之前的org.projectreactor
。Gradle 项目的示例依赖项块如下所示:
ext {
reactorVersion = '2.0.0.RC1'
}
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
compile "io.projectreactor:reactor-core:$reactorVersion",
"io.projectreactor:reactor-net:$reactorVersion",
"io.projectreactor.spring:reactor-spring-context:$reactorVersion"
如果您是 Reactor 的新手,建议您先访问新的网站http://projectreactor.io,了解相关信息,然后再阅读本文。
除了在 2.0.0.M1 版本中宣布的更改之外,以下是 1.1 版本的一些重要更改:
Stream
已重写为实现 Reactive Streams 标准,速度提高了 5-10 倍,并且开销比以前版本小得多。Stream
的新签名以及将Reactor
重命名为EventBus
。相关的文档正在完善中。Stream
API 源自Reactive Extensions,并镜像其许多命名约定。通过利用共同的词汇和行为,可以轻松地将 Rx.NET 和 RxJava 示例转换为 Reactor。Stream
集成、DSL 辅助方法。如果我们必须只谈论一项更改,那么那就是对 Reactive Streams 的原生和基础支持。Reactive Streams 对 Reactor 的重要性难以言过其实。流处理是新的潮流,Reactor 从项目开始就采用了这种方式。然而,Reactive Streams 的加入及其背压支持的传播,使得实时或近实时处理大量数据更容易应用于您的云应用程序。现在,诸如负载下的stop-read
、batch flush
或adaptive batch
之类的模式都可直接使用。
Reactor Stream
中的每个步骤都是一个 Reactive Streams 组件,它会根据当前资源限制下的处理速率正确地传播需求和背压。使用 Reactive Streams,Reactor 2.0 使创建能够自动调整其资源使用的处理流成为可能。由于 Reactive Streams 背压是向上游传播的,您可以影响新项目被拉入系统速率。这意味着如果当前处理正在使用所有可用资源,则缓慢的下游组件可以一直向上传播压力,以减慢摄取速率。
Pool<Connection> pool;
Stream<Message> input;
input.capacity(1)
.batchConsume(msg -> {
pool.getConnection().merge(msg);
}, requestMore -> Math.max(pool.getSize() - pool.getActive(), 1));
在上例代码片段中,我们根据池中可用连接的数量调整要处理的项目数量。作为第一个参数传递给batchConsume
方法的Consumer
将由作为第二个参数传递的Function
返回的requestMore
值控制。在本例中,我们将预取与池中空闲连接数量相等的消息数量,如果所有连接都处于活动状态(在这种情况下,我们将依赖于连接池的背压),则只预取一条消息。
如果我们想要确保我们的流不会占用过多资源,我们还可以更改容量算法以返回小于可用连接数的数字,这将为应用程序中的其他组件留下一些可用连接。
从 Reactor 2.0.0.RC1 开始,可以通过简单地排除gs-collections
库来将 Reactor 包含在您的 Android 应用程序中,否则由于其大小,您将不得不绕过一些障碍。我们已经为EventBus
实现了一个不使用gs-collections
的SimpleCachingRegistry
。未来的改进可能包括一个专用的 UI 事件循环Dispatcher
,以确保您的事件处理程序在正确的线程上运行。
我们非常有兴趣了解 Reactor 如何促进 Android 设备上的响应式应用程序,以及这如何与 Reactor 在服务器端极高的吞吐量和低延迟能力相结合。如果您正在 Android 上使用 Reactor,并且有一些可以改善体验的事情,请告诉我们。
RC1 引入了基于 Reactor 使用 Netty 4 的 HTTP 支持。它还不全面,但它提供了一些辅助方法和一些有用的抽象,用于构建(和访问)基于非阻塞 REST 的微服务和纳米服务。我目前不会尝试用它来构建生产服务,因为在 GA 之前还需要进行一些改进。您可以嵌入使用 Reactor 的微服务,而无需直接使用 Netty API。
以下内容创建一个基于 Netty 的嵌入式 HTTP 服务器,该服务器具有路径参数,并将任务分派到共享的RingBufferDispatcher
。
HttpServer<String, String> server = NetStreams.httpServer(
spec -> spec.listen(3000)
.codec(StandardCodecs.STRING_CODEC)
.dispatcher(Environment.sharedDispatcher())
);
server.get("/echo/{greeting}", ch -> {
String greeting = ch.param("greeting") + " World!";
ch.transfer(Transfer.NON_CHUNKED)
.responseHeader("Content-Length", "" + greeting.length())
.log("server");
return Streams.just(greeting);
});
server.start();
我们还更新了 TCP 和 ZeroMQ 支持,以更好地利用我们对Stream
所做的重要更改。最重要的是,TCP 服务器和客户端利用 Reactive Streams 背压支持来实现诸如“stop-read”之类的模式,以防止服务器通过在有资源可用处理之前从客户端读取过多数据来压垮下游处理。
在发布 Reactor 2.0 GA 之前,我们将至少再发布一个 RC 版本。在我们对它的可预测性感到满意之前,我们还需要对复杂的 fork/join 调度进行一些调整。我们可能还希望对 HTTP 支持进行一些补充,因为第一个版本只是一组相当简单的功能。我们也可能会在极端情况下发现一些错误。
我们对这个候选版本感觉很好,我们鼓励您尝试一下。如果您正在进行新的开发,我们绝对鼓励您基于 Reactor 2.0 的 Reactive Streams 基础进行构建,而不是使用 1.1 中功能较弱的、Reactive Streams 之前的版本。如果您正在升级现有的 Reactor 代码,路径实际上非常简单。在几乎所有情况下,您的代码都将得到极大的简化。
如果您在升级代码时遇到问题,或者只是对如何使用 Reactor 来解决您的快速数据问题有任何疑问,请随时在Reactor Framework Google Group上提问。
我们也欢迎通过GitHub 上的 pull requests进行社区贡献。
您可能还想知道,Reactive Streams 项目正在考虑以新的java.util.concurrent.Flow
类和相应的内部类形式包含在 JDK 9 中。关于此主题的讨论正在由纽约州奥斯威戈州立大学的Doug Lea教授管理的JSR-166 concurrency-interest邮件列表中进行。
Reactor 采用 Apache 2.0 许可证,项目通过 GitHub 管理。