案例研究:从文件读取并写入 MongoDB

工程 | Soby Chacko | 2020年8月25日 | ...

本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用程序。在本期中,我们将深入了解文件提供程序及其 Spring Cloud Stream 文件源对应项。我们还将看到 MongoDB 消费者及其对应的 Spring Cloud Stream 接收器。最后,我们将演示如何将文件源和 MongoDB 接收器一起编排在 Spring Cloud Data Flow 上作为管道。

以下是本博客系列的所有先前部分。

文件读取和处理用例

文件摄取和从这些文件中读取数据是经典的企业用例。几十年来,许多企业一直在依靠不同级别的文件设施来执行关键任务系统。数 TB 的数据以文件的形式通过互联网和企业内联网传输。例如,想象一个银行数据中心,它每秒都会收到来自其所有分支机构、ATM 和 POS 交易的数据文件,然后需要对其进行处理并放置到其他系统中。这只是一个领域,但还有数十万个例子表明文件处理是许多企业的关键路径。许多遗留系统中编写了许多自定义应用程序,每个应用程序都采用了处理这些用例的自身方式。Spring Integration 多年来一直提供 文件支持 作为通道适配器。这些组件可以实现为函数,在从文件读取的情况下,我们可以提供一个可重用的通用 Supplier 函数并在最终用户应用程序中注入它。在以下部分中,我们将详细了解这种功能抽象及其各种使用场景。

文件提供程序

文件提供程序 是一个作为 java.util.function.Supplier bean 实现的组件,当调用时,它将提供给定目录中文件的內容。文件提供程序具有以下签名。

Supplier<Flux<Message<?>>>

提供程序的用户可以订阅返回的 Flux<Message<?>,它是一个消息流或目录本身中的文件对象。

为了调用文件提供程序,我们需要指定一个目录来轮询文件。目录信息是必需的,必须通过配置属性 file.supplier.directory 提供。默认情况下,提供程序将数据作为 byte[] 生成,但它还通过配置属性 file.consumer.mode 支持两种其他文件使用模式。支持的其他值为 linesref。文件使用模式 lines 将一次读取文件內容一行。这对于读取文本文件(如 CSV 文件和其他结构化文本数据)很有用。ref 模式将提供实际的 File 对象。默认情况下,文件提供程序还会阻止读取它之前已读取过的相同文件。这通过属性 file.supplier.preventDuplicates 控制。

在自定义应用程序中重用文件提供程序

文件提供程序是一个可重用的 Spring bean,我们可以在最终用户自定义应用程序中注入它。注入后,可以直接调用它并结合数据的自定义处理。这是一个示例。

@Autowired
Supplier<Flux<Message<?>>> fileSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> data = fileSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的伪代码中,我们正在注入文件提供程序 bean,然后使用它来调用其 get 方法以获取 Flux。然后我们订阅该 Flux,并且每次通过 Flux 接收任何数据时,应用一些过滤,并根据该数据采取措施。这只是一个简单的说明,说明了如何重用文件提供程序。当您在实际应用程序中尝试此操作时,可能需要在您的实现中进行更多调整,例如在进行条件检查之前将接收到的数据的默认数据类型从 byte[] 转换为其他内容,或将默认文件读取模式从 content 更改为 lines 等。

运行使用文件提供程序的独立文件源

当文件提供程序与 Spring Cloud Stream 结合使用以使其成为 文件源 时,它变得更加强大。正如我们在之前的博客中看到的,此提供程序已预先打包了 KafkaRabbitMQ 绑定程序在 Spring Cloud Stream 中,使它们成为可作为 Spring Boot 应用程序运行的 uber jar。让我们看看如何获取此 uber jar 并将其作为独立程序运行。

第一步,继续获取此具有 Apache Kafa 变体的文件源。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-source-kafka/3.0.0-SNAPSHOT/file-source-kafka-3.0.0-SNAPSHOT.jar

确保您在默认端口上运行 Kafka。

现在是时候独立运行文件源了。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data

