案例研究:使用 Spring Cloud Data Flow 远程文件摄取

工程 | David Turanski | 2020 年 9 月 29 日 | ...

本文是探讨基于 Java 函数重新设计的 Spring Cloud Stream 应用程序的系列博客文章的一部分。在本章中,我们将探讨如何使用 Spring Cloud Stream 应用程序Spring Cloud Data Flow 来实现一个非常常见的 ETL 用例:从远程服务摄取文件。具体来说,我们将研究如何从 S3、SFTP 和 FTP 摄取文件。

到目前为止,博客系列中包含的内容如下:

远程文件摄取架构

要全面了解情况,Spring Cloud Data Flow 已经支持使用 SFTP 进行远程文件摄取几年了。自撰写这篇文章以来,基本架构没有改变,但正如我们将看到的,新的流应用程序允许更简单、更灵活的解决方案。

fileingest

文件摄取架构始于一个远程文件源,它轮询远程目录并为检测到的每个文件发布一条消息。术语远程文件源指的是提供此功能的任何源应用程序。迄今为止,这包括Amazon S3 源SFTP 源FTP 源

每个源都可以配置为将远程目录中的文件同步到本地目录。在这种情况下,底层Supplier函数产生的消息负载是本地文件路径。供应商的输出会在此过程中转换为任务启动请求。我们稍后会解释这是如何完成的。请求由Task Launcher sink 接收,它通过其 REST API 将请求发送到 Data Flow Server,以启动批处理作业来摄取文件内容。在下面显示的示例中,该作业将 CSV 文件的每一行插入到数据库表中。

如果我们在 Kubernetes 或 Cloud Foundry 等云平台上运行,我们需要使用 NFS 等方式配置共享卷,以便任务应用程序可以访问源下载的文件。

这是使用 Spring Cloud Data Flow 推荐的文件摄取架构。以下特点使其具有很强的弹性:

  • 文件摄取作业使用 Spring Batch 实现。它非常适合大型文件处理,其中临时故障可能需要作业从中断处重新启动——Spring Batch 专门设计用于处理这种情况。

  • Task Launcher sink 使用 PollableMessageSource,以便它可以在从输入队列中拉取任务启动请求之前,首先确认 Data Flow 可以接受任务请求。Data Flow 配置了最大允许并发任务执行数量。该 sink 使用 Data Flow API 检查此限制是否已达到,然后才接受下一个请求。这种流控制(类似于背压)可以防止当(例如)100 个文件被放入远程目录时,轻易发生的平台资源饱和。

  • 共享卷是必需的,以便批处理作业在必要时可以从上次提交的事务继续处理。

如果没有 Spring Cloud Data Flow 或 Spring Batch,也可以实现这种类型的工作负载。我们将此留给读者自行练习。

远程文件源

远程文件源是什么意思?Amazon S3、SFTP 和 FTP 源应用程序在 Spring Integration 中共享一个共同的血统,因此行为基本相同。例如,扩展AbstractInboundFileSynchronizer的类用于将本地目录与远程目录同步。基类包括配置FileListFilter以指定要包含哪些文件。通常,这用于模式匹配文件名。此外,此组件使用MetadataStore来跟踪本地目录中已有的文件以及上次修改时间,以便只同步新文件或已更改的文件。默认情况下,元数据存储是内存实现。这意味着当源重新启动时,我们可能会收到已处理文件的事件。为了解决这个问题,这些源中的每一个都可以轻松自定义以使用可用的几种持久化实现之一。AbstractFileSynchronizer还支持使用 SpEL 表达式创建本地文件名、自动远程文件删除等。

除了文件同步,这些源中的每一个都包含file.consumer.mode属性,其值可以是:

  • contents - 负载是文件内容(字节数组)

  • ref - 负载是本地文件路径

  • lines - 每个负载是文件中的一行

此外,每个源都提供了一个list-only选项,在这种情况下,负载包含远程文件的元数据,并且不执行同步。

SFTP 源

