领先一步
VMware 提供培训和认证来加速您的进步。
了解更多嗨,Spring 粉丝们!在本期节目中,我们将了解 Spring Integration 的新 Kotlin DSL。我之前在其他视频中介绍过 Spring Integration 和 Kotlin。我非常确定我也曾在基于 Kotlin 的 Spring 应用程序中使用过 Spring Integration,但这是我第一次能够专门介绍 Spring Integration 的 Kotlin DSL。
Spring Integration 已经存在很长时间了——至少 13 年——它满足了一个永恒的用例:异构系统和服务的集成。它以 Gregor Hohpe 和 Bobby Woolf 的开创性著作 企业集成模式 为蓝本。这是一本很棒的著作,我强烈推荐它,因为它在某种程度上充当了理解 Spring Integration 所需的文档。Spring Integration 将书中的模式编纂成代码;API 元素以书中相关模式命名。
集成本质上是高级工作。它关乎不同管道系统在输入和输出方面的运作。您不希望花费太多时间在最低级别处理对象图来完成这项工作。从它们支持的输入和输出方面解耦这些系统和服务要容易得多。Spring Integration 为您提供了一种执行此操作的方法。
多年来,Spring Integration 的 DSL 发生了变化。我们最初使用基于 XML 的 DSL 启动了该项目,后来引入了 Java 配置组件模型,再后来引入了 Java DSL。甚至还短暂地尝试过 Scala DSL。现在我们有了 Kotlin DSL。Kotlin DSL 建立在几年前在 Spring Integration Java DSL 中奠定的基础之上。它扩展了 DSL,使其更适合 Kotlin 本身。
在此应用程序中,我们将构建一个应用程序来监视文件系统。我展示这些示例是因为它们不需要您(观众)在本地机器上安装任何东西,除了一个文件系统,您可能已经拥有了。Spring Integration 提供了丰富的集成工具箱。您可以与文件系统(远程和本地)、数据库、消息队列以及无数其他协议和集成进行交互。
让我们构建一个新的应用程序。转到 Spring Initializr 并确保选择 Kotlin
作为语言选择。另外,请确保使用 Spring Boot 2.3.x 或更高版本。
然后在依赖项组合框中,选择 Spring Integration
。我们还需要添加一个 Spring Integration 特定的依赖项。虽然此依赖项在 Spring Initializr 上不可发现,但它由 Spring Boot 管理,因此我们可以轻松地将其手动添加到我们的构建中。
点击 生成
,然后在您喜欢的 IDE 中打开项目。
转到 pom.xml
文件并添加以下依赖项
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>${spring-integration.version}</version>
</dependency>
现在,让我们转向应用程序本身。实现将非常简单。让我们看看伪代码。
input
目录($HOME/Desktop/in
)时,Spring Integration 入站适配器将注意到它的到达,然后将其转发到….csv
文件、.xml
文件还是其他所有文件的处理程序。.csv
和 .txt
处理程序最终会将文件移动到相应的目录。我们不会为文件属于其他类型的情况指定处理程序,但这里可能性是无限的。您可以将错误文件转发到发送电子邮件、写入数据库或将消息发布到 Apache Kafka 代理等的处理程序。这里的天空是无限的!
我们将有三个流程。我知道您刚刚阅读了这些伪代码要点,并且可能认为只有一个流程,但我已将 .csv
文件的处理程序与 .txt
文件的处理程序解耦。解耦很有用,因为这意味着其他流程可能会生成文件,然后将其作为下游处理程序路由到 csv
或 txt
流程。解耦支持良好的干净架构。它允许我通过保持流程小巧且专注于单一目标来重新利用我的流程。这与编写函数时适用的建议相同。
我们通过明智地使用 MessageChannels
来解耦流程。通道就像消息流经的命名管道。
这是我们的基本 Spring Boot 应用程序,只有导入、我们的 main()
函数,以及其他不多。
package com.example.kotlinspringintegration
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.file.dsl.Files
import java.io.File
@SpringBootApplication
class KotlinSpringIntegrationApplication
fun main(args: Array<String>) {
runApplication<KotlinSpringIntegrationApplication>(*args)
}
从这里,让我们定义消息通道。
@Configuration
class ChannelsConfiguration {
@Bean
fun txt() = MessageChannels.direct().get()
@Bean
fun csv() = MessageChannels.direct().get()
@Bean
fun errors() = MessageChannels.direct().get()
}
我们可以注入配置类,然后调用方法来取消引用各个通道。
让我们看看主要的集成流程。Spring Integration Java DSL 一直以来都非常方便,但在 Kotli 语言中它更容易访问。在此配置类中,我使用 Kotlin 的 integrationFlow
工厂函数定义了一个 IntegrationFlow
。它反过来接受一个 lambda 作为上下文,我可以在其上挂载集成流程中的各个步骤。我通过将其指向新消息(启动流程)到达的位置来构建 IntegrationFlow
。在本例中,消息在使用监视目录($HOME/Desktop/in
)的入站文件适配器使用消息后到达。配置的轮询器确定轮询新消息的时间表。这里同样,Spring Integration Kotlin DSL 使事情变得更容易。这比原始的 Java 配置 DSL 清晰得多(至少在我看来)。
文件到达后,它们将包装在 Message<File>
中并转发到 filter<File>
扩展函数。请注意,这里我无需指定 File.class
——Kotlin 具有伪具现化泛型——参数的类型在函数本身的泛型调用中捕获。无需类型标记。filter
函数期望一个 lambda,它检查当前消息(通过隐式参数 it
可用)并确认它是一个文件(而不是目录或其他内容)。如果是,则流程继续到路由器。
然后,路由器检查消息并确定应将结果消息转发到哪个出站 MessageChannel
。此路由器使用 Kotlin 的巧妙 when
表达式——有点像 Java 中的增强型 switch
语句。(注意:Java 中有一个很有前途的 switch 表达式,但现在有多少人在使用它呢?)。when
表达式生成一个值。在 Kotlin 中,函数的最后一个表达式是返回值(您很少需要指定 return
)。在本例中,函数的最后一个表达式是 when
表达式的结果:一个 MessageChannel
。
@Configuration
class FileConfiguration(private val channels: ChannelsConfiguration) {
private val input = File("${System.getenv("HOME")}/Desktop/in")
private val output = File("${System.getenv("HOME")}/Desktop/out")
private val csv = File(output, "csv")
private val txt = File(output, "txt")
@Bean
fun filesFlow() = integrationFlow(
Files.inboundAdapter(this.input).autoCreateDirectory(true),
{ poller { it.fixedDelay(500).maxMessagesPerPoll(1) } }
) {
filter<File> { it.isFile }
route<File> {
when (it.extension.toLowerCase()) {
"csv" -> channels.csv()
"txt" -> channels.txt()
else -> channels.errors()
}
}
}
}
此时,此流程结束。消息没有去处。我们的火车已经跑完了轨道!我们需要铺设另外两条轨道。一条用于以 csv
结尾的文件,另一条用于以 txt
结尾的文件。让我们看看。
在同一个配置类中,再添加两个集成流程 Bean 定义。
@Bean
fun csvFlow() = integrationFlow(channels.csv()) {
handle(Files.outboundAdapter(csv).autoCreateDirectory(true))
}
@Bean
fun txtFlow() = integrationFlow(channels.txt()) {
handle(Files.outboundAdapter(txt).autoCreateDirectory(true))
}
最后一个示例看起来应该与您迄今为止看到的非常相似,除了启动流程的内容不是入站适配器,而是来自消息通道的任何消息。第一个流程 csvFlow
在消息从 csv
消息通道到达时启动。第二个流程 txtFlow
也是如此。这两个流程都相当突然地终止,只做一件事。它们将消息转发到出站适配器,该适配器反过来将文件写入其他目录。出站适配器是入站适配器的镜像;它从 Spring Integration 流程中获取消息并将其发送到现实世界中的某个接收器(文件系统)。入站适配器从现实世界中获取值并将它们转换为 Spring Integration 消息。
至此,我们得到了一个可工作的处理流程。我有点回避了如果消息不是 txt
或 csv
文件并最终进入 errors
通道会发生什么的问题?正如我之前提到的,这里可能性是无限的。