Spring Cloud Data Flow 的 1 个流、2 个应用程序和 3 个依赖项

工程 | Josh Long | 2016 年 4 月 5 日 | ...

我今天想在这里记录下昨天让我感到欣慰的一次体验:让快速改进的 Spring Cloud Data Flow 在几分钟内从(Spring Boot)启动(初学者)到服务运行起来!

唯一的先决条件是您有一个正在运行的 Redis 实例。我的 Redis 实例在 127.0.0.1 上运行,并且不需要任何进一步的配置即可让 Spring Boot 找到并使用它。

我们将使用极好的 Spring Initializr 来快速生成我们的应用程序。还记得那些愚蠢的苹果广告,“为此有一个应用程序?” 不管它了,为此有一个复选框让我们看看您是否像我一样喜欢这种体验!

本地数据流服务器

访问 Spring Initializr 并选择 Local Data Flow Server,并将工件命名为 df-server。这将用于启动一个本地数据流服务 - 一个 REST API 和一些持久性逻辑 - 来协调和存储有关流和任务的信息。在旧的 Spring XD 世界中,这称为 Spring XD Admin Server

在您选择的 IDE 中打开项目,并将 @EnableDataFlowServer 添加到 DfServerApplication 类中。

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;

@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DfServerApplication.class, args);
	}
}

df-server 项目的根目录中运行 mvn spring-boot:run,应用程序将在端口 9393 上启动。

提示:当您看到欢迎的 ASCII 艺术作品时,您就知道(可能)已经成功了!

关于提示的提示:所以,这可能不完全正确。它可能会因为各种原因而失败(例如服务或嵌入式 H2 数据库的端口冲突),但是高质量的 ASCII 艺术作品在我的研究中被证明是有疗效的……(我的……我)。

数据流 Shell

访问 Spring Initializr 并选择 Data Flow Shell,并将工件命名为 df-shell。这将用于启动一个由 Spring Shell 提供支持的数据流 Shell。

数据流 Shell 在任何操作系统上运行。它是我们刚刚启动的数据流服务的客户端。它允许我们使用熟悉的管道和过滤器 DSL 和命令来操作服务。我和其他开发人员一样喜欢好的横幅 ASCII 艺术作品,但确实存在(吞咽!)好事过头的情况。默认情况下,Spring ShellSpring Boot 都会尝试发出 ASCII 横幅,因此我们将让 Spring Boot 这次(这次!)保持沉默。在您选择的 IDE 中打开项目,并将 @EnableDataFlowShell 添加到 DfShellApplication 类中,然后配置 SpringApplication 的创建方式以隐藏 Spring Boot 横幅。

package com.example;

import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;

@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(DfShellApplication.class)
				.bannerMode(Banner.Mode.OFF)
				.run(args);
	}
}

df-shell 项目的根目录中运行 mvn spring-boot:run。默认情况下,您应该能够与本地运行的数据流服务器交互。尝试发出 module list 命令。您应该会看到 Spring Cloud Data Flow 已知的内置组件的表格。

日志接收器模块

访问 Spring Initializr 并选择 Stream Redis,并将工件命名为 logging-sink。我们将使用 Spring Cloud Stream,它构建在 Spring 的 MessageChannel 抽象和 Spring Integration 中的组件模型之上,以简化描述和集成基于消息的微服务的工作,来构建一个自定义模块来记录传入的消息。然后,我们将使用 Spring Cloud Data Flow 部署和编排此模块。

Spring Cloud Data Flow 是一种强大的方法,可以根据小型 Spring Boot 支持的模块来描述复杂的集成、批处理和流处理工作负载。有几种类型的 modulesource 生成数据,通常以固定的时间表,下游组件可以消费和处理这些数据。processor 接收数据,对其进行处理,然后输出数据。sink 只接收数据,但不产生任何要发送出去的数据。这些组件很好地组合在一起,可以描述任何类型的潜在的持续工作负载(物联网传感器数据、全天候事件处理、在线交易数据摄取和集成场景等)。最终,source 通常是 Spring Integration 入站适配器。processor 通常是任何类型的 Spring Integration 组件(如转换器),它接收数据并输出数据。sink 通常是 Spring Integration 出站适配器。

