领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用程序。在本节中,我们将探讨如何使用Spring Cloud Stream 应用程序和Spring Cloud Data Flow来实现一个非常常见的 ETL 用例:从远程服务导入文件。具体来说,我们将了解如何从 S3、SFTP 和 FTP 导入文件。
以下是迄今为止博客系列中包含的内容
为了更好地理解,Spring Cloud Data Flow 几年来一直支持使用 SFTP 进行远程文件导入。自从我撰写这篇博文以来,基本架构没有改变,但是正如我们将看到的,新的流应用程序允许一个更简单、更灵活的解决方案。
文件导入架构始于一个远程文件源,它轮询远程目录并为检测到的每个文件发布一条消息。术语远程文件源是指提供此功能的任何源应用程序。迄今为止,这包括Amazon S3 源、SFTP 源和FTP 源。
这些源中的每一个都可以配置为将远程目录中的文件同步到本地目录。在此情况下,底层Supplier
函数生成的的消息有效负载是本地文件路径。供应商的输出在此过程中转换为任务启动请求。我们稍后将解释如何做到这一点。请求由一个Task Launcher
接收器接收,该接收器通过其 REST API 将其发布到 Data Flow Server,以启动一个批处理作业来导入文件内容。在下面显示的示例中,该作业将 CSV 文件的每一行插入数据库表中。
如果我们在云平台(例如 Kubernetes 或 Cloud Foundry)上运行,我们需要使用 NFS 等配置共享卷,以便任务应用程序可以访问源下载的文件。
这是使用 Spring Cloud Data Flow 的推荐文件导入架构。以下特性使其非常有弹性
文件导入作业使用 Spring Batch 实现。它非常适合处理大型文件,其中中间故障可能需要作业从中断处重新启动——Spring Batch 特别设计用于处理这种情况。
该Task Launcher 接收器使用PollableMessageSource,以便它可以首先确认 Data Flow 可以在从输入队列提取任务启动请求之前接受任务请求。Data Flow 配置了允许的最大并发任务执行数。接收器使用 Data Flow API 检查此限制是否未达到,然后再接受下一个请求。这种类似于反压的流量控制可以防止平台资源饱和,而平台资源饱和很容易发生在例如 100 个文件被放入远程目录时。
需要共享卷才能使批处理作业能够在必要时从上次提交的事务继续处理。
有可能在没有 Spring Cloud Data Flow 或 Spring Batch 的情况下实现这种工作负载。我们将把这留给读者作为练习。
我们所说的远程文件源是什么意思?Amazon S3、SFTP 和 FTP 源应用程序在 Spring Integration 中具有共同的传承,因此行为基本相同。例如,扩展AbstractInboundFileSynchronizer的类用于将本地目录与远程目录同步。基类包括配置FileListFilter
以指定要包含的文件。通常,这用于模式匹配文件名。此外,此组件使用MetadataStore来跟踪本地目录中已存在的文件以及上次修改时间,以便仅同步新的或已更改的文件。默认情况下,元数据存储是内存中的实现。这意味着当源重新启动时,我们可能会收到我们已经处理过的文件的事件。为了解决这个问题,这些源中的每一个都可以很容易地自定义为使用几种持久性实现之一。AbstractFileSynchronizer
还支持使用 SpEL 表达式创建本地文件名、自动删除远程文件等等。
除了文件同步之外,这些源中的每一个都包含file.consumer.mode
属性,它可以是:
contents
- 有效负载是文件内容(字节数组)
ref
- 有效负载是本地文件路径
lines
- 每个有效负载都是文件中的每一行
此外,每个源都提供一个list-only
选项,其中有效负载包含有关远程文件的元数据,并且不执行同步。
该SFTP 源从 SFTP 服务器消费文件。由于 SFTP 是最常用的远程文件服务,因此此组件具有最先进的功能。事实上,在上一代流应用程序中,SFTP 是我们为文件导入架构支持的唯一源。随着它的发展以支持任务启动请求,我们最终实现了一个专门用于文件导入用例的特殊变体。sftp-datalow
源旨在与tasklauncher-dataflow
接收器一起工作,它嵌入代码以将有效负载转换为任务启动请求。在当前版本中,我们已经放弃了这种变体,转而支持函数组合。此外,sftp 源可以设置为轮询多个远程目录,并在每个目录之间轮换。在此配置中,轮换算法可以是fair
——每个远程目录获得一次轮询——或者不是——每个远程目录都会持续轮询,直到没有新文件为止。它还支持sftp.supplier.stream=true
,它将直接流式传输内容而无需同步到本地目录。
该FTP 源与 SFTP 源非常相似,只是它使用 FTP 并且不会加密传输中的数据,因此安全性较低。它提供相同的核心功能,但目前不支持多个远程目录、list-only
或stream
模式。
该Amazon S3 源以其他源为模型,并支持与list-only
模式相同的文件使用者模式。在这种情况下,s3.supplier.remote-dir
指的是 S3 存储桶。使用list-only
时,有效负载包含一个S3ObjectSummary
,该对象提供有关 S3 对象的元数据。S3 本身提供的功能比 FTP/SFTP 更丰富。
除了 AWS S3 之外,此源现在可以与 S3 兼容的实现一起使用,例如Minio。
在之前的版本中,这被称为tasklauncher-dataflow
接收器。最初,我们还为每个支持的平台都提供了独立的任务启动器。由于易用性和弹性(如上所述),这些启动器已被弃用,取而代之的是基于数据流的实现。因此,我们从名称中去掉了“数据流”。现在简称为tasklauncher-sink。
该接收器基于相应的tasklauncher-function构建,该函数可在任何独立应用程序中用于向数据流发送任务启动请求。它实现为Function<LaunchRequest, Optional<Long>>
。LaunchRequest是一个简单的值对象,至少包含要启动的任务名称。此任务必须在数据流中使用相同的名称定义。可选地,启动请求包括命令行参数和部署属性。如果提交了请求,则该函数返回唯一的任务 ID(长整型)。如果数据流服务器指示任务平台已达到其最大运行任务数、无法访问数据流服务器或请求无效,则不会提交请求。
任务启动器接收器在其调度任务内调用其基函数,该任务由DynamicPeriodicTrigger触发,该触发器允许在运行时更新其周期。在这种情况下,我们使用它来实现指数退避。从最初的一秒钟周期开始,触发器将退避,如果:
没有排队的启动请求
平台已经在运行最大数量的任务
如果这两个条件中的任何一个发生变化,源将周期重置为其初始值。当然,初始和最大触发周期是可配置的。
触发的任务检查服务器是否可以接受新的启动请求,如果可以,它将使用PollableMessageSource轮询输入队列。如果有请求,它将通过其 REST API 将请求发布到数据流。
新的基于函数的架构为函数组合提供了第一类支持。作为此策略的一部分,某些常用函数可以与任何源组合。值得注意的是,这包括一个task-launch-request-function。这意味着现在可以将任何远程文件源配置为生成任务启动请求。任务启动请求函数可以评估 SpEL 表达式。例如,每个任务启动请求都可以提供不同的文件路径作为命令行参数。
让我们深入研究一个示例,看看它是如何工作的。我们将使用 S3 源、任务启动器接收器、Spring Cloud Data Flow、兼容 S3 的服务和一个简单的 Spring Batch 应用程序来处理文件。
为简单起见,我们将使用 Docker Compose 在本地运行所有内容。
为本示例创建一个项目目录,打开终端会话,然后 cd 到项目目录。下载 SCDF docker-compose 文件。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml
设置 Data Flow 和 Skipper 版本,以及导入最新流应用程序的 URI。然后运行docker-compose
export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up
SCDF 的docker-compose.yml
挂载当前目录,因此这些文件可在挂载路径/root/scdf
下被容器访问。
我们将使用 Minio 进行 S3 存储,并将其在 Docker 容器中运行以绑定到minio
目录。我们将向minio/mybucket
添加一个数据文件。这将充当我们的远程目录。
让我们还创建一个download
目录作为我们的共享本地目录。download
目录位于共享卷上,任何需要它的应用程序容器都可以访问它。在本例中,S3 源从 S3 下载文件,批处理应用程序将摄取数据并将其写入数据库表。在生产环境中,这将是一个外部持久卷,例如专用服务器上挂载的 NFS 目录。
mkdir -p minio/mybucket
mkdir download
在 S3 bucket 位置创建一个数据文件name-list.csv
。我们恰好有一个你可以下载的文件
wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv
它包含firstname
、lastname
的行。批处理作业将为文件中的每一行向people
表插入一行。
我们的项目目录现在应该如下所示
.
├── docker-compose.yml
├── download
└── minio
└── mybucket
└── name-list.csv
我们将运行 Minio,创建一个卷挂载以将其容器的/data
路径绑定到minio
目录。这将创建一个 S3 bucket,mybucket
包含name-list.csv
。
docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data
此时,如果你愿意,可以打开浏览器访问https://127.0.0.1:9000,使用上述凭据登录并查看 bucket。
现在我们已经设置了本地环境,我们可以协调我们的文件摄取管道。
我们恰好有我们需要的应用程序,发布到repo.spring.io
Maven 仓库。源代码在这里。
要注册此应用程序,请打开浏览器访问https://127.0.0.1:9393/dashboard并导航到“应用程序”页面。单击添加应用程序
并使用 URI 注册名为fileingest
的任务
应用程序。
maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
然后单击注册应用程序
注册应用程序后,我们需要创建一个任务定义,该定义将在任务启动请求中引用。我们将任务命名为fileingest
,与应用程序名称相同。
现在,我们将创建一个流,为 S3 bucket 中的每个新文件启动fileingest
任务。由于 S3 bucket 中已经有一个文件,我们预计它将被下载到我们的共享download
目录。发生这种情况时,将向任务启动器接收器发送任务启动请求,该接收器将启动fileingest
任务来处理它。
在左侧菜单栏中选择流
,然后单击创建流
。将下面的流定义复制粘贴到文本区域。
注意
在 S3 端点 URL 中替换主机的 LAN IP 地址。由于localhost
解析为容器自身的 IP,我们需要使用 LAN IP。获取此值的方法有很多种。
ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'
在我的 OS/X 系统上有效。
此外
dig +short $(hostname)
在我雇主将我的机器加入他们的域之后,它就不再工作了。
这是流定义
s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393
流定义基本上是 s3|tasklauncher
,但是S3源需要一些配置。让我们分解一下
spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction
- 函数组合的秘诀。在这里,我们使用逗号作为组合分隔符,而不是标准的 |
。如果以这种方式使用 |
,DSL 解析器会感到困惑。我们将 s3supplier
(S3 源的主要函数)与 taskLaunchRequestFunction
(应用程序上下文中的函数 Bean,如果需要,任何标准源都可以使用它)组合在一起。
file.consumer.mode=ref
- 有效负载是已下载文件的路径。
s3.common.endpoint-url
- 我们 Minio 实例的 S3 服务端点。如果您使用的是 AWS S3,则不需要此项。
s3.common.oath-style-access=true
- Minio 需要此项
s3.supplier.remote-dir=mybucket
- 我们源将监控的 S3 桶
3.supplier.local-dir=/root/scdf/download
- 从容器角度来看的本地目录路径
cloud.aws.credentials.accessKey=minio
cloud.aws.credentials.secretKey=minio123
- 凭据直接使用 spring-cloud-aws
属性名称
cloud.aws.region.static=us-east-1
- AWS s3 SDK 需要一个区域,Minio 会忽略此项
cloud.aws.stack.auto=false
- 不要对 AWS 做任何特殊操作。
task.launch.request.taskName=fileingest
- 要启动的任务的名称。这是必需的,但可以通过 SpEL 表达式动态设置。
task.launch.request.argExpressions='localFilePath=payload'
- 每次启动任务时,我们都希望将文件位置作为命令行参数传递,在本例中,我们的摄取任务正在查找名为 localFilePath
的参数,其值是为每条消息计算的消息有效负载。此路径位于配置的本地目录中,/root/scdf/download/<filename>
。这样批处理应用程序就可以看到它。
在这种情况下,任务启动器接收器只需要 Data Flow Server URI。对于在 skipper 容器中运行的接收器,主机名为 dataflow-server
。
创建流并为其命名。
使用“播放”按钮部署流。这将打开一个页面,让您查看配置并进行任何更改。点击页面底部的“部署流”。
流部署后,转到“任务”页面,最终(30 秒内)您应该会看到 fileingest
任务已完成。
您还可以看到,文件已复制到 download
目录
.
├── docker-compose.yml
├── download
│ └── name-list.csv
└── minio
└── mybucket
└── name-list.csv
您可以在“执行”选项卡中获取有关任务执行的更多详细信息。
由于这也是一个 Spring Batch 应用程序,您可以转到“作业”页面,转到 ingestJob
,然后单击“信息”图标以显示作业执行详细信息
作业详细信息报告它执行了 5494 次写入。Data Flow Server 将所有任务应用程序的 DataSource 配置为使用其数据库来记录任务和作业执行状态。在此演示中,我们使用相同的 DataSource 来写入应用程序数据。我们可以连接到 dataflow-mysql
容器来查询表
docker exec -it dataflow-mysql mysql -u root -p
使用密码 rootpw
登录并查询表
如果您读到这里,感谢您的时间和关注。如果您运行了演示,那么您值得祝贺!
即使是最简单的形式,这也是 Data Flow 的一个相当高级的用例。在这里,我们提供了 Spring Batch 应用程序。通常,您需要自己编写(尽管据传 Spring Cloud Task 的下一个版本将包含一个可配置的批处理应用程序)。除此之外,我们不需要编写任何代码就可以拥有一个完全功能的、云原生的、事件驱动的 ETL 管道来将数据从 S3 摄取到关系数据库。事件驱动意味着新数据一到达就会被摄取并提供给用户,而不是按每日计划运行,并在第二天提供数据。可以并发处理文件,根据需要运行多个作业实例。由于 Data Flow 限制了平台上运行的并发任务数量,因此此架构可以在不耗尽平台资源的情况下处理非常高的负载。
从 S3 摄取文件的另一种可行的方案消除了将文件复制到共享文件系统的需要。在这种情况下,可以将 S3 源配置为 list only=true
,以便它将提供远程 S3 路径。然后,批处理作业连接到 S3 并直接处理远程文件。这 Stack Overflow 帖子 提供了一些关于如何执行此操作的提示。
但是,当使用 S/FTP 时,这种方法不太理想,因为这些是文件传输协议,直接流式传输受到限制。如果您不使用持久卷,并且作业由于某种原因失败,则必须从头开始,可能必须手动从部分完成的状态回退。使用 S/FTP 实现此管道的过程与我们在此处显示的过程非常相似。
这篇文章是关于与基于新函数的 Spring Cloud Stream 应用程序相关的主题的一系列文章的一部分。敬请关注未来几周的更多内容。