1 个流、2 个应用程序和 3 个 Spring Cloud Data Flow 的依赖项

工程 | Josh Long | 2016 年 4 月 5 日 | ...

我只是想在这里记录一下昨天让我感到高兴的一段经历:在短短几分钟内,让快速改进的 Spring Cloud Data Flow 从(Spring Boot)启动器转变为服务!

唯一的先决条件是运行一个 Redis 实例。我的 Redis 实例运行在 127.0.0.1 上,Spring Boot 不需要进一步的配置就可以找到并与其协同工作。

我们将使用史诗级的 Spring Initializr 来快速生成我们的应用程序。还记得那些愚蠢的苹果广告,“那里有一个应用程序可以解决它?” 忘了那回事吧,那里有一个复选框可以解决它! 看看你是否喜欢这种体验,就像我一样!

本地数据流服务器

转到 Spring Initializr,选择 Local Data Flow Server,并将工件命名为 df-server。这将用于搭建本地数据流服务 - 一个 REST API 和一些持久化逻辑 - 以协调和存储有关流和任务的信息。在旧的 Spring XD 世界里,这被称为 Spring XDAdmin Server

在您选择的 IDE 中打开项目,并将 @EnableDataFlowServer 添加到 DfServerApplication 类中。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;

@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DfServerApplication.class, args);
	}
}

df-server 项目的根目录下运行 mvn spring-boot:run,应用程序将在端口 9393 上启动。

提示:当您看到欢迎使用的 ASCII 艺术作品时,就(很可能)成功了!

关于提示的提示:好吧,这可能不完全正确。它可能因为各种原因失败(例如服务端口或嵌入式 H2 数据库端口冲突),但根据(我的)研究...(我的研究对象是我自己)... 的研究表明,高质量的 ASCII 艺术作品具有治疗作用。

数据流 Shell

转到 Spring Initializr,选择 Data Flow Shell,并将工件命名为 df-shell。这将用于搭建一个由 Spring Shell 驱动的数据流 Shell。

数据流 Shell 可以在任何操作系统上运行。它是我们刚刚搭建的数据流服务的客户端。它允许我们使用熟悉的管道和过滤器 DSL 和命令来操作该服务。我喜欢好的横幅 ASCII 艺术,就像下一个开发者一样,但也有(天哪!)过犹不及。默认情况下,Spring Shell 和 Spring Boot 都会尝试发出 ASCII 横幅,所以我们将让 Spring Boot 这次不发出横幅!在您选择的 IDE 中打开项目,将 @EnableDataFlowShell 添加到 DfShellApplication 类中,然后配置 SpringApplication 的创建方式以隐藏 Spring Boot 横幅。

package com.example;

import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;

@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(DfShellApplication.class)
				.bannerMode(Banner.Mode.OFF)
				.run(args);
	}
}

df-shell 项目的根目录下运行 mvn spring-boot:run。默认情况下,您应该能够与本地运行的数据流服务器进行交互。尝试发出 module list 命令。您应该会看到一个表格,列出 Spring Cloud Data Flow 已知的 auto-built 组件。

日志接收器模块

转到 Spring Initializr,选择 Stream Redis,并将工件命名为 logging-sink。我们将使用 Spring Cloud Stream,它构建在 Spring 的 MessageChannel 抽象和 Spring Integration 的组件模型之上,以简洁地描述和集成基于消息的微服务,来构建一个记录传入消息的自定义模块。然后,我们将使用 Spring Cloud Data Flow 来部署和协调这个模块。

Spring Cloud Data Flow 是一种强大的方式,可以用小型 Spring Boot 驱动的模块来描述复杂的集成、批处理和流处理工作负载。有几种类型的 modulesource 产生数据,通常是按固定时间间隔产生的,下游组件可以消费和处理这些数据。processor 接收数据,对其进行处理,然后写出数据。sink 仅接收数据,但不产生任何要发送出去的内容。这些组件可以很好地组合在一起,以描述任何可能不间断的工作负载(物联网传感器数据、24/7 事件处理、在线事务数据摄取和集成场景等)。最终,source 通常是 Spring Integration 的入站适配器。Processor 通常是任何接受数据并输出数据的 Spring Integration 组件(例如转换器)。Sink 通常是 Spring Integration 的出站适配器。

task 描述了任何最终会停止的工作负载。它可能是一个简单的 Spring Boot Command Line Runner 或一个 Spring Batch Job