SFTP Source从 SFTP 服务器消费文件。由于 SFTP 是最常用的远程文件服务,此组件具有最先进的功能。事实上,在上一代流应用程序中,SFTP 是我们为文件摄取架构支持的唯一源。随着它发展以支持任务启动请求,我们最终专门为文件摄取用例实现了一个特殊变体。旨在与tasklauncher-dataflow sink 配合使用的sftp-datalow源,嵌入了将有效负载转换为任务启动请求的代码。在当前版本中,我们已弃用此变体,转而使用函数组合。此外,sftp 源可以设置为轮询多个远程目录,在每个目录之间轮流。在此配置中,轮询算法可以是fair(每个远程目录轮询一次),也可以不是(每个远程目录持续轮询直到没有新文件)。它还支持sftp.supplier.stream=true,这将直接流式传输内容而无需同步到本地目录。

FTP 源

FTP 源与 SFTP 源非常相似,只是它使用 FTP 且在传输过程中不加密数据,因此安全性较低。它提供相同的核心功能,但目前不支持多个远程目录、list-onlystream模式。

Amazon S3 源

Amazon S3 Source是根据其他源建模的,并支持相同的文件消费者模式以及list-only模式。在这种情况下,s3.supplier.remote-dir指的是一个 S3 存储桶。当使用list-only时,有效负载包含一个S3ObjectSummary,它提供有关 S3 对象的元数据。S3 本身提供了比 FTP/SFTP 更丰富的功能集。

除了 AWS S3,此源现在还可以与兼容 S3 的实现(例如Minio)一起使用。

任务启动器接收器

在之前的版本中,这被称为tasklauncher-dataflow sink。最初,我们还有独立的任务启动器,每个受支持的平台一个。为了易用性和弹性(如上所述),这些任务启动器已被弃用,转而使用 Data Flow 支持的实现。因此,我们从名称中删除了“Data Flow”。它现在简单地是tasklauncher-sink

该接收器是基于相应的tasklauncher-function构建的,该函数可在任何独立应用程序中使用,以向 Data Flow 发送任务启动请求。这实现为Function<LaunchRequest, Optional<Long>>LaunchRequest是一个简单的值对象,至少包含要启动的任务名称。此任务必须在 Data Flow 中使用相同的名称定义。可选地,启动请求包括命令行参数和部署属性。如果请求已提交,该函数返回唯一的任务 ID 作为长整型。如果 Data Flow 服务器指示任务平台已达到其最大运行任务数、无法联系到 Data Flow 服务器或请求无效,则不会提交请求。

任务启动器接收器从计划任务中调用其基本函数,该任务由DynamicPeriodicTrigger触发,该触发器允许在运行时更新其周期。在这种情况下,我们使用它来实现指数退避。从初始的一秒周期开始,如果满足以下条件,触发器将退避,最终达到每 30 秒一次:

  • 没有排队的启动请求

  • 平台已运行最大任务数

如果这些条件中的任何一个发生变化,源会将周期重置为其初始值。当然,初始和最大触发周期都是可配置的。

触发的任务会检查服务器是否可以接受新的启动请求,如果可以,它会使用PollableMessageSource轮询输入队列。如果有请求,它会通过其 REST API 将请求发布到 Data Flow。

创建任务启动请求

新的基于函数架构为函数组合提供了第一类支持。作为此策略的一部分,某些常用函数可以与任何源组合。值得注意的是,这包括一个任务启动请求函数。这意味着现在可以配置任何远程文件源来生成任务启动请求。任务启动请求函数可以评估 SpEL 表达式。例如,每个任务启动请求都可以将不同的文件路径作为命令行参数提供。

整合所有功能

让我们深入研究一个示例,看看它是如何工作的。我们将使用 S3 源、任务启动器接收器、Spring Cloud Data Flow、S3 兼容服务和一个简单的 Spring Batch 应用程序来处理文件。

为了简单起见,我们将使用 Docker Compose 在本地运行所有内容。

安装 Spring Cloud Data Flow

