Spring 提示:Spring Integration Kotlin DSL

工程 | Josh Long | 2020年4月7日 | ...

演讲者:Josh Long (@starbuxman)

嗨,Spring 粉丝们!在本期节目中,我们将了解 Spring Integration 的新 Kotlin DSL。我之前在其他视频中介绍过 Spring Integration 和 Kotlin。我非常确定我也曾在基于 Kotlin 的 Spring 应用程序中使用过 Spring Integration,但这是我第一次能够专门介绍 Spring Integration 的 Kotlin DSL。

Spring Integration 已经存在很长时间了——至少 13 年——它满足了一个永恒的用例:异构系统和服务的集成。它以 Gregor Hohpe 和 Bobby Woolf 的开创性著作 企业集成模式 为蓝本。这是一本很棒的著作,我强烈推荐它,因为它在某种程度上充当了理解 Spring Integration 所需的文档。Spring Integration 将书中的模式编纂成代码;API 元素以书中相关模式命名。

集成本质上是高级工作。它关乎不同管道系统在输入和输出方面的运作。您不希望花费太多时间在最低级别处理对象图来完成这项工作。从它们支持的输入和输出方面解耦这些系统和服务要容易得多。Spring Integration 为您提供了一种执行此操作的方法。

多年来,Spring Integration 的 DSL 发生了变化。我们最初使用基于 XML 的 DSL 启动了该项目,后来引入了 Java 配置组件模型,再后来引入了 Java DSL。甚至还短暂地尝试过 Scala DSL。现在我们有了 Kotlin DSL。Kotlin DSL 建立在几年前在 Spring Integration Java DSL 中奠定的基础之上。它扩展了 DSL,使其更适合 Kotlin 本身。

在此应用程序中,我们将构建一个应用程序来监视文件系统。我展示这些示例是因为它们不需要您(观众)在本地机器上安装任何东西,除了一个文件系统,您可能已经拥有了。Spring Integration 提供了丰富的集成工具箱。您可以与文件系统(远程和本地)、数据库、消息队列以及无数其他协议和集成进行交互。

让我们构建一个新的应用程序。转到 Spring Initializr 并确保选择 Kotlin 作为语言选择。另外,请确保使用 Spring Boot 2.3.x 或更高版本。

然后在依赖项组合框中,选择 Spring Integration。我们还需要添加一个 Spring Integration 特定的依赖项。虽然此依赖项在 Spring Initializr 上不可发现,但它 Spring Boot 管理,因此我们可以轻松地将其手动添加到我们的构建中。

点击 生成,然后在您喜欢的 IDE 中打开项目。

转到 pom.xml 文件并添加以下依赖项

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>${spring-integration.version}</version>
</dependency>

现在,让我们转向应用程序本身。实现将非常简单。让我们看看伪代码。

  • 当文件到达 input 目录($HOME/Desktop/in)时,Spring Integration 入站适配器将注意到它的到达,然后将其转发到…
  • …过滤器,它将确定条目是否为文件(而不是目录),然后将其发送到…
  • …路由器,它将根据文件扩展名确定是将其发送到 .csv 文件、.xml 文件还是其他所有文件的处理程序。
  • .csv.txt 处理程序最终会将文件移动到相应的目录。

我们不会为文件属于其他类型的情况指定处理程序,但这里可能性是无限的。您可以将错误文件转发到发送电子邮件、写入数据库或将消息发布到 Apache Kafka 代理等的处理程序。这里的天空是无限的!

我们将有三个流程。我知道您刚刚阅读了这些伪代码要点,并且可能认为只有一个流程,但我已将 .csv 文件的处理程序与 .txt 文件的处理程序解耦。解耦很有用,因为这意味着其他流程可能会生成文件,然后将其作为下游处理程序路由到 csvtxt 流程。解耦支持良好的干净架构。它允许我通过保持流程小巧且专注于单一目标来重新利用我的流程。这与编写函数时适用的建议相同。

我们通过明智地使用 MessageChannels 来解耦流程。通道就像消息流经的命名管道。

这是我们的基本 Spring Boot 应用程序,只有导入、我们的 main() 函数,以及其他不多。

package com.example.kotlinspringintegration

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.file.dsl.Files
import java.io.File

@SpringBootApplication
class KotlinSpringIntegrationApplication

