Spring技巧:使用Spring Integration集成远程文件系统(FTP)

工程 | Josh Long | 2020年3月18日 | ...

Spring技巧:FTP集成

演讲者:Josh Long (@starbuxman)

嗨,Spring 粉丝们!在本期Spring技巧中,我们将探讨一个与我息息相关的话题:集成!是的,您可能还记得,第一期《Spring技巧》就介绍了Spring Integration。如果您还没有观看,您应该看看。因此,虽然我们不会重新讨论Spring Integration的基础知识,但我们将深入探讨Spring Integration的一个支持领域:FTP。FTP主要用于文件同步。广义上讲,在企业应用集成(EAI)领域,我们有四种类型的集成:文件同步、RPC、数据库同步和消息传递。

文件同步绝对不是大多数人在想到云原生应用程序时首先想到的,但您会惊讶地发现,金融领域有多少是通过文件同步(FTP、SFTP、AS2、FTPS、NFS、SMB等)集成来运行的。当然,大多数人使用的是更安全的变体,但要点仍然有效。在本视频中,我们将了解如何使用Spring Integration的FTP支持,一旦您理解了这一点,将其应用于其他变体就足够容易了。

请允许我在这里稍微自夸一下:我认为自从十多年前我主要负责完善Iwein Fuld的原始原型代码,并且自从我贡献了原始的FTPS和SFTP适配器以来,我一直认为我已经了解了Spring Integration的FTP支持所需的一切知识。在过去的十年中,不出所料,Spring Integration团队添加了大量的新的功能,并修复了我原始代码中的所有错误!我喜欢引入的新功能。

所以,首先:我们需要设置一个FTP服务器。Spring Integration的大部分支持作为客户端连接到已安装的FTP服务器。因此,您使用什么FTP服务器并不重要。但是,我建议您使用Apache FTPServer项目。这是一个Apache Mina项目的子项目,顺便说一下,它是Netty项目的前身。Apache FTP Server是一个超级可扩展的、轻量级的、全Java实现的FTP协议。而且,您可以轻松地将其嵌入到Spring应用程序中。我已经在本视频的Github仓库中这样做了。我定义了一个自定义的UserManager类来管理FTP用户帐户。自定义的UserManager与本地PostgreSQL数据库中的一个简单的表ftp_user进行交互,其模式定义为具有以下列的表

我在里面有两个用户,jlonggrussell,它们都有密码pw。我已经将两个记录的enabledadmin都设置为true。稍后我们将使用这两个帐户,因此请确保您将它们插入到表中,如下所示。

insert into ftp_user(username, password, enabled, admin) values ('jlong', 'pw', true, true);
insert into ftp_user(username, password, enabled, admin) values ('grussell', 'pw', true, true);

我不会在这里完整地重印FTP服务器的代码。如果您想仔细阅读,我建议您查看FtpServerConfigurationFtpUserManager

在大多数情况下,我们无法更改FTP服务器。如果我们想收到远程文件系统中任何更改的通知,我们的客户端需要连接、扫描目录并将其与之前的已知状态进行比较。基本上,客户端计算增量并发布事件。但是,如果FTP服务器可以在发生某些事情时广播事件,那不是很好吗?这样,就不会对发生了什么事情有任何疑问。而且毫无疑问,我们观察到了每一次变化。如果我们使用的是任何其他FTP服务器,这将更像是一个愿望而不是可能性。但由于我们使用的是Apache FTP Server,Spring Integration为我们提供了一些有趣的选择。我们可以安装一个FTPlet(有点像过滤器),它将FTP服务器上的任何重要事件广播为ApplicationContext事件。然后,我们可以使用Spring Integration将有趣的事件发布为消息,我们可以在Spring Integration中处理这些消息。此功能是Spring Integration中的新功能。

package ftp;

import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.ftp.server.ApacheMinaFtpEvent;
import org.springframework.integration.ftp.server.ApacheMinaFtplet;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.messaging.MessageChannel;

@Log4j2
@Configuration
class IntegrationConfiguration {

	@Bean
	ApacheMinaFtplet apacheMinaFtplet() {
		return new ApacheMinaFtplet();
	}