task 描述任何最终会停止的工作负载。它可能是一个简单的 Spring Boot Command Line Runner 或一个 Spring Batch Job

但是,Spring Cloud Data Flow 对 Spring Integration 没有特别的了解。它只了解 Spring Cloud Stream 和众所周知的 Spring MessageChannels(如 inputoutput)。它不关心这些通道的终点是什么。Spring Cloud Data Flow 也没有特别的了解 Spring Batch。它只了解 Spring Cloud Task。

就像 UNIX sh shell 环境允许我们通过分别将数据传递到和来自 stdinstdout,从单个专注的命令行实用程序中组合任意数量和任意复杂的解决方案一样,Spring Cloud Data Flow 也允许我们从单个专注的消息传递组件中组合任意数量和任意复杂的解决方案。

Spring Cloud Data Flow 已经包含了许多内置功能。我们将开发并安装一个简单的模块来记录内容 - 在我们的例子中,是时间。值得注意的是,我们这样做是为了自己的学习,但我们不需要这样做;Spring Cloud Data Flow 已经提供了一个 log 模块!(以及一个 time 模块!)

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.Map;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

	@MessageEndpoint
	public static class LoggingMessageEndpoint {

		@ServiceActivator(inputChannel = Sink.INPUT)
		public void logIncomingMessages(
				@Payload String msg,
				@Headers Map<String, Object> headers) {

			System.out.println(msg);
			headers.entrySet().forEach(e ->
					System.out.println(e.getKey() + '=' + e.getValue()));

		}
	}

	public static void main(String[] args) {
		SpringApplication.run(LoggingSinkApplication.class, args);
	}
}

这是一个简单的 Spring Cloud Stream 绑定。Sink.class 是一个定义 MessageChannel input() 的接口。Spring Cloud Stream 将将其转换为一个实时的、命名的管道到消息代理(在本例中为 Redis,尽管 Spring Cloud Data Flow 的默认值可能会在未来几个月内更改为 RabbitMQ),我们的任何消息传递代码都可以使用它。此示例使用 Spring Integration 在传入消息数据到达时打印出来。让我们首先在 Data Flow 中注册我们的自定义模块,然后组合一个流,该流从 time 组件接收包含时间的信息,然后记录结果。

首先,mvn clean install logging-sink 项目,以便它在本地 Maven 存储库中可解析。Spring Cloud Data Flow 使用可插拔策略来解析自定义模块的实例。在我们的示例中,它将尝试在我们的系统本地 Maven 存储库中解析它们。

返回到数据流 Shell 并输入以下内容

dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'

dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║    source    │   processor    │       sink        │  task   ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file          │bridge          │aggregate-counter  │timestamp║
║ftp           │filter          │cassandra          │         ║
║http          │groovy-filter   │counter            │         ║
║jdbc          │groovy-transform│custom-log         │         ║
║jms           │httpclient      │field-value-counter│         ║
║load-generator│pmml            │file               │         ║
║rabbit        │splitter        │ftp                │         ║
║sftp          │transform       │gemfire            │         ║
║tcp           │                │gpfdist            │         ║
║time          │                │hdfs               │         ║
║trigger       │                │jdbc               │         ║
║twitterstream │                │log                │         ║
║              │                │rabbit             │         ║
║              │                │redis              │         ║
║              │                │router             │         ║
║              │                │tcp                │         ║
║              │                │throughput         │         ║
║              │                │websocket          │         ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝

dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'

dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝

dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'

您将在数据流服务日志中看到模块已启动并连接在一起。在我的特定日志中,我观察到

2016-04-05 09:09:18.067  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.custom-log instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time

跟踪日志以确认您心中已知的内容:我们的自定义 logging-sink 正在工作!

tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*

后续步骤

进入云端!我们正在使用本地数据流服务器。还有其他可用于处理结构(如 Cloud Foundry)的实现。Cloud Foundry 数据流服务器会启动应用程序实例,而不是本地 Java 进程。现在,构建可扩展的数据摄取和处理流就像 cf push ..cf scale -i $MOAR 一样简单!

我们只使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 编排任意数量的基于消息的微服务,这些微服务由 Spring Cloud Stream 提供支持。我建议您查看一些内置的 Spring Cloud Stream 模块以获取灵感。

获取 Spring Newsletter

关注 Spring Newsletter

订阅

领先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部