使用Spring、协程和Kotlin Flow实现响应式编程

工程 | Sébastien Deleuze | 2019年4月12日 | ...

自从我们在2017年1月宣布Spring Framework官方支持Kotlin以来,发生了很多事情。Kotlin在2017年Google I/O大会上被宣布为官方的Android开发语言,我们继续改进Spring产品组合中的Kotlin支持,而Kotlin本身也随着协程等关键新功能的出现而不断发展。

我想借Spring Framework 5.2的第一个里程碑的机会,概述一下我们在Spring和Kotlin方面取得的进展。我将尽最大努力关注具体的改进,因为我相信Spring和Kotlin拥有相同的务实思维。

我认为这完全取决于选择。这些选择是由我们(Spring团队)提供的,但也是您作为应用程序开发人员在启动新的Spring Boot应用程序时必须做出的选择。例如

  • 我应该使用哪种语言?

  • 使用注解的@Controller还是函数式风格?

  • Spring MVC还是WebFlux?

这些问题显然非常主观,通常取决于项目上下文,但我将分享我个人的观点。

Java还是Kotlin?

Java是显而易见的默认选择,但Kotlin是一个越来越受欢迎的替代方案。是什么原因导致开发人员从Java转向Kotlin?当人们问我这个问题时,我通常会说Kotlin允许Java开发人员利用他们现有的技能编写更短、更安全、更具表现力的代码。但为了做出明智的选择,我们应该找出更具体的方面。

我最喜欢的Kotlin特性是它将null(所谓的(多个)“十亿美元的错误”)变成了一个安全特性。Java的错误不是null本身,而是没有在其类型系统中显式地管理null,从而导致的问题与我们在动态语言中观察到的问题类似。Kotlin通过在其类型系统中使用null处理值的缺失。在Kotlin中,像String这样的类型是非空的,因此可以安全地使用而无需预防措施,而像String?这样的类型是可空的,应该谨慎使用。好消息是Kotlin编译器会在编译时引发潜在的错误,您可以使用安全调用Elvis运算符如果不是null则执行块优雅地处理它们。与Java Optional不同,Kotlin的空安全性也适用于输入参数,并且不会强制您使用影响代码性能和可读性的包装器。

DSL也是Kotlin闪光点的另一个领域。Gradle Kotlin DSL(对start.spring.io的支持即将到来)就是一个很好的例子,它允许使用非常丰富和灵活的API,并由于Kotlin静态类型的特性而具有极高的可发现性和可靠性。Spring Framework为bean定义函数式路由甚至MockMvc提供了Kotlin DSL。

我还可以详细说明很多其他切换的好理由,例如具有默认值的可选项参数与Java API(如Spring)的良好互操作性扩展函数具现类型参数以避免类型擦除、数据类或默认情况下鼓励的不变性,但我认为您应该通过示例学习Kotlin,最终借助参考文档并做出自己的判断。您还可以按照此分步进行使用Kotlin的Spring Boot教程

所以假设我将为我的下一个Spring Boot项目选择Kotlin ;-)

使用注解的@Controller还是函数式风格?

正如我在引言中所说,选择取决于上下文,也是个人的喜好问题。鉴于这种语言非常好的DSL和函数式编程能力,我是使用Kotlin进行函数式路由的忠实粉丝。我甚至正在探索如何通过实验性的Spring Boot的Kofu DSL(正在Spring Fu存储库中孵化)以函数式方式定义Spring Boot应用程序配置。

但是今天,假设我的团队由多年来习惯使用@Controller编程模型的开发人员组成,而且我不想同时更改所有内容,因此让我们保留@Controller

Spring MVC还是WebFlux?

我们在Web框架方面提供的选择如下。

您可以继续使用Spring MVC以及我们不断改进的所有相关知名技术:Tomcat、JPA等。您甚至可以通过使用WebClient现代API代替RestTemplate来利用一些反应式组件。

但是我们还提供了一个反应式堆栈,其中包括WebFlux,这是一个基于Reactive Streams的Web框架,适用于那些想要更高可扩展性、对延迟免疫(适用于面向微服务的架构)和更好的流处理能力的开发人员。生态系统的其他部分,如Spring Data和Spring Security,也提供了反应式支持。