	@Bean
	MessageChannel eventsChannel() {
		return MessageChannels.direct().get();
	}

	@Bean
	IntegrationFlow integrationFlow() {
		return IntegrationFlows.from(this.eventsChannel())
			.handle((GenericHandler<ApacheMinaFtpEvent>) (apacheMinaFtpEvent, messageHeaders) -> {
                log.info("new event: " + apacheMinaFtpEvent.getClass().getName() + 
                     ':' + apacheMinaFtpEvent.getSession());
				return null;
			})
			.get();
	}

	@Bean
	ApplicationEventListeningMessageProducer applicationEventListeningMessageProducer() {
		var producer = new ApplicationEventListeningMessageProducer();
		producer.setEventTypes(ApacheMinaFtpEvent.class);
		producer.setOutputChannel(eventsChannel());
		return producer;
	}
}

此示例设置了一个Spring Integration消息流,该流侦听相关事件并将其记录出来。显然,我们对这个新信息做了太多处理,但需要注意的是…… *我们完全可以这样做*!这里有很多机会。我们可以通过Apache Kafka、RabbitMQ或JMS发布事件,让其他节点响应。我们可以发送一封电子邮件,邀请某人参与某个工作流。无限可能!

现在,我们有一个在端口7777上运行的服务器,我们可以使用客户端进行连接。我使用的是Filezilla。无论您使用哪个客户端,请尝试登录到主机为localhost、端口为7777、用户为jlong、密码为pw的正在运行的FTP服务器。上传文件、重命名文件等,然后检查应用程序的控制台,您将看到事件中反映的活动。

FTP客户端

我们有一个可运行的服务器。让我们看看Spring Integration如何充当您服务的客户端。我们将使用最简单的抽象,逐步提升到更复杂的功能。在Spring Initializr上创建一个新项目,添加LombokSpring Integration,并选择最新版本的Java。然后单击“生成”,并在您的IDE中打开项目。

我们将使用前面定义的两个帐户。让我们在application.properties中配置它们。

##
## Josh
ftp1.username=jlong
ftp1.password=pw
ftp1.port=7777
ftp1.host=localhost
## 
## Gary
ftp2.username=grussell
ftp2.password=pw
ftp2.port=7777
ftp2.host=localhost

FtpRemoteFileTemplate

我们可以与FTP服务器交互的最简单方法是使用Spring Integration附带的非常方便的FtpRemoteFileTemplate。这是一个示例。此第一个示例定义了一个DefaultFtpSessionFactory,它建立与其中一个FTP帐户的连接。然后我们使用该DefaultFtpSessionFactory定义一个FtpRemoteFileTemplate。然后,我们定义一个初始化程序,它使用该FtpRemoteFileTemplate将远程文件系统上的文件hello.txt读取到本地文件$HOME/Desktop/hello-local.txt中。它再简单不过了!

package com.example.integration;

import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.ftp.session.FtpRemoteFileTemplate;

import java.io.File;
import java.io.FileOutputStream;

@Log4j2
@Configuration
class FtpTemplateConfiguration {

	@Bean
	InitializingBean initializingBean(FtpRemoteFileTemplate template) {
		return () -> template
			.execute(session -> {
				var file = new File(new File(System.getProperty("user.home"), "Desktop"), "hello-local.txt");
				try (var fout = new FileOutputStream(file)) {
					session.read("hello.txt", fout);
				}
				log.info("read " + file.getAbsolutePath());
				return null;
			});
	}

	@Bean
	DefaultFtpSessionFactory defaultFtpSessionFactory(
		@Value("${ftp1.username}") String username,
		@Value("${ftp1.password}") String pw,
		@Value("${ftp1.host}") String host,
		@Value("${ftp1.port}") int port) {
		DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
		defaultFtpSessionFactory.setPassword(pw);
		defaultFtpSessionFactory.setUsername(username);
		defaultFtpSessionFactory.setHost(host);
		defaultFtpSessionFactory.setPort(port);
		return defaultFtpSessionFactory;
	}

