Spring Tips: Spring Integration Kotlin DSL

工程 | Josh Long | 2020年4月7日 | ...

主讲人: Josh Long (@starbuxman)

大家好,Spring 爱好者们!在本期文章中,我们将介绍 Spring Integration 新的 Kotlin DSL。我之前在其他视频中已经介绍过 Spring Integration 和 Kotlin。我**非常**确定我也曾在基于 Kotlin 的 Spring 应用中使用过 Spring Integration,但这是我第一次能够专门介绍 Spring Integration 的 Kotlin DSL。

Spring Integration 已经存在了很长时间——至少 13 年——它服务于一个永恒的用例:集成不同的系统和服务。它借鉴了 Gregor Hohpe 和 Bobby Woolf 合著的开创性著作 《企业集成模式》。这是一本极好的著作,我强烈推荐它,因为它在某种程度上是理解 Spring Integration 所需的文档。Spring Integration 将书中的模式编码化;API 元素的命名与书中的相关模式一致。

集成必然是高层级的工作。它关注不同管道系统的输入和输出。你不想在最低层级花太多时间处理对象图来完成这项工作。根据它们支持的输入和输出来解耦这些系统和服务要容易得多。Spring Integration 提供了实现这一目标的方法。

多年来,Spring Integration 的 DSL 发生了变化。我们最初的项目使用基于 XML 的 DSL,后来引入了 Java 配置组件模型,再后来又引入了 Java DSL。甚至有过短暂的 Scala DSL 尝试。现在我们有了 Kotlin DSL。Kotlin DSL 构建在 Spring Integration Java DSL 多年前奠定的基础上。它扩展了 DSL,使其更具 Kotlin 原生特性。

在这个应用中,我们将构建一个监控文件系统的应用。我展示这些例子是因为它们不需要您(读者)在本地机器上安装任何东西,除了一个文件系统,而这个文件系统您大概已经有了。Spring Integration 提供了丰富的集成工具箱。您可以与文件系统(远程和本地)、数据库、消息队列以及无数其他协议和集成进行通信。

让我们构建一个新应用。访问 Spring Initializr 并确保在语言选择中选择 Kotlin。另外,请确保使用 Spring Boot 2.3.x 或更高版本。

然后在依赖下拉框中选择 Spring Integration。我们还需要添加一个特定于 Spring Integration 的依赖。虽然这个依赖在 Spring Initializr 上不可发现,但它**受** Spring Boot 管理,所以我们可以很容易地手动添加到我们的构建中。

点击 Generate 然后在您喜欢的 IDE 中打开项目。

前往 pom.xml 文件并添加以下依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>${spring-integration.version}</version>
</dependency>

现在,让我们看看应用本身。实现将相当简单。我们来看一下伪代码。

  • 当文件到达 input 目录 ($HOME/Desktop/in) 时,Spring Integration 入站适配器会注意到它的到来,然后将其转发到...。
  • ...一个过滤器,该过滤器将判断该条目是否是文件(而不是目录),然后将其发送到...
  • ...一个路由器,该路由器将根据文件的扩展名决定是将其发送到处理 .csv 文件的处理器、处理 .xml 文件的处理器,还是其他所有文件的处理器。
  • .csv.txt 处理器最终会将文件移动到相应的目录。

我们不会为文件类型不是其他类型的情况指定处理器,但这里的可能性是无限的。您可以将错误的文件转发到发送电子邮件、写入数据库或发布消息到 Apache Kafka 代理的处理器等。这里是无限的!

我们将有**三个**流。我知道你刚刚读了那些伪代码要点,可能认为只有一个流,但我已经将处理 .csv 文件的处理器与处理 .txt 文件的处理器解耦了。解耦很有用,因为它意味着其他流可以生成文件,然后这些文件作为下游处理器被路由到 csvtxt 流。解耦支持良好的清晰架构。它允许我通过保持流的小巧和单一焦点来重用它们。这与编写函数的建议相同。

我们通过明智地使用 MessageChannels 来解耦流。通道就像命名的管道——消息通过它们流动。

这是我们基本的 Spring Boot 应用,只包含导入、我们的 main() 函数以及其他少量内容。

package com.example.kotlinspringintegration

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.file.dsl.Files
import java.io.File

@SpringBootApplication
class KotlinSpringIntegrationApplication

fun main(args: Array<String>) {
    runApplication<KotlinSpringIntegrationApplication>(*args)
}

