抢先一步
VMware 提供培训和认证,助您快速提升技能。
了解更多今天,我们很高兴地宣布 Spring XD 的 1.0 M1 版本发布 (下载)。Spring XD 是一个统一的、分布式的、可扩展的系统,用于数据摄取、实时分析、批处理和数据导出。该项目的目的是简化大数据应用程序的开发。
从一万英尺的高度来看,大数据应用程序与企业集成和批处理应用程序具有许多共同特征。Spring 通过 Spring Integration 和 Spring Batch 项目已经为构建集成和批处理应用程序提供了超过 6 年的成熟解决方案。Spring XD 在此基础上构建,并提供了一个轻量级的运行时环境,可以通过简单的 DSL轻松配置和组装。
在本博客中,我们将介绍 Spring XD 的关键组件,即流、作业、分支、分析和用于声明它们的 DSL,以及运行时架构。更多详细信息可以在 XD 指南中找到。http | file
您可以通过向默认在端口 8080 上运行的 XD 管理服务器发出 HTTP 请求来告诉 Spring XD 创建一个流。在 M2 版本中,我们将提供一个交互式 shell 来与 XD 通信,但是对于 M1,与 XD 交互最简单的方法是使用“curl”。
curl -d "http | file" https://127.0.0.1:8080/streams/httptest
流的名称为httptest
,默认侦听的 HTTP 端口为9000
,默认文件位置为/tmp/xd/output/${streamname}
。
curl -d "hello world" https://127.0.0.1:9000
您将在文件/tmp/xd/output/httptest
中看到字符串 hello world。
要更改默认值,您可以传入选项参数
http --port=9090 | file --dir=/var/streams --name=data.txt
M1 中支持的源有文件、时间、HTTP、Tail、Twitter 搜索、Gemfire(连续查询)、Gemfire(缓存事件)、Syslog 和 TCP。支持的汇有日志、文件、HDFS、Gemfire 分布式数据网格和 TCP。要将 syslog 数据捕获到 HDFS,DSL 非常简单:
syslog | hdfs --namenode="http://192.168.1.100:9000"
您还可以添加您自己的自定义源和汇。可以按照简单的说明添加 Spring Integration 中现有的入站和出站通道适配器。未来的版本将添加对 MQTT、RabbitMQ、JMS 和 Kafka 的支持。我们非常欢迎您提交 pull request来贡献您喜欢的源和汇模块。
流的编程模型基于 Spring Integration。输入源将外部数据转换为消息,该消息由包含键值对的标头和可以是任何 Java 类型的有效负载组成。消息通过消息通道在流中流动。对于具有输入源、处理步骤和输出汇的流,如下图所示。
在 DSL 中,管道符号对应于将数据从每个处理步骤传递到下一个的通道。Spring XD 中的通道可以是内存中的,也可以由 Redis、JMS、RabbitMQ 等中间件支持。这允许一个简单的分布式处理模型,这将在稍后讨论。
表示包含处理步骤的流的 DSL 表达式形式为
source | filter | transform | sink
M1 中支持的处理器有过滤器、转换器、json-field-extractor、json-field-value-filter 和脚本。过滤器和转换器处理器支持使用 Spring 表达式语言 (SpEL) 以及 Groovy。要在前面的示例中使用 SpEL 将 HTTP 请求的有效负载转换为大写,
http | transform --expression=payload.toUpperCase() | file
脚本处理器还允许您执行自定义 Groovy 代码。
分支允许您“监听”另一个流中的数据并在单独的流中处理数据。原始流不受分支的影响,也不了解它的存在,类似于电话窃听。分支是 EAI 模式标准目录的一部分,并且是 Spring XD 使用的 Spring Integration 框架的一部分。
分支可以从目标流处理管道的任何点消费数据。例如,如果您有一个名为 mystream 的流,定义如下:source | filter | transform | sink
您可以使用 DSL 创建一个分支:
tap mystream.filter | sink2
这将拦截过滤器应用后但转换器之前的流数据。因此,未转换的数据将被发送到 sink2。
例如,如果您使用以下命令创建一个名为httpstream
的流:
curl -d "http --port=9898 | filter --expression='payload.length() > 5'
| transform --expression=payload.toUpperCase()
| file" https://127.0.0.1:8080/streams/httpstream
然后,要创建一个名为httptap
的分支,该分支将过滤后的数据流写入单独的文件,请使用以下命令:
curl -d "tap httpstream.filter | file --dir=/tmp --name=filtered.txt" https://127.0.0.1:8080/streams/httptap
发布如下数据:
curl -d "hello world" https://127.0.0.1:9898
curl -d "he" https://127.0.0.1:9898
curl -d "hello world 2" https://127.0.0.1:9898
将导致 HELLO WORLD 和 HELLO WORLD 2 出现在文件/tmp/xd/output/httpstream
中,以及/tmp/filtered.txt
中的小写等效项。“he”文本将不会出现在任何文件中。
一个主要用例是在通过其主流摄取数据的同时执行实时分析。例如,考虑一个消耗 Twitter 搜索结果并将它们写入 HDFS 的数据流。可以在将数据写入 HDFS 之前创建一个分支,并将数据从分支传输到计数器,该计数器对应于在推文中提及特定主题标签的次数。
问问 10 个开发者什么是“实时分析”,你会得到 20 个答案。答案范围从非常简单的(但非常有用的)计数器,到移动平均值,到聚合计数器,到直方图,到时间序列,到机器学习算法,再到嵌入式 CEP 引擎。Spring XD 旨在支持这些指标和分析数据结构的广泛范围,作为一个通用的类库,它可以与几种后端存储技术一起使用。它们也作为一种汇公开给 XD,用于 DSL 表达式。
在 M1 版本中,支持计数器、字段值计数器、量规和丰富量规。这些指标可以存储在内存中或 Redis 中。有关更多详细信息以及未来版本中将实现的内容列表,请参阅JavaDoc和用户指南的分析部分。
例如,考虑收集推文流中主题标签频率的实时计数的情况。要使用 SpringXD 执行此操作,请创建一个使用 twitter 搜索源模块的新流定义,并将其命名为“spring”
curl -d "twittersearch --query='spring' --consumerKey=<consumer-key> --consumerSecret=<consumer-secret>
| file" https://127.0.0.1:8080/streams/spring
这将推文存储在本地文件系统中。请注意,要获取consumerKey和consumerSecret您需要注册一个 Twitter 应用程序。如果您还没有设置一个,您可以在Twitter 开发者网站上创建一个应用程序以获取这些凭据。
接下来,在 twittersearch 源的输出上创建一个名为“springtap”的分支,以计算推文中主题标签的频率。
curl -d "tap spring.twittersearch | field-value-counter
--fieldName=entities.hashTags.text
--counterName=hashTagFrequency" https://127.0.0.1:8080/streams/springtap
字段entities.hashTags.text
是底层实现中使用的Spring Social Tweet对象的 JSON 表示中主题标签的路径。要查看前 5 个主题标签,请使用 redis-cli 查看名为fieldvaluecounters.hashTagFrequency
的有序集合的内容。请注意,通常需要几分钟才能收集到足够包含主题标签实体的推文。
> redis-cli
redis 127.0.0.1:6379>ZREVRANGEBYSCORE fieldvaluecounters.hashTagFrequency +inf -inf WITHSCORES LIMIT 0 5
1] "spring"
2] "6"
3] "Turkey"
4] "6"
5] "Arab"
6] "6"
7] "summer"
8] "3"
9] "fashion"
10] "3"
Spring XD 有两种操作模式:单节点模式和分布式模式。第一种模式是一个处理所有处理和管理的单一进程。此模式可帮助您轻松入门,并简化应用程序的开发和测试。分布式模式允许将处理任务分布在机器集群中,并且管理服务器发送命令来控制在集群上执行的处理任务。
M1 版本中的分布式架构很简单。流的每个部分(称为模块)都可以在其自己的容器实例中执行。数据使用 Redis 队列在模块之间传递。有关更多详细信息,请参阅架构部分。此版本的重点是正确地抽象出概念,例如使 DSL 中的管道符号可以在各种传输中使用。其他传输和性能改进将在未来的版本中推出,以及在 Hadoop 集群内执行。本文未涵盖的其他主题包括Tuple 数据结构的介绍以及如何创建自定义处理器。下一个版本的重点将是支持 XDContainer 运行 Spring Batch 作业。这些作业可用于帮助将数据从 HDFS 导出到关系数据库,以及在集群上协调执行 Hadoop 作业(MapReduce、Pig、Hive 或 Cascading 作业)。我们还将提供用于度量的附加库,例如聚合计数器、基于 HTTP/JMX 的管理,以及一些基于Reactor项目的高性能数据源,敬请期待!
在我们努力完成最终的 Spring XD 1.0.0 版本的过程中,我们非常乐意听到您的反馈。如有任何问题,请使用Stackoverflow(标签:springxd),如有任何错误或改进建议,请使用Jira 问题跟踪器或提交GitHub issue。