响应式编程笔记 第二部分:编写一些代码

工程 | Dave Syer | 2016年6月13日 | ...

在这篇文章中,我们将继续关于响应式编程的系列文章,并重点通过实际代码示例解释一些概念。最终结果应该使您更好地理解是什么让响应式编程与众不同,以及是什么让它具有函数式特性。这里的示例非常抽象,但它们为您提供了一种思考 API 和编程风格的方式,并开始了解它与众不同的之处。我们将看到响应式的元素,并学习如何控制数据流,并在必要时在后台线程中进行处理。

设置项目

我们将使用 Reactor 库来说明我们需要说明的要点。代码也可以用其他工具轻松编写。如果您想使用代码并查看其工作情况,而无需复制粘贴任何内容,则在Github中有一些带有测试的工作示例。

要开始,请从https://start.spring.io获取一个空白项目并添加 Reactor Core 依赖项。使用 Maven

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.0.0.RC2</version>
		</dependency>

使用 Gradle 非常相似

    compile 'io.projectreactor:reactor-core:3.0.0.RC2'

现在让我们编写一些代码。

是什么让它具有函数式特性?

响应式的基本构建块是一系列事件,以及两个主角,发布者和这些事件的订阅者。将序列称为“流”也可以,因为这就是它的本质。如果需要,我们将使用小写字母“s”来表示“流”,但 Java 8 有一个java.util.Stream,它与之不同,所以尽量不要混淆。无论如何,我们将尝试将叙述集中在发布者和订阅者上(这就是 Reactive Streams 所做的)。

Reactor 是我们将在示例中使用的库,因此我们将坚持使用那里的表示法,并将发布者称为Flux(它实现了来自 Reactive Streams 的Publisher接口)。RxJava 库非常相似,并且有很多并行特性,因此在这种情况下,我们将讨论Observable,但代码将非常相似。(Reactor 2.0 将其称为Stream,如果我们也需要讨论 Java 8 的Streams,就会令人困惑,因此我们只会在 Reactor 3.0 中使用新代码。)

生成器

Flux 是特定 POJO 类型事件序列的发布者,因此它是泛型的,即Flux<T>T的发布者。Flux 有一些静态便利方法,可以从各种来源创建其自身的实例。例如,要从数组创建Flux

Flux<String> flux = Flux.just("red", "white", "blue");

我们刚刚生成了一个Flux,现在我们可以用它做一些事情了。实际上,您只能用它做两件事:对其进行操作(转换它或将其与其他序列组合),订阅它(它是一个发布者)。

单值序列

通常,您会遇到一个您知道只包含一个或零个元素的序列,例如查找实体及其 ID 的存储库方法。Reactor 有一个Mono类型,表示单值或空FluxMono 具有与Flux非常相似的 API,但更专注,因为并非所有运算符对单值序列都有意义。RxJava 也在(1.x 版中)有一个称为Single的附加组件,以及用于空序列的Completable。Reactor 中的空序列是Mono<Void>

运算符

Flux 上有很多方法,几乎所有方法都是运算符。我们这里不会介绍所有这些方法,因为有更好的地方可以查找(例如 Javadoc)。我们只需要了解运算符是什么,以及它可以为您做什么。

例如,要请求将Flux内部的事件记录到标准输出,您可以调用.log()方法。或者您可以使用map()对其进行转换

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

在此代码中,我们通过将输入中的字符串转换为大写来转换它们。到目前为止,这很简单。

这个小示例的有趣之处(如果您不习惯它,甚至令人震惊)在于,尚未处理任何数据。甚至没有任何内容被记录,因为实际上什么也没有发生(尝试一下,您就会看到)。在Flux上调用运算符相当于为以后构建执行计划。它是完全声明式的,这就是人们称之为“函数式”的原因。运算符中实现的逻辑仅在数据开始流动时才执行,并且只有在有人订阅Flux(或等效地订阅Publisher)时才会发生。

所有响应式库以及 Java 8 的Streams中都存在相同处理数据序列的声明式、函数式方法。考虑一下这段类似的代码,使用与Flux内容相同的Stream

Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

