领先一步
VMware 提供培训和认证,助您加速进步。
了解更多Reactor 团队非常兴奋地宣布 Reactor 2.0 的初始里程碑版本!此更新包括在完全重写的 Stream 和 Promise API 中实现完全兼容的 Reactive Streams!这对 Reactor 用户来说是一个巨大的进步。它开启了与其他 Reactive Streams 实现(如 Akka Streams、Ratpack、RxJava 等)的集成。Reactor 为构建具有高吞吐量和低延迟要求的现代 #uberfastdata 应用程序提供了坚实的基础。
Reactor 2.0 的主要变化是 Stream API。事实上,1.1 和 2.0 之间,代码库的其他大部分要么只是略有改进,要么保持不变。但 Stream 和 Promise 则不同。这些组件已经完全重写,从头开始利用 Reactive Streams 规范,在功能性响应式流管道中提供完全非阻塞的背压。
Akka 创始人 Roland Kuhn 博士对此话题有过非常精彩的阐述,如果您有兴趣了解非阻塞背压背后的原因,我们鼓励您观看他在会议上的相关演讲,其中大部分都可以在 YouTube 上找到。
TL;DR
背压是发布者和订阅者关系的一种反转,其中 Subscriber 对 Publisher 说“给我接下来的 N 个可用项”,而不是 Publisher 对 Subscriber 说“无论你是否能处理,都拿走我所有的这些项”。由于 Publisher 是被动地向 Subscriber 提供数据元素,而不是反过来,因此(在完全 Reactive Streams 管道中)不需要缓冲数据,因为你永远不会有超过你所能处理的数据。实际上,一些缓冲或排队是必要的,但像 Reactor 这样的库让你无需担心如何实现这一点,这样你就可以编写完全响应式的代码,在数据可用时对其进行响应,而无需费心找出 BlockingQueue 或其他低效方案的神秘组合,以确保异步组件彼此正确隔离。
Reactor 团队投入了大量令人沮丧的时间,实现了 Reactive Streams 规范的全面实现。Reactor 的 Stream 组件为您提供了有用且易于理解的挂钩,您可以在其上挂载您的业务逻辑,这样您只需关注编写适当作用域的功能组件,该组件将响应单个数据元素,而无需用大量样板式的物流代码来处理数据在不同线程之间的传递、执行有界队列和缓冲以及处理响应式异步组件通常所需的各种其他任务。
您可以在响应式地理编码器演示中找到一个代码示例,Reactor 团队在今年的达拉斯 SpringOne 会议上讨论了该演示(回放可在 InfoQ 上供 SpringOne2GX 2014 与会者观看,稍后将公开)。
以下是一个小片段,展示了如何创建一个新的 Stream,将业务逻辑附加到它,然后将数据发布到其中。
// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
helloStream.broadcastNext("World");
当你运行这段代码时,你将看到“Hello World!”的文本被记录。你还应该注意到日志是从 RingBuffer 线程而不是你的主线程进行的。换句话说,你刚刚提交了一个任务到另一个线程异步执行,将结果转换为其他内容,然后使用 Reactive Streams 非阻塞、基于需求的背压响应结果,而没有任何嘈杂的基于 Future 的阻塞代码!
您也可以创建“冷”流,这与使用 RxJava 的 Observable 非常相似。
// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
运行此代码时,您会看到“Hello World!”文本被记录,与前面的示例类似。这里的区别在于我们从未调用 broadcastNext(String) 方法,因为当我们附加 Consumer<String> 时,它已经为我们处理了。您可以像创建 RxJava Observable 一样,从任何值或值集合创建流。这使您可以将标准 Java Collection API 与响应式流 API 结合使用。
像 Spark、Storm 和其他大数据库这样的流式 API 证明,在资源有限的系统(基本上是我们所有在云中运行的系统)上,以更函数式和响应式的方式处理数据效率更高,而且由于用于构建处理管道的 DSL 具有声明式、自文档的特性,因此(在许多情况下)也更容易理解。当你将业务逻辑提炼到其本质时,你确实会注意到没有多少事情不能表达为转换或消费者函数。你要么接收输入并产生输出,要么只是接收输入。Reactor 的 Stream API 沉浸在这种范式中,因此为您提供了大量的选项(谁能告诉我这个电影引用:“你认为我有大量的皮纳塔吗?”)来处理数据,因为它通过您的管道。除了像 map(Function<T,V>) 和 filter(Predicate<T>) 这样的简单函数之外,还有更复杂的选项,如 buffer(int) 或 buffer(int, long, TimeUnit)。后者提供了极其有用的基于长度和时间的“微批处理”。例如,为了微批处理一组发送到广域网连接代价高昂的数据库更新,您可能希望将它们缓冲起来,直到达到设定数量或经过一定超时。
// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());
updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
.consume(driver::batchUpdate);
这将收集流式更新,直到收集到 1024 个更新或 350 毫秒已过去,以先发生者为准。然后,它将通过传递一个包含 1024 个元素或 350 毫秒内收集到的元素数量的 List<Update> 来触发下游处理。这使您能够编写非常高效的系统,以批处理方式处理大量数据,以最小化网络带宽使用并最大化吞吐量(同时仍保持可预测的延迟)。
除了微批处理,Stream 还提供了诸如 filter、flatMap、movingBuffer、join、merge、sample、sort 等许多自解释的操作。与 Scala 的集合 API 或 RxJava 的 Observable 非常相似,Reactor 的 Stream 提供了功能性和响应式的方式来快速、高效、以极高的吞吐量处理数据,同时保持可预测的低延迟。毫不夸张地说,您可以使用 Stream 作为基础组件来编写整个应用程序,它既可以用于提交异步任务执行,也可以以响应式方式处理传统数据集合——然后通过结合实时数据和历史数据来混合这两种方法。
有时需要将数据流拆分为并行管道以进行并发处理。Reactor 的 Stream 通过 parallel(int) 操作提供了极其便捷的方式。您只需将业务逻辑附加到 parallel 调用后提供的 Stream,数据将在下游管道之间进行轮询,以进行并发处理。
HotStream<String> stream = Streams.defer(env);
// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info));
这里有一个有趣的 Reactive Streams 实现示例,它在您的代码中展现出来:当您运行这段代码时,您将不会得到任何输出。.parallel() 操作不会在管道上创建“需求”。在 Reactive Streams 系统中,是管道的末端将数据拉入操作,而不是生产者将其推出。由于此管道的末端没有终端操作,因此数据无法被拉过。实际上,这通常不是问题,因为在实际应用程序中您确实想要处理数据。在此示例中,我们可以在 .parallel() 之后添加 .drain() 调用来产生需求并拉取数据。我们可能不会在生产系统中使用这种方法,但对于测试和演示,我们可以通过 drain 轻松实现。
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
使用响应式系统时,有时会很沮丧,不明白为什么事情没有按预期工作。虽然库无法在 IDE 中更好地调试异步流方面做太多事情,但总有经过验证的“大量日志记录”方法。Reactor 添加了几个有点隐藏的方法,称为 .debug() 和 .log(),它们应该可以帮助您弄清楚您的流是如何构建以及它们正在做什么。.debug() 方法将为您提供流如何连接的输出。它将显示哪些操作连接到哪些操作,以及每个操作中当前可用的容量。.log() 方法将日志记录操作附加到您的流并输出订阅和发布事件。
如果我们在上面的示例中,在 .parallel() 之前添加一个 .log() 调用,我们将获得额外的日志,告诉我们发生了什么。
stream.log()
.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
将产生
[ringBuffer-1] INFO r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0}
[main] INFO r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188}
[ringBuffer-1] INFO r.r.a.LoggerAction - request: 9223372036854775807
[ringBuffer-1] INFO r.r.a.LoggerAction - onNext: World
[ringBufferGroup-2] INFO r.r.StreamTests - Hello World!
要将现有应用程序升级到 Reactor 2.0,您可能只需调整几处。如果您在 Reactor 1.1 中使用流,您会发现 Reactor 2.0 流在值发布方面有所不同。.broadcastNext() 方法在 Action 子类和 HotStream 上定义,但不在其他一些操作上。Reactor 1.1 使用 Deferred 来发布值,因此您的代码需要调整,将发布者类型更改为可以访问 .broadcastNext() 方法的类型。如果您使用的是普通的 Reactor 或基于 Spring 和注解的事件处理,则几乎不需要更改任何内容。
要访问里程碑工件,请在您选择的构建系统中使用 http://repo.spring.io/libs-milestone 存储库。例如,如果您正在使用 Gradle(当然您正在使用 Gradle,对吧?),只需像这样配置您的 repositories 块:
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
要报告错误、关注 Reactor 2.0 的开发、阅读 wiki 或以其他方式参与 Reactor 社区,请访问 Reactor 的 GitHub 主页:https://github.com/reactor/reactor。您还可以在线阅读 JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html