Spring、协程和 Kotlin Flow 的响应式开发

工程 | Sébastien Deleuze | 2019年4月12日 | ...

自2017年1月宣布 Spring Framework 正式支持 Kotlin 以来,已经发生了许多事情。Kotlin 在 Google I/O 2017 上被宣布为官方 Android 开发语言,我们继续改进 Spring 产品组合中的 Kotlin 支持,Kotlin 本身也随着 协程 等关键新功能不断发展。

我想借 Spring Framework 5.2 的第一个里程碑 的机会,概述一下我们在 Spring 和 Kotlin 方面的现状。我会尽力专注于具体的改进,因为我相信 Spring 和 Kotlin 拥有相同的务实心态。

我认为这归结为选择。我们(Spring 团队)提供的选择,以及你们作为应用程序开发者在启动新的 Spring Boot 应用程序时必须做出的选择。例如

  • 应该使用什么语言?

  • 注解的 @Controller 还是函数式风格?

  • Spring MVC 还是 WebFlux?

这些问题显然是非常主观的,并且通常取决于项目上下文,但我将分享我个人观点。

Java 还是 Kotlin?

Java 是显而易见的默认选择,但 Kotlin 是一个越来越受欢迎的替代方案。有什么理由可以让开发人员从 Java 转向 Kotlin?当人们问我时,我通常会说 Kotlin 允许 Java 开发人员利用他们现有的技能编写更简洁、更安全、更具表现力的代码。但要做出明智的选择,我们应该确定更具体的要点。

我最喜欢的 Kotlin 功能是它将 `null`,即所谓的(多重)“十亿美元的错误”,变成了一个安全功能。Java 的错误不是 `null` 本身,而是没有在其类型系统中显式管理 `null`,导致出现接近动态语言中可观察到的问题。Kotlin 通过在其类型系统中利用 `null` 来 处理值的缺失。在 Kotlin 中,像 `String` 这样的类型不是可空的,因此可以安全地使用而无需小心;而像 `String?` 这样的类型是可空的,应该谨慎使用。好消息是 Kotlin 编译器会在编译时报告潜在错误,您可以通过 安全调用Elvis 操作符非空执行 块来优雅地处理它们。与 Java 的 `Optional` 不同,Kotlin 的空安全也适用于输入参数,并且不会强制您使用影响性能和可读性的包装器。

DSL 也是 Kotlin 闪耀的另一个领域。 Gradle Kotlin DSL(在 start.spring.io 上的支持 即将推出)是一个很好的例子,它允许使用非常丰富且灵活的 API,并且由于 Kotlin 的静态类型特性,具有出色的可发现性和信心。Spring Framework 为 Bean 定义函数式路由 甚至 MockMvc 提供了 Kotlin DSL。

我可以详细说明许多其他切换的好处,例如 带默认值的可选参数与 Java API(如 Spring)的良好互操作性扩展函数用于避免类型擦除的 reified 类型参数数据类 或默认鼓励的不可变性,但我认为您最终应该通过 边学边练 Kotlin,并在 参考文档 的帮助下做出自己的判断。您还可以遵循这个一步一步的 Spring Boot Kotlin 教程

所以,我将在我的下一个 Spring Boot 项目中选择 Kotlin ;-)

注解的 @Controller 还是函数式风格?

正如我在引言中所说,选择取决于上下文,并且是品味问题。我非常喜欢 使用 Kotlin 进行函数式路由,考虑到该语言非常好的 DSL 和函数式编程能力。我甚至正在探索如何通过实验性的 Kofu Spring Boot DSL 以函数式方式定义 Spring Boot 应用程序配置,该 DSL 正在 Spring Fu 存储库中孵化。

但今天,假设我的团队由多年来习惯于 `@Controller` 编程模型的开发人员组成,并且我不想一次性改变所有东西,所以我们保留 `@Controller`。

Spring MVC 还是 WebFlux?

我们在 Web 框架方面提供的选择如下。

您可以继续使用 Spring MVC 和所有相关的知名技术,我们将继续改进它们:Tomcat、JPA 等。您甚至可以通过使用现代的 `WebClient` API 而不是 `RestTemplate` 来利用一些响应式部分。

但我们也提供了一个响应式堆栈,包括 WebFlux,这是一个基于 Reactive Streams 的 Web 框架,适用于那些想要更高可伸缩性、不受延迟影响(适用于面向微服务的架构)以及更好流处理能力的人。生态系统的其他部分,如 Spring Data 和 Spring Security,也提供响应式支持。

