快人一步
VMware 提供培训和认证,助您加速进步。
了解更多本文是一个博客系列的一部分,该系列探讨了基于 Java 函数的重新设计的 Spring Cloud Stream 应用程序。在本章中,我们将探讨如何使用 Spring Cloud Stream Applications 和 Spring Cloud Data Flow 实现一个非常常见的 ETL 用例:从远程服务摄取文件。具体来说,我们将介绍如何从 S3、SFTP 和 FTP 摄取文件。
以下是该博客系列迄今为止包含的内容
从宏观角度来看,Spring Cloud Data Flow 多年来一直支持使用 SFTP 进行远程文件摄取。自我撰写 此文章 以来,基本架构没有改变,但正如我们将看到的那样,新的流应用程序提供了更简单、更灵活的解决方案。
文件摄取架构始于一个 远程文件源,它轮询远程目录并为检测到的每个文件发布一条消息。术语 远程文件源 指代任何提供此功能的源应用程序。目前,这包括 Amazon S3 源、SFTP 源 和 FTP 源。
每个源都可以配置为将远程目录中的文件同步到本地目录。在这种情况下,底层 Supplier
函数生成的消息负载是本地文件路径。Supplier 的输出在此过程中被转换为任务启动请求。我们稍后会解释如何完成。该请求由一个 Task Launcher
sink 接收,然后通过其 REST API 将其发送到 Data Flow Server,以启动一个批处理作业来摄取文件内容。在下面所示的示例中,该作业将 CSV 文件中的每一行插入到数据库表中。
如果我们在云平台(例如 Kubernetes 或 Cloud Foundry)上运行,我们需要配置一个共享卷,例如使用 NFS,以便任务应用程序可以访问源下载的文件。
这是使用 Spring Cloud Data Flow 推荐的文件摄取架构。以下特点使其具有很高的弹性
文件摄取作业是使用 Spring Batch 实现的。它非常适合处理大型文件,其中临时故障可能需要作业从上次中断的地方重新启动——Spring Batch 专门设计用于处理这种情况。
Task Launcher sink 使用 PollableMessageSource,以便在从输入队列中拉取任务启动请求之前,它可以首先确认 Data Flow 可以接受任务请求。Data Flow 配置了允许的最大并发任务执行数量。该 sink 使用 Data Flow API 在接受下一个请求之前检查是否未达到此限制。这种类似于背压的流量控制,防止了当例如 100 个文件被放入远程目录时容易发生的平台资源饱和。
如果需要,共享卷是必要的,以便批处理作业可以从上次提交的事务继续处理。
可以在不使用 Spring Cloud Data Flow 或 Spring Batch 的情况下实现这种类型的工作负载。我们将此留作读者的练习。
远程文件源是什么意思?Amazon S3、SFTP 和 FTP 源应用程序在 Spring Integration 中具有共同的起源,因此行为基本相同。例如,扩展 AbstractInboundFileSynchronizer 的类用于将本地目录与远程目录同步。基类包括配置一个 FileListFilter
来指定要包含的文件。通常,这用于模式匹配文件名。此外,此组件使用一个 MetadataStore 来跟踪本地目录中已有的文件以及上次修改时间,以便只同步新的或更改的文件。默认情况下,元数据存储是内存实现。这意味着当源重新启动时,我们可能会收到已处理过的文件的事件。为了解决这个问题,每个源都可以轻松定制以使用几种可用的 持久化实现 之一。AbstractFileSynchronizer
还支持使用 SpEL 表达式创建本地文件名、自动删除远程文件等。
除了文件同步,每个源还包含 file.consumer.mode
属性,其值可以是以下之一
contents
- 负载是文件内容的字节数组
ref
- 负载是本地文件路径
lines
- 每个负载是文件中的一行
此外,每个源都提供一个 list-only
选项,其中负载包含有关远程文件的元数据,并且不执行同步。
SFTP Source 从 SFTP 服务器消费文件。由于 SFTP 是最常用的远程文件服务,因此该组件具有最先进的功能。事实上,在上一代流应用程序中,SFTP 是我们支持文件摄取架构的唯一源。随着它发展到支持任务启动请求,我们最终专门为文件摄取用例实现了一个特殊变体。sftp-datalow
源被设计为与 tasklauncher-dataflow
sink 一起工作,嵌入了将负载转换为任务启动请求的代码。在当前版本中,我们已弃用此变体,转而支持函数组合。此外,SFTP 源可以配置为轮询多个远程目录,在它们之间轮换。在此配置中,轮换算法可以是 fair
(每个远程目录轮询一次)或非公平(每个远程目录持续轮询直到没有新文件)。它还支持 sftp.supplier.stream=true
,这将直接流式传输内容而无需同步到本地目录。
FTP Source 与 SFTP 源非常相似,不同之处在于它使用 FTP 且在传输过程中不加密数据,因此安全性较低。它提供相同核心功能,但目前不支持多个远程目录、list-only
或 stream
模式。
Amazon S3 Source 仿照其他源构建,支持相同的文件消费模式以及 list-only
模式。在这种情况下,s3.supplier.remote-dir
指的是一个 S3 存储桶。使用 list-only
时,负载包含一个 S3ObjectSummary
,它提供有关 S3 对象的元数据。S3 本身提供了比 FTP/SFTP 更丰富的功能集。
除了 AWS S3,该源现在还可以用于兼容 S3 的实现,例如 Minio。
在之前的版本中,这被称为 tasklauncher-dataflow
sink。最初,我们还有独立的任务启动器,每个支持的平台一个。出于易用性和弹性考虑,如上所述,这些已经弃用,转而使用 Data Flow 支持的实现。因此,我们从名称中去掉了 "Data Flow"。现在它就是简单的 tasklauncher-sink。
该 sink 构建在一个相应的 tasklauncher-function 上,该函数可在任何独立应用程序中使用,向 Data Flow 发送任务启动请求。这实现为 Function<LaunchRequest, Optional<Long>>
。[LaunchRequest](https://github.com/spring-cloud/stream-applications/blob/master/functions/function/tasklauncher-function/src/main/java/org/springframework/cloud/fn/tasklauncher/LaunchRequest.java) 是一个简单的值对象,至少包含要启动的任务名称。此任务必须在 Data Flow 中使用相同的名称定义。可选地,启动请求包括命令行参数和部署属性。如果请求已提交,该函数将返回唯一的任务 ID 作为 long 值。如果 Data Flow server 指示任务平台已达到最大运行任务数、无法访问 Data Flow server 或请求无效,则请求不会提交。
Task Launcher sink 从一个定时任务中调用其基础函数,该任务由一个 DynamicPeriodicTrigger 触发,该触发器允许在运行时更新其周期。在这种情况下,我们使用它来实现指数退避。从最初的一秒周期开始,如果满足以下条件,触发器将退避,最终达到每 30 秒一次的周期:
没有排队的启动请求
平台已运行最大任务数
如果这些条件中的任何一个发生变化,源会将周期重置为其初始值。当然,初始和最大触发周期是可配置的。
被触发的任务检查服务器是否可以接受新的启动请求,如果可以,它使用 PollableMessageSource 轮询输入队列。如果存在请求,它将通过其 REST API 将请求发送到 Data Flow。
新的基于函数的架构为 函数组合 提供了一流的支持。作为此策略的一部分,某些通用函数可以与任何源组合。值得注意的是,这包括 task-launch-request-function。这意味着现在可以将任何远程文件源配置为生成任务启动请求。任务启动请求函数可以评估 SpEL 表达式。例如,每个任务启动请求可以提供不同的文件路径作为命令行参数。
让我们深入研究一个示例,看看它是如何工作的。我们将使用 S3 Source、Task Launcher Sink、Spring Cloud Data Flow、一个兼容 S3 的服务以及一个简单的 Spring Batch 应用程序来处理文件。
为了简单起见,我们将使用 Docker Compose 在本地运行所有内容。
为此示例创建一个项目目录,打开一个终端会话,并切换到项目目录。下载 SCDF 的 docker-compose 文件。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml
设置 Data Flow 和 Skipper 版本,以及导入最新流应用程序的 URI。然后运行 docker-compose
export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up
SCDF 的 docker-compose.yml
挂载当前目录,因此这些文件可以在挂载路径 /root/scdf
下供容器访问。
我们将使用 Minio 进行 S3 存储,并将其运行在 Docker 容器中,绑定到 minio
目录。我们将向 minio/mybucket
添加一个数据文件。这将充当我们的远程目录。
我们还将创建一个 download
目录作为我们的共享本地目录。download
目录位于一个共享卷上,任何需要它的应用程序容器都可以访问。在本例中,包括从 S3 下载文件的 S3 source,以及将数据摄取并写入数据库表的批处理应用程序。在生产环境中,这将是一个外部持久卷,例如专用服务器上挂载的 NFS 目录。
mkdir -p minio/mybucket
mkdir download
在 S3 存储桶位置创建一个数据文件,name-list.csv
。我们恰好有一个您可以下载的
wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv
它包含 firstname
,lastname
的行。批处理作业将为文件中的每一行向 people
表插入一条记录。
我们的项目目录现在应该看起来像这样
.
├── docker-compose.yml
├── download
└── minio
└── mybucket
└── name-list.csv
我们将运行 Minio,创建一个卷挂载将其容器的 /data
路径绑定到 minio
目录。这将创建一个 S3 存储桶 mybucket
,其中包含 name-list.csv
。
docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data
此时,如果愿意,您可以打开浏览器访问 http://localhost:9000,使用上述凭据登录并查看存储桶。
现在我们已经搭建好了本地环境,我们可以编排我们的文件摄取管道了。
我们恰好拥有所需的应用程序,已发布到 repo.spring.io
Maven 仓库。源代码在这里。
要注册此应用程序,请在浏览器中打开 http://localhost:9393/dashboard 并导航到 Apps 页面。点击 Add Application(s)
并使用 URI 注册一个名为 fileingest
的 Task
应用程序。
maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
然后点击 Register the application(s)
应用程序注册完成后,我们需要创建一个任务定义,该定义将在任务启动请求中引用。我们将任务命名为 fileingest
,与应用程序同名。
现在我们将创建一个流,为 S3 存储桶中的每个新文件启动 fileingest
任务。由于 S3 存储桶中已经有一个文件,我们期望它会被下载到我们的共享 download
目录。发生这种情况时,将向 Task Launcher sink 发送任务启动请求,该 sink 将启动 fileingest
任务来处理它。
在左侧菜单栏选择 Streams
并点击 Create stream(s)
。将下面的流定义剪切并粘贴到文本区域。
注意
在 S3 endpoint URL 中替换您主机的局域网 IP 地址。由于 localhost
解析为容器自身的 IP,我们需要使用局域网 IP。获取此值的方法有很多种。
ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'
在 OS/X 上对我有用。
另外
dig +short $(hostname)
以前有效,直到我的雇主将我的机器加入他们的域。
这是流定义
s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393
流定义基本上是 s3|tasklauncher
,但 S3 source 需要一些配置。分解如下:
spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction
- 函数组合的秘诀。在这里,我们使用逗号作为组合分隔符,而不是标准的 |
。如果以这种方式使用 |
,DSL 解析器会感到困惑。我们将 S3 source 的主要函数 s3supplier
与 taskLaunchRequestFunction
(应用程序上下文中的一个 Function Bean,如果愿意,可用于任何标准源)进行组合。
file.consumer.mode=ref
- 负载是下载文件的路径。
s3.common.endpoint-url
- 用于我们的 Minio 实例的 S3 服务 endpoint。如果您使用 AWS S3,则不需要此项。
s3.common.oath-style-access=true
- Minio 需要此项。
s3.supplier.remote-dir=mybucket
- 我们的源将监控的 S3 存储桶。
3.supplier.local-dir=/root/scdf/download
- 从容器角度来看的本地目录路径。
cloud.aws.credentials.accessKey=minio
cloud.aws.credentials.secretKey=minio123
- 凭证直接使用 spring-cloud-aws
属性名称。
cloud.aws.region.static=us-east-1
- AWS s3 SDK 需要一个区域,Minio 忽略此项。
cloud.aws.stack.auto=false
- 不为 AWS 做任何特殊操作。
task.launch.request.taskName=fileingest
- 要启动的任务名称。这是必需的,但可以通过 SpEL 表达式动态设置。
task.launch.request.argExpressions='localFilePath=payload'
- 每次启动任务时,我们都希望将文件位置作为命令行参数传递。在本例中,我们的摄取任务正在寻找名为 localFilePath
的参数,其值是针对每条消息评估的消息负载。此路径位于配置的本地目录 /root/scdf/download/<filename>
中。因此批处理应用程序可以看到它。
在这种情况下,task launcher sink 只需要 Data Flow Server 的 URI。对于在 skipper 容器中运行的 sink,主机名是 dataflow-server
。
创建流并给它命名。
使用 play
按钮部署流。这将打开一个页面,允许您查看配置并进行任何更改。点击页面底部的 Deploy stream
。
流部署完成后,转到 Tasks
页面,最终(30 秒内)您应该看到 fileingest
任务已完成。
您还可以看到,文件已复制到 download
目录
.
├── docker-compose.yml
├── download
│ └── name-list.csv
└── minio
└── mybucket
└── name-list.csv
从 Executions
选项卡可以获取更多任务执行详情。
由于这也是一个 Spring Batch 应用程序,您可以转到 Jobs
页面,找到 ingestJob
,然后点击 info
图标显示 Job 执行详情。
作业详情报告执行了 5494 次写入。Data Flow Server 配置所有任务应用程序的 DataSource 使用其数据库来记录任务和作业执行状态。对于本演示,我们使用相同的 DataSource 写入应用程序数据。我们可以连接到 dataflow-mysql
容器来查询表。
docker exec -it dataflow-mysql mysql -u root -p
使用密码 rootpw
登录并查询表。
如果您读到这里,感谢您的时间和关注。如果您运行了演示,祝贺您!这是您应得的!
即使在最简单的形式下,这也是 Data Flow 的一个相当高级的用例。在这里,我们提供了 Spring Batch 应用程序。通常,您需要自己编写(尽管有传言称 Spring Cloud Task 的下一个版本将包含一个可配置的批处理应用程序)。除此之外,我们无需编写任何代码即可拥有一个功能齐全、云原生、事件驱动的 ETL 管道,将数据从 S3 摄取到关系数据库。事件驱动意味着新数据一旦到达即可被摄取并供用户使用,而不是按夜间计划运行,次日才提供数据。文件可以并发处理,根据需要运行多个作业实例。由于 Data Flow 限制了平台上并发运行的任务数量,因此该架构可以处理非常高的负载而不会耗尽平台资源。
从 S3 摄取文件的一个可行替代方案是无需将文件复制到共享文件系统。在这种情况下,可以将 S3 source 配置为 list only=true
,以便它提供远程 S3 路径。然后批处理作业连接到 S3 并直接处理远程文件。此 Stack Overflow 文章 提供了一些关于如何执行此操作的提示。
然而,在使用 S/FTP 时,这种方法不太理想,因为它们是文件传输协议,直接流式传输受到限制。如果您不使用持久卷,并且作业由于某种原因失败,您必须从头开始,很可能需要从部分完成的状态手动回退。使用 S/FTP 实现此管道与我们在此展示的非常相似。
本文是关于新的基于函数的 Spring Cloud Stream 应用程序相关主题系列文章的一部分。未来几周敬请期待更多内容。