我们对Flux的观察也适用于此:没有处理任何数据,它只是一个执行计划。但是,FluxStream之间存在一些重要的区别,这使得Stream不适合响应式用例。Flux 有更多的运算符,其中大部分只是便利性,但真正的区别在于您想要使用数据时,因此我们需要接下来看看。

提示

Sebastien Deleuze 在响应式类型中有一篇有用的博客文章,他在其中通过查看它们定义的类型以及如何使用它们来描述各种流和响应式 API 之间的差异。FluxStream之间的差异在其中有更详细的说明。

订阅者

要使数据流动,您必须使用subscribe()方法之一订阅Flux。只有这些方法才能使数据流动。它们会回溯到您在序列上声明的运算符链(如果有),并请求发布者开始创建数据。在我们一直在使用的示例中,这意味着底层字符串集合正在被迭代。在更复杂的用例中,它可能会触发从文件系统读取文件,或从数据库拉取数据,或调用 HTTP 服务。

以下是subscribe()调用的实际示例

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

输出是

09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

因此,我们可以从这里看出,没有参数的subscribe()的效果是请求发布者发送所有数据——只有一个request()被记录,它是“无界”的。我们还可以看到每个发布的项目(onNext())、序列的结束(onComplete())和原始订阅(onSubscribe())的回调。如果需要,您可以使用Flux中的doOn*()方法自己侦听这些事件,这些方法本身是运算符,而不是订阅者,因此它们本身不会导致任何数据流动。

subscribe() 方法是重载的,其他变体提供了不同的选项来控制发生的事情。一个重要且方便的形式是使用回调作为参数的 subscribe()。第一个参数是 Consumer,它为每个项提供一个回调,并且您还可以选择添加一个用于错误的 Consumer,以及在序列完成后执行的普通 Runnable。例如,仅使用每个项的回调

Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

以下是输出结果

09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

我们可以通过多种方式控制数据流,并使其“受限”。控制的原始 API 是从 Subscriber 获取的 Subscription。上面对 subscribe() 的简短调用的等效长格式为

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

要控制流,例如一次最多使用 2 个项,您可以更智能地使用 Subscription

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

Subscriber 正在一次“批处理”2 个项。这是一个常见的用例,因此您可能会考虑将实现提取到一个便利类中,这样也会使代码更具可读性。输出结果如下所示

09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

事实上,批处理订阅者是一个如此常见的用例,以至于 Flux 中已经提供了便利方法。上面的批处理示例可以这样实现

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(注意使用请求限制的 subscribe() 调用)。以下是输出结果

10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

像 Spring Reactive Web 这样的为您处理序列的库可以处理订阅。能够将这些问题推送到堆栈底部是一件好事,因为它可以避免您使用非业务逻辑来使代码混乱,从而使代码更具可读性,并更容易测试和维护。因此,作为一项规则,如果您能够**避免订阅**序列,或者至少将该代码推入处理层并将其移出业务逻辑,那将是一件好事。

线程、调度器和后台处理

上面所有日志的一个有趣特征是它们都在“主”线程上,即调用 subscribe() 的调用者的线程。这突出了一个重点:Reactor 对线程非常节俭,因为这为您提供了获得最佳性能的最大机会。如果您在过去 5 年里一直在处理线程和线程池以及异步执行,试图从您的服务中榨取更多资源,那么这可能是一个令人惊讶的声明。但这是真的:在没有任何切换线程的必要的情况下,即使 JVM 针对非常有效地处理线程进行了优化,在单个线程上执行计算也始终更快。Reactor 已将控制所有异步处理的密钥交给了您,并假设您知道自己在做什么。

Flux 提供了一些配置方法来控制线程边界。例如,您可以使用 Flux.subscribeOn() 将订阅配置为在后台线程中处理

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

结果可以在输出中看到

13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

如果您自己编写此代码或复制粘贴它,请记住在 JVM 退出之前等待处理停止。

请注意,订阅和所有处理都在单个后台线程“parallel-1-1”上进行——这是因为我们要求我们主要 Flux 的订阅者在后台。如果项处理是 CPU 密集型的,这很好(但实际上在后台线程中毫无意义,因为您需要为上下文切换付费,但不会更快地获得结果)。您可能还需要能够执行 I/O 密集型且可能阻塞的项处理。在这种情况下,您希望尽快完成它,而不会阻塞调用方。线程池仍然是您的朋友,这就是您从 Schedulers.parallel() 获取的内容。要将各个项的处理切换到单独的线程(最多达到池的限制),我们需要将它们分解成单独的发布者,并为每个发布者在后台线程中请求结果。一种方法是使用名为 flatMap() 的运算符,它将项映射到 Publisher(可能是不同类型),然后返回到新类型的序列

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