默认情况下,Spring Cloud Stream 期望输出绑定为 fileSupplier-out-0(因为 fileSupplier 是提供程序 bean 名称)。但是,当生成这些应用程序时,此输出绑定将被覆盖为 output。这样做是为了适应在 Spring Cloud Data Flow 上运行此源应用程序时的一些要求。

我们还要求应用程序读取位于 /tmp/data-files 目录中的文件,并一次读取一行(使用模式 lines)。

观察 kafka 主题 file-data。使用 kafkacat 工具,您可以执行此操作

kafkacat -b localhost:9092 -t file-data

现在,在 /tmp/data-files 目录中放置一些文件。您将看到数据到达 file-data Kafka 主题,其中每个文件中的每一行都表示一个 Kafka 记录。

如果要将文件限制为某些模式,可以使用属性 file.supplier.filenamePattern 使用简单的命名模式,或使用属性 file.supplier.filenameRegex 使用更复杂的基于正则表达式的模式。

使用文件源自动配置的处理器

正如我们在本博客系列的第二部分中看到的,所有开箱即用的 Spring Cloud Stream 源应用程序都已经自动配置了几个开箱即用的通用处理器。您可以将这些处理器作为文件源的一部分激活。以下是一个示例,其中我们运行文件源并接收数据,然后在将其发送到中间件上的目标之前转换已使用的数据。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data --spring.cloud.function.definition=fileSupplier|spelFunction --spel.function.expression=payload.toUpperCase()

通过为spring.cloud.function.definition属性提供值fileSupplier|spelFunction,我们正在激活与文件提供程序组合的 spel 函数。然后,我们提供一个 SpEL 表达式,我们希望使用它来使用spel.function.expression转换数据。

还有其他几个函数可以以这种方式组合。查看此处以获取更多详细信息。

MongoDB 消费者

MongoDB 消费者提供了一个函数,允许用户从外部系统接收数据,然后将这些数据写入 MongoDB。我们可以在自定义应用程序中直接使用消费者 bean 将数据插入到 MongoDB 集合中。以下是 MongoDB 消费者 bean 的类型签名。

Consumer<Message<?>> mongodbConsumer

注入到自定义应用程序后,用户可以直接调用消费者的accept方法,并提供一个Message<?>对象以将其有效负载发送到 MongoDB 集合。

使用 MongoDB 消费者时,集合是一个必需的属性,必须通过mongodb.consumer.collection进行配置。

使用消费者的独立 Spring Cloud Stream MongoDB 接收器

与文件源一样,Spring Cloud Stream 开箱即用的应用程序已经使用 MongoDB 消费者提供了 MongoDB 接收器。接收器适用于KafkaRabbitMQ绑定器变体。当用作 Spring Cloud Stream 接收器时,MongoDB 消费者会自动配置为接受来自相应中间件系统的数据,例如,来自 Kafka 主题或 RabbitMQ 交换机的数据。

让我们花几分钟时间验证一下我们是否可以独立运行 MongoDB 接收器。

使用 Docker 设置 MongoDB 以测试接收器

在终端窗口上执行以下命令。

docker run -d --name my-mongo \
    -e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
    -e MONGO_INITDB_ROOT_PASSWORD=secret \
    -p 27017:27017 \
    mongo

docker exec -it my-mongo /bin/sh

这将使我们进入正在运行的 Docker 容器的 shell 会话。在 shell 中调用以下命令。

# mongo

> use admin

> db.auth('mongoadmin','secret')
1
> db.createCollection('test_collection’')
{ "ok" : 1 }>

现在我们已经设置了 MongoDB,让我们独立运行接收器。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/mongodb-sink-kafka/3.0.0-SNAPSHOT/mongodb-sink-kafka-3.0.0-SNAPSHOT.jar

java -jar mongodb-sink-kafka-3.0.0-SNAPSHOT.jar --mongodb.consumer.collection=test_collection --spring.data.mongodb.username=mongoadmin --spring.data.mongodb.password=secret --spring.data.mongodb.database=admin --spring.cloud.stream.bindings.input.destination=test-data-mongo

