YMNNALFT:使用 Project Reactor 的响应式数据流

工程 | Josh Long | 2021 年 1 月 6 日 | ...

欢迎来到另一期你可能不需要另一个库 (YMNNALFT)!自 2016 年以来,我花了很多时间在 我的 Spring Tips 视频中阐明(或者说是尝试阐明!)Spring 生态系统中一些更巨大的机遇。然而,今天,我以不同的精神来到这里,想要关注那些做着非常棒的事情的小巧、有时是隐藏的瑰宝,它们可能会让你免于额外的第三方依赖及其隐含的复杂性。

而且我们涵盖这些减少复杂性的瑰宝是一件好事,你看,因为世界是一个令人困惑、复杂和眼花缭乱的地方,而且在响应式数据流组合的世界中,这一点比其他任何地方都更加明显。 生活来得很快,数据更是如此。 数据来自一切(网络服务、数据库、内存计算、线程代码等)。 数据有许多不同的尺寸和形状(一条记录、十条记录、无限记录等)。 数据以不同的量到来,使消费者饥饿或不知所措。 数据以不同的节奏和时间到达:现在、稍后、一次性、周期性等。谈论已经存在于内存中、掌握在手中的数据,比谈论尚未实现的数据要自然得多。而且在单线程的情况下谈论数据,比在并发的情况下处理数据更自然。

考虑起来令人困惑! 从历史上看,在代码中解决不同的数据维度甚至更加乏味。 也就是说,直到响应式编程出现。

响应式编程提供了一个统一的世界观,使我们能够以易于使用的带有运算符的 DSL 来思考(可能是异步和潜在的并发)集成的混乱世界。 这些运算符支持响应式流数据流管道的定义和组合。 响应式编程提供了一种结构化的并发模式,大大简化了编写安全、可扩展、资源高效的代码。

那里有一些很棒的库(比如 RxJavaAkka Streams),它们以非常相似的方式工作。 如果你还没有特别记住一个,但想要一个世界级的选项,并且已经在使用 Spring,那么你最好使用 Project Reactor。 它包含在盒子中!

Spring 团队开发了 Project Reactor,以支持 Spring 生态系统中的响应式工作。 你不需要 Spring 即可使用 Project Reactor,但 Spring 生态系统中的所有响应式 API 都是基于 Project Reactor 构建的数据流选项。 Microsoft 强制要求使用 Project Reactor 创建他们所有的 SDK 客户端和 API。 Facebook 使用 Project Reactor 为他们的 RSocket 协议开发了 Java 客户端。 Project Reactor 已经成熟——它从 2010 年就开始存在了!——但一直在增长新的功能。 如果你仍然没有得到你需要的,它可以通过可互操作的 Reactive Streams 类型与其他响应式数据流库完美地工作。

让我们看一个示例,了解 Project Reactor 如何简化不同数据流源和接收器的组合,并几乎消除了任何手动线程代码。 这是一个巨大的胜利。 请记住:只有一个人真正了解如何编写安全、有用、多线程的 Java 代码……而且不是你! 我不知道是谁。 这并不重要。 不要考验命运; 让 Project Reactor 帮助你。

你需要以下依赖项。

  • Spring Initializr 上的响应式 Web - org.springframework.boot : spring-boot-starter-webflux

现在,让我们看一个示例。 此示例演示了在给定不同类型的数据的情况下,规范化处理是多么容易。 在此示例中,我们查看 Java 8 java.util.Stream<T?>CompletableFuture<T>,但可能性是无限的。 在大多数响应式应用程序中,你不一定需要将非响应式类型转换为响应式类型(例如 Flux<T>Mono<T>)。 那些示例会更加简单。 此示例假设你有两个数据源并且需要组合它们。

package bootiful.rx;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

@SpringBootApplication
public class BootifulApplication {

	CompletableFuture<String> returnCompletableFuture(int counter) {
		return CompletableFuture.supplyAsync(() -> {
			var start = System.currentTimeMillis();
			try {
				Thread.sleep((long) (Math.max((Math.random() * 10), 5) * 1000));
			}
			catch (InterruptedException e) {
				// threads smdh
			}
			var stop = System.currentTimeMillis();
			var delta = stop - start;
			return "(" + Thread.currentThread().getName() + ") Hello, #" + counter + "! (after " + delta + " ms.)";
		});
	}

	Stream<Integer> returnStream() {
		return Stream.iterate(0, integer -> integer + 1);
	}

	@Bean
	ApplicationListener<ApplicationReadyEvent> begin() {
		return event -> {

			Flux<String> count = Flux//
					.fromStream(this.returnStream()) //
					.take(10) //
					.flatMap(c -> Flux.zip(Mono.just(c), Mono.fromCompletionStage(this.returnCompletableFuture(c)))) //
					.map(tuple -> tuple.getT2() + " #" + tuple.getT1()); //

			count.subscribe(System.out::println);
		};
	}

	public static void main(String[] args) {
		SpringApplication.run(BootifulApplication.class, args);
	}

}

你知道使用 Reactor 还会得到什么吗? 支持重试、错误处理、超时等的运算符,如果它们未包含在 Project Reactor 中,你也会将它们外包给另一个第三方库。 双赢。

我可以继续谈论像 Project Reactor 这样的东西所带来的机会。 事实上,我做到了。 查看我的书 响应式 Spring 了解更多(很多)信息。

好吧? 你玩得开心,也许学到了一些东西吗? 与往常一样,我很想收到你的来信,所以 请在 Twitter 上发声 (@starbuxman) ! 我将在本周晚些时候带着另一期 YMNNALFT 回来,所以一定要错过。 我有很多关于 Easy RPC、*Utils 对象花园、使用 Micrometer 的维度指标等等主题的文章。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

取得领先

VMware 提供培训和认证,以加速你的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部