案例研究:关系型数据库源和文件汇

工程 | Soby Chacko | 2020 年 9 月 10 日 | ...

本文是探讨基于 Java 函数的全新设计的 Spring Cloud Stream 应用程序的博客系列的一部分。在本篇中,我们将探索 JDBC supplier 和基于 Spring Cloud Stream 的 source。我们将看到如何从关系型数据库导出数据,并使用 File Consumer 和相应的 Spring Cloud Stream File sink 将其转储到文件中。我们将介绍几种不同的运行 JDBC Source 并将数据发送到文件的方式。

这是本博客系列的所有先前部分。

从 RDBMS 获取数据

查询数据库并处理结果是一个非常基本的企业用例。即使追溯到信息技术革命的主机时代,我们也能看到这种模式被广泛使用。在过去的几十年里,SQL 已经成为与数据库通信的典型语言。Java 从其最初版本开始,就通过一个名为 Java Database Connectivity(俗称 JDBC)的库,为基于数据库的企业级应用程序添加了支持。在 Java 的早期,许多应用程序都是使用原生的 JDBC 库编写的。Spring Framework 从一开始就通过提供基于 JDK 中 JDBC 库的模板模式 - JdbcTemplate - 来支持处理数据库的核心用例。Spring Data 项目在此模板的基础上添加了许多额外功能。当 Spring Integration 出现时,它利用了 Spring 中的这种支持,并提供了许多额外的组件,以便数据可以通过通道适配器、网关等方式获取。在其最新版本中(这也是本文的主题),我们意识到可以将这些 Spring Integration 组件作为简单的 Java supplier 来查询数据库。我们将详细介绍如何访问此 supplier,在自定义应用程序中重用它,以及将其用作 Spring Cloud Stream source。

JDBC Supplier

JDBC Supplier 是一个实现为 java.util.function.supplier bean 的组件。当调用时,它会从数据库表中提供数据。JDBC supplier 具有以下签名。

Supplier<Flux<Message<?>>> jdbcSupplier()

默认情况下,JDBC supplier 根据数据库表的行分割数据,其中数据库的每一行表示为一个 java.util.Map 数据结构。例如,这里有一个填充了一些数据的数据库表。

ID

Name

1

Bob

2

Jane

3

John

当我们对这个表调用 JDBC supplier 时,我们会得到一个 Flux 类型的 Message 对象,其中每个 Message 对象的 payload 是一个 Map。第一个消息将包含一个 Map,其键为 IDNAME,值分别为 1Bob。第二个消息将包含具有相同键但值分别为 2Jane 的 Map,依此类推。我们也可以要求提供数据时不将它们分割成单独的消息。为此,我们可以使用属性 jdbc.supplier.split 并将其设置为 false(默认值为 true)。当我们禁用分割时,与上面的 JDBC supplier 签名有一个重要区别。其签名变为 Supplier<Message<?>>,并且不再是单独的 Message<Map>,而是得到一个单一的 Message<List<Map>。如果将此应用于上面的示例,我们将得到一个包含 3 个元素的列表,每个元素都包含一个表示数据库表每一行的 Map

JDBC Supplier 需要在每次调用时执行一个 SQL 查询。此查询是强制性的,必须通过属性 jdbc.supplier.query 提供。我们还可以使用属性 jdbc.supplier.update 强制 supplier 忽略已经读取的行。本文后面将看到如何实现此操作的示例。

在自定义应用程序中重用 JDBC Supplier

上面提到的两种 Supplier bean——默认启用数据分割的和我们禁用数据分割的——都被命名为 jdbcSupplier。我们可以在自定义应用程序中以该名称限定注入它们。我们需要根据是否分割数据来确保使用正确的类型。例如,如果我们采用默认设置(即分割数据),那么可以按如下方式自动注入 JDBC supplier。

@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;

另一方面,如果使用属性 jdbc.supplier.split 禁用分割,那么需要以 Supplier<Message<?> 类型注入它。

注入后,我们可以调用 Supplierget 方法,然后开始接收数据。

File Consumer

与我们在上一篇博客中看到的 File Supplier 类似,File Consumer 也是一个可重用 bean,我们可以将其注入到自定义应用程序中,并用它在目录中创建新文件。该 bean 实现为 java.util.function.Consumer。对于初学者来说,这可能会引起一些困惑,为什么它被称为 consumer,但实际上并没有从文件中消费任何东西。虽然名称和实现是 consumer,但文件 consumer 并非是轮询或读取文件的 consumer,而是接收数据并写入文件的 consumer。“消费”文件的用例由文件 supplier 处理。

这是文件 consumer 的类型签名。

Consumer<Message<?>> fileConsumer()

