领先一步
VMware提供培训和认证,以加速您的进步。
了解更多本文是博客系列的一部分,该系列探讨了基于Java函数的最新重新设计的Spring Cloud Stream应用程序。在本篇中,我们将探讨基于Spring Cloud Stream的JDBC供应器和源。我们将了解如何从关系数据库导出数据并使用文件使用者和相应的Spring Cloud Stream文件接收器将其转储到文件中。我们将研究一些不同的方法来运行JDBC源并将数据发送到文件。
以下是此博客系列的所有先前部分。
查询数据库并处理结果是一个非常基本的企业用例。即使追溯到信息技术革命的大型机时代,我们也可以看到这种模式被广泛使用。在过去的几十年里,SQL已经成为与数据库通信的典型语言。Java从最初的版本开始,就使用名为Java Database Connectivity(通常称为JDBC)的库为基于数据库的应用程序添加了企业级支持。在Java的早期,许多应用程序都是使用普通的JDBC库编写的。Spring Framework从一开始就通过提供一个基于JDK中JDBC库的模板模式——JdbcTemplate——来支持这个与数据库交互的核心用例。Spring Data项目在此模板的基础上增加了许多附加功能。当Spring Integration出现时,它利用Spring中的这种支持并提供了许多附加组件,以便可以通过通道适配器、网关等获得数据。在其最新的版本中(也是这篇博客的主题),我们意识到我们可以采用这些Spring Integration组件,然后将它们作为简单的Java供应器来查询数据库。我们将详细了解如何访问此供应器、在自定义应用程序中重用它以及将其用作Spring Cloud Stream源。
JDBC供应器是一个作为java.util.function.supplier
bean实现的组件。调用时,它将提供来自数据库表的数据。JDBC供应器具有以下签名。
Supplier<Flux<Message<?>>> jdbcSupplier()
默认情况下,JDBC供应器根据数据库表的行拆分数据,其中数据库的每一行都表示为java.util.Map
数据结构。例如,这是一个包含一些已填充数据的数据库表。
ID
姓名
1
Bob
2
Jane
3
John
当我们针对此表调用JDBC供应器时,我们将获得一个Flux的Message对象,这些对象包含一个Map
作为有效负载。第一条消息将包含一个键为ID
和NAME
,值分别为1
和Bob
的映射。第二条消息将包含键相同,但值为2
和Jane
的映射,依此类推。我们也可以要求提供数据而不将其拆分为单独的消息。为此,我们可以使用属性jdbc.supplier.split
并将其设置为false
(默认为true
)。当我们禁用拆分时,与上述JDBC供应器签名相比,存在一个重要的区别。它的签名变为Supplier<Message<?>>
,并且不再具有单独的Message<Map>
,而是获得单个Message<List<Map>
。如果我们将其应用于上述示例,我们将获得一个包含3个元素的单个列表,并且每个元素将保存一个包含数据库表每一行的Map
。
JDBC供应器需要一个在每次调用时执行的SQL查询。此查询是必需的,必须通过属性jdbc.supplier.query
提供。我们还可以使用属性jdbc.supplier.update
强制供应器忽略已经读取的行。我们将在本文后面看到如何做到这一点的示例。
上面提到的两种类型的Supplier
bean——我们在默认情况下看到的数据拆分和禁用数据拆分的另一个bean——都命名为jdbcSupplier
。我们可以使用该名称限定它们注入到我们的自定义应用程序中。我们需要确保根据我们是否正在拆分数据来使用正确的类型。例如,如果我们使用默认设置,即拆分数据,那么我们可以像下面这样自动装配JDBC供应器。
@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;
另一方面,如果我们使用属性jdbc.supplier.split
禁用拆分,那么我们需要使用类型Supplier<Message<?>
注入它。
注入后,我们可以调用Supplier
的get
方法,然后开始接收数据。
与我们在之前的博客中看到的 文件供应器类似,文件使用者也是一个可重用的bean,我们可以将其注入到我们的自定义应用程序中,并使用它在目录中创建新文件。该bean作为java.util.function.Consumer
实现。对于新手读者来说,为什么它被称为使用者,而实际上并没有从文件中消费任何东西,这可能会导致一些混淆。尽管命名和实现为使用者,但文件使用者不是轮询或读取文件的使用者,而是一个接受数据然后写入文件的使用者。“使用”文件的用例由文件供应器处理。
以下是文件使用者的类型签名。
Consumer<Message<?>> fileConsumer()
由于它是一个使用者,因此只有在数据处理管道的末尾使用此组件才有意义。使用者接受传入数据并使用它写入文件。当我们通过绑定程序实现将文件使用者与Spring Cloud Stream结合使用时,它将成为一个接收器应用程序,它从中间件目标(例如Kafka主题或RabbitMQ交换机)中使用数据。
文件使用者在许多企业用例中很有用。对于任何在可用新数据时创建或追加到文件的业务案例,文件使用者都很有用。
使用文件使用者时,我们可以使用属性file.consumer.directory
和file.consumer.name
分别提供要写入的目录和文件名。如果我们不设置它们,它将使用使用者设置的默认值。默认情况下,数据将追加到文件中,这可以通过属性file.consumer.mode
更改。有关更多详细信息,请参阅FileConsumerProperties的配置选项。
将上述功能组件与Spring Cloud Stream结合使用后,功能会变得更加强大。这些函数可用于消息传递应用程序,Spring Cloud Stream 使它们更易于以与中间件无关的方式使用。JDBC供应器用于构建JDBC源,该源可以与许多不同的中间件系统一起工作。类似地,文件使用者用作文件接收器应用程序的骨干,该应用程序也可以与不同的消息传递系统一起工作。
在以下部分中,我们将独立运行这些应用程序并验证它们是否按预期工作。
首先,创建一个新目录jdbc-file-demo
。
mkdir jdbc-file-demo && cd jdbc-file-demo
我们将运行 jdbc-source 和 file-sink 应用程序的 Kafka 版本。我们将使用 Apache Kafka 作为中间件运行这些应用程序。对于 JDBC 源,我们将使用 MySQL 作为数据库。我们为 Kafka 和 MySQL 提供了一个方便的 docker-compose 脚本。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml
启动 Docker 容器
docker-compose up
执行 docker ps
并确保看到所有三个组件都已启动并运行(Kafka、Zookeeper 和 MySQL)。
现在我们已经准备好必要的基础设施,让我们在运行应用程序之前设置我们的 MySQL 数据库。
docker exec -it jdbc-file-blog-mysql mysql -uroot -p
使用 rootpw
作为密码。在终端上,输入以下命令来设置数据库和表。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
上述模式非常容易理解,但是 tag
列需要一些解释。它用于避免读取我们已经读取过的表中的重复数据。其思想是我们更新每个由查询返回的行中的 tag
列,以便它不会包含在后续查询中。我们将在下面看到它的细节。
保持 MySQL 终端会话处于打开状态,因为我们稍后会回到它。
让我们运行开箱即用的独立文件接收器。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar
然后按如下方式运行它。
java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo
让我们来看一下我们试图做什么的细节。我们要求文件接收器应用程序从 Kafka 主题 jdbc-file-demo
消费数据,然后在文件系统上的 /tmp/processed-file
目录中生成一个名为 output.txt
的文件。默认情况下,每个传入的 Kafka 主题记录都作为新行附加到文件中。如果启用 file.consumer.binary
值为 true
,则文件将以二进制形式写入。您可以在这里找到所有可用的配置 这里。
与运行文件接收器的方式类似,我们现在将获取并运行基于 Kafka 的 jdbc 数据源应用程序。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar
然后运行:
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://127.0.0.1:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0
我们为 JDBC 数据源提供以下配置信息。
数据源 URL - 在这种情况下,是我们正在运行的 MySQL 版本的 JDBC URL。
数据源用户凭据
数据源应用程序要运行的 SQL 查询
发送数据的 Kafka 主题(此主题设置为文件接收器从中消费数据的同一主题)
更新 SQL 语句以标记记录
请注意,当我们之前创建表时,我们添加了一个名为 tag
的列以避免读取我们已经读取过的重复记录。我们的主要 SQL 查询(通过属性 jdbc.supplier.query
)只会读取 tag
值不为空的那些记录。然后每次数据源读取一条记录时,都会将标记更新为 1
,以便下次执行查询时,该记录将被省略。如果没有通过 jdbc.supplier.update
提供更新语句,就像我们上面所做的那样,每个查询都将提供表中的所有记录。如果我们不提供这个并且仍然想要避免重复,那么我们需要使用一些使用某些 元数据存储 来跟踪我们到目前为止已消费内容的复杂策略。提供支持标志(例如我们示例中的 tag
)的模式,然后在每次读取时更新它,这是一种避免重复的更简单的策略。
JDBC 数据源使用轮询器调用。这与在自定义非 Spring Cloud Stream 应用程序中直接使用 JDBC 提供程序不同,在这种情况下,必须手动调用提供程序。默认情况下,Spring Cloud Stream 为 JDBC 数据源提供一个轮询器,它每秒轮询一次。可以使用属性 spring.cloud.stream.poller.fixedDelay
更改此计划。可以在 这里 找到有关轮询的更多控件。
现在我们正在运行这两个应用程序,让我们将数据插入表中。
转到您的 MySQL 终端会话并输入以下插入语句。
mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);
现在转到文件接收器正在写入文件的 /tmp/processed-file
目录,并查找名为 output.txt
的文件。打开文件并验证其内容。它应该包含以下内容。
{"id":1,"name":"Bob","street":"First Street","city":"First City"}
将更多数据填充到表中。
mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);
验证我们是否在 output.txt
文件中看到了新数据。
停止运行 JDBC 数据源应用程序并按如下方式重新运行。
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://127.0.0.1:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
正如我们在 之前的博文中 所见,并如 此处 所述,所有开箱即用的数据源都使用许多有用的函数自动配置,以便您可以通过属性激活它们。在使用上面新的配置选项运行时,我们将 filterFunction 与 jdbcSupplier
组合,从而为 JDBC 提供程序生成的数据添加过滤功能。我们使用属性 spring.cloud.stream.function.definition
组合它,并为其赋予值 jdbcSupplier|filterFunction
。然后通过属性 filter.function.expression
,我们提供一个 JSONPath 表达式来过滤掉所有偶数 ID。
现在,如果您将更多数据插入表中,您将看到只有具有奇数 ID 的记录被写入文件。
尝试将这些记录输入表中。
mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);
我们将看到该文件不包含 ID 为 200、202 和 204 的记录,因为它们已被过滤掉。
JDBC 提供程序附带流行的开源 JDBC 驱动程序。目前,它包含用于 MySQL、PostgreSQL 和 Microsoft SQL Server 数据库的驱动程序。这使我们能够快速切换针对特定数据库运行的同一 JDBC 数据源应用程序,例如 MySQL,以针对 PostgreSQL 运行,而无需进行任何代码更改,而只需在部署时进行配置更改。让我们以我们针对 MySQL 运行的 JDBC 数据源为例,这次针对 PostgreSQL 运行。
首先,我们将在 Docker 容器中运行 PostgreSQL。
docker run --rm --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo -d -p 5432:5432 postgres
登录到 psql
会话(或使用 PGAdmin 等 UI 工具)。
docker run -it --rm --network host postgres psql -h localhost -d demo -U test
使用 test
作为密码。
然后创建此表
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
停止当前的 JDBC 数据源并使用以下配置选项重新运行它
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
这与我们第二次针对 MySQL 运行 JDBC 数据源时使用的配置选项大致相同,但这次数据源属性已更改为针对 PostgreSQL 数据库运行。
在 psql 提示符下插入与 MySQL 相同的数据。您会注意到只有 ID 为奇数的记录被附加到文件中。
如果要添加商用数据库的JDBC驱动程序,则需要手动进行这些更改。操作步骤很简单,如下所示。
克隆stream-application 仓库
在maven配置中添加所需的驱动程序(例如Oracle JDBC驱动程序)作为依赖项。将其作用域设置为runtime
。
从仓库根目录执行:./mvnw clean install -pl :jdbc-supplier
使用供应商更改生成应用程序:./mvnw clean install -pl :jdbc-source
cd applications/source/jdbc-source/apps
- 在这里,您可以找到基于Kafka和RabbitMQ的jdbc-source应用程序
构建所需的应用程序变体。
这篇博文详细介绍了JDBC Supplier以及它在Spring Cloud Stream JDBC Source中的使用方法。我们还了解了Spring Cloud Stream中的文件消费者及其sink对应部分。然后,我们深入研究了使用一些变体独立运行这些应用程序,并在此过程中探索了各种功能。最后,我们了解了如何轻松地在不同的数据库之间切换JDBC Source,以及如何添加新的数据库驱动程序。
本系列将继续。在接下来的几周内,我们将研究更多功能和应用程序。