	@Bean
	FtpRemoteFileTemplate ftpRemoteFileTemplate(DefaultFtpSessionFactory dsf) {
		return new FtpRemoteFileTemplate(dsf);
	}
}

FTP入站适配器

下一个示例介绍如何使用FTP入站适配器,以便每当远程文件系统上有新文件时接收新的Message<File>。入站或出站适配器是单向消息组件。入站适配器将来自远程系统的事件转换为新的消息,这些消息被传递到Spring Integration流中。出站适配器将Spring Integration Message<T>转换为远程系统中的事件。在这种情况下,每当远程文件系统上出现新文件时,FTP入站适配器都会将Message<T>发布到Spring Integration代码中。

和之前一样,我们配置一个DefaultFtpSessionFactory。然后,我们配置一个FTP入站适配器,只要匹配掩码.txt的任何文件到达服务器,它就会自动同步远程文件系统。入站适配器获取远程文件,将其移动到本地目录,然后发布一个Message<File>,我们可以对其做任何我们想做的事情。在这里,我只是记录消息。试试看!将文件foo.txt上传到FTP服务器,然后观察 - 不超过一秒钟后 - 它就会被下载并存储在本地文件系统下的$HOME/Desktop/local目录中。


package com.example.integration;

import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ftp.dsl.Ftp;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;

import java.io.File;
import java.util.concurrent.TimeUnit;

@Log4j2
@Configuration
class InboundConfiguration {

	@Bean
	DefaultFtpSessionFactory defaultFtpSessionFactory(
		@Value("${ftp1.username}") String username,
		@Value("${ftp1.password}") String pw,
		@Value("${ftp1.host}") String host,
		@Value("${ftp1.port}") int port) {
		DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
		defaultFtpSessionFactory.setPassword(pw);
		defaultFtpSessionFactory.setUsername(username);
		defaultFtpSessionFactory.setHost(host);
		defaultFtpSessionFactory.setPort(port);
		return defaultFtpSessionFactory;
	}

	@Bean
	IntegrationFlow inbound(DefaultFtpSessionFactory ftpSf) {
		var localDirectory = new File(new File(System.getProperty("user.home"), "Desktop"), "local");
		var spec = Ftp
			.inboundAdapter(ftpSf)
			.autoCreateLocalDirectory(true)
			.patternFilter("*.txt")
			.localDirectory(localDirectory);
		return IntegrationFlows
			.from(spec, pc -> pc.poller(pm -> pm.fixedRate(1000, TimeUnit.MILLISECONDS)))
			.handle((file, messageHeaders) -> {
				log.info("new file: " + file + ".");
				messageHeaders.forEach((k, v) -> log.info(k + ':' + v));
				return null;
			})
			.get();
	}
}

FTP 网关

现在,让我们来看一下Spring Integration FTP网关,这是我们最后一站。在Spring Integration中,网关是一个发送数据(到远程服务)然后接收响应并将其带回Spring Integration流程的组件。或者,网关也可以接收来自远程系统的传入请求,将其带入Spring Integration流程,然后再次发送响应。无论哪种方式,网关都是一个双向消息组件。在这种情况下,FTP网关接收Spring Integration Message<T>,将其发送到FTP服务器并上传它们,一旦上传完成,就将响应(至少是确认)发送回Spring Integration代码。

如果我们只做这些,本身就很有用。但是,对于最后一个示例,我想根据某些条件有条件地将文件上传到两个FTP服务器帐户之一。您可以想象一下这种情况。一个HTTP请求到来,它被转换成一个进入Spring Integration流程的Message<T>,然后它到达网关。唯一的问题是:应该将数据上传到哪个帐户?如果打算给John的文件上传到了Jane的帐户,Jane可能不会高兴。

我们将使用DelegatingSessionFactory<FTPFile>DelegatingSessionFactory<FTPFile>有两个构造函数。一个接受SessionFactoryLocator,您可以使用它在运行时决定使用哪个FTP帐户。另一个接受一个Map<String, SessionFactory>,这反过来会产生一个SessionFactoryLocator,它查看传入消息的某些属性(由您决定哪个属性),并将其用作地图中查找的键。

