引入 Spring for Apache Hadoop

工程 | Costin Leau | 2012 年 2 月 29 日 | ...

我很高兴宣布 Spring for Apache Hadoop 项目的第一个里程碑版本(1.0.0.M1)现已发布,并谈谈我们在过去几个月里所做的一些工作。作为 Spring Data 伞形项目的一部分,Spring for Apache Hadoop 通过利用 Spring 生态系统的能力,为基于 Apache Hadoop 技术开发应用提供了支持。无论是编写独立的、原生的 MapReduce 应用,还是与企业中多个数据存储的数据交互,抑或是协调复杂的 HDFS、Pig 或 Hive 作业工作流,以及介于两者之间的任何事情,Spring for Apache Hadoop 都秉承 Spring 的理念,提供了简化的编程模型,并解决了基础设施造成的"偶然复杂性"。Spring for Apache Hadoop 为开发者处理大数据量提供了一个强大的工具。

MapReduce 作业

Apache Hadoop 的Hello world 示例是单词计数示例——一个展示 Apache Hadoop 基本功能的简单用例。使用 Spring for Apache Hadoop 时,单词计数示例如下所示
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />

<!-- define the job -->
<hdp:job id="word-count" 
  input-path="/input/" output-path="/ouput/"
  mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
  reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>

<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" 
                  p:jobs-ref="word-count"/>

请注意,作业配置的创建和提交是如何由 IoC 容器处理的。无论是需要调整 Apache Hadoop 配置,还是 Reducer 需要额外的参数,所有配置选项都可供您配置。这使您可以从小处着手,并让配置与应用一起成长。配置可以像开发者希望/需要的那样简单或高级,充分利用 Spring 容器的功能,例如属性占位符环境支持

<hdp:configuration resources="classpath:/my-cluster-site.xml">
    fs.default.name=${hd.fs}
    hadoop.tmp.dir=file://${java.io.tmpdir}
    electric=sea
</hdp:configuration>

<context:property-placeholder location="classpath:hadoop.properties" />

<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
  <hdp:classpath value="/cp/some-library.jar#library.jar" />
  <hdp:cache value="/cache/some-archive.tgz#main-archive" />
  <hdp:cache value="/cache/some-resource.res" />
</hdp:cache>

(单词计数示例是 Spring for Apache Hadoop 分发版的一部分——欢迎下载并进行实验)。

Spring for Apache Hadoop 不需要您用 Java 重写 MapReduce 作业,您可以无缝使用非 Java 流式作业:它们只是框架以一致、连贯的方式创建、配置、连接和管理的普通对象(或者 Spring 称之为 Bean),就像其他任何对象一样。开发者可以根据自己的偏好和需求混合搭配,而无需担心集成问题。

<hdp:streaming id="streaming-env" 
  input-path="/input/" output-path="/ouput/"
  mapper="${path.cat}" reducer="${path.wc}">
  <hdp:cmd-env>
    EXAMPLE_DIR=/home/example/dictionaries/
  </hdp:cmd-env>
</hdp:streaming>

还支持现有的 Apache Hadoop Tool 实现;实际上,与其通过命令行指定自定义 Apache Hadoop 属性,不如直接注入它们

<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

上述配置执行 Twitter 的 Scalding(基于 Cascading 的 Scala DSL,见下文)的 Tutorial1。注意,无论是 Spring for Apache Hadoop 还是 Scalding,都没有专用的支持代码——仅使用了标准的 Apache Hadoop API。

使用 HBase/Hive/Pig

说到 DSL,在使用 Apache Hadoop 时使用更高层次的抽象非常常见——流行的选择包括 HBaseHivePig。Spring for Apache Hadoop 为所有这些提供了集成,允许在 Spring 应用中轻松配置和使用这些数据源
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
    foo=bar
</hdp:hbase-configuration>

<!-- create a Pig instance using custom properties
    and execute a script (using given arguments) at startup -->
     
<hdp:pig properties-location="pig-dev.properties" />
   <script location="org/company/pig/script.pig">
     <arguments>electric=tears</arguments>
   </script>
</hdp:pig>

通过 Spring for Apache Hadoop,您不仅可以获得一个强大的 IoC 容器,还可以访问 Spring 的可移植服务抽象。以流行的 JdbcTemplate 为例,您可以在 Hive 的 Jdbc 客户端之上使用它

<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
    class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
    c:driver-ref="hive-driver" c:url="${hive.url}"/>

<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
    c:data-source-ref="hive-ds"/>

Cascading

Spring 还支持基于 Java 的类型安全的配置模型。您可以将其作为声明式 XML 配置的替代或补充——例如与 Cascading 一起使用
@Configuration
public class CascadingConfig {
    @Value("${cascade.sec}") private String sec;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), 
                 "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
    }
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>

<bean id="cascade"
    class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" 
    p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />

上面的示例混合使用了编程配置和声明式配置:前者用于创建独立的 Cascading 管道,后者用于将它们连接成一个流。

使用 Spring 的可移植服务抽象

或者使用 Spring 出色的任务/调度支持在特定时间提交作业
<task:scheduler id="myScheduler" pool-size="10"/>

<task:scheduled-tasks scheduler="myScheduler">
 <!-- run once a day, at midnight -->
 <task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>

上述配置使用了一个简单的 JDK Executor 实例——非常适合概念验证 (POC) 开发。在生产环境中,您可以轻松地(只需一行代码)将其替换为更全面的解决方案,例如专用的调度器WorkManager 实现——这是 Spring 强大服务抽象的另一个例子。

HDFS/脚本

HDFS 交互时的一个常见任务是准备文件系统,例如清理输出目录以避免覆盖数据,或者将所有输入文件移动到相同的命名方案或文件夹下。Spring for Apache Hadoop 通过完全支持 Apache Hadoop 的 fs 命令来解决这个问题,例如 FS ShellDistCp,并将其公开为正式的 Java API。将其与 JVM 脚本(无论是 Groovy、JRuby 还是 Rhino/JavaScript)结合使用,可以形成强大的组合

<hdp:script language="groovy">
  inputPath = "/user/gutenberg/input/word/"
  outputPath = "/user/gutenberg/output/word/"

  if (fsh.test(inputPath)) {
    fsh.rmr(inputPath)
  }

  if (fsh.test(outputPath)) {
    fsh.rmr(outputPath)
  }

  fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>

总结

这篇文章只是触及了 Spring for Apache Hadoop 中可用的一些功能的表面;我还没有提及 Spring Batch 集成,它为各种 Apache Hadoop 交互提供了 Tasklet,也没有提及使用 Spring Integration 进行事件触发——更多内容将在未来的一篇文章中介绍。请告诉我们您的想法和需求,并给我们反馈:下载代码、fork 源代码、报告问题、在论坛上发帖或给我们发推特


获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅,即可获得对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部