案例研究:使用 HTTP Source 和 JDBC Sink 构建并运行流式应用程序

工程 | David Turanski | 2020 年 8 月 10 日 | ...

引言

到目前为止,本系列已经介绍了基于 Java 函数的流式应用程序函数组合。我们还提供了详细的示例,说明如何从 supplier 构建 source 以及如何从 consumer 构建 sink。在这里,我们将继续这一旅程,展示接下来的几个案例研究中的第一个。每个案例研究都演示了如何在各种场景下使用一个或多个现有的预打包 Spring Boot 流式应用程序来构建数据流管道。

今天我们将展示两个最常用的应用程序:HTTP sourceJDBC sink。我们将使用它们构建一个简单的服务,该服务接受 HTTP POST 请求并将内容保存到数据库表中。我们将首先将它们作为独立的 Spring Cloud Stream 应用程序运行,然后展示如何使用 Spring Cloud Data Flow 编排相同的管道。本文将以分步教程的形式呈现,我们鼓励您在阅读时按照步骤进行操作。

准备环境

这个简单的流式应用程序由两个通过消息代理通信的远程进程组成。预打包的流式应用程序开箱即用,可与 Apache Kafka 或 RabbitMQ 一起工作。在这里,我们将使用 Apache Kafka。JDBC sink 将数据插入到数据库中。我们将使用 MySQL 作为此示例的数据库。

application schematic

假设我们从头开始,开发环境中没有 Kafka 或 MySQL。为了运行这个示例,我们将使用 Docker 来玩一下。所以我们需要在本地机器上运行 Docker。稍后我们将使用 Spring Cloud Data Flow,因此我们将利用 Data Flow 的 docker-compose 安装。这是开始使用 Data Flow 最简单的方法。它会启动几个容器,包括 MySQL 和 Kafka。为了使这些后端服务可供独立应用程序使用,我们需要调整标准安装以发布端口,并更改 Kafka 的 advertised host name。

注意

我已经在 Mac OS 上运行过这种设置,预计类似的设置也可以在 Windows 上工作。如果您遇到问题或有一些有用的技巧要分享,请在评论区留言。

首先,让我们创建一个名为 http-jdbc-demo 的目录,然后将 docker-compose.yml 从 github 下载到该目录中。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

或者

curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml

为了启用从本地主机到 Kafka 和 MySQL 的连接,我们将下载另一部分 YAML 文件来覆盖或自定义配置。

wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml

接下来,我们需要获取本地机器的局域网 IP 地址。在 Mac 上,您可以通过几种方式之一执行此操作,例如

dig +short $(hostname)

或者

ping $(hostname)

局域网 IP 地址对于 docker 容器也是可访问的,而容器内的 localhost127.0.0.1 指的是自身。我们需要将环境变量 KAFKA_ADVERTISED_HOST_NAME 设置为此值。我们还需要设置其他一些环境变量

export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0

并将最新的流应用程序注册到 Data Flow 中

export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

现在,从我们的项目目录中,我们可以启动 Data Flow 集群

docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up

这将显示大量的日志消息,并将持续运行直到您终止它(例如,Ctrl-C),这将停止所有容器。保持此终端打开。

打开一个新终端并输入

docker ps

这将列出 Data Flow 集群中正在运行的容器。稍后我们将查看 Data Flow。此时,请确保 dataflow-kafka 容器在 PORTS 下显示 0.0.0.0:9092→9092/tcp,并且 dataflow-mysql 类似地显示 0.0.0.0:3306→3306/tcp

docker ps

创建数据库表

我们可以配置 JDBC sink 应用程序自动初始化数据库,但为了简单起见,我们将手动创建它。我们可以使用任何 JDBC 数据库工具或在 dataflow-mysql 容器内运行 mysql 来完成此操作。

docker exec -it dataflow-mysql mysql -uroot -p

系统将提示您输入密码。数据库凭据配置在 docker-compose.yml 中。如果您不想查看那里,用户名是 root,密码是 rootpw

输入以下命令(您应该能够复制并粘贴整个内容)来创建数据库和表。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 PRIMARY KEY (id));

输入 exit; 退出。

create database

运行应用程序

此时,我们准备好运行 HTTP source 和 JDBC sink 了。Spring Boot 可执行 jar 已发布到 Spring Maven 仓库。我们需要使用 Kafka binder 构建的 jar 包。

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar

我们将在不同的终端会话中运行这些应用程序。我们需要配置这些应用程序使用同一个 Kafka topic,我们将其命名为 jdbc-demo-topic。Spring Cloud Stream Kafka binder 将自动创建此 topic。我们还需要配置 JDBC sink 连接到我们的数据库,并将数据映射到我们创建的表中。我们将发布如下所示的 JSON 数据:

{
 “name”:”My Name”,
 “address”: {
      “street”:”My Street”,
       “city”: “My City”
    }
}

我们希望将这些值插入到 Demo 数据库的 People 表中的 namestreetcity 列。

启动 JDBC Sink

打开一个下载了 jar 包的新终端会话并运行

java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://localhost:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic

注意用于将字段映射到列的 jdbc.consumer.columns 语法。

启动 HTTP Source

打开一个下载了 jar 包的新终端会话并运行

java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic

这里我们将 source 的 HTTP 端口设置为 9000(默认是 8080)。另外,source 的输出目的地与 sink 的输入目的地匹配非常重要。

发送一些数据

接下来,我们需要向 http://localhost:9000 发送一些数据。

