领先一步
VMware提供培训和认证,以快速提升您的进步。
了解更多欢迎来到另一期《你可能不需要另一个库》(YMNNALFT)!自从2016年以来,我花了很多时间在我的Spring Tips视频中阐明(或者至少尝试阐明!)Spring生态系统中一些更大的机遇。然而,今天我怀着不同的精神来到这里,想关注那些功能强大的、有时隐藏的、不起眼的小工具,它们可以让你免于添加额外的第三方依赖项及其隐含的复杂性。
我们涵盖这些减少复杂性的工具是一件好事,因为世界是一个混乱、复杂和令人眼花缭乱的地方,在响应式数据流组合的世界中,这一点尤其明显。生活变化很快,数据更是如此。数据源自任何地方(网络服务、数据库、内存计算、线程代码等)。数据具有多种不同的大小和形状(一条记录、十条记录、无限条记录等)。数据的数量也各不相同,可能会导致消费者数据匮乏或不堪重负。数据的到达节奏和时间也不同:现在、稍后、一次性、定期等。谈论已经存在于内存中的数据,也就是手头的数据,比谈论尚未实现的数据更自然。谈论单线程情况下的数据也比处理并发情况下的数据更自然。
这令人困惑!从历史上看,在代码中处理不同的数据维度甚至更加繁琐。直到响应式编程出现。
响应式编程提供了一个统一的世界观,使我们能够使用易于使用的带有操作符的DSL来思考(可能是异步的,也可能是并发的)集成混乱的世界。这些操作符支持响应式流数据流管道的定义和组合。响应式编程提供了一种结构化的并发范式,极大地简化了编写安全、可扩展、资源高效的代码。
有一些很棒的库(例如RxJava和Akka Streams)以大致相同的方式工作。如果您没有特别的想法,但想要一个世界级的选择,并且已经在使用Spring,那么您不妨使用Project Reactor。它包含在其中!
Spring团队开发了Project Reactor来支持Spring生态系统中的响应式工作。您不需要Spring就可以使用Project Reactor,但是Spring生态系统中的所有响应式API都基于Project Reactor来提供数据流选项。微软强制要求所有其SDK客户端和API都使用Project Reactor创建。Facebook使用Project Reactor开发了其RSocket协议的Java客户端。Project Reactor已经成熟——它自2010年以来就存在!——但一直在不断添加新功能。如果您仍然无法获得所需的功能,它可以通过可互操作的Reactive Streams类型与其他响应式数据流库完美协作。
让我们来看一个Project Reactor如何简化不同数据流源和接收器的组合以及几乎消除任何手动线程代码的示例。这是一个巨大的胜利。记住:只有一位真正了解如何编写安全、实用、多线程Java代码的人……而这不是你!我不知道那是谁。这并不重要。不要试图挑战命运;让Project Reactor来帮忙。
您需要以下依赖项。
org.springframework.boot
: spring-boot-starter-webflux
现在,让我们来看一个示例。此示例演示了在给定不同类型的数据时,规范化处理是多么容易。在此示例中,我们查看Java 8 java.util.Stream<T?>
和CompletableFuture<T>
,但可能性是无限的。在大多数响应式应用程序中,您不一定会将非响应式类型转换为响应式类型(如Flux<T>
或Mono<T>
)。这些示例将更加简单。此示例假设您有两个数据源,并且需要将它们组合起来。
package bootiful.rx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
@SpringBootApplication
public class BootifulApplication {
CompletableFuture<String> returnCompletableFuture(int counter) {
return CompletableFuture.supplyAsync(() -> {
var start = System.currentTimeMillis();
try {
Thread.sleep((long) (Math.max((Math.random() * 10), 5) * 1000));
}
catch (InterruptedException e) {
// threads smdh
}
var stop = System.currentTimeMillis();
var delta = stop - start;
return "(" + Thread.currentThread().getName() + ") Hello, #" + counter + "! (after " + delta + " ms.)";
});
}
Stream<Integer> returnStream() {
return Stream.iterate(0, integer -> integer + 1);
}
@Bean
ApplicationListener<ApplicationReadyEvent> begin() {
return event -> {
Flux<String> count = Flux//
.fromStream(this.returnStream()) //
.take(10) //
.flatMap(c -> Flux.zip(Mono.just(c), Mono.fromCompletionStage(this.returnCompletableFuture(c)))) //
.map(tuple -> tuple.getT2() + " #" + tuple.getT1()); //
count.subscribe(System.out::println);
};
}
public static void main(String[] args) {
SpringApplication.run(BootifulApplication.class, args);
}
}
您知道Reactor还提供了什么吗?支持重试、错误处理、超时等的运算符,如果您不将其包含在Project Reactor中,则还必须将其交给另一个第三方库。双赢。
我可以继续谈论Project Reactor带来的机遇。事实上,我已经谈论了。查看我的书《响应式Spring》以了解更多信息。
怎么样?您玩得开心吗?也许学到了一些东西?一如既往,我很想听听您的意见,所以请在Twitter (@starbuxman)上发表您的看法!我将在本周晚些时候发布另一期《YMNNALFT》,所以请务必不要错过。我还有关于以下内容的文章,包括简单的RPC、*Utils
对象的花园、使用Micrometer进行维度指标等等。