在Java中使用Reactor API的WebFlux

到目前为止,使用基于WebFlux的Spring反应式堆栈需要通过使用Reactor MonoFlux或RxJava类似类型将IO相关功能(web、持久性)从命令式切换到声明式/函数式风格来实现相当大的转变。这种颠覆性的方法与命令式编程相比具有真正的优势,但也非常不同,需要一个非平凡的学习曲线。

让我们看看这意味着什么具体的代码,让我们借此机会向您展示如何使用R2DBC(基于Reactive Streams的JDBC替代方案)和Spring Data R2DBC以反应式方式访问SQL数据库。

如果我们选择Java,我们将编写以下UserRepository类,该类公开了一个反应式API,使用Spring Data R2DBC提供的DatabaseClient API访问SQL数据库。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

保存用户本可以使用fork-join的方式,因为这些操作彼此之间没有依赖关系,但为了进行比较,我在这里使用用then()链式连接的顺序操作。

你可以看到,在这个API中,void变成了Mono<Void>User变成了Mono<User>。这允许以非阻塞的方式使用它们,并提供对丰富的运算符集的访问。但它也使得必须使用Mono包装器,并显著改变你使用这些API的方式。例如,如果某些操作需要像init()方法中那样顺序执行,这在命令式代码中是很直接的,在这里我们必须用then运算符构建一个声明式管道。

Flux<User>提供了更多附加值,因为它允许将传入的用户作为流进行处理,而像在阻塞式堆栈中通常使用的List<User>则意味着在处理数据之前将所有数据加载到内存中。请注意,我们也可以在这里使用Mono<List<User>>

在控制器端,你可以看到Spring WebFlux原生支持这些响应式类型,你还可以看到基于Reactive Streams的API的另一个特点,即异常主要用作由响应式类型携带的错误信号,而不是像在常规命令式代码中那样被抛出。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

Kotlin 中使用协程 API 的 WebFlux

重要的是要理解,Spring 响应式支持是在 Reactive Streams 之上构建的,并且考虑了互操作性,Reactor 用于两种不同的目的

  • 它是我们在 Spring 响应式基础设施中无处不在使用的 Reactive Streams 实现

  • 它也是公开的默认响应式公共 API

但是 Spring 响应式支持从一开始就被设计成可以轻松地适应其他异步或响应式 API,例如CompletableFuture、RxJava 2,以及现在的协程。在这种情况下,我们仍然在内部利用 Reactor,在公共 API 级别适应不同的最终用户响应式 API。

如果您更喜欢这种方法,当然可以继续在 Kotlin 中使用FluxMono,但是 Spring Framework 5.2 引入了一项新的主要功能:我们现在可以使用 Kotlin 协程 以更命令式的方式利用 Spring 响应式堆栈。

协程是 Kotlin 轻量级线程,允许以命令式的方式编写非阻塞代码。在语言方面,由suspend关键字标识的挂起函数为异步操作提供了一种抽象,而在库方面,kotlinx.coroutines 提供了诸如 async { } 之类的函数和诸如 Flow 之类的类型,它是协程世界中Flux的等价物。

当类路径中存在kotlinx-coroutines-corekotlinx-coroutines-reactor依赖项时,将启用协程支持。

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那么用 Kotlin 而不是 Java 编写的UserRepositoryUserController,以及使用协程和Flow而不是MonoFlux会是什么样子呢?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

你可以在这里看到,我们不是返回例如Mono<User>,而是在一个挂起函数中返回User(更准确地说,是它的可空变体User?),该函数可以以命令式的方式使用。init()方法实现中的差异很好地说明了这一点,因为我们现在使用常规的命令式代码而不是链式then调用。

但是等等,我如何在基于MonoFlux的响应式 API 的DatabaseClient类型上直接使用协程?这是可能的,因为 Spring Data R2DBC 还提供了 Kotlin 扩展(例如,参见 这个),一旦导入,就可以在DatabaseClient上添加基于协程的方法。按照约定,挂起方法以await为前缀或以AndAwait为后缀,并具有与其基于Mono的对应方法相似的名称。