由于它是一个 consumer,因此只适合在数据处理管道的末端使用此组件。consumer 接受输入数据并将其写入文件。当我们将文件 consumer 与 Spring Cloud Stream 通过 binder 实现结合时,它就变成了一个 sink 应用程序,从诸如 Kafka topic 或 RabbitMQ exchange 等中间件目标消费数据。

文件 consumer 在几种企业用例中非常有用。对于任何有新数据可用时创建或附加到文件的业务场景,文件 consumer 都很有用。

使用文件 consumer 时,我们可以使用属性 file.consumer.directoryfile.consumer.name 分别提供要写入的文件目录和文件名。如果不设置这些属性,它将使用 consumer 设置的默认值。默认情况下,数据将附加到文件中,这可以通过属性 file.consumer.mode 进行更改。有关更多详细信息,请参阅 FileConsumerProperties 的配置选项。

运行应用程序

当与 Spring Cloud Stream 结合时,上述功能组件变得更加强大。这些函数可以在消息传递应用程序中使用,而 Spring Cloud Stream 使其更容易以与中间件无关的方式使用。JDBC Supplier 用于构建 JDBC source,该 source 可以与许多不同的中间件系统配合工作。同样,File Consumer 被用作 File Sink 应用程序的基础,该应用程序也可以与不同的消息系统配合工作。

在下面的章节中,我们将独立运行这些应用程序并验证它们是否按预期工作。

设置 Apache Kafka 和 MySQL 数据库

首先,创建一个新目录 jdbc-file-demo

mkdir jdbc-file-demo && cd jdbc-file-demo

我们将运行 jdbc-souce 和 file-sink 应用程序的 Kafka 版本。我们将使用 Apache Kafka 作为中间件来运行这些应用程序。对于 JDBC source,我们将使用 MySQL 作为数据库。我们为 Kafka 和 MySQL 提供了一个方便的 docker-compose 脚本。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml

启动 docker 容器

docker-compose up

执行 docker ps 并确保看到所有三个组件都已启动并运行(Kafka、Zookeeper 和 MySQL)。

现在我们已经准备好必要的基础设施,接下来让我们在运行应用程序之前设置好 MySQL 数据库。

docker exec -it jdbc-file-blog-mysql mysql -uroot -p

使用 rootpw 作为密码。在终端中输入以下命令来设置数据库和表。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 tag CHAR(1),
	 PRIMARY KEY (id));

上边的 schema 很容易理解,但 tag 列需要一些解释。它用于避免重复读取已经读取过的表数据。其思想是,我们更新查询返回的每一行的 tag 列,这样它就不会包含在后续查询中。我们将在下面看到详细信息。

将进入 MySQL 的终端会话保持打开状态,因为稍后我们会回到那里。

作为独立应用程序运行 file sink

让我们独立运行现成的 file sink。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar

然后按如下方式运行。

java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo

让我们详细了解一下我们要做的事情。我们要求 file sink 应用程序从 Kafka topic jdbc-file-demo 中消费数据,然后在文件系统上的目录 /tmp/processed-file 中生成一个名为 output.txt 的文件。默认情况下,每个传入的 Kafka topic 记录都会作为新行附加到文件中。如果将 file.consumer.binary 的值设置为 true,则文件将以二进制形式写入。您可以在此处找到所有可用的配置。

运行 JDBC Source

与运行 file sink 的方式类似,现在我们将获取并运行基于 Kafka 的 jdbc source 应用程序。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar

然后运行,

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://localhost:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0

我们为 JDBC Source 提供了以下配置信息。

  • 数据源 URL - 在本例中是用于我们运行的 MySQL 版本的 JDBC URL。

  • 数据源用户凭据

  • source 应用程序要运行的 SQL 查询

  • 发送数据的 Kafka topic(这设置为与 file-sink 消费数据相同的 topic)

  • 用于标记记录的 update SQL 语句

请注意,当我们之前创建表时,我们添加了一个名为 tag 的列,以避免读取我们已经读取过的重复记录。我们的主要 SQL 查询(通过属性 jdbc.supplier.query)将仅读取 tag 值不为空的记录。然后每次 source 读取一条记录时,tag 都会更新为值 1,以便下次执行查询时跳过该记录。如果不通过 jdbc.supplier.update 提供 update 语句,就像我们上面做的那样,每次查询都会提供表中的所有记录。如果我们不提供此语句并且仍然想避免重复,那么我们需要使用一些复杂的策略,例如使用元数据存储来跟踪我们到目前为止已经消费了哪些记录。提供一个支持标志(像我们示例中的 tag)的 schema,然后在每次读取时更新它,是避免重复的一种更容易的策略。