为本示例创建一个项目目录,打开终端会话,并切换到项目目录。下载 SCDF docker-compose 文件。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml

启动 Spring Cloud Data Flow

设置 Data Flow 和 Skipper 版本,以及导入最新流应用程序的 URI。然后运行docker-compose

export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up

创建一些数据

SCDF docker-compose.yml 挂载当前目录,因此这些文件可在挂载路径 /root/scdf 下供容器访问。

我们将使用 Minio 作为 S3 存储,并将其运行在 Docker 容器中,以将其容器的/data路径绑定到minio目录。我们将在minio/mybucket中添加一个数据文件。这将作为我们的远程目录。

我们再创建一个download目录作为共享本地目录。download目录位于一个共享卷上,任何需要它的应用程序容器都可以访问。在这种情况下,S3 源会从 S3 下载文件,批处理应用程序将摄取数据并将其写入数据库表。在生产环境中,这将是一个外部持久卷,例如专用服务器上挂载的 NFS 目录。

mkdir -p minio/mybucket
mkdir download

在 S3 存储桶位置创建一个数据文件 name-list.csv。我们正好有一个您可以下载的。

wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv

这包含firstname,lastname行。批处理作业将为文件中的每一行在people表中插入一行。

我们的项目目录现在应该如下所示:

.
├── docker-compose.yml
├── download
└── minio
    └── mybucket
        └── name-list.csv

启动 Minio

我们将运行 Minio,创建一个卷挂载以将其容器的/data路径绑定到minio目录。这将创建一个 S3 存储桶mybucket,其中包含name-list.csv

docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data

此时,如果您愿意,可以打开浏览器访问https://:9000,使用上述凭据登录并查看存储桶。

使用 Data Flow 创建任务和流

现在我们已经设置好本地环境,我们可以编排文件摄取管道。

注册批处理应用程序

我们恰好有需要的应用程序,发布到repo.spring.ioMaven 仓库。源代码位于此处

要注册此应用程序,请打开浏览器到https://:9393/dashboard并导航到“应用程序”页面。单击“添加应用程序”,然后使用 URI 注册名为fileingestTask应用程序。

maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

然后点击“注册应用程序”。

RegisterTaskApplication

创建任务定义

应用程序注册后,我们需要创建一个任务定义,该定义将在任务启动请求中引用。我们将任务命名为fileingest,与应用程序名称相同。

CreateFileIngestTask

创建流

现在我们将创建一个流,以便在 S3 存储桶中出现每个新文件时启动fileingest任务。由于 S3 存储桶中已经有一个文件,我们预计它会被下载到我们的共享download目录。当这种情况发生时,任务启动请求将被发送到任务启动器接收器,该接收器将启动fileingest任务来处理它。

选择左侧菜单栏上的Streams,然后单击Create stream(s)。将下面的流定义剪切并粘贴到文本区域中。

注意

用您主机的局域网 IP 地址替换 S3 端点 URL。由于localhost解析为容器自身的 IP,我们需要使用局域网 IP。有多种方法可以获取此值。

ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'

在 OS/X 上对我有效。

另外

dig +short $(hostname)

在我公司将我的机器加入其域之前,它曾经有效。

这是流定义

s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393