请注意这里使用 flatMap() 将项推送到“子”发布者中,在这里我们可以控制每个项的订阅,而不是整个序列。Reactor 具有内置的默认行为,即尽可能长时间地保留在单个线程上,因此如果我们希望它在后台线程中处理特定项或项组,则需要明确说明。实际上,这是少数强制并行处理的技巧之一(有关更多详细信息,请参阅Reactive Gems问题)。

输出结果如下所示

15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE

请注意,现在有多个线程正在使用项,并且 flatMap() 中的并发提示确保在任何给定时间最多处理 2 个项,只要它们可用。我们看到很多 request(1),因为系统试图在管道中保留 2 个项,并且通常它们不会同时完成处理。实际上,Reactor 试图在这里变得非常聪明,它从上游 Publisher 预取项以尝试消除订阅者的等待时间(我们在这里没有看到,因为数字很低——我们只处理 3 个项)。

提示

三个项(“red”、“white”、“blue”)可能太少,无法令人信服地看到多个后台线程,因此最好生成更多数据。例如,您可以使用随机数生成器来做到这一点。

Flux 还有一个 publishOn() 方法,它与 subscribeOn() 相同,但用于侦听器(即 onNext() 或消费者回调),而不是订阅者本身

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});

输出结果如下所示

15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE

请注意,消费者回调(记录“Consumed: …​”)位于发布者线程 pub-1-1 上。如果您删除 subscribeOn() 调用,您可能会看到所有第二批数据也在 pub-1-1 线程上处理。同样,这又是 Reactor 对线程的节俭——如果没有显式请求切换线程,它将在下一个调用中保留在同一个线程上,无论是什么。

注意

我们在本示例中将代码从 subscribe(null, 2) 更改为向 publishOn() 添加 prefetch=2。在这种情况下,subscribe() 中的获取大小提示将被忽略。

提取器:来自暗面的订阅者

还有另一种订阅序列的方法,即调用 Mono.block()Mono.toFuture()Flux.toStream()(这些是“提取器”方法——它们使您能够从 Reactive 类型退出到不太灵活的阻塞抽象中)。Flux 还有转换器 collectList()collectMap(),它们将 Flux 转换为 Mono。它们实际上并没有订阅序列,但它们确实会丢弃您可能在各个项级别对订阅进行的任何控制。

警告

一个好的经验法则是“**永远不要调用提取器**”。有一些例外情况(否则这些方法将不存在)。一个值得注意的例外是在测试中,因为能够阻塞以允许结果累积非常有用。

这些方法作为一种逃生舱存在,用于在 Reactive 和阻塞之间架起桥梁;例如,如果您需要适应旧版 API,例如 Spring MVC。当您调用 Mono.block() 时,您会丢弃 Reactive Streams 的所有优势。这是 Reactive Streams 和 Java 8 Streams 之间的主要区别——本机 Java Stream 只有“全有或全无”订阅模型,相当于 Mono.block()。当然,subscribe() 也可以阻塞调用线程,因此它与转换器方法一样危险,但您可以更好地控制——您可以通过使用 subscribeOn() 防止它阻塞,并且可以通过应用背压并定期决定是否继续来逐滴传递项。

结论

在本文中,我们介绍了 Reactive Streams 和 Reactor API 的基础知识。如果您需要了解更多信息,有很多地方可以查看,但没有比动手编码更好的方法了,因此请使用GitHub中的代码(对于本文中名为“flux”的项目中的测试),或前往Lite RX Hands On研讨会。到目前为止,这实际上只是开销,我们还没有学到很多我们无法使用非 Reactive 工具以更明显的方式完成的事情。本系列的下一篇文章将更深入地探讨 Reactive 模型的阻塞、调度和异步方面,并向您展示利用实际优势的机会。

获取 Spring 时事通讯

与 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部