curl http://localhost:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}

验证数据是否已保存

再次,找到一个打开的终端会话并

docker exec -it dataflow-mysql mysql -uroot -p

使用 rootpw 登录并查询表

query standalone

如果您看到此结果,恭喜!独立的 Spring Cloud Stream 应用程序按预期工作。我们现在可以终止独立应用程序(Ctrl-C)了。让 docker-compose 进程保持运行,以便我们可以查看 Data Flow。

使用 Spring Cloud Data Flow

正如我们所见,即使我们不必编写任何代码,在“裸机”上运行这些应用程序也需要许多手动步骤。这些步骤包括:

  • 自定义 docker-compose 配置,或另外在本地机器上安装 kafka 和 mysql

  • 使用 Maven URL 下载所需版本的流应用程序(我们只是恰好知道在这里应该使用哪些版本)

  • 确保 Spring Cloud Stream destination binding 配置正确,以便应用程序能够通信

  • 查找并阅读 文档 以获取配置属性(我们已经为准备此示例做了此工作)并正确设置它们。

  • 管理多个终端会话

在接下来的章节中,我们将看到使用 Spring Cloud Data Flow 完成此操作可以消除所有这些步骤,并提供更丰富的整体开发体验。

打开 Data Flow 控制台

要开始使用,请在 http://localhost:9393/dashboard 打开 Data Flow 控制台。这将带您进入应用程序视图,在那里我们可以看到已注册的预打包应用程序。我们之前运行的 docker-compose 命令执行了此步骤,它使用了我们提供的 URL 来获取流应用程序的最新快照版本,包括我们刚刚运行的相同 jar 文件。

datflow ui applications

创建并部署流

在控制台中,从左侧菜单中选择 Streams,然后单击 Create Streams 打开图形化流编辑器。

dataflow create stream

将 http source 和 jdbc sink 拖放到编辑器面板中,并使用鼠标连接这两个句柄。或者,您可以直接在顶部的文本框中键入 Data Flow 流定义 DSL:http | jdbc

接下来我们需要配置应用程序。如果您单击其中任何一个应用程序,您将看到一个 Options 链接。打开 JDBC sink 的选项窗口。您将看到所有可用的配置属性及其简要描述。以下截图显示了部分视图;我们需要滚动才能看到其余部分。

datflow ui jdbc options

和之前一样,我们需要提供 url、用户名、密码、表和列。在这里,我们需要将 JDBC URL 更改为 jdbc:mariadb://mysql:3306/Demo,因为主机名 mysql 对应于 docker-compose.yml 中定义的 mysql 服务名称。此外,我们将 http 端口设置为 20000,因为它在配置的已发布端口范围内。有关更多详细信息,请参阅 skipper-server 配置

![]](https://github.com/spring-cloud/stream-applications/blob/gh-pages/img/http-jdbc/datflow-ui-stream-configured.png?raw=true)

让我们看看自动生成的流定义 DSL

http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People

这个 DSL 可以用于脚本或 Data Flow 客户端应用程序,以自动化流创建。我们的配置已完成,但是 Spring Cloud Stream destination bindings 在哪里?我们不需要它们,因为 Data Flow 会为我们处理连接。

选择 Create Stream 按钮并将流命名为 http-jdbc

dataflow ui deploy 1

要部署流,请单击播放按钮

play button

接受默认部署属性,然后单击页面底部的 Deploy stream

如有必要,单击 Refresh 按钮。大约一分钟后,您应该会看到我们的流已部署。

dataflow ui deploy 2

发送一些数据并验证是否已保存

在这里,我们将发送一些不同的值到端口 20000

curl http://localhost:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'

当我们再次运行查询时,应该会看到表中添加了一条新记录。

query dataflow

干得漂亮!

Data Flow 应用程序部署

细心的读者会注意到,尽管平台本身运行在容器中,但已部署的应用程序并没有创建 Docker 容器。在 Data Flow 的 架构 中,Skipper 服务器负责部署流应用程序。在本地配置中,Skipper 使用 Local Deployer 在其 localhost 上运行 jar 文件,就像我们独立运行应用程序时一样。要验证这一点,我们可以在 skipper 容器中运行 ps 命令。

docker exec -it skipper ps -ef

docker exec ps

要查看控制台日志,请使用 stdout 路径

docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log

tail -f 命令也同样适用。

如果部署成功,应用程序日志也可以从 UI 中查看。

dataflow ui app log

但如果部署失败,我们可能需要深入查看以进行故障排除。

注意

本地 Data Flow 安装适用于本地开发和探索,但我们不推荐用于生产环境。生产级别的 Spring Cloud Data Flow OSS 以及商业许可产品,都适用于 KubernetesCloud Foundry

总结

我们刚刚详细介绍了如何使用预打包的 Spring Cloud Stream 应用程序构建一个简单的数据流管道,将通过 HTTP POST 的 JSON 内容保存到关系型数据库中。我们使用 Docker 和 docker-compose 安装了本地环境,然后首先在“裸机”上部署了 source 和 sink 应用程序,然后使用 Spring Cloud Data Flow 进行部署。希望我们通过与 Spring Cloud Stream、Data Flow、Docker 容器、HTTP source 和 JDBC sink 的交互学到了一些有趣的东西。

敬请期待…​

在接下来的几周里,我们将为大家带来更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每个案例都将探讨不同的流应用程序和功能。

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

抢占先机

VMware 提供培训和认证,助力您快速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部