fun main(args: Array<String>) {
    runApplication<KotlinSpringIntegrationApplication>(*args)
}

从这里,让我们定义消息通道。

@Configuration
class ChannelsConfiguration {

    @Bean
    fun txt() = MessageChannels.direct().get()

    @Bean
    fun csv() = MessageChannels.direct().get()

    @Bean
    fun errors() = MessageChannels.direct().get()
}

我们可以注入配置类,然后调用方法来取消引用各个通道。

让我们看看主要的集成流程。Spring Integration Java DSL 一直以来都非常方便,但在 Kotli 语言中它更容易访问。在此配置类中,我使用 Kotlin 的 integrationFlow 工厂函数定义了一个 IntegrationFlow。它反过来接受一个 lambda 作为上下文,我可以在其上挂载集成流程中的各个步骤。我通过将其指向新消息(启动流程)到达的位置来构建 IntegrationFlow。在本例中,消息在使用监视目录($HOME/Desktop/in)的入站文件适配器使用消息后到达。配置的轮询器确定轮询新消息的时间表。这里同样,Spring Integration Kotlin DSL 使事情变得更容易。这比原始的 Java 配置 DSL 清晰得多(至少在我看来)。

文件到达后,它们将包装在 Message<File> 中并转发到 filter<File> 扩展函数。请注意,这里我无需指定 File.class——Kotlin 具有伪具现化泛型——参数的类型在函数本身的泛型调用中捕获。无需类型标记。filter 函数期望一个 lambda,它检查当前消息(通过隐式参数 it 可用)并确认它是一个文件(而不是目录或其他内容)。如果是,则流程继续到路由器。

然后,路由器检查消息并确定应将结果消息转发到哪个出站 MessageChannel。此路由器使用 Kotlin 的巧妙 when 表达式——有点像 Java 中的增强型 switch 语句。(注意:Java 中有一个很有前途的 switch 表达式,但现在有多少人在使用它呢?)。when 表达式生成一个值。在 Kotlin 中,函数的最后一个表达式是返回值(您很少需要指定 return)。在本例中,函数的最后一个表达式是 when 表达式的结果:一个 MessageChannel

@Configuration
class FileConfiguration(private val channels: ChannelsConfiguration) {

    private val input = File("${System.getenv("HOME")}/Desktop/in")
    private val output = File("${System.getenv("HOME")}/Desktop/out")
    private val csv = File(output, "csv")
    private val txt = File(output, "txt")

    @Bean
    fun filesFlow() = integrationFlow(
            Files.inboundAdapter(this.input).autoCreateDirectory(true),
            { poller { it.fixedDelay(500).maxMessagesPerPoll(1) } }
    ) {

        filter<File> { it.isFile }
        route<File> {
            when (it.extension.toLowerCase()) {
                "csv" -> channels.csv()
                "txt" -> channels.txt()
                else -> channels.errors()
            }
        }
    }
}

此时,此流程结束。消息没有去处。我们的火车已经跑完了轨道!我们需要铺设另外两条轨道。一条用于以 csv 结尾的文件,另一条用于以 txt 结尾的文件。让我们看看。

在同一个配置类中,再添加两个集成流程 Bean 定义。


    @Bean
    fun csvFlow() = integrationFlow(channels.csv()) {
        handle(Files.outboundAdapter(csv).autoCreateDirectory(true))
    }

    @Bean
    fun txtFlow() = integrationFlow(channels.txt()) {
        handle(Files.outboundAdapter(txt).autoCreateDirectory(true))
    }

最后一个示例看起来应该与您迄今为止看到的非常相似,除了启动流程的内容不是入站适配器,而是来自消息通道的任何消息。第一个流程 csvFlow 在消息从 csv 消息通道到达时启动。第二个流程 txtFlow 也是如此。这两个流程都相当突然地终止,只做一件事。它们将消息转发到出站适配器,该适配器反过来将文件写入其他目录。出站适配器是入站适配器的镜像;它从 Spring Integration 流程中获取消息并将其发送到现实世界中的某个接收器(文件系统)。入站适配器从现实世界中获取值并将它们转换为 Spring Integration 消息。

至此,我们得到了一个可工作的处理流程。我有点回避了如果消息不是 txtcsv 文件并最终进入 errors 通道会发生什么的问题?正如我之前提到的,这里可能性是无限的。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证来加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部