我们需要某种方法来启动管道,所以我创建了一个简单的HTTP端点,它接受HTTP POST消息并使用路径变量来建立一个键,然后将其发送到集成流中。集成流有三个步骤。第一阶段查看传入的消息并配置DelegatingSessionFactory的线程局部键,然后它将消息转发到执行文件上传到远程文件系统的网关,然后它将上传的响应转发到另一个清除线程局部键的组件。

package com.example.integration;

import org.apache.commons.net.ftp.FTPFile;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.ftp.dsl.Ftp;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.ftp.session.FtpRemoteFileTemplate;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.servlet.function.RouterFunction;
import org.springframework.web.servlet.function.ServerResponse;

import java.util.Map;

import static org.springframework.web.servlet.function.RouterFunctions.route;

@Configuration
@Profile("gateway")
class GatewayConfiguration {

	@Bean
	MessageChannel incoming() {
		return MessageChannels.direct().get();
	}

	@Bean
	IntegrationFlow gateway(
		FtpRemoteFileTemplate template,
		DelegatingSessionFactory<FTPFile> dsf) {
		return f -> f
			.channel(incoming())
			.handle((GenericHandler<Object>) (key, messageHeaders) -> {
				dsf.setThreadKey(key);
				return key;
			})
			.handle(Ftp
				.outboundGateway(template, AbstractRemoteFileOutboundGateway.Command.PUT, "payload")
				.fileExistsMode(FileExistsMode.IGNORE)
				.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
			)
			.handle((GenericHandler<Object>) (key, messageHeaders) -> {
				dsf.clearThreadKey();
				return null;
			});
	}

	@Bean
	DelegatingSessionFactory<FTPFile> dsf(Map<String, DefaultFtpSessionFactory> ftpSessionFactories) {
		return new DelegatingSessionFactory<>(ftpSessionFactories::get);
	}

	@Bean
	DefaultFtpSessionFactory gary(@Value("${ftp2.username}") String username, @Value("${ftp2.password}") String pw, @Value("${ftp2.host}") String host, @Value("${ftp2.port}") int port) {
		return this.createSessionFactory(username, pw, host, port);
	}

	@Bean
	DefaultFtpSessionFactory josh(@Value("${ftp1.username}") String username, @Value("${ftp1.password}") String pw, @Value("${ftp1.host}") String host, @Value("${ftp1.port}") int port) {
		return this.createSessionFactory(username, pw, host, port);
    }

    @Bean
	FtpRemoteFileTemplate ftpRemoteFileTemplate(DelegatingSessionFactory<FTPFile> dsf) {
		var ftpRemoteFileTemplate = new FtpRemoteFileTemplate(dsf);
		ftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression(""));
		return ftpRemoteFileTemplate;
	}
    

	private DefaultFtpSessionFactory createSessionFactory(String username, String pw, String host, int port) {
		var defaultFtpSessionFactory = new DefaultFtpSessionFactory();
		defaultFtpSessionFactory.setPassword(pw);
		defaultFtpSessionFactory.setUsername(username);
		defaultFtpSessionFactory.setHost(host);
		defaultFtpSessionFactory.setPort(port);
		return defaultFtpSessionFactory;
    }

	@Bean
	RouterFunction<ServerResponse> routes() {
		var in = this.incoming();
		return route()
			.POST("/put/{sfn}", request -> {
				var name = request.pathVariable("sfn");
				var msg = MessageBuilder.withPayload(name).build();
				var sent = in.send(msg);
				return ServerResponse.ok().body(sent);
			})
			.build();
	}
}

您可以通过运行curl -XPOST https://127.0.0.1:8080/put/one自己尝试这个流程。这会将文件上传到bean名称为one的FTP帐户。尝试curl -XPOST https://127.0.0.1:8080/put/two将文件上传到bean名称为two的FTP帐户。

结论

在本期Spring Tips中,我们了解了如何处理各种FTP集成场景。您可以利用此处学到的知识来处理框架中对远程文件系统的其他支持。

获取Spring通讯

与Spring通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部