案例研究:关系数据库源和文件接收器

工程 | Soby Chacko | 2020年9月10日 | ...

本文是探讨基于Java函数的全新设计的Spring Cloud Stream应用程序的系列博客文章之一。在本期中,我们将探讨JDBC Supplier以及基于Spring Cloud Stream的源。我们将了解如何从关系数据库中导出数据,并使用文件消费者和相应的Spring Cloud Stream文件接收器将其转储到文件中。我们将介绍几种不同的运行JDBC源并将数据发送到文件的方式。

以下是本博客系列的所有先前部分。

从RDBMS中获取数据

查询数据库并处理结果是一个非常基本的企业用例。追溯到信息技术革命的主机时代,我们就可以看到这种模式被广泛使用。在过去的几十年里,SQL已经成为与数据库通信的典型语言。Java从其最初的版本开始,就通过一个名为Java数据库连接(JDBC)的库,为基于数据库的企业级应用提供了支持,JDBC广为人知。在Java的早期,许多应用程序都是使用原生的JDBC库编写的。Spring Framework从一开始就通过提供一个基于JDK中JDBC库的模板模式——JdbcTemplate——来支持处理数据库的核心用例。Spring Data项目围绕这个模板添加了许多额外的功能。当Spring Integration出现时,它在Spring中吸收了这种支持,并提供了许多额外的组件,以便数据可以通过通道适配器、网关等方式获取。在其最新版本中,也是本博客的主题,我们意识到我们可以将这些Spring Integration组件作为简单的Java Supplier来查询数据库。我们将详细了解如何访问这个Supplier,在自定义应用程序中重用它,以及将其用作Spring Cloud Stream源。

JDBC Supplier

JDBC Supplier是一个实现为java.util.function.Supplier bean的组件。当被调用时,它将从数据库表中提供数据。JDBC Supplier具有以下签名。

Supplier<Flux<Message<?>>> jdbcSupplier()

默认情况下,JDBC Supplier根据数据库表的行来拆分数据,其中数据库的每一行都表示为一个java.util.Map数据结构。例如,这里有一个填充了一些数据的数据库表。

ID

姓名

1

Bob

2

Jane

3

John

当我们针对此表调用JDBC supplier时,我们会得到一个包含Map作为有效负载的Flux Message对象。第一个消息将包含一个键为IDNAME,值分别为1Bob的映射。第二个消息将包含具有相同键但值为2Jane的映射,依此类推。我们还可以要求在不将数据拆分为单独消息的情况下提供数据。为此,我们可以使用属性jdbc.supplier.split并将其设置为false(默认值为true)。当我们禁用拆分时,与上述JDBC supplier签名有一个重要区别。它的签名变为Supplier<Message<?>>,并且我们得到一个单独的Message<List<Map>,而不是单独的Message<Map>。如果我们将此应用于上面给出的示例,我们将得到一个包含3个元素的列表,每个元素都将包含一个包含数据库表中每行的Map

JDBC Supplier需要在每次调用时执行SQL查询。此查询是强制性的,必须通过属性jdbc.supplier.query提供。我们还可以使用属性jdbc.supplier.update强制Supplier忽略已读取的行。我们将在本文后面看到如何实现此操作的示例。

在自定义应用程序中重用JDBC Supplier

上面提到的两种Supplier bean——一种是我们看到的默认数据拆分方式,另一种是我们禁用数据拆分的方式——都命名为jdbcSupplier。我们可以用这个名称在自定义应用程序中注入它们。我们需要根据是否正在拆分数据来确保使用正确的类型。例如,如果我们按照默认值(即拆分数据),那么我们可以如下自动装配JDBC supplier。

@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;

另一方面,如果我们使用属性jdbc.supplier.split禁用拆分,那么我们需要以Supplier<Message<?>类型注入它。

一旦注入,我们就可以调用Supplierget方法,然后开始接收数据。

文件消费者

与我们在上一篇博客中看到的文件Supplier类似,文件消费者也是一个可重用的bean,我们可以将其注入到自定义应用程序中,并用它在目录中创建新文件。该bean被实现为java.util.function.Consumer。对于新手读者来说,这可能会导致一些困惑,为什么它被称为消费者,但实际上并没有从文件中消费任何东西。尽管文件消费者被命名并实现为消费者,但它不是轮询或读取文件的文件消费者,而是一个接受数据然后写入文件的消费者。“消费”文件的用例由文件Supplier处理。