Spring Cloud Data Flow 本身并不特别了解 Spring Integration。它只了解 Spring Cloud Stream 和众所周知的 Spring MessageChannels,如 inputoutput。它不关心这些通道的终结点是什么。Spring Cloud Data Flow 也不特别了解 Spring Batch。它只了解 Spring Cloud Task。

就像 UNIX sh shell 环境通过将数据传递给 stdinstdout,从而允许我们通过简单聚焦的命令行工具组合任意数量和任意复杂度的解决方案一样,Spring Cloud Data Flow 也允许我们通过简单聚焦的消息组件来组合任意数量和任意复杂度的解决方案。

Spring Cloud Data Flow 已经包含了很多“开箱即用”的功能。我们将开发并安装一个简单的模块来记录信息——在这种情况下,是时间。值得注意的是,我们这样做是为了我们自己的教育目的,但我们不必这样做;Spring Cloud Data Flow 已经提供了一个 log 模块!(还有一个 time 模块!)

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.Map;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

	@MessageEndpoint
	public static class LoggingMessageEndpoint {

		@ServiceActivator(inputChannel = Sink.INPUT)
		public void logIncomingMessages(
				@Payload String msg,
				@Headers Map<String, Object> headers) {

			System.out.println(msg);
			headers.entrySet().forEach(e ->
					System.out.println(e.getKey() + '=' + e.getValue()));

		}
	}

	public static void main(String[] args) {
		SpringApplication.run(LoggingSinkApplication.class, args);
	}
}

这是一个简单的 Spring Cloud Stream 绑定。Sink.class 是一个接口,它定义了一个 MessageChannel input()。Spring Cloud Stream 将其转换为一个活动的、命名的消息代理(在本例中是 Redis,尽管 Spring Cloud Data Flow 的默认配置在未来几个月内可能会更改为 RabbitMQ),我们的任何消息代码都可以使用它。该示例使用 Spring Integration 在接收消息时将传入消息数据打印出来。让我们首先向 Data Flow 注册我们的自定义模块,然后组合一个流,该流从 time 组件接收包含时间的传入消息,然后记录结果。

首先,在 logging-sink 项目的根目录下执行 mvn clean install,以便在本地 Maven 存储库中解析它。Spring Cloud Data Flow 使用一个可插拔的策略来解析自定义模块的实例。在我们的示例中,它将尝试在我们的系统本地 Maven 存储库中解析它们。

回到 Data Flow Shell,输入以下内容:

dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'

dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║    source    │   processor    │       sink        │  task   ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file          │bridge          │aggregate-counter  │timestamp║
║ftp           │filter          │cassandra          │         ║
║http          │groovy-filter   │counter            │         ║
║jdbc          │groovy-transform│custom-log         │         ║
║jms           │httpclient      │field-value-counter│         ║
║load-generator│pmml            │file               │         ║
║rabbit        │splitter        │ftp                │         ║
║sftp          │transform       │gemfire            │         ║
║tcp           │                │gpfdist            │         ║
║time          │                │hdfs               │         ║
║trigger       │                │jdbc               │         ║
║twitterstream │                │log                │         ║
║              │                │rabbit             │         ║
║              │                │redis              │         ║
║              │                │router             │         ║
║              │                │tcp                │         ║
║              │                │throughput         │         ║
║              │                │websocket          │         ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝

dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'

dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝

dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'

您将在 Data Flow 服务日志中看到模块已被启动并缝合在一起。在我的特定日志中,我观察到

2016-04-05 09:09:18.067  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.custom-log instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time

尾随日志以确认您内心深处已经知道的事情:我们的自定义 logging-sink 正在工作!

tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*

下一步

走向云端!我们正在使用本地数据流服务器。对于像 Cloud Foundry 这样的处理环境,还有其他实现可供选择。Cloud Foundry 数据流服务器会启动应用程序实例,而不是本地 Java 进程。现在,构建一个可扩展的数据摄取和处理流就像 cf push ..cf scale -i $MOAR 一样简单!

我们只使用了 Spring Cloud Data Flow 的一些功能!使用 Spring Cloud Data Flow 来协调任何数量的基于消息的微服务,这些微服务由 Spring Cloud Stream 提供支持。我建议您查看一些 内置的 Spring Cloud Stream 模块 以获取灵感。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有