Reactor 2.0.0.M1 发布,集成了 Reactive Streams!

工程 | Jon Brisbin | 2014年10月21日 | ...

Reactor 团队非常兴奋,终于能够宣布 Reactor 2.0 的一个初步里程碑版本!此更新在完全重写的 StreamPromise API 中包含了完全兼容 Reactive Streams 的实现!这对于 Reactor 用户来说是巨大的一步。它打开了与 Akka Streams, Ratpack, RxJava 等其他 Reactive Streams 实现集成的可能性。Reactor 提供了一个坚实的基础,可以在此基础上构建具有高吞吐量和低延迟要求的现代 #uberfastdata 应用。

Stream 和 Promise

Reactor 2.0 的主要变化是 Stream API。事实上,代码库的大部分其他部分在 1.1 和 2.0 之间只是进行了轻微改进或保持不变。但 StreamPromise 则不同。这些组件已从头完全重写,以利用 Reactive Streams 规范,在功能性响应式流管道中提供完全非阻塞的背压。

响应式系统中的背压是什么?

Akka 的著名人物 Dr. Roland Kuhn 在这个问题上发表过非常雄辩的演讲,如果您有兴趣探索非阻塞背压背后的原因,我们鼓励您观看他关于该主题的会议演示,其中大部分可在 YouTube 上找到

长话短说 (TL;DR)

背压是发布者和订阅者关系的一种反转,其中 SubscriberPublisher 说“给我接下来的 N 个可用项”,而不是 PublisherSubscriber 说“无论你是否能够处理,都把这些项全部拿走”。由于 Publisher 是被动地向 Subscriber 提供数据元素,而不是反过来,因此(在完全 Reactive Streams 管道中)没有必要缓冲数据,因为流入的数据永远不会超过你能够处理的量。实际上,一些缓冲或排队是必要的,但像 Reactor 这样的库消除了你担心如何实现这一点的需要,这样你就可以编写完全响应式的代码,随着数据变得可用而做出响应,而不是试图找出必须采用哪种神奇的 BlockingQueue 或其他低效方案的组合,以确保异步组件彼此正确隔离。

Reactor 实现 Reactive Streams

Reactor 团队投入了数量惊人的时间来全面实现 Reactive Streams 规范。Reactor 的 Stream 组件为您提供了有用且易于理解的钩子,您可以将业务逻辑挂在上面,这样您只需关注编写适当范围的功能组件,该组件将响应单个数据元素,而无需用大量样板式的逻辑来处理数据从一个线程传递到另一个线程、执行有界队列和缓冲以及其他在处理响应式异步组件时通常必要的各种任务。

关于这在您的代码中如何体现的示例,可以在 Reactive Geocoder Demo 中找到,Reactor 团队在今年德克萨斯州达拉斯举行的 SpringOne 会议上讨论了该示例(回放可在 InfoQ 上供 SpringOne2GX 2014 参会者观看,稍后将对公众开放)。

以下是一个小片段,展示了如何创建新的 Stream,为其附加业务逻辑,然后向其发布数据。

// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);

helloStream.map(s -> "Hello " + s + "!")
           .consume(log::info);

helloStream.broadcastNext("World");

当您运行此代码时,您将看到日志中打印出“Hello World!”文本。您还应该注意到日志是从 RingBuffer 线程进行的,而不是从您的主线程。换句话说,您刚刚提交了一个任务到另一个线程进行异步执行,将其结果转换为其他内容,然后使用 Reactive Streams 非阻塞、基于需求的背压响应了结果,没有任何基于 Future 的嘈杂阻塞代码!

您也可以创建“冷”流,这与使用 RxJava 的 Observable 非常相似。

// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");

helloStream.map(s -> "Hello " + s + "!")
           .consume(log::info);

当您运行此代码时,您将看到日志中打印出“Hello World!”文本,与上一个示例类似。这里的区别在于我们无需调用 broadcastNext(String) 方法,因为当我们附加 Consumer<String> 时,这一切都已为我们处理。您可以从任何值或值的集合创建流,就像创建 RxJava Observable 一样。这使您可以将标准的 Java Collection API 与响应式流 API 混合使用。

Stream 是新潮流(黑就是新潮)

像 Spark、Storm 和其他大数据库这样的流 API 证明,在资源有限的系统(基本上就是我们在云中运行的任何东西)上运行时,以更具功能性和响应式的方式处理数据更有效,并且由于用于构建处理管道的 DSL 具有声明性和自文档化的特性,在许多情况下也更容易理解。当你将业务逻辑提炼到其本质时,你会注意到很少有事物不能表达为转换或消费函数。你要么接受输入并产生输出,要么只接受输入。Reactor 的 Stream API 根植于这种范式,因此提供了丰富(谁能告诉我这是哪部电影台词:“你会说我有很多皮纳塔吗?”)的选择,用于处理流经管道的数据。除了像 map(Function<T,V>)filter(Predicate<T>) 这样的简单函数外,还有更复杂的选项,如 buffer(int)buffer(int, long, TimeUnit)。后者提供了极其有用的基于长度和时间的“微批处理”。例如,对于通过 WAN 连接发送开销较大的数据库更新集,您可能希望缓冲它们,直到收集到设定的数量或经过一定的超时时间。

// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());

updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
            .consume(driver::batchUpdate);

这将收集流式更新,直到收集到 1024 个或者经过 350 毫秒,以先到者为准。然后,它将通过传递一个包含 1024 个元素或在 350 毫秒内收集到的元素数量的 List<Update> 来触发下游处理。这使您可以编写非常高效的系统,以批处理方式处理大量数据,以最大程度地减少网络带宽使用并最大化吞吐量(同时仍保持可预测的延迟)。

除了微批处理,Stream 还提供了诸如 filter, flatMap, movingBuffer, join, merge, sample, sort 等许多其他操作,这些操作大多不言自明。很像 Scala 的集合 API 或 RxJava 的 Observable,Reactor 的 Stream 提供功能性和响应式的方法,以快速、高效、高吞吐量地处理数据,同时保持可预测的低延迟。毫不夸张地说,您可以使用 Stream 作为基础组件来编写整个应用程序,该组件用于提交异步任务执行,并以响应式方式处理传统数据集合——然后通过结合实时数据和历史数据来混合这两种方法。

并行处理

有时需要将数据流分割成并行管道进行并发处理。Reactor 的 Stream 通过 parallel(int) 操作提供了极其便捷的方式来实现这一点。您只需将业务逻辑附加到 parallel 调用后提供的 Stream 上,数据就会在下游管道之间进行轮询,以进行并发处理。

HotStream<String> stream = Streams.defer(env);

// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info));

以下是一个有趣的 Reactive Streams 实现示例,展示了它在您的代码中如何体现:当您运行此代码时,您不会得到任何输出。.parallel() 操作不会在管道上创建“需求”。在 Reactive Streams 系统中,是由管道的末端将数据拉入操作,而不是由生产者推送。由于此管道末端没有终端操作,数据无法被拉出。在实际应用中,这通常不是问题,因为您实际上希望处理数据。在此示例中,我们只需在 .parallel() 之后添加一个 .drain() 调用即可产生需求并将数据拉出。我们在生产系统中不太可能这样做,但在测试和演示中,我们可以通过 drain 轻松解决问题。

stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info))
      .drain();

发生了什么?为什么我没有看到预期的结果?

对于响应式系统,有时很难理解为什么事情不像预期的那样工作。虽然库在改善 IDE 内异步流的实时调试过程方面能做的事情不多,但总有屡试不爽的方法,那就是大量的日志记录。Reactor 添加了两个有点隐藏的方法,名为 .debug().log(),它们应该能帮助您弄清楚流是如何构建的以及它们正在做什么。.debug() 方法将输出流的连接方式。它会显示哪些操作连接到哪些操作,以及每个操作当前可用的容量。.log() 方法会为您的流附加一个日志操作,并输出订阅和发布事件。

如果在上面的示例中,我们在 .parallel() 调用之前添加一个 .log() 调用,我们将获得额外的日志信息,告诉我们正在发生什么。

stream.log()
      .parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
                                      .consume(log::info))
      .drain();

将产生

[ringBuffer-1] INFO  r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0}
[main] INFO  r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188}
[ringBuffer-1] INFO  r.r.a.LoggerAction - request: 9223372036854775807
[ringBuffer-1] INFO  r.r.a.LoggerAction - onNext: World
[ringBufferGroup-2] INFO  r.r.StreamTests - Hello World!

构件 (Artifacts)

要将现有应用程序升级到 Reactor 2.0,您可能只需要调整几处。如果您在 Reactor 1.1 中使用了 stream,您会发现 Reactor 2.0 的 stream 在值的发布方式上有所不同。.broadcastNext() 方法定义在 Action 子类和 HotStream 上,而不是其他一些操作上。Reactor 1.1 使用 Deferred 发布值,因此您的代码需要进行调整,将发布者类型更改为可以访问 .broadcastNext() 方法的类型。如果您使用普通的 Reactor 或基于 Spring 和注解的事件处理,您几乎不需要更改任何东西。

要访问里程碑构件,请在您选择的构建系统中使用 http://repo.spring.io/libs-milestone 仓库。例如,如果使用 Gradle(当然您在使用 Gradle,对吧?),只需像这样配置您的 repositories

repositories {
  maven { url 'http://repo.spring.io/libs-milestone' }
  mavenCentral()
}

要报告 bug、关注 Reactor 2.0 的开发、阅读 wiki 或以其他方式参与 Reactor 社区,请访问 Reactor 的 GitHub 主页:https://github.com/reactor/reactor。您还可以在线阅读 JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html

获取 Spring 新闻简报

订阅 Spring 新闻简报,保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一个简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部