现在让我们更深入地了解一下这个Flow<User>返回类型。首先,请注意我们指的是 kotlinx.coroutines.flow.Flow,而不是 java.util.concurrent.Flow,后者是 Java 9+ 提供的 Reactive Streams 容器类型。

你将像使用 Java 8+ 的Stream或其 Kotlin 等价物Sequence一样使用Flow API,但巨大的区别在于它适合异步操作并管理背压。所以它是协程世界中Flux的等价物,适用于热流或冷流、有限流或无限流,主要区别如下:

  • Flow是基于推送的,而Flux是推拉混合的。

  • 背压是通过挂起函数实现的。

  • Flow只有一个 单个挂起collect方法,运算符作为 扩展 实现。

  • 运算符很容易实现,这要归功于协程。

  • 扩展允许向Flow添加自定义运算符。

  • 收集操作是挂起函数。

  • map运算符支持异步操作(不需要flatMap),因为它接受一个挂起函数参数。

现在让我们看一下控制器的协程版本。

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

你同样可以看到,这段代码非常接近我们使用 Spring MVC 时将使用的常规命令式代码。

除了为基于FluxMono的API(如WebClientServerRequestServerResponse)提供协程扩展之外,Spring WebFlux现在还原生支持为带注释的@Controller类挂起函数和Flow返回类型。

使用命令式代码的异步操作

让我们利用WebClient协程扩展来了解如何链接异步调用。我们将请求一个远程 HTTP 端点以获取附加的UserDetail1UserDetail2

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

在这里,我们使用WebClient协程扩展(如awaitExchange()awaitBody())以完全命令式的方式执行异步和非阻塞操作。并且由于Flow map运算符接受一个挂起函数参数,我们可以在其中执行此类操作,这里不需要像在 Java 中使用响应式 API 时那样使用flatMap

并行分解

如前所述,协程默认情况下是顺序的,但它们也可以用于并行执行操作。让我们重构之前的示例以并发执行这两个远程调用。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

在这里,我们利用 结构化并发 通过async {}构建器创建Deferred<UserDetail1>Deferred<UserDetail2>实例来触发两个用户详细信息的并行检索,然后我们通过调用这两个await()方法来等待它们的完成,这两个方法将在可用时返回UserDetail1UserDetail2实例。

结论

我认为使用 Spring 响应式堆栈以及这样的协程和 Kotlin Flow API 在命令式和声明式方法之间提供了一种有趣的权衡。它允许以非常易于理解的方式利用 WebFlux 和 Spring Data 的响应式可扩展性和功能。

Spring WebFlux 和 Spring Data 中的协程支持将作为即将发布的 Spring Boot 2.2 版本的一部分提供。你可以阅读 参考文档,并期待进一步改进,例如对 RSocket @MessageMapping 端点和RSocketRequester扩展的协程支持。Spring Data Moore 还将在 Spring Data MongoDB、Cassandra 和 Redis 上提供类似的协程扩展。Spring Data 也可能在某个时候为 协程存储库 提供支持。我们还将使 Reactor 和协程上下文互操作 以支持安全性和反应式事务。

最后,我要感谢许多有才华的工程师,如果没有他们,这一切都是不可能实现的。

  • Kotlin 团队的 Roman Elizarov 和 Vsevolod Tolstopyatov,感谢他们对协程和Flow的辛勤工作。

  • Konrad Kaminski,感谢他最初的社区驱动的 Spring 协程支持。

  • Jake Wharton,感谢他早期围绕统一 Rx 和协程的原型设计。

  • Stéphane Maldini 和 David Karnok,感谢他们的鼓舞人心的工作。

  • Juergen Hoeller、Rossen Stoyanchev 和 Brian Dussault,感谢他们的信任。

  • Mark Paluch 和 Oliver Drotbohm,感谢他们在持久化方面的支持。

像往常一样,我期待着您的反馈,以及Kotlin团队对Flow API的反馈,因为它仍在预览模式中。欢迎来到我的即将举行的演讲:Devoxx FranceJAXSpring I/OSunny Tech了解更多信息。

欢呼!

获取Spring通讯

通过Spring通讯保持联系

订阅

领先一步

VMware提供培训和认证,以快速提升您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部