领先一步
VMware 提供培训和认证,助您加速进步。
了解更多在本文中,我们将继续响应式编程系列,重点通过实际的代码示例来解释一些概念。最终结果应该能让你更好地理解响应式编程的不同之处,以及它的函数式特性。这里的例子相当抽象,但它们提供了一种思考 API 和编程风格的方式,让你开始感受它的不同。我们将看到响应式编程的元素,并学习如何控制数据流,以及必要时如何在后台线程中处理数据。
我们将使用 Reactor 库来阐明我们需要说明的要点。代码也可以很轻易地用其他工具编写。如果你想尝试代码并查看其运行情况,而无需复制粘贴任何内容,可以在 Github 上找到包含测试的工作示例。
首先,从 https://start.spring.io 获取一个空白项目,并添加 Reactor Core 依赖。使用 Maven 如下:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.0.RC2</version>
</dependency>
使用 Gradle 也非常相似:
compile 'io.projectreactor:reactor-core:3.0.0.RC2'
现在我们来编写一些代码。
响应式编程的基本构建块是事件序列,以及两个主角:发布者和这些事件的订阅者。将序列称为“流”也可以,因为它确实是流。如果需要,我们将使用小写字母“s”的“stream”,但 Java 8 有一个不同的 java.util.Stream,所以尽量不要混淆。无论如何,我们将尝试将叙述集中在发布者和订阅者上(这就是 Reactive Streams 所做的)。
Reactor 是我们将在示例中使用的库,所以我们将坚持其表示法,并称发布者为 Flux(它实现了 Reactive Streams 中的 Publisher 接口)。RxJava 库非常相似,并且具有许多并行功能,因此在这种情况下,我们将讨论 Observable,但代码会非常相似。(Reactor 2.0 称之为 Stream,如果我们需要同时讨论 Java 8 Streams,这会造成混淆,所以我们只使用 Reactor 3.0 中的新代码。)
Flux 是特定 POJO 类型事件序列的发布者,因此它是泛型的,即 Flux<T> 是 T 的发布者。Flux 有一些静态便利方法可以从各种来源创建自身实例。例如,从数组创建 Flux:
Flux<String> flux = Flux.just("red", "white", "blue");
我们刚刚生成了一个 Flux,现在我们可以用它做一些事情。实际上,你只能做两件事:操作它(转换它,或与其他序列组合),订阅它(它是一个发布者)。
你经常会遇到一个你知道只包含一个或零个元素的序列,例如按 ID 查找实体的存储库方法。Reactor 有一个 Mono 类型,表示单值或空的 Flux。Mono 具有与 Flux 非常相似的 API,但更专注于此,因为并非所有操作符都对单值序列有意义。RxJava 也有一个附加组件(在 1.x 版本中)称为 Single,以及一个用于空序列的 Completable。Reactor 中的空序列是 Mono<Void>。
Flux 有*很多*方法,几乎所有这些方法都是操作符。我们不会在这里全部查看它们,因为有更好的地方可以查找(例如 Javadoc)。我们只需要了解操作符是什么以及它能为你做什么。
例如,要将 Flux 内部事件记录到标准输出,你可以调用 .log() 方法。或者你可以使用 map() 来转换它:
Flux<String> flux = Flux.just("red", "white", "blue");
Flux<String> upper = flux
.log()
.map(String::toUpperCase);
在这段代码中,我们通过将输入中的字符串转换为大写来对其进行转换。到目前为止,一切都很简单。
关于这个小样本有趣的是——如果你不习惯的话,甚至令人震惊——还没有处理任何数据。甚至没有记录任何内容,因为实际上什么都没有发生(试试看你就知道了)。在 Flux 上调用操作符相当于为以后构建了一个执行计划。它是完全声明性的,这也是为什么人们称之为“函数式”的原因。操作符中实现的逻辑只有在数据开始流动时才执行,而这只有在有人订阅 Flux(或等效地订阅 Publisher)时才会发生。
所有响应式库以及 Java 8 Streams 中都存在这种处理数据序列的声明式函数式方法。考虑这段类似的代码,使用与 Flux 内容相同的 Stream:
Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
System.out.println(value);
return value.toUpperCase();
});
我们对 Flux 的观察同样适用于这里:没有数据被处理,它只是一个执行计划。然而,Flux 和 Stream 之间存在一些重要的差异,这使得 Stream 不适用于响应式用例。Flux 有更多的操作符,其中大部分只是为了方便,但真正的区别在于你想消费数据时,所以接下来我们需要看看这一点。
提示
Sebastien Deleuze 有一篇关于响应式类型的有用博客,他在其中通过查看它们定义的类型以及如何使用它们来描述各种流式和响应式 API 之间的差异。Flux 和 Stream 之间的差异在那里得到了更详细的强调。
要让数据流动,你必须使用 subscribe() 方法之一订阅 Flux。只有这些方法才能让数据流动。它们会回溯到你在序列上声明的操作符链(如果有的话),并请求发布者开始创建数据。在我们一直在使用的示例中,这意味着遍历底层字符串集合。在更复杂的用例中,它可能会触发从文件系统读取文件、从数据库拉取数据或调用 HTTP 服务。
以下是 subscribe() 方法的实际调用:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe();
输出为:
09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()
由此我们可以看出,不带参数的 subscribe() 的效果是请求发布者发送*所有*数据——只记录了一个 request(),它是“无限制的”。我们还可以看到发布每个项目(onNext())、序列结束(onComplete())和原始订阅(onSubscribe())的回调。如果你需要,你可以使用 Flux 中的 doOn*() 方法自己监听这些事件,这些方法本身是操作符,而不是订阅者,所以它们不会自行导致任何数据流动。
subscribe() 方法被重载,其他变体为你提供了不同的选项来控制发生的事情。一种重要且方便的形式是带有回调作为参数的 subscribe()。第一个参数是一个 Consumer,它为你提供了每个项目的回调,你还可以选择添加一个 Consumer 用于错误(如果有的话),以及一个普通的 Runnable 以在序列完成时执行。例如,只使用每个项目的回调:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(System.out::println);
以下是输出:
09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()
我们可以通过多种方式控制数据流,使其“有界”。用于控制的原始 API 是从 Subscriber 获取的 Subscription。上述对 subscribe() 的短调用等效的完整形式是:
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
为了控制流,例如一次最多消费2个项目,你可以更智能地使用 Subscription:
.subscribe(new Subscriber<String>() {
private long count = 0;
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(2);
}
@Override
public void onNext(String t) {
count++;
if (count>=2) {
count = 0;
subscription.request(2);
}
}
...
这个 Subscriber 正在“批量”处理项目,每次2个。这是一个常见的用例,所以你可能会考虑将实现提取到一个便利类中,这样代码也会更具可读性。输出如下:
09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()
事实上,批量订阅者是一个非常常见的用例,以至于 Flux 中已经提供了便利方法。上面的批量处理示例可以这样实现:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(null, 2);
(注意带有请求限制的 subscribe() 调用)。这是输出:
10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()
提示
像 Spring Reactive Web 这样为你处理序列的库可以处理订阅。能够将这些关注点下推到堆栈中是件好事,因为这可以让你避免代码中充斥着非业务逻辑,使其更具可读性,更易于测试和维护。因此,通常来说,如果你能**避免订阅**序列,或者至少将该代码推入处理层,并从业务逻辑中移除,这是一件好事。
上述所有日志的一个有趣特点是它们都在“主”线程上,也就是调用 subscribe() 的线程。这突出了一点:Reactor 对线程非常节俭,因为这为你提供了获得最佳性能的最大机会。如果你在过去 5 年里一直在处理线程和线程池以及异步执行,试图从服务中榨取更多性能,这可能是一个令人惊讶的说法。但这是真的:在没有强制切换线程的情况下,即使 JVM 经过优化以非常高效地处理线程,在单个线程上进行计算也总是更快。Reactor 已经将控制所有异步处理的权限交给了你,它假定你知道自己在做什么。
Flux 提供了一些配置方法来控制线程边界。例如,你可以使用 Flux.subscribeOn() 配置在后台线程中处理订阅:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.subscribe(null, 2);
结果可以在输出中看到:
13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
提示
如果你自己编写这段代码,或者复制粘贴它,请记住在 JVM 退出之前等待处理停止。
请注意,订阅和所有处理都在单个后台线程“parallel-1-1”上进行——这是因为我们要求主 Flux 的订阅者在后台运行。如果项目处理是 CPU 密集型的,这很好(但实际上在后台线程中没有意义,因为你支付了上下文切换的代价,但并没有更快地获得结果)。你可能还希望能够执行 I/O 密集型且可能阻塞的项目处理。在这种情况下,你希望尽可能快地完成它,而不会阻塞调用者。线程池仍然是你的朋友,这就是你从 Schedulers.parallel() 中获得的。要将单个项目的处理切换到单独的线程(最多达到池的限制),我们需要将它们分解为单独的发布者,并为每个发布者请求在后台线程中获取结果。实现此目的的一种方法是使用名为 flatMap() 的操作符,它将项目映射到 Publisher(可能类型不同),然后映射回新类型的序列:
Flux.just("red", "white", "blue")
.log()
.flatMap(value ->
Mono.just(value.toUpperCase())
.subscribeOn(Schedulers.parallel()),
2)
.subscribe(value -> {
log.info("Consumed: " + value);
})
这里要注意 flatMap() 的使用,它将项目下推到“子”发布者中,我们可以在其中控制每个项目的订阅,而不是整个序列的订阅。Reactor 内置的默认行为是尽可能长时间地挂在一个线程上,所以如果我们要它在后台线程中处理特定的项目或项目组,我们需要明确指定。实际上,这是少数几种公认的强制并行处理的技巧之一(有关更多详细信息,请参阅 Reactive Gems 问题)。
输出看起来像这样:
15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
请注意,现在有多个线程正在消费项目,并且 flatMap() 中的并发提示确保在任何给定时间都有 2 个项目正在处理,只要它们可用。我们看到很多 request(1),因为系统试图在管道中保持 2 个项目,并且它们通常不会同时完成处理。事实上,Reactor 在这里试图非常智能,它会从上游 Publisher 预取项目,以尝试消除订阅者的等待时间(我们在这里没有看到这一点,因为数量很低——我们只处理了 3 个项目)。
提示
三个项目(“红色”、“白色”、“蓝色”)可能太少,无法令人信服地看到不止一个后台线程,因此最好生成更多数据。例如,你可以使用随机数生成器来做到这一点。
Flux 还有一个 publishOn() 方法,它的作用相同,但用于监听器(即 onNext() 或消费者回调),而不是订阅者本身:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
log.info("Consumed: " + value);
});
输出看起来像这样:
15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
请注意,消费者回调(记录“Consumed: …”)位于发布者线程 pub-1-1 上。如果你删除 subscribeOn() 调用,你可能会看到第二批数据的所有处理也都在 pub-1-1 线程上。这再次是 Reactor 在线程使用上节俭的表现——如果R没有明确的线程切换请求,它将保持在同一个线程上进行下一次调用,无论那是什么。
注意
我们在此示例中将代码从 subscribe(null, 2) 更改为在 publishOn() 中添加 prefetch=2。在这种情况下,subscribe() 中的获取大小提示将被忽略。
还有另一种订阅序列的方法,即调用 Mono.block() 或 Mono.toFuture() 或 Flux.toStream()(这些是“提取器”方法——它们将你从响应式类型中拉出,进入一个不那么灵活的阻塞抽象)。Flux 也有转换器 collectList() 和 collectMap(),可以将 Flux 转换为 Mono。它们实际上不订阅序列,但它们会抛弃你可能对单个项目订阅的任何控制。
警告
一个好的经验法则是“**永远不要调用提取器**”。当然也有一些例外(否则这些方法就不会存在)。一个值得注意的例外是在测试中,因为能够阻塞以允许结果累积是很有用的。
这些方法作为一种逃生舱口存在,用于从响应式桥接到阻塞式;例如,如果你需要适应遗留 API,如 Spring MVC。当你调用 Mono.block() 时,你就会抛弃响应式流的所有优势。这是响应式流和 Java 8 Streams 之间的关键区别——原生 Java Stream 只有“全有或全无”的订阅模型,相当于 Mono.block()。当然,subscribe() 也可以阻塞调用线程,所以它和转换器方法一样危险,但你拥有更多的控制权——你可以通过使用 subscribeOn() 来防止它阻塞,并且你可以通过施加背压并定期决定是否继续来逐个处理项目。
在本文中,我们涵盖了 Reactive Streams 和 Reactor API 的基础知识。如果你需要了解更多,有很多地方可以查找,但没有什么能替代动手编码,所以请使用 GitHub 中的代码(本文在项目中名为“flux”的测试中),或者前往 Lite RX Hands On 工作坊。到目前为止,这真的只是开销,我们还没有学到太多用非响应式工具无法以更明显的方式完成的事情。本系列的下一篇文章将更深入地探讨响应式模型的阻塞、调度和异步方面,并向你展示如何获得真正的好处。