轮询 JDBC Source

JDBC Source 使用 poller 进行调用。这与在自定义的非 Spring Cloud Stream 应用程序中直接使用 JDBC Supplier 不同,在后者的情况下,必须手动调用 supplier。默认情况下,Spring Cloud Stream 为 JDBC Source 提供一个 poller,它每秒轮询一次。此调度可以通过使用属性 spring.cloud.stream.poller.fixedDelay 进行更改。有关轮询的更多控制,可以在此处找到。

验证流程

现在我们正在运行这两个应用程序,让我们向表中插入数据。

转到您的 MySQL 终端会话并输入以下 insert 语句。

mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);

现在转到 file-sink 写入文件的目录 /tmp/processed-file,查找名为 output.txt 的文件。打开文件并验证其内容。它应该包含以下内容。

 {"id":1,"name":"Bob","street":"First Street","city":"First City"}

向表中填充更多数据。

mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);

验证我们是否在 output.txt 文件中看到了新数据。

运行带过滤器的 JDBC Source

停止运行 JDBC Source 应用程序并按如下方式重新运行。

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://localhost:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

正如我们在之前的博客中以及此处所解释的,所有现成的 source 都自动配置了许多有用的函数,因此您可以通过属性激活它们。在使用上述新配置选项运行时,我们将 filterFunctionjdbcSupplier 进行组合,从而为 JDBC Supplier 生成的数据添加了过滤功能。我们使用属性 spring.cloud.stream.function.definition 并将其值设置为 jdbcSupplier|filterFunction 来实现组合。然后通过属性 filter.function.expression 提供一个 JSONPath 表达式来过滤掉所有偶数 ID。

现在,如果您向表中插入更多数据,您将看到只有 ID 为奇数的记录被写入文件。

尝试将这些记录输入到表中。

mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);

我们将看到文件中不包含 ID 为 200、202 和 204 的记录,因为它们被过滤掉了。

运行带其他数据库的 JDBC Source

JDBC supplier 随附了流行的开源 JDBC 驱动程序。目前,它包含用于 MySQL、PostgreSQL 和 Microsoft SQL Server 数据库的驱动程序。这使我们能够快速切换针对特定数据库(例如 MySQL)运行的同一个 JDBC Source 应用程序,使其针对 PostgreSQL 运行,而无需进行任何代码更改,只需在部署时更改配置即可。让我们以我们之前针对 MySQL 运行的 JDBC Source 为例,这次针对 PostgreSQL 运行。

首先,我们将在 docker 容器中运行 PostgreSQL。

docker run --rm   --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo  -d -p 5432:5432  postgres

登录到 psql 会话(或使用 PGAdmin 等 UI 工具)。

docker run -it --rm --network host postgres psql -h localhost -d demo -U test

使用 test 作为密码。

然后创建此表

CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));

停止当前的 JDBC Source 并使用以下配置选项重新运行它

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://localhost:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

这与我们第二次针对 MySQL 运行 JDBC source 时使用的配置选项大致相同,但这次数据源属性已更改为针对 PostgreSQL 数据库运行。

在 psql 提示符下插入与之前使用 MySQL 时相同的数据。您将注意到只有 ID 为奇数的数据被附加到文件中。

向 JDBC Supplier 添加商业数据库驱动程序

如果我们想添加商业数据库的 JDBC 驱动程序,则需要手动进行这些更改。操作步骤很简单,如下所示。

  • 克隆 stream-application 仓库

  • maven 配置中添加我们想要的驱动程序(例如 Oracle JDBC 驱动程序)作为依赖项。将其 scope 设置为 runtime

  • 从仓库根目录执行:./mvnw clean install -pl :jdbc-supplier

  • 生成包含 supplier 更改的应用程序:./mvnw clean install -pl :jdbc-source

  • cd applications/source/jdbc-source/apps - 在此处,我们可以找到基于 Kafka 和 RabbitMQ 的 jdbc-source 应用程序

  • 构建我们想要的应用程序变体。

总结

本文详细介绍了 JDBC Supplier 以及它如何在 Spring Cloud Stream JDBC Source 中使用。我们还了解了 file consumer 及其在 Spring Cloud Stream 中的 sink 对应部分。然后,我们深入探讨了使用几种变体独立运行这些应用程序,并在此过程中探索了各种功能。最后,我们看到了如何在各种数据库之间轻松切换 JDBC Source,以及如何添加新的数据库驱动程序。

敬请关注

本系列文章将继续更新。在接下来的几周,我们将介绍更多的函数和应用程序。

订阅 Spring 新闻稿

通过 Spring 新闻稿保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速发展。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,一个简单订阅即可获得。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部