Java 中的 Reactor API WebFlux

到目前为止,使用 Spring 响应式堆栈的 WebFlux 需要一个相当大的转变,通过使用 Reactor MonoFlux 或 RxJava 类似类型的 API,将 IO 相关功能(Web、持久性)从命令式编程风格切换到声明式/函数式编程风格。这种颠覆性的方法比命令式编程具有真正的优势,但它也非常不同,并且需要相当大的学习曲线。

让我们通过具体的代码来看看这意味着什么,并借此机会向您展示如何使用 R2DBC(基于 Reactive Streams 的 JDBC 替代方案)和 Spring Data R2DBC 以响应式方式访问 SQL 数据库。

如果我们选择 Java,我们将编写以下 `UserRepository` 类,该类公开了一个响应式 API,使用 Spring Data R2DBC 提供的 `DatabaseClient` API 来访问 SQL 数据库。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

保存用户可以通过 fork-join 的方式完成,因为这些操作彼此不依赖,但为了比较起见,我使用了通过 `then()` 链接的顺序操作。

您可以看到,在这种 API 中,`void` 变成了 `Mono`,`User` 变成了 `Mono`。这使得它们可以以非阻塞的方式使用,并提供对丰富运算符集的访问。但它也强制使用 `Mono` 包装器,并显著改变了您使用这些 API 的方式。例如,如果某些操作需要顺序执行,就像在 `init()` 方法中一样,这在命令式代码中很容易实现,而在这里我们必须使用 `then` 运算符构建声明式管道。

`Flux` 提供了更多的附加价值,因为它允许在处理传入用户时将它们作为流进行处理,而阻塞堆栈中通常使用的 `List` 暗示在处理之前将所有数据加载到内存中。请注意,我们也可以在这里使用 `Mono>`。

在控制器方面,您可以看到 Spring WebFlux 原生支持这些响应式类型,您还可以看到基于 Reactive Streams API 的另一个特性,即异常主要被用作由响应式类型携带的错误信号,而不是像常规命令式代码那样被抛出。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

Kotlin 协程 API WebFlux

重要的是要理解 Spring 响应式支持是建立在 Reactive Streams 之上的,并考虑到互操作性,Reactor 被用于两个不同的目的:

  • 它是我们用于 Spring 响应式基础设施的 Reactive Streams 实现。

  • 它也是默认暴露的响应式公共 API。

但 Spring 响应式支持从一开始就被设计为可以轻松适应其他异步或响应式 API,如 `CompletableFuture`、RxJava 2、以及现在的协程。在这种情况下,我们仍然在内部利用 Reactor,在公共 API 层面适应不同的最终用户响应式 API。

当然,如果您更喜欢这种方法,在 Kotlin 中继续使用 `Flux` 和 `Mono` 是完全可以的,但 Spring Framework 5.2 引入了一个新的主要功能:我们现在可以使用 Kotlin 协程 以更命令式的方式利用 Spring 响应式堆栈。

协程是 Kotlin 的轻量级线程,允许以命令式的方式编写非阻塞代码。在语言层面,用 `suspend` 关键字标识的挂起函数提供了异步操作的抽象,而在库层面,kotlinx.coroutines 提供了像 `async {}` 这样的函数以及像 `Flow` 这样的类型,它是协程世界中的 `Flux` 等价物。

当 `kotlinx-coroutines-core` 和 `kotlinx-coroutines-reactor` 依赖项在类路径中时,将启用协程支持。

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那么,用 Kotlin 而不是 Java 编写的 `UserRepository` 和 `UserController`,并使用协程和 `Flow` 而不是 `Mono` 和 `Flux` 看起来是什么样的?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

您可以看到,这里,例如,我们返回 `User`(或者更确切地说,它的可空变体 `User?`)而不是返回 `Mono`,在可以以命令式方式使用的挂起函数中。`init()` 方法实现中的差异很好地说明了这一点,因为我们现在使用的是常规命令式代码,而不是链式 `then` 调用。

但是,等等,我如何直接在基于 `Mono` 和 `Flux` 的响应式 API `DatabaseClient` 上使用协程?这是可能的,因为 Spring Data R2DBC 还提供了 Kotlin 扩展(请参阅 这个),一旦导入,您就可以将基于协程的方法添加到 `DatabaseClient`。按照约定,挂起方法以前缀 `await` 或后缀 `AndAwait` 命名,并且与它们的 `Mono` 对等方法名称相似。