在此基础上,我们来定义消息通道。

@Configuration
class ChannelsConfiguration {

    @Bean
    fun txt() = MessageChannels.direct().get()

    @Bean
    fun csv() = MessageChannels.direct().get()

    @Bean
    fun errors() = MessageChannels.direct().get()
}

我们可以注入配置类,然后调用方法来解除引用单个通道。

让我们看一下主要的集成流。Spring Integration Java DSL 一直很方便,但从 Kotlin 语言访问它更容易。在这个配置类中,我使用 Kotlin 的 integrationFlow 工厂函数定义了一个 IntegrationFlow。它接着接受一个 Lambda,该 Lambda 作为我在其上挂载集成流中各种步骤的上下文。我通过指向触发流的新消息将到达的位置来构建 IntegrationFlow。在这种情况下,消息在消费来自监控目录 ($HOME/Desktop/in) 的入站文件适配器的消息(每 500 毫秒)之后到达。配置的轮询器确定轮询新消息的时间表。在这里,Spring Integration Kotlin DSL 使事情变得更容易。这比原始的 Java 配置 DSL 更清晰(至少在我看来)。

文件一到达,它们就会被封装到 Message<File> 中,并转发到 filter<File> 扩展函数。请注意,这里我不需要指定 File.class —— Kotlin 具有伪泛型具化(pseudo reified generics)—— 参数的类型在函数本身的泛型调用中被捕获。不需要类型令牌。filter 函数期望一个 Lambda,该 Lambda 检查当前消息(通过隐式参数 it 获取)并确认它是一个文件(而不是目录或其他)。如果是,流将继续到路由器。

然后路由器检查消息,并确定结果消息应转发到哪个出站 MessageChannel。这个路由器使用了 Kotlin 巧妙的 when 表达式——有点像 Java 中增强版的 switch 语句。(注意:Java 中有一个非常有前景的 switch 表达式,但是现在有多少人正在使用它呢?)。when 表达式会产生一个值。在 Kotlin 中,函数的最后一个表达式就是返回值(您很少需要指定 return)。在这种情况下,函数的最后一个表达式是 when 表达式的结果:一个 MessageChannel

@Configuration
class FileConfiguration(private val channels: ChannelsConfiguration) {

    private val input = File("${System.getenv("HOME")}/Desktop/in")
    private val output = File("${System.getenv("HOME")}/Desktop/out")
    private val csv = File(output, "csv")
    private val txt = File(output, "txt")

    @Bean
    fun filesFlow() = integrationFlow(
            Files.inboundAdapter(this.input).autoCreateDirectory(true),
            { poller { it.fixedDelay(500).maxMessagesPerPoll(1) } }
    ) {

        filter<File> { it.isFile }
        route<File> {
            when (it.extension.toLowerCase()) {
                "csv" -> channels.csv()
                "txt" -> channels.txt()
                else -> channels.errors()
            }
        }
    }
}

此时,这个流程结束了。消息无处可去。我们的列车已经跑完了轨道!我们需要再铺设两个流程。一个用于处理以 csv 结尾的文件,另一个用于处理以 txt 结尾的文件。我们来看一下。

在同一个配置类中,添加另外两个集成流程 bean 定义。


    @Bean
    fun csvFlow() = integrationFlow(channels.csv()) {
        handle(Files.outboundAdapter(csv).autoCreateDirectory(true))
    }

    @Bean
    fun txtFlow() = integrationFlow(channels.txt()) {
        handle(Files.outboundAdapter(txt).autoCreateDirectory(true))
    }

最后一个例子应该看起来与您目前所看到的合理相似,只是启动流程的不是入站适配器,而是任何从消息通道出来的消息。第一个流程 csvFlow 在消息从 csv 消息通道到达时启动。第二个流程 txtFlow 也是如此。两个流程都相当突然地终止,只做一件事。它们将消息转发到一个出站适配器,该适配器又将文件写入另一个目录。出站适配器是入站适配器的镜像;它从 Spring Integration 流程中获取消息并将其发送到现实世界中的某个接收器(文件系统)。入站适配器从现实世界中获取值并将其转换为 Spring Integration 消息。

此时,我们已经有了一个工作中的处理流程。我有点含糊地略过了这样一个问题:如果消息不是 txtcsv 文件并最终进入 errors 通道,会发生什么?正如我前面提到的,这里的可能性是无限的。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将发生的活动

查看 Spring 社区所有即将发生的活动。

查看全部