流定义基本上是s3|tasklauncher,但 S3 源需要一些配置。分解如下:

  • spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction - 函数组合的关键。这里,我们使用逗号作为组合分隔符,而不是标准的|。如果以这种方式使用|,DSL 解析器会混淆。我们将s3supplier(S3 源的主要函数)与taskLaunchRequestFunction(应用程序上下文中的函数 Bean,可供任何标准源使用,如果我们愿意)进行组合。

  • file.consumer.mode=ref - 负载是下载文件的路径。

  • s3.common.endpoint-url - 我们的 Minio 实例的 S3 服务端点。如果您使用 AWS S3,则不需要此项。

  • s3.common.oath-style-access=true - Minio 需要此设置

  • s3.supplier.remote-dir=mybucket - 我们的源将监控的 S3 存储桶

  • 3.supplier.local-dir=/root/scdf/download - 从容器角度来看的本地目录路径

  • cloud.aws.credentials.accessKey=minio

  • cloud.aws.credentials.secretKey=minio123 - 凭据直接使用spring-cloud-aws属性名称

  • cloud.aws.region.static=us-east-1 - AWS s3 SDK 需要一个区域,Minio 忽略此项

  • cloud.aws.stack.auto=false - 对 AWS 不执行任何特殊操作。

  • task.launch.request.taskName=fileingest - 要启动的任务名称。此项为必填项,但可以通过 SpEL 表达式动态设置。

  • task.launch.request.argExpressions='localFilePath=payload' - 每次我们启动任务时,我们都希望将文件位置作为命令行参数传递。在这种情况下,我们的摄取任务正在寻找名为localFilePath的参数,其值是针对每条消息评估的消息有效负载。此路径位于配置的本地目录/root/scdf/download/<filename>中,因此批处理应用程序可以看到它。

在这种情况下,任务启动器接收器只需要 Data Flow Server URI。对于在 skipper 容器中运行的接收器,主机名是dataflow-server

CreateFileIngestStream

创建流并为其命名。

部署流

使用play按钮部署流。这将打开一个页面,让您可以查看配置并进行任何更改。点击页面底部的Deploy stream

验证任务是否已启动

流部署后,转到Tasks页面,最终(30 秒内)您应该会看到fileingest任务已完成。

TaskView

您还可以看到,文件已复制到download目录

.
├── docker-compose.yml
├── download
│   └── name-list.csv
└── minio
    └── mybucket
        └── name-list.csv

Executions选项卡中,您可以获取任务执行的更多详细信息。

由于这也是一个 Spring Batch 应用程序,您可以转到“作业”页面,转到ingestJob,然后单击“信息”图标以显示作业执行详细信息。

JobDetails

验证数据库中的数据

作业详细报告显示它执行了 5494 次写入。Data Flow Server 配置所有任务应用程序的 DataSource 使用其数据库来记录任务和作业执行状态。对于此演示,我们使用相同的 DataSource 来写入应用程序数据。我们可以连接到dataflow-mysql容器来查询表

docker exec -it dataflow-mysql mysql -u root -p

使用密码rootpw登录并查询表格

VerifyData

结论

如果您读到这里,感谢您的时间和关注。如果您运行了演示,恭喜您!

即使以最简单的形式,这也是 Data Flow 的一个相当高级的用例。在这里,我们提供了 Spring Batch 应用程序。通常,您会编写自己的(尽管 Spring Cloud Task 的下一个版本据说将包含一个可配置的批处理应用程序)。除此之外,我们不需要编写任何代码就可以拥有一个功能齐全、云原生、事件驱动的 ETL 管道,用于将数据从 S3 摄取到关系数据库。事件驱动意味着新数据一旦到达就会被摄取并供用户使用,而不是在夜间计划运行,数据在第二天才能使用。文件可以并发处理,根据需要运行多个作业实例。由于 Data Flow 限制了平台上运行的并发任务数量,因此此架构可以处理非常高的负载而不会耗尽平台资源。

从 S3 摄取文件的一个可行替代方案消除了将文件复制到共享文件系统的需要。在这种情况下,S3 源可以配置为list only=true,以便它将提供远程 S3 路径。然后批处理作业连接到 S3 并直接处理远程文件。这篇Stack Overflow 帖子提供了一些关于如何实现此目的的提示。

然而,当使用 S/FTP 时,这种方法不太理想,因为这些是文件传输协议,直接流式传输受到限制。如果您不使用持久卷,并且作业因某种原因失败,您必须从头开始,可能需要手动从部分完成的状态倒回。使用 S/FTP 实现此管道与我们在此处所示的非常相似。

敬请关注

这篇文章是关于新的基于函数 Spring Cloud Stream 应用程序系列文章的一部分。未来几周将有更多内容发布。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有