现在让我们深入了解一下 `Flow` 返回类型。首先,请注意我们指的是 `kotlinx.coroutines.flow.Flow`,而不是 `java.util.concurrent.Flow`,后者是 Java 9+ 提供的 Reactive Streams 容器类型。

您将像使用 Java 8+ `Stream` 或其 Kotlin 等价物 `Sequence` 一样使用 `Flow` API,但巨大的区别在于它适用于异步操作并管理背压。因此,它是协程世界中的 `Flux` 等价物,适用于热流或冷流、有限流或无限流,主要区别如下:

  • `Flow` 是推模式的,而 `Flux` 是推拉混合模式的。

  • 背压通过挂起函数实现。

  • `Flow` 只有一个 挂起的 `collect` 方法,并且运算符作为 扩展 实现。

  • 由于协程,运算符易于实现

  • 扩展允许向 `Flow` 添加自定义运算符。

  • 收集操作是挂起函数。

  • `map` 运算符 支持异步操作(无需 `flatMap`),因为它接受一个挂起函数参数。

现在让我们看看控制器的协程版本。

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

您再次可以看到,代码非常接近我们用 Spring MVC 使用的常规命令式代码。

除了为 `WebClient`、`ServerRequest` 或 `ServerResponse` 等基于 `Flux` 和 `Mono` 的 API 提供协程扩展外,Spring WebFlux 现在还为注解的 `@Controller` 类原生支持挂起函数和 `Flow` 返回类型。

命令式代码的异步操作

让我们利用 `WebClient` 协程扩展来查看如何链接异步调用。我们将请求远程 HTTP 端点以获取额外的 `UserDetail1` 和 `UserDetail2`。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

在这里,我们使用 `WebClient` 协程扩展,如 `awaitExchange()` 和 `awaitBody()`,以纯命令式方式执行异步和非阻塞操作。并且由于 `Flow` `map` 运算符 接受一个挂起函数参数,我们可以在其中执行此类操作,而无需像使用 Java 中的响应式 API 那样使用 `flatMap`。

并行分解

如前所述,协程默认是顺序执行的,但也可以用于并行执行操作。让我们重构之前的示例,以并发执行两个远程调用。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

在这里,我们利用 结构化并发 来触发两个用户详情的并行检索,通过创建 `Deferred` 和 `Deferred` 实例(通过 `async {}` 构建器),然后我们通过调用两个 `await()` 方法来等待它们完成,当可用时,它们将返回 `UserDetail1` 和 `UserDetail2` 实例。

结论

我认为将 Spring 响应式堆栈与此类协程和 Kotlin `Flow` API 结合使用,在命令式和声明式方法之间提供了有趣的权衡。它以一种非常易于理解的方式利用了 WebFlux 和 Spring Data 的响应式可伸缩性和功能。

Spring WebFlux 和 Spring Data 中的协程支持将在即将发布的 Spring Boot 2.2 版本中提供。您可以阅读 参考文档,并可以期待进一步的改进,例如对 RSocket `@MessageMapping` 端点和 `RSocketRequester` 扩展的协程支持。Spring Data Moore 还将提供 Spring Data MongoDB、Cassandra 和 Redis 的类似协程扩展。Spring Data 可能会在某个时候提供对 协程存储库 的支持。我们还将使 Reactor 和协程上下文可互操作,以支持安全和响应式事务。

我想最后感谢许多才华横溢的工程师,没有他们,这一切都将不可能实现。

  • 来自 Kotlin 团队的 Roman Elizarov 和 Vsevolod Tolstopyatov,感谢他们为协程和 `Flow` 所做的不可思议的工作。

  • Konrad Kaminski 感谢社区驱动的 Spring 协程的初步支持。

  • Jake Wharton 感谢他早期在统一 Rx 和协程方面的原型工作。

  • Stéphane Maldini 和 David Karnok 感谢他们的启发性工作。

  • Juergen Hoeller、Rossen Stoyanchev 和 Brian Dussault 感谢他们的信任。

  • Mark Paluch 和 Oliver Drotbohm 感谢他们在持久化方面的支持。

一如既往,我期待反馈,也期待 Kotlin 团队对 `Flow` API 的反馈,因为它仍处于预览阶段。请来参加我在 Devoxx FranceJAXSpring I/OSunny Tech 的即将举行的演讲,了解更多信息。

干杯!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有