Apache Hadoop 版 Spring 简介

工程 | Costin Leau | 2012年2月29日 | ...

我很高兴地宣布,Apache Hadoop 版 Spring 项目的第一个里程碑版本 (1.0.0.M1) 已发布,并讨论一下我们在过去几个月里所做的一些工作。作为Spring Data大家族的一部分,Apache Hadoop 版 Spring 通过利用 Spring 生态系统的功能,为基于Apache Hadoop技术的应用程序开发提供支持。无论编写的是独立的、普通的 MapReduce 应用程序,还是与企业中多个数据存储中的数据进行交互,或者协调 HDFS、Pig 或 Hive 作业的复杂工作流,或者介于两者之间的任何内容,Apache Hadoop 版 Spring 都忠于 Spring 的理念,提供简化的编程模型并解决由基础设施引起的“意外复杂性”。Apache Hadoop 版 Spring 为开发人员处理海量数据提供了强大的工具。

MapReduce 作业

Apache Hadoop 的“Hello world”程序是单词计数示例——一个简单的用例,它展示了基本的 Apache Hadoop 功能。使用 Apache Hadoop 版 Spring 时,单词计数示例如下所示
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />

<!-- define the job -->
<hdp:job id="word-count" 
  input-path="/input/" output-path="/ouput/"
  mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
  reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>

<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" 
                  p:jobs-ref="word-count"/>

请注意,作业配置的创建和提交是由 IoC 容器处理的。无论是否需要调整 Apache Hadoop 配置或为 Reducer 提供额外的参数,所有配置选项都可供您配置。这允许您从小处着手,并随着应用程序的增长而增长配置。配置可以像开发人员想要/需要的那么简单或高级,利用 Spring 容器的功能,例如属性占位符环境支持

<hdp:configuration resources="classpath:/my-cluster-site.xml">
    fs.default.name=${hd.fs}
    hadoop.tmp.dir=file://${java.io.tmpdir}
    electric=sea
</hdp:configuration>

<context:property-placeholder location="classpath:hadoop.properties" />

<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
  <hdp:classpath value="/cp/some-library.jar#library.jar" />
  <hdp:cache value="/cache/some-archive.tgz#main-archive" />
  <hdp:cache value="/cache/some-resource.res" />
</hdp:cache>

(单词计数示例是 Apache Hadoop 版 Spring 分发版的一部分——请随时下载并尝试)。

Apache Hadoop 版 Spring 并不需要您用 Java 重写 MapReduce 作业,您可以无缝地使用非 Java 流式作业:它们只是由框架以一致连贯的方式创建、配置、连接和管理的对象(或 Spring 称之为 Bean)。开发人员可以根据自己的喜好和需求进行混合匹配,而无需担心集成问题。

<hdp:streaming id="streaming-env" 
  input-path="/input/" output-path="/ouput/"
  mapper="${path.cat}" reducer="${path.wc}">
  <hdp:cmd-env>
    EXAMPLE_DIR=/home/example/dictionaries/
  </hdp:cmd-env>
</hdp:streaming>

现有的 Apache Hadoop Tool 实现也受支持;实际上,无需通过命令行指定自定义 Apache Hadoop 属性,只需将其注入即可

<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

上面的配置执行 Twitter 的 Scalding(Cascading 之上的 Scala DSL,见下文)库的 Tutorial1。请注意,Apache Hadoop 版 Spring 或 Scalding 中没有专用支持代码——只使用了标准的 Apache Hadoop API。

使用 HBase/Hive/Pig

说到 DSL,在与 Apache Hadoop 交互时使用更高级别的抽象非常常见——流行的选择包括HBaseHivePig。Apache Hadoop 版 Spring 为所有这些提供了集成,允许在 Spring 应用中轻松配置和使用这些数据源
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
    foo=bar
</hdp:hbase-configuration>

<!-- create a Pig instance using custom properties
    and execute a script (using given arguments) at startup -->
     
<hdp:pig properties-location="pig-dev.properties" />
   <script location="org/company/pig/script.pig">
     <arguments>electric=tears</arguments>
   </script>
</hdp:pig>

通过 Apache Hadoop 版 Spring,您不仅可以获得强大的 IoC 容器,还可以访问 Spring 的可移植服务抽象。以流行的JdbcTemplate为例,您可以在 Hive 的Jdbc 客户端之上使用它

<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
    class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
    c:driver-ref="hive-driver" c:url="${hive.url}"/>

<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
    c:data-source-ref="hive-ds"/>

Cascading

Spring 还支持基于 Java 的、类型安全的配置模型。您可以将其用作声明性 XML 配置的替代方案或补充——例如使用Cascading
@Configuration
public class CascadingConfig {
    @Value("${cascade.sec}") private String sec;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), 
                 "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
    }
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>

<bean id="cascade"
    class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" 
    p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />

上面的示例混合使用了程序化和声明性配置:前者用于创建单个 Cascading 管道,后者用于将它们连接成流。

使用 Spring 的可移植服务抽象

或者使用 Spring 出色的任务/调度支持在特定时间提交作业
<task:scheduler id="myScheduler" pool-size="10"/>

<task:scheduled-tasks scheduler="myScheduler">
 <!-- run once a day, at midnight -->
 <task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>

上面的配置使用了一个简单的 JDK Executor 实例——非常适合<a href="http://en.wikipedia.org/wiki/Proof_of_Concept'>POC开发。您可以轻松替换它(一行代码)在生产环境中使用更全面的解决方案,例如专用的调度程序WorkManager实现——这是 Spring 强大服务抽象的另一个示例。

HDFS/脚本

HDFS交互时的常见任务是准备文件系统,例如清理输出目录以避免覆盖数据或将所有输入文件移动到相同的名称方案或文件夹下。Apache Hadoop 版 Spring 通过完全采用 Apache Hadoop 的 fs 命令(例如FS ShellDistCp)并公开它们作为合适的 Java API 来解决这个问题。将其与 JVM 脚本(无论是 Groovy、JRuby 还是 Rhino/JavaScript)混合使用,可以形成强大的组合

<hdp:script language="groovy">
  inputPath = "/user/gutenberg/input/word/"
  outputPath = "/user/gutenberg/output/word/"

  if (fsh.test(inputPath)) {
    fsh.rmr(inputPath)
  }

  if (fsh.test(outputPath)) {
    fsh.rmr(outputPath)
  }

  fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>

总结

这篇文章只介绍了 Apache Hadoop 版 Spring 中一些功能的表面;我没有提到提供用于各种Apache Hadoop 交互的 Spring Batch 集成或用于事件触发的 Spring Integration 的使用——更多内容将在以后的文章中介绍。请告诉我们您的想法、您的需求并给我们反馈:下载代码,fork源代码,报告问题,在论坛上发帖或给我们发送推文


获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部