将一些 JSON 数据插入 Kafka 主题test-data-mongo中。例如,您可以使用 Kafka 附带的控制台生产者脚本,如下所示。

/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-data-mongo

然后生成如下数据

{"hello":"mongo"}

转到我们在上面 Docker shell 上启动的终端上的 MongoDB CLI。

db.test_collection.find()

我们通过 Kafka 主题输入的数据应显示为输出。

在 Spring Cloud Data Flow 上运行

独立运行文件源和 MongoDB 都很好,但是 Spring Cloud Data Flow 使它们作为管道运行变得非常容易。基本上,我们希望编排一个等效于文件源 | 过滤器 | MongoDB的流。

本系列中的一个博客专门介绍了如何运行 Spring Cloud Data Flow 并将应用程序部署为流的所有详细信息。如果您不熟悉运行 Spring Cloud Data Flow,请查看该博客。下面,我们简要介绍了设置 Spring Cloud Data Flow 所涉及的步骤。

首先,我们需要获取用于运行 Spring Cloud Data Flow 的 docker-compose 文件。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

此外,获取此用于运行 MongoDB 的附加 docker-compose 文件。

wget -O mongodb.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/mongodb.yml

我们需要设置一些环境变量才能正确运行 Spring Cloud Data Flow。

export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

现在我们已经准备就绪,是时候开始运行 Spring Cloud Data Flow 和所有其他辅助组件了。

docker-compose -f docker-compose.yml -f mongo.yml up

SCDF 运行后,转到https://127.0.0.1:9393/dashboard。然后转到左侧的“流”,然后选择“创建流”。从源应用程序中选择文件,从处理器中选择过滤器,从接收器应用程序中选择 MongoDB。单击选项并选择以下属性。以下是如何在选择所有属性后显示的屏幕截图。

File to MongoDB SCDF Blog Pipeline

将流命名为 file-source-filter-mongo,然后单击创建流。创建后,单击“部署流”。接受所有默认选项,然后单击屏幕底部的“部署流”。

流部署后,继续在调用 Spring Cloud Data Flow docker-compose 脚本的同一目录中创建一个名为source-files的目录。此目录已由运行 Spring Cloud Data Flow 组件之一(Skipper)的 Docker 容器挂载,并由容器看到。确保此source-files目录具有正确的访问级别,尤其是在 Docker 容器将以 root 身份运行应用程序,而您很可能在本地机器上以非 root 用户身份运行的情况下。在 UI 上监视文件源应用程序的日志以查找任何权限错误。如果您看到任何错误,请解决这些问题。

使用mongoCLI 工具准备一个新的终端会话。

docker exec -it dataflow-mongo /bin/sh

# mongo

> use admin

> db.auth(‘mongoadmin’,'secret')
1
> db.createCollection(‘mongo_data’')
{ "ok" : 1 }>

source-files目录中放置一些具有以下内容的文件。

{"non-sql":"mongo"}
{"sql":"mysql"}
{"document":"mongo"}
{"log":"kafka"}
{"sink":"mongo"}

转到mongocli 终端会话。

db.mongo_data.find()

您将看到我们在管道中添加的过滤组件过滤掉了文件中所有不包含单词mongo的条目。您应该看到类似于以下内容的输出。

{ "_id" : ObjectId("5f4551c470e0373080fcd0b8"), "non-sql" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b2"), "sink" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b5"), "document" : "mongo" }

总结

在本博客中,我们对文件提供程序及其 Spring Cloud Stream 源对应项进行了快速浏览。我们还看到了 MongoDB 消费者和相应的 Spring Cloud Stream 接收器应用程序。我们研究了如何将函数组件注入到自定义应用程序中。之后,我们了解了如何独立运行文件源和 MongoDB 接收器的 Spring Cloud Stream 应用程序。最后,我们深入研究了 Spring Cloud Data Flow 并编排了一个从文件源到 MongoDB 接收器的管道,在此过程中过滤掉了一些数据。

敬请期待

本博客系列将继续。在未来几周内,我们将研究更多类似于在本博客中描述的场景,但使用不同的函数和应用程序。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部