抢先一步
VMware 提供培训和认证,助您加速进步。
了解更多大家好,Spring 爱好者们!在本期文章中,我们将介绍 Spring Integration 新的 Kotlin DSL。我之前在其他视频中已经介绍过 Spring Integration 和 Kotlin。我**非常**确定我也曾在基于 Kotlin 的 Spring 应用中使用过 Spring Integration,但这是我第一次能够专门介绍 Spring Integration 的 Kotlin DSL。
Spring Integration 已经存在了很长时间——至少 13 年——它服务于一个永恒的用例:集成不同的系统和服务。它借鉴了 Gregor Hohpe 和 Bobby Woolf 合著的开创性著作 《企业集成模式》。这是一本极好的著作,我强烈推荐它,因为它在某种程度上是理解 Spring Integration 所需的文档。Spring Integration 将书中的模式编码化;API 元素的命名与书中的相关模式一致。
集成必然是高层级的工作。它关注不同管道系统的输入和输出。你不想在最低层级花太多时间处理对象图来完成这项工作。根据它们支持的输入和输出来解耦这些系统和服务要容易得多。Spring Integration 提供了实现这一目标的方法。
多年来,Spring Integration 的 DSL 发生了变化。我们最初的项目使用基于 XML 的 DSL,后来引入了 Java 配置组件模型,再后来又引入了 Java DSL。甚至有过短暂的 Scala DSL 尝试。现在我们有了 Kotlin DSL。Kotlin DSL 构建在 Spring Integration Java DSL 多年前奠定的基础上。它扩展了 DSL,使其更具 Kotlin 原生特性。
在这个应用中,我们将构建一个监控文件系统的应用。我展示这些例子是因为它们不需要您(读者)在本地机器上安装任何东西,除了一个文件系统,而这个文件系统您大概已经有了。Spring Integration 提供了丰富的集成工具箱。您可以与文件系统(远程和本地)、数据库、消息队列以及无数其他协议和集成进行通信。
让我们构建一个新应用。访问 Spring Initializr 并确保在语言选择中选择 Kotlin
。另外,请确保使用 Spring Boot 2.3.x 或更高版本。
然后在依赖下拉框中选择 Spring Integration
。我们还需要添加一个特定于 Spring Integration 的依赖。虽然这个依赖在 Spring Initializr 上不可发现,但它**受** Spring Boot 管理,所以我们可以很容易地手动添加到我们的构建中。
点击 Generate
然后在您喜欢的 IDE 中打开项目。
前往 pom.xml
文件并添加以下依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>${spring-integration.version}</version>
</dependency>
现在,让我们看看应用本身。实现将相当简单。我们来看一下伪代码。
input
目录 ($HOME/Desktop/in
) 时,Spring Integration 入站适配器会注意到它的到来,然后将其转发到...。.csv
文件的处理器、处理 .xml
文件的处理器,还是其他所有文件的处理器。.csv
和 .txt
处理器最终会将文件移动到相应的目录。我们不会为文件类型不是其他类型的情况指定处理器,但这里的可能性是无限的。您可以将错误的文件转发到发送电子邮件、写入数据库或发布消息到 Apache Kafka 代理的处理器等。这里是无限的!
我们将有**三个**流。我知道你刚刚读了那些伪代码要点,可能认为只有一个流,但我已经将处理 .csv
文件的处理器与处理 .txt
文件的处理器解耦了。解耦很有用,因为它意味着其他流可以生成文件,然后这些文件作为下游处理器被路由到 csv
或 txt
流。解耦支持良好的清晰架构。它允许我通过保持流的小巧和单一焦点来重用它们。这与编写函数的建议相同。
我们通过明智地使用 MessageChannels
来解耦流。通道就像命名的管道——消息通过它们流动。
这是我们基本的 Spring Boot 应用,只包含导入、我们的 main()
函数以及其他少量内容。
package com.example.kotlinspringintegration
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.file.dsl.Files
import java.io.File
@SpringBootApplication
class KotlinSpringIntegrationApplication
fun main(args: Array<String>) {
runApplication<KotlinSpringIntegrationApplication>(*args)
}
在此基础上,我们来定义消息通道。
@Configuration
class ChannelsConfiguration {
@Bean
fun txt() = MessageChannels.direct().get()
@Bean
fun csv() = MessageChannels.direct().get()
@Bean
fun errors() = MessageChannels.direct().get()
}
我们可以注入配置类,然后调用方法来解除引用单个通道。
让我们看一下主要的集成流。Spring Integration Java DSL 一直很方便,但从 Kotlin 语言访问它更容易。在这个配置类中,我使用 Kotlin 的 integrationFlow
工厂函数定义了一个 IntegrationFlow
。它接着接受一个 Lambda,该 Lambda 作为我在其上挂载集成流中各种步骤的上下文。我通过指向触发流的新消息将到达的位置来构建 IntegrationFlow
。在这种情况下,消息在消费来自监控目录 ($HOME/Desktop/in
) 的入站文件适配器的消息(每 500 毫秒)之后到达。配置的轮询器确定轮询新消息的时间表。在这里,Spring Integration Kotlin DSL 使事情变得更容易。这比原始的 Java 配置 DSL 更清晰(至少在我看来)。
文件一到达,它们就会被封装到 Message<File>
中,并转发到 filter<File>
扩展函数。请注意,这里我不需要指定 File.class
—— Kotlin 具有伪泛型具化(pseudo reified generics)—— 参数的类型在函数本身的泛型调用中被捕获。不需要类型令牌。filter
函数期望一个 Lambda,该 Lambda 检查当前消息(通过隐式参数 it
获取)并确认它是一个文件(而不是目录或其他)。如果是,流将继续到路由器。
然后路由器检查消息,并确定结果消息应转发到哪个出站 MessageChannel
。这个路由器使用了 Kotlin 巧妙的 when
表达式——有点像 Java 中增强版的 switch
语句。(注意:Java 中有一个非常有前景的 switch 表达式,但是现在有多少人正在使用它呢?)。when
表达式会产生一个值。在 Kotlin 中,函数的最后一个表达式就是返回值(您很少需要指定 return
)。在这种情况下,函数的最后一个表达式是 when
表达式的结果:一个 MessageChannel
。
@Configuration
class FileConfiguration(private val channels: ChannelsConfiguration) {
private val input = File("${System.getenv("HOME")}/Desktop/in")
private val output = File("${System.getenv("HOME")}/Desktop/out")
private val csv = File(output, "csv")
private val txt = File(output, "txt")
@Bean
fun filesFlow() = integrationFlow(
Files.inboundAdapter(this.input).autoCreateDirectory(true),
{ poller { it.fixedDelay(500).maxMessagesPerPoll(1) } }
) {
filter<File> { it.isFile }
route<File> {
when (it.extension.toLowerCase()) {
"csv" -> channels.csv()
"txt" -> channels.txt()
else -> channels.errors()
}
}
}
}
此时,这个流程结束了。消息无处可去。我们的列车已经跑完了轨道!我们需要再铺设两个流程。一个用于处理以 csv
结尾的文件,另一个用于处理以 txt
结尾的文件。我们来看一下。
在同一个配置类中,添加另外两个集成流程 bean 定义。
@Bean
fun csvFlow() = integrationFlow(channels.csv()) {
handle(Files.outboundAdapter(csv).autoCreateDirectory(true))
}
@Bean
fun txtFlow() = integrationFlow(channels.txt()) {
handle(Files.outboundAdapter(txt).autoCreateDirectory(true))
}
最后一个例子应该看起来与您目前所看到的合理相似,只是启动流程的不是入站适配器,而是任何从消息通道出来的消息。第一个流程 csvFlow
在消息从 csv
消息通道到达时启动。第二个流程 txtFlow
也是如此。两个流程都相当突然地终止,只做一件事。它们将消息转发到一个出站适配器,该适配器又将文件写入另一个目录。出站适配器是入站适配器的镜像;它从 Spring Integration 流程中获取消息并将其发送到现实世界中的某个接收器(文件系统)。入站适配器从现实世界中获取值并将其转换为 Spring Integration 消息。
此时,我们已经有了一个工作中的处理流程。我有点含糊地略过了这样一个问题:如果消息不是 txt
或 csv
文件并最终进入 errors
通道,会发生什么?正如我前面提到的,这里的可能性是无限的。