这是文件消费者的类型签名。

Consumer<Message<?>> fileConsumer()

由于它是一个消费者,因此只有在数据处理管道的末端使用此组件才有意义。消费者接受传入数据并使用它写入文件。当我们将文件消费者通过绑定器实现与Spring Cloud Stream结合时,它就成为一个接收器应用程序,它从中间件目标(例如Kafka主题或RabbitMQ交换)消费数据。

在许多企业用例中,文件消费者都很有用。对于任何在有新数据可用时创建或追加文件的业务案例,文件消费者都很有用。

使用文件消费者时,我们可以分别通过属性file.consumer.directoryfile.consumer.name提供要写入的目录和文件名。如果我们不设置它们,它将使用消费者设置的默认值。默认情况下,数据将附加到文件中,这可以通过属性file.consumer.mode更改。有关更多详细信息,请参阅FileConsumerProperties的配置选项。

运行应用程序

当上述功能组件与Spring Cloud Stream结合时,它们变得更加强大。这些功能可以在消息传递应用程序中使用,Spring Cloud Stream使其更容易以与中间件无关的方式使用它们。JDBC Supplier用于构建可以与许多不同中间件系统一起工作的JDBC源。同样,文件消费者被用作文件接收器应用程序的骨干,它也可以与不同的消息传递系统一起工作。

在以下部分中,我们将独立运行这些应用程序并验证它们是否按预期工作。

设置Apache Kafka和MySQL数据库

首先,创建一个新目录jdbc-file-demo

mkdir jdbc-file-demo && cd jdbc-file-demo

我们将运行jdbc-souce和file-sink应用程序的Kafka变体。我们将使用Apache Kafka作为中间件来运行这些应用程序。对于JDBC源,我们将使用MySQL作为数据库。我们为Kafka和MySQL提供了方便的docker-compose脚本。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml

启动docker容器

docker-compose up

执行docker ps,确保您看到所有三个组件(Kafka、Zookeeper和MySQL)都已启动并正在运行。

现在我们已经准备好了必要的基础设施,让我们在运行应用程序之前设置好MySQL数据库。

docker exec -it jdbc-file-blog-mysql mysql -uroot -p

使用rootpw作为密码。在终端中,输入以下命令来设置数据库和表。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 tag CHAR(1),
	 PRIMARY KEY (id));

上面的模式很直观,但tag列需要一些解释。它用于避免从已读取的表中读取重复数据。其思想是,我们更新查询返回的每一行的tag列,以便它不会包含在后续查询中。我们将在下面看到详细信息。

保持MySQL终端会话打开,我们稍后会再回来。

独立运行文件接收器

让我们独立运行开箱即用的文件接收器。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar

然后如下运行。

java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo

让我们详细了解一下我们要做的。我们要求文件接收器应用程序从kafka主题jdbc-file-demo消费数据,然后在文件系统上的/tmp/processed-file目录中生成一个名为output.txt的文件。默认情况下,每个传入的Kafka主题记录都作为新行附加到文件中。如果将file.consumer.binary值设置为true,则文件将以二进制形式写入。您可以在此处找到所有可用配置。

运行JDBC源

与我们运行文件接收器的方式类似,我们现在将获取并运行基于Kafka的jdbc源应用程序。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar

然后运行,

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0

我们正在向JDBC Source提供以下配置信息。

  • 数据源URL - 在此情况下为我们正在运行的MySQL变体的JDBC URL。

  • 数据源用户凭据

  • 源应用程序要运行的SQL查询

  • 发送数据的Kafka主题(这与文件接收器正在消费数据的主题相同)

  • 用于标记记录的更新SQL语句

请注意,当我们之前创建表时,我们添加了一个名为tag的列,以避免读取我们已经读取的重复记录。我们的主要SQL查询(通过属性jdbc.supplier.query)将只读取tag值不为空的记录。然后,每次源读取一条记录时,tag都会更新为值1,这样下次执行查询时,该记录就会被忽略。如果不像我们上面那样通过jdbc.supplier.update提供更新语句,每次查询都将提供表中的所有记录。如果我们不提供此功能并且仍然想避免重复,那么我们需要使用一些复杂的策略,即使用一些元数据存储来跟踪我们迄今为止消费了什么。提供一个支持标志(如我们示例中的tag)的模式,然后在每次读取时更新它,是一种更简单的避免重复的策略。

