抢占先机
VMware 提供培训和认证,助您加速进步。
了解更多我很高兴宣布 Spring for Apache Hadoop 项目的第一个里程碑版本(1.0.0.M1)现已发布,并谈谈我们在过去几个月里所做的一些工作。作为 Spring Data 伞形项目的一部分,Spring for Apache Hadoop 通过利用 Spring 生态系统的能力,为基于 Apache Hadoop 技术开发应用提供了支持。无论是编写独立的、原生的 MapReduce 应用,还是与企业中多个数据存储的数据交互,抑或是协调复杂的 HDFS、Pig 或 Hive 作业工作流,以及介于两者之间的任何事情,Spring for Apache Hadoop 都秉承 Spring 的理念,提供了简化的编程模型,并解决了基础设施造成的"偶然复杂性"。Spring for Apache Hadoop 为开发者处理大数据量提供了一个强大的工具。
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />
<!-- define the job -->
<hdp:job id="word-count"
input-path="/input/" output-path="/ouput/"
mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>
<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner"
p:jobs-ref="word-count"/>
请注意,作业配置的创建和提交是如何由 IoC 容器处理的。无论是需要调整 Apache Hadoop 配置,还是 Reducer 需要额外的参数,所有配置选项都可供您配置。这使您可以从小处着手,并让配置与应用一起成长。配置可以像开发者希望/需要的那样简单或高级,充分利用 Spring 容器的功能,例如属性占位符和环境支持。
<hdp:configuration resources="classpath:/my-cluster-site.xml">
fs.default.name=${hd.fs}
hadoop.tmp.dir=file://${java.io.tmpdir}
electric=sea
</hdp:configuration>
<context:property-placeholder location="classpath:hadoop.properties" />
<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
<hdp:classpath value="/cp/some-library.jar#library.jar" />
<hdp:cache value="/cache/some-archive.tgz#main-archive" />
<hdp:cache value="/cache/some-resource.res" />
</hdp:cache>
(单词计数示例是 Spring for Apache Hadoop 分发版的一部分——欢迎下载并进行实验)。
Spring for Apache Hadoop 不需要您用 Java 重写 MapReduce 作业,您可以无缝使用非 Java 流式作业:它们只是框架以一致、连贯的方式创建、配置、连接和管理的普通对象(或者 Spring 称之为 Bean),就像其他任何对象一样。开发者可以根据自己的偏好和需求混合搭配,而无需担心集成问题。
<hdp:streaming id="streaming-env"
input-path="/input/" output-path="/ouput/"
mapper="${path.cat}" reducer="${path.wc}">
<hdp:cmd-env>
EXAMPLE_DIR=/home/example/dictionaries/
</hdp:cmd-env>
</hdp:streaming>
还支持现有的 Apache Hadoop Tool 实现;实际上,与其通过命令行指定自定义 Apache Hadoop 属性,不如直接注入它们
<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
<hdp:arg value="tutorial/Tutorial1"/>
<hdp:arg value="--local"/>
</hdp:tool-runner>
上述配置执行 Twitter 的 Scalding(基于 Cascading 的 Scala DSL,见下文)的 Tutorial1。注意,无论是 Spring for Apache Hadoop 还是 Scalding,都没有专用的支持代码——仅使用了标准的 Apache Hadoop API。
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
foo=bar
</hdp:hbase-configuration>
<!-- create a Pig instance using custom properties
and execute a script (using given arguments) at startup -->
<hdp:pig properties-location="pig-dev.properties" />
<script location="org/company/pig/script.pig">
<arguments>electric=tears</arguments>
</script>
</hdp:pig>
通过 Spring for Apache Hadoop,您不仅可以获得一个强大的 IoC 容器,还可以访问 Spring 的可移植服务抽象。以流行的 JdbcTemplate 为例,您可以在 Hive 的 Jdbc 客户端之上使用它
<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
c:driver-ref="hive-driver" c:url="${hive.url}"/>
<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
c:data-source-ref="hive-ds"/>
@Configuration
public class CascadingConfig {
@Value("${cascade.sec}") private String sec;
@Bean public Pipe tsPipe() {
DateParser dateParser = new DateParser(new Fields("ts"),
"dd/MMM/yyyy:HH:mm:ss Z");
return new Each("arrival rate", new Fields("time"), dateParser);
}
@Bean public Pipe tsCountPipe() {
Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
}
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>
<bean id="cascade"
class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean"
p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />
上面的示例混合使用了编程配置和声明式配置:前者用于创建独立的 Cascading 管道,后者用于将它们连接成一个流。
<task:scheduler id="myScheduler" pool-size="10"/>
<task:scheduled-tasks scheduler="myScheduler">
<!-- run once a day, at midnight -->
<task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>
上述配置使用了一个简单的 JDK Executor 实例——非常适合概念验证 (POC) 开发。在生产环境中,您可以轻松地(只需一行代码)将其替换为更全面的解决方案,例如专用的调度器或 WorkManager 实现——这是 Spring 强大服务抽象的另一个例子。
与 HDFS 交互时的一个常见任务是准备文件系统,例如清理输出目录以避免覆盖数据,或者将所有输入文件移动到相同的命名方案或文件夹下。Spring for Apache Hadoop 通过完全支持 Apache Hadoop 的 fs 命令来解决这个问题,例如 FS Shell 和 DistCp,并将其公开为正式的 Java API。将其与 JVM 脚本(无论是 Groovy、JRuby 还是 Rhino/JavaScript)结合使用,可以形成强大的组合
<hdp:script language="groovy">
inputPath = "/user/gutenberg/input/word/"
outputPath = "/user/gutenberg/output/word/"
if (fsh.test(inputPath)) {
fsh.rmr(inputPath)
}
if (fsh.test(outputPath)) {
fsh.rmr(outputPath)
}
fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>
这篇文章只是触及了 Spring for Apache Hadoop 中可用的一些功能的表面;我还没有提及 Spring Batch 集成,它为各种 Apache Hadoop 交互提供了 Tasklet,也没有提及使用 Spring Integration 进行事件触发——更多内容将在未来的一篇文章中介绍。请告诉我们您的想法和需求,并给我们反馈:下载代码、fork 源代码、报告问题、在论坛上发帖或给我们发推特。