领先一步
VMware 提供培训和认证,助您加速进步。
了解更多今天,我们很高兴地宣布 Spring XD (下载) 的 1.0 M2 版本。Spring XD 是一个统一的、分布式的、可扩展的系统,用于数据摄取、实时分析、批处理和数据导出。该项目的目标是简化大数据应用程序的开发。
Spring XD 的第二个里程碑版本引入了多项新特性,使您能够更轻松地摄取和处理实时数据流以及编排基于 Hadoop 的批处理作业。在这篇博文中,我们将涵盖:
以单节点模式($XD_HOME/bin/xd-singlenode)启动 Spring XD,然后在另一个窗口中启动 shell。下面的示例演示了如何创建一个简单的流,该流将通过 HTTP 发布的数据写入文件。请注意,shell 提供了命令的 Tab 自动补全提示。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target https://:9000 --data "helloworld"
您可以列出所有流和 Tap 来验证它们是否已创建。
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果检查位于目录 /tmp/xd/output/httpStream.out 中的文件,您将看到“hello world”消息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在单节点模式下,计数器是内存中的,但它也支持 Redis,当不使用单节点模式时,Redis 是默认选项。您可以使用 --analytics redis 命令行参数启用 Redis 支持。
要创建一个将数据存储在 Hadoop 中并实时统计推文中标签频率的 Twitter 流,请运行以下命令。请注意,要获取consumerKey还是consumerSecret您需要注册一个 Twitter 应用程序。如果您还没有设置好,可以在 Twitter Developers 站点上创建一个应用程序来获取这些凭据。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
在谈到计数器时,引入了一种新的 聚合计数器 类型,该类型将消息中某个字段的计数聚合到每年、每月、每天、每小时和每分钟的时间桶中。
仅用几行 shell 命令,您就完成了这么多工作!有关所有 shell 命令的详细信息,请参阅 用户指南。
到目前为止显示的流处理管道是线性的,但通常需要支持更复杂的流。为了开始解决这种情况,M2 中引入了命名通道。您可以选择命名通道而不是源或宿模块。为了保持 Unix 的主题,向特定通道的输入/输出数据使用 `>` 字符,通道名称前缀为 `:`。
这是一个示例,展示了如何使用命名通道来共享由不同输入源驱动的数据管道。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target https://:9000 --data "hello"
查看输出文件
xd:>! cat /tmp/xd/output/demo.out
您将看到单词“hello”与时间戳值交错出现。将消息广播到多个流以及支持根据消息内容将消息路由到不同流的功能计划在未来的里程碑版本中实现。
值得注意的是,我们还增加了对 4 个 Hadoop 版本的支持。
您可以在启动 XDContainer 时通过传递命令行选项 --hadoopDistribution 来选择要使用的特定发行版 jar。您也可以使用其他 Hadoop 发行版,例如 Hadoop 1.2.x。我们将在以后的版本中为其他发行版添加明确的选项。值得注意的是,在 samples repository 中有一个示例,演示了如何使用 Spring XD 和 Pivotal HD 的 HAWQ 功能。
M1 版本提供了本地和 Redis 队列后备传输,用于模块之间的通信,由 DSL 中的管道符号表示。M2 版本支持基于 Rabbit 的传输,让您可以利用功能齐全的消息代理进行流摄入。
可以使用 Spring XD 执行批处理作业,并设置触发器来启动这些作业。例如,我们可以重用 Hadoop 中经典的 wordcount 示例来提供一个简单的包含两个步骤的工作流编排。第一步是将文件复制到 HDFS,第二步是运行 wordcount MapReduce 作业。
要运行该示例,请克隆 spring-xd-samples repository 并构建 sample batch-wordcount。然后,像下面一样复制 jar、配置文件和数据文件。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
现在停止并以单节点模式($XD_HOME/bin/xd-singlenode)重新启动 Spring XD。然后在 shell 中执行以下命令。
xd:> job create --name wordCountJob --definition "wordcount"
或者,您也可以指定一个 cron 表达式来 调度作业 的执行。您可以通过查看 map reduce 作业的输出来验证结果。
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /count/out/part-r-00000
工作流中具有其他步骤,例如执行 Hive 或 Pig 脚本,也得到支持。要编写这些类型的工作流,请查阅 Spring for Apache Hadoop 参考指南。也支持非 Hadoop 基于的步骤。
下一个版本的一个主要主题是,通过整合 Spring Batch Admin 项目的组件,公开更多的批处理作业管理功能。您将能够通过向命名通道发送消息来触发批处理作业,以及通过命名通道接收作业状态通知。这将使您能够轻松地根据数据可用性设置批处理作业的触发,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
当文件出现在目录 /data/inbound 中时,通过向命名通道 :wordCountJob. 发送消息来启动 wordcount 批处理作业。当批处理作业执行时,将有一系列数据可供您使用有关 JobExecution、StepExecution 等的信息的消息。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
在流和作业之间使用通道交换数据是 Spring XD 正在采取步骤统一流和批处理这两个领域的一个方面。敬请期待!