轮询JDBC Source

JDBC Source是使用轮询器调用的。这与在自定义非Spring Cloud Stream应用程序中直接使用JDBC Supplier不同,在这种情况下,必须手动调用Supplier。默认情况下,Spring Cloud Stream为JDBC Source提供了一个轮询器,它每秒轮询一次。此调度可以通过使用属性spring.cloud.stream.poller.fixedDelay进行更改。有关轮询的更多控制可以在此处找到。

验证流程

现在我们正在运行这两个应用程序,让我们将数据插入到表中。

转到您的MySQL终端会话并输入以下插入语句。

mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);

现在转到文件接收器正在写入文件的目录/tmp/processed-file,查找名为output.txt的文件。打开该文件并验证其内容。它应该包含以下内容。

 {"id":1,"name":"Bob","street":"First Street","city":"First City"}

向表中填充更多数据。

mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);

验证我们是否在output.txt文件中看到了新数据。

运行带有过滤器的JDBC Source

停止运行JDBC Source应用程序,并按如下重新运行。

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

正如我们在之前的博客中看到的,以及此处所解释的,所有开箱即用的源都配置了许多有用的函数,您可以通过属性激活它们。在上述新配置选项下运行时,我们将filterFunctionjdbcSupplier组合,从而为JDBC Supplier生成的数据添加过滤功能。我们使用属性spring.cloud.stream.function.definition将其组合,并将其值设置为jdbcSupplier|filterFunction。然后通过属性filter.function.expression,我们提供一个JSONPath表达式来过滤掉所有偶数ID。

现在,如果您向表中插入更多数据,您将看到只有具有奇数ID的记录被写入文件。

尝试将这些记录输入到表中。

mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);

我们将看到文件不包含ID为200、202和204的记录,因为它们被过滤掉了。

使用其他数据库运行JDBC Source。

JDBC supplier随流行的开源JDBC驱动程序一起提供。目前,它包含MySQL、PostgreSQL和Microsoft SQL Server数据库的驱动程序。这使我们能够快速切换同一个针对特定数据库(例如MySQL)运行的JDBC Source应用程序,使其在不进行任何代码更改的情况下针对PostgreSQL运行,而只需在部署时进行配置更改。让我们以我们针对MySQL运行的JDBC Source为例,这次针对PostgreSQL运行。

首先,我们将在docker容器中运行PostgreSQL。

docker run --rm   --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo  -d -p 5432:5432  postgres

登录到psql会话(或使用PGAdmin等UI工具)。

docker run -it --rm --network host postgres psql -h localhost -d demo -U test

使用test作为密码。

然后创建此表

CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));

停止当前的JDBC Source,并使用以下配置选项重新运行它。

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

这与我们第二次针对MySQL运行JDBC源时的配置选项基本相同,但这次数据源属性已更改为针对PostgreSQL数据库运行。

在psql提示符下插入与之前MySQL相同的数据。您会注意到只有ID为奇数的记录被追加到文件中。

向JDBC Supplier添加商业数据库驱动程序

如果我们要添加商业数据库的JDBC驱动程序,那么我们需要手动进行这些更改。具体步骤如下所示。

  • 克隆stream-application仓库

  • maven配置中添加所需的驱动程序(例如Oracle JDBC驱动程序)作为依赖项。将其作用域设置为runtime

  • 从仓库根目录执行:./mvnw clean install -pl :jdbc-supplier

  • 通过Supplier更改生成应用程序:./mvnw clean install -pl :jdbc-source

  • cd applications/source/jdbc-source/apps - 在这里,我们可以找到基于Kafka和RabbitMQ的jdbc-source应用程序。

  • 构建我们想要的应用程序变体。

结论

这篇博客详细介绍了JDBC Supplier以及该Supplier在Spring Cloud Stream JDBC Source中的用法。我们还看到了文件消费者及其在Spring Cloud Stream中的接收器对应物。然后,我们深入探讨了使用几种变体独立运行这些应用程序,并在此过程中探索了各种功能。最后,我们了解了如何轻松地在各种数据库之间切换JDBC Source,以及如何添加新的数据库驱动程序。

敬请期待

本系列将继续。在接下来的几周,我们将研究更多的函数和应用程序。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有