领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在本系列文章中,我们已经介绍了新的基于 Java 函数的流应用程序,以及函数组合。我们还提供了详细的示例,说明如何从供应商创建源以及从消费者创建接收器。在这里,我们将继续这段旅程,并进行第一个案例研究,后续还将进行更多案例研究。每个案例研究都演示了如何在各种场景中使用一个或多个可用的预打包 Spring Boot 流应用程序来构建数据流管道。
今天,我们将展示两个最常用的应用程序:HTTP 源和JDBC 接收器。我们将使用它们来构建一个简单的服务,该服务接受 HTTP POST 请求并将内容保存到数据库表中。我们首先将它们作为独立的Spring Cloud Stream应用程序运行,然后演示如何使用Spring Cloud Data Flow来编排相同的管道。这是一个分步教程,我们鼓励您在阅读时按照步骤操作。
这个简单的流式应用程序由两个通过消息代理通信的远程进程组成。预打包的流应用程序可以开箱即用地与 Apache Kafka 或 RabbitMQ 配合使用。在这里,我们将使用 Apache Kafka。JDBC 接收器将数据插入数据库。在本例中,我们将使用 MySQL。
让我们假设我们从头开始,并且在我们的开发环境中没有可用的 Kafka 或 MySQL。要运行此示例,我们将使用 Docker 进行一些有趣的操作。因此,我们需要在本地计算机上运行Docker。稍后我们将使用 Spring Cloud Data Flow,因此我们将利用 Data Flow 的docker-compose 安装。这是开始使用 Data Flow 的最简单方法。它启动了几个容器,包括 MySQL 和 Kafka。为了使这些后端服务可供独立应用程序使用,我们需要调整标准安装以发布端口,并更改 Kafka 的公告主机名。
注意
我在 Mac OS 上运行了此设置,并预计类似的设置将在 Windows 上运行。如果您遇到问题或有一些有用的提示要分享,请在评论部分留下笔记。
首先,让我们创建一个名为 http-jdbc-demo
的目录,并将 docker-compose.yml
从 github 下载到该目录
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml
或者
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml
为了启用从本地主机到 Kafka 和 MySQL 的连接,我们将下载另一个 YAML 片段以覆盖或自定义配置。
wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml
接下来,我们需要获取本地计算机的 LAN IP 地址。在 Mac 上,您可以通过多种方式执行此操作,例如
dig +short $(hostname)
或者
ping $(hostname)
LAN IP 地址也可供 docker 容器访问,而容器内部的 localhost
或 127.0.0.1
指的是自身。我们需要将环境变量 KAFKA_ADVERTISED_HOST_NAME
设置为此值。我们还需要设置一些其他环境变量
export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
并在 Data Flow 中注册最新的流应用程序
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
现在,从我们的项目目录中,我们可以启动 Data Flow 集群
docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up
这将显示许多日志消息,并继续运行直到您终止它(例如,Ctrl-C),这将停止所有容器。请保持此终端打开。
打开一个新的终端并键入
docker ps
这将列出 Data Flow 集群的正在运行的容器。我们稍后会查看 Data Flow。此时,请确保 dataflow-kafka
容器在 PORTS
下显示 0.0.0.0:9092→9092/tcp
,并且 dataflow-mysql
同样显示 0.0.0.0:3306→3306/tcp
。
我们可以将 JDBC 接收器应用程序配置为自动初始化数据库,但为了简单起见,我们将手动创建它。我们可以使用任何 JDBC 数据库工具或通过从 dataflow-mysql
容器内运行 mysql
来执行此操作
docker exec -it dataflow-mysql mysql -uroot -p
系统将提示您输入密码。数据库凭据在docker-compose.yml中配置。如果您不想查看那里,则用户名为 root
,密码为 rootpw
。
输入以下命令 - 您应该能够复制粘贴整个内容 - 以创建数据库和表。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
PRIMARY KEY (id));
键入 exit;
退出。
此时,我们已准备好运行 HTTP 源和 JDBC 接收器。Spring Boot 可执行 jar 文件发布到 Spring Maven 存储库。我们需要使用 Kafka 绑定器构建的 jar 文件
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar
我们将在单独的终端会话中运行这些文件。我们需要将这些应用程序配置为使用相同的 Kafka 主题,我们将其称为 jdbc-demo-topic
。Spring Cloud Stream Kafka 绑定器将自动创建此主题。我们还需要将 JDBC 接收器配置为连接到我们的数据库并将数据映射到我们创建的表。我们将发布如下所示的 JSON
{
“name”:”My Name”,
“address”: {
“street”:”My Street”,
“city”: “My City”
}
}
我们希望将这些值插入 Demo
数据库中的 People
表中,插入到 name
、street
和 city
列中。
打开一个新的终端会话,我们在其中下载了 jar 文件并运行
java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://127.0.0.1:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic
请注意用于将字段映射到列的 jdbc.consumer.columns
语法。
打开一个新的终端会话,我们在其中下载了 jar 文件并运行
java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic
在这里,我们将 HTTP 端口设置为 9000 用于源(默认值为 8080)。此外,源的输出目标与接收器的输入目标匹配非常重要。
接下来,我们需要将一些数据发布到 https://127.0.0.1:9000。
curl https://127.0.0.1:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}
再次找到一个打开的终端会话,并
docker exec -it dataflow-mysql mysql -uroot -p
使用 rootpw
登录并查询表
如果您看到此信息,恭喜!独立的 Spring Cloud Stream 应用程序按预期工作。我们现在可以终止我们的独立应用程序(Ctrl-C)。保持 docker-compose 进程运行,以便我们可以查看 Data Flow。
正如我们所看到的,在“裸机”上运行这些应用程序需要许多手动步骤,即使我们不必编写任何代码。这些包括
自定义 docker-compose 配置,或者在本地机器上安装 kafka 和 mysql
使用 Maven URL 下载所需的流应用程序版本(我们碰巧知道要使用哪些版本)
确保 Spring Cloud Stream 目标绑定配置正确,以便应用程序可以通信
查找并阅读 文档 以获取配置属性(我们已经这样做了以准备此示例)并正确设置它们。
管理多个终端会话
在以下部分中,我们将看到使用 Spring Cloud Data Flow 可以消除所有这些步骤,并提供更丰富的整体开发体验。
首先,在 https://127.0.0.1:9393/dashboard 打开 Data Flow 仪表板。这将带您到应用程序视图,我们可以在其中看到已注册的预打包应用程序。我们之前运行的 docker-compose 命令执行了此步骤,使用我们提供的 URL 获取流应用程序的最新快照版本,包括我们刚刚运行的相同 jar 文件。
在仪表板中,从左侧菜单中选择Streams
,然后单击Create Streams
以打开图形流编辑器。
将 http 源和 jdbc 接收器拖放到编辑器窗格中,并使用鼠标连接这两个句柄。或者,您可以将 Data Flow 流定义 DSL 直接键入顶部的文本框中:http | jdbc
。
接下来我们需要配置应用程序。如果单击任一应用程序,您将看到一个Options
链接。打开 JDBC 接收器的选项窗口。您将看到所有可用的配置属性以及简短的描述。以下屏幕截图显示了部分视图;我们需要滚动才能看到其余部分。
和以前一样,我们需要提供 url、用户名、密码、表和列。在这里,我们需要将 JDBC URL 更改为jdbc:mariadb://mysql:3306/Demo
,因为主机名mysql
对应于docker-compose.yml
中定义的 mysql 服务的名称。此外,我们将 http 端口设置为20000
,因为它在已配置的发布端口范围内。有关更多详细信息,请参阅 skipper-server 配置。
让我们看一下自动生成的流定义 DSL
http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People
此 DSL 可用于脚本或 Data Flow 客户端应用程序来自动化流创建。我们的配置已完成,但是 Spring Cloud Stream 目标绑定在哪里?我们不需要它们,因为 Data Flow 会为我们处理连接。
选择Create Stream
按钮并将流命名为http-jdbc
。
要部署流,请单击播放按钮
接受默认部署属性,然后单击页面底部的Deploy stream
。
根据需要单击Refresh
按钮。大约一分钟后,您应该会看到我们的流已部署。
在这里,我们将一些不同的值发布到端口 20000
curl https://127.0.0.1:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'
当我们再次运行查询时,应该会看到表中添加了一行新数据。
干得好!
敏锐的读者会注意到,即使平台本身在容器中运行,也没有为已部署的应用程序创建 Docker 容器。在 Data Flow 架构 中,Skipper 服务器负责部署流应用程序。在本地配置中,Skipper 使用 Local Deployer 在其localhost
上运行 jar 文件,就像我们独立运行应用程序时一样。要查看情况是否如此,我们可以在 skipper 容器中运行ps
docker exec -it skipper ps -ef
要查看控制台日志,请使用stdout
路径
docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log
tail -f
命令也可以使用。
如果部署成功,也可以从 UI 查看应用程序日志。
但是,如果部署失败,我们可能需要深入了解以对其进行故障排除。
注意
本地 Data Flow 安装适用于本地开发和探索,但我们不建议将其用于生产环境。生产级 Spring Cloud Data Flow OSS 以及商业许可产品可用于 Kubernetes 和 Cloud Foundry。
我们刚刚仔细研究了如何从预打包的 Spring Cloud Stream 应用程序构建一个简单的消息流管道,以将通过 HTTP 发布的 JSON 内容保存到关系数据库中。我们使用 Docker 和 docker-compose 安装了本地环境,然后我们部署了源和接收器应用程序,首先是在“裸机”上,然后使用 Spring Cloud Data Flow。希望我们学习了一些关于使用 Spring Cloud Stream、Data Flow、Docker 容器、HTTP 源和 JDBC 接收器的有趣知识。
在接下来的几周里,我们将提供更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每个案例研究都将探索不同的流应用程序和功能。