Reactor 2.0.0.M1 发布,集成 Reactive Streams!

工程 | Jon Brisbin | 2014 年 10 月 21 日 | ...

坦率地说,Reactor 团队对终于能够宣布 Reactor 2.0 的第一个里程碑版本感到非常兴奋!此更新包括在完全重写的 StreamPromise API 中实现了一个完全兼容的 Reactive Streams!这对 Reactor 用户来说是一个巨大的进步。它开启了与其他 Reactive Streams 实现(如 Akka StreamsRatpackRxJava 等)的集成。Reactor 提供了一个坚实的基础,可以构建现代 #uberfastdata 应用程序,这些应用程序具有苛刻的高吞吐量和低延迟要求。

Stream 和 Promise

Reactor 2.0 中最主要的更改是 Stream API。事实上,在 1.1 和 2.0 之间,代码库的大多数其他部分要么只是稍微改进,要么保持不变。StreamPromise 并非如此。这些组件已从头开始完全重写,以利用 Reactive Streams 规范在功能性反应式流管道中提供完全非阻塞的反压。

什么是反应式系统中的反压?

Akka 知名人士 Roland Kuhn 博士对这一话题进行了非常雄辩的阐述,如果您有兴趣探索非阻塞反压背后的原因,我们建议您观看他关于此主题的会议演讲,其中大部分都可以在 YouTube 上找到

太长不看版

反压是发布者和订阅者关系的反转,其中 Subscriber 会告诉 Publisher “给我接下来的 N 个可用项”,而不是 Publisher 告诉 Subscriber “无论你是否能处理,都接收我所有的这些项”。由于 Publisher 正在被动地向 Subscriber 提供数据元素,而不是相反,因此(在完全的 Reactive Streams 管道中)无需缓冲数据,因为您永远不会拥有超出处理能力的数据量。实际上,需要进行一些缓冲或排队,但像 Reactor 这样的库可以消除您对如何实现此操作的担忧,因此您可以编写完全响应式的代码,以响应数据变得可用时的状态,而不是试图找出必须使用什么神奇组合的 BlockingQueue 或其他类型的低效方案来确保异步组件彼此正确隔离。

Reactor 实现 Reactive Streams

Reactor 团队投入了大量时间来实现 Reactive Streams 规范的全面实现。Reactor 的 Stream 组件为您提供了有用且易于理解的挂钩,您可以将您的业务逻辑挂在上面,以便您只需要关注编写适当范围的功能组件来响应单个数据元素,而不必用大量样板逻辑来处理将数据从一个线程传递到另一个线程、执行有界排队和缓冲以及在处理反应式异步组件时通常需要的其他杂项任务。

您可以在 Reactive Geocoder 演示 中找到代码示例,Reactor 团队在今年德克萨斯州达拉斯举行的 SpringOne 上讨论了此演示(重播 可供 SpringOne2GX 2014 参会者使用,并将在稍后时间公开发布)。

下面是一个小片段,它展示了如何创建一个新的 Stream,将业务逻辑附加到它,然后将数据发布到它。

// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);

helloStream.map(s -> "Hello " + s + "!")
           .consume(log::info);

helloStream.broadcastNext("World");

当您运行此代码时,您将看到已记录文本“Hello World!”。您还应该注意到日志记录已来自 RingBuffer 线程,而不是来自您的主线程。换句话说,您刚刚向另一个线程提交了一个任务以异步执行,将结果转换为其他内容,然后使用 Reactive Streams 非阻塞、按需反压来响应结果,而无需任何类型的基于 Future 的嘈杂阻塞代码!

您还可以创建“冷”流,这与使用 RxJava 的 Observable 非常相似。

// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");

helloStream.map(s -> "Hello " + s + "!")
           .consume(log::info);

当您运行此代码时,您将看到已记录文本“Hello World!”,与前面的示例类似。这里的区别在于我们从未调用过 broadcastNext(String) 方法,因为当我们附加我们的 Consumer<String> 时,它已为我们处理了。您可以像创建 RxJava Observable 一样,从任何值或值的集合创建流。这使您可以将标准 Java 集合 API 与反应式流 API 混合使用。

Stream 是新的潮流

Spark、Storm 和其他大数据库等流式 API 证明,在没有无限资源的系统(基本上是我们云中运行的任何东西)上运行时,以更具功能性和反应性的方式处理数据效率更高,并且(在许多情况下)更易于理解,这归因于用于构建处理管道的 DSL 的声明性、自文档化特性。当您将业务逻辑提炼到其本质时,您确实会注意到,没有多少事物无法表示为转换或使用者函数。您要么接收输入并产生输出,要么只是接收输入。Reactor 的 Stream API 深植于这种范式,因此为您提供了大量(谁能告诉我这个电影参考:“你会说我有很多彩罐吗?”)处理数据通过管道传递时的选项。除了像 map(Function<T,V>)filter(Predicate<T>) 这样的简单函数之外,还有更复杂的选项,如 buffer(int)buffer(int, long, TimeUnit)。后者提供了极其有用的基于长度和时间的“微批处理”。例如,要微批处理一组昂贵的数据库更新以通过 WAN 连接发送,您可能希望将其缓冲,直到您拥有设定的数量或达到某个超时时间为止。

// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());

updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
            .consume(driver::batchUpdate);

这将收集流式更新,直到收集到 1024 个更新或 350 毫秒过期,以先发生者为准。然后,它将通过传递 List<Update>(包含 1024 个元素或在 350 毫秒内收集到的任何元素)来触发下游处理。这使您可以编写非常高效的系统,这些系统以批处理方式处理大量数据,以最大程度地减少网络带宽使用并最大化吞吐量(同时仍然保持可预测的延迟)。

除了微批处理之外,Stream 还提供了 filterflatMapmovingBufferjoinmergesamplesort 和许多其他操作,这些操作大多不言自明。与 Scala 的集合 API 或 RxJava 的 Observable 类似,Reactor 的 Stream 提供了功能性和反应式的方式来快速、高效且以极高的容量处理数据,同时保持可预测的低延迟。毫不夸张地说,您可以使用 Stream 作为基础组件来编写整个应用程序,该组件用于提交异步任务以供执行,以及以反应式方式处理传统数据集合——然后通过将实时数据与历史数据结合来混合这两种方法。

并行处理

有时需要将数据流拆分为并行管道以进行并发处理。Reactor 的 Stream 提供了一种非常方便的方法来使用 parallel(int) 操作来实现这一点。您只需将您的业务逻辑附加到 parallel 调用后提供的 Stream,数据将在下游管道之间进行循环分配以进行并发处理。

HotStream<String> stream = Streams.defer(env);

// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info));

这里有一个关于Reactive Streams实现如何在代码中体现的有趣示例:当你运行它时,你不会得到任何输出。.parallel()操作不会在管道上创建“需求”。在Reactive Streams系统中,是管道的末端拉取数据进入操作,而不是生产者推送数据。由于此管道末端没有终端操作,因此数据无法被拉取通过。实际上,这通常不是问题,因为在实际应用中你确实希望处理数据。在这个例子中,我们可以在.parallel()之后添加一个.drain()调用来产生需求并拉取数据通过。我们可能不会在生产系统中这样做,但对于测试和演示,我们可以通过一个drain轻松解决。

stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info))
      .drain();

发生了什么,为什么我没有看到我期望的结果?

在响应式系统中,有时很难理解为什么事情没有按预期工作。虽然库在改进IDE内部异步流的实时调试方面做不了太多,但始终存在久经考验且行之有效的方法:大量日志记录。Reactor添加了几个隐藏的方法,称为.debug().log(),它们应该可以帮助你弄清楚流是如何构建的以及它们在做什么。.debug()方法将提供流连接方式的输出。它将显示哪些操作连接到哪些操作,以及每个操作当前可用的容量。.log()方法将日志记录操作附加到你的流,并输出订阅和发布事件。

如果我们在上面示例中的.parallel()之前添加一个.log()调用,我们将获得额外的日志记录来告诉我们发生了什么。

stream.log()
      .parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info))
      .drain();

将产生

[ringBuffer-1] INFO  r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0}
[main] INFO  r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188}
[ringBuffer-1] INFO  r.r.a.LoggerAction - request: 9223372036854775807
[ringBuffer-1] INFO  r.r.a.LoggerAction - onNext: World
[ringBufferGroup-2] INFO  r.r.StreamTests - Hello World!

工件

要将现有应用程序升级到Reactor 2.0,你可能只需要调整一些内容。如果你在Reactor 1.1中使用流,你会发现Reactor 2.0流在发布值方面有所不同。.broadcastNext()方法定义在Action子类和HotStream上,但不在某些其他操作上。Reactor 1.1使用Deferred发布值,因此你的代码需要进行调整,将发布者类型更改为可以访问.broadcastNext()方法的内容。如果你使用的是普通的Reactor或基于Spring和注解的事件处理,则几乎无需更改任何内容。

要访问里程碑工件,请在您选择的构建系统中使用http://repo.spring.io/libs-milestone存储库。例如,如果使用Gradle(当然你正在使用Gradle,对吧?),只需像这样配置你的repositories

repositories {
  maven { url 'http://repo.spring.io/libs-milestone' }
  mavenCentral()
}

要报告错误、跟踪Reactor 2.0的开发、阅读Wiki或以其他方式参与Reactor社区,请访问Reactor的GitHub主页:https://github.com/reactor/reactor。你也可以在这里在线阅读JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html

获取Spring新闻

与Spring新闻保持联系

订阅

领先一步

VMware提供培训和认证来加速你的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部