`
hugh.wangp
  • 浏览: 292997 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于HIVE文件格式的map reduce代码编写

    博客分类:
  • HIVE
阅读更多

by hugh.wangp

 

我们的数据绝大多数都是在HIVE上,对HIVE的SEQUENCEFILE和RCFILE的存储格式都有利用,为了满足HIVE的数据开放,hive client的方式就比较单一,直接访问HIVE生成的HDFS数据也是一种必要途径,所以本文整理测试了如何编写基于TEXTFILE、SEQUENCEFILE、RCFILE的数据的map reduce的代码。以wordcount的逻辑展示3种MR的代码。


其实只要知道MAP的输入格式是什么,就知道如何在MAP中处理数据;只要知道REDUCE(也可能只有MAP)的输出格式,就知道如何把处理结果转成输出格式。

表1:

 
如下代码片段是运行一个MR的最简单的配置:定义job、配置job、运行job

//map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 
JobConf conf = new JobConf(WordCountRC.class);
//设置一个用户定义的job名称
conf.setJobName("WordCountRC");

//为job的输出数据设置Key类
conf.setOutputKeyClass(Text.class);
//为job输出设置value类 
conf.setOutputValueClass(IntWritable.class);

//为job设置Mapper类
conf.setMapperClass(MapClass.class);
//为job设置Combiner类
conf.setCombinerClass(Reduce.class);
//为job设置Reduce类
conf.setReducerClass(Reduce.class);

//为map-reduce任务设置InputFormat实现类
conf.setInputFormat(RCFileInputFormat.class);
//为map-reduce任务设置OutputFormat实现类
conf.setOutputFormat(TextOutputFormat.class);

//为map-reduce job设置路径数组作为输入列表
FileInputFormat.setInputPaths(conf, new Path(args[0]));
//为map-reduce job设置路径数组作为输出列表
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

//运行一个job
JobClient.runJob(conf);



而此刻,我们更多的是关注配置InputFormat和OutputFormat的setInputFormat和setOutputFormat。根据我们不同的输入输出做相应的配置,可以选择表1的任何格式。
当我们确定了输入输出格式,接下来就是来在实现map和reduce函数时首选对输入格式做相应的处理,然后处理具体的业务逻辑,最后把处理后的数据转成既定的输出格式。

 

如下是处理textfile、sequencefile、rcfile输入文件的wordcount代码,大家可以比较一下具体区别,应该就能处理更多其它输入文件或者输出文件格式的数据。
代码1:textfile版wordcount

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;


public class WordCountTxt{
 
  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
   
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
   
       @Override
       public void map(LongWritable key, Text value,
                     OutputCollector<Text, IntWritable> output,
            Reporter reporter) throws IOException {
              String line = value.toString();
              StringTokenizer itr = new StringTokenizer(line);
              while (itr.hasMoreTokens()) {
                     word.set(itr.nextToken());
                     output.collect(word, one);
              }
  }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
   
       @Override
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output,
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
 
  public static void main(String[] args) throws Exception {
         JobConf conf = new JobConf(WordCountTxt.class);
         conf.setJobName("wordcounttxt");
        
         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);
        
         conf.setMapperClass(MapClass.class);
         conf.setCombinerClass(Reduce.class);
         conf.setReducerClass(Reduce.class);
        
         FileInputFormat.setInputPaths(conf, new Path(args[0]));
         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
              
         JobClient.runJob(conf);   
  }
  
}



代码2:sequencefile版wordcount

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class WordCountSeq {

         public static class MapClass extends MapReduceBase
           implements Mapper<Text, Text, Text, IntWritable> {
          
           private final static IntWritable one = new IntWritable(1);
           private Text word = new Text();
          
              @Override
              public void map(Text key, Text value,
                           OutputCollector<Text, IntWritable> output,
                   Reporter reporter) throws IOException {
                     String line = value.toString();
                     StringTokenizer itr = new StringTokenizer(line);
                     while (itr.hasMoreTokens()) {
                           word.set(itr.nextToken());
                           output.collect(word, one);
                     }
         }
         }

         public static class Reduce extends MapReduceBase
           implements Reducer<Text, IntWritable, Text, IntWritable> {
          
              @Override
           public void reduce(Text key, Iterator<IntWritable> values,
                              OutputCollector<Text, IntWritable> output,
                              Reporter reporter) throws IOException {
             int sum = 0;
             while (values.hasNext()) {
               sum += values.next().get();
             }
             output.collect(key, new IntWritable(sum));
           }
         }
         /**
          * @param args
        * @throws IOException
          */
         public static void main(String[] args) throws IOException {
              // TODO Auto-generated method stub
                JobConf conf = new JobConf(WordCountSeq.class);
                conf.setJobName("wordcountseq");
               
                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(IntWritable.class);
               
                conf.setMapperClass(MapClass.class);
                conf.setCombinerClass(Reduce.class);
                conf.setReducerClass(Reduce.class);
               
                conf.setInputFormat(SequenceFileAsTextInputFormat.class);
                conf.setOutputFormat(TextOutputFormat.class);
               
                FileInputFormat.setInputPaths(conf, new Path(args[0]));
                FileOutputFormat.setOutputPath(conf, new Path(args[1]));
                     
                JobClient.runJob(conf);
         }

}



代码3:rcfile版wordcount

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCountRC {
    
     public static class MapClass
          extends MapReduceBase implements Mapper<LongWritable, BytesRefArrayWritable, Text, IntWritable> {
         
          private final static IntWritable one = new IntWritable(1);
          private Text word =new Text();
    
          @Override
          public void map(LongWritable key, BytesRefArrayWritable value,
                    OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
               Text txt = new Text();
               txt.set(value.get(0).getData(), value.get(0).getStart(), value.get(0).getLength());
               String[] result = txt.toString().split("\\s");
               for(int i=0; i < result.length; i++){
                    word.set(result[i]);
                    output.collect(word, one);    
               }
          }         
     }

     public static class Reduce
          extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    
          private IntWritable result = new IntWritable();
         
          @Override
          public void reduce(Text key, Iterator<IntWritable> value,
                    OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
               int sum = 0;
               while (value.hasNext()) {
                    sum += value.next().get();
               }
              
               result.set(sum);
               output.collect(key, result);              
          }
         
     }
     /**
     * @param args
     */
     public static void main(String[] args) throws IOException{
          JobConf conf = new JobConf(WordCountRC.class);
          conf.setJobName("WordCountRC");
         
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class);
         
          conf.setMapperClass(MapClass.class);
          conf.setCombinerClass(Reduce.class);
          conf.setReducerClass(Reduce.class);
         
          conf.setInputFormat(RCFileInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);
         
          FileInputFormat.setInputPaths(conf, new Path(args[0]));
          FileOutputFormat.setOutputPath(conf, new Path(args[1]));
         
          JobClient.runJob(conf);
     }
}



原始数据:

hadoop fs -text /group/alidw-dev/seq_input/attempt_201201101606_2339628_m_000000_0
12/02/13 17:07:57 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/02/13 17:07:57 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
        hello, i am ok. are you?
        i am fine too!



编译打包完成后执行:

hadoop jarWordCountSeq.jar WordCountSeq /group/alidw-dev/seq_input/ /group/alidw-dev/rc_output



执行完毕就能看到最终结果:

hadoop fs -cat /group/alidw-dev/seq_output/part-00000
am      2
are     1
fine    1
hello,  1
i       2
ok.     1
too!    1
you?    1

 

 

  • 大小: 10.7 KB
1
0
分享到:
评论

相关推荐

    Hive - A Warehousing Solution Over a Map-Reduce.pdf

    然而,Map-Reduce编程模型本身较为底层,开发者需要编写大量的自定义程序来实现特定的功能,这不仅增加了开发的难度,还降低了代码的可维护性和复用性。 针对这些问题,Facebook的数据基础设施团队提出了一种名为...

    开发高效的hive程序

    Hive是由Facebook开源的一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,将SQL语句转化为MapReduce任务运行在Hadoop上。Hive的设计目标是将复杂的数据处理工作变得...

    第02节:hadoop精讲之map reduce原理及代码.rar

    MapReduce的工作机制分为两个主要阶段:Map阶段和Reduce阶段,这两个阶段之间通过中间结果 Shuffle 和 Sort 阶段连接。 1. Map阶段:在这个阶段,原始输入数据被分割成多个小块(Split),每个Split会被分配到集群...

    hadoop Join代码(map join 和reduce join)

    虽然Hadoop MapReduce可以直接处理JOIN操作,但使用Hive或Pig等高级数据处理工具可以简化编写代码的过程。Hive和Pig都支持多种JOIN类型,如LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN等,并且能够自动优化JOIN策略。 ...

    王家林hive学习资料

    针对HQL的优化,课程将深入探讨具体的策略和方法,包括Map和Reduce的优化、数据倾斜问题的解决,以及执行计划的分析。此外,课程还涉及在Amazon Web Services (AWS)上的Hive应用,讲解如何在EMR集群上管理Hive,配置...

    《企业级Hive实战课程》大纲

    - 几种Join方式(ReduceJoin、MapJoin、SMBJoin)的工作原理与适用场景; - PredicatePushdown(PPD)的作用与实现; - 数据倾斜现象的诊断与解决策略; - 分区使用的优化方法。 2. **实战案例** - 通过具体的...

    开发和优化高效的Hadoop & Hive 程序

    而在处理配置文件和词典时,可以利用DistributedCache来加载这些文件到TaskTracker节点上,从而避免在map和reduce阶段重复读取外部文件。 在MapReduce的调优过程中,除了编写高效代码之外,还需要根据具体应用场景...

    Hive 学习资料

    - **定制化 MapReduce 任务**:对于复杂的分析需求,Hive 允许用户编写自定义的 Map 和 Reduce 函数。 #### 三、Hive 的数据格式与存储 - **数据格式灵活**:Hive 不强制要求数据必须采用某种特定格式,用户可以...

    hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包

    "map"阶段将数据分割并处理,"reduce"阶段整合结果。`mapreduce`目录下的文件可能包括了MapReduce的示例程序或者库文件,帮助用户理解并编写自己的MapReduce作业。 使用这些jar包,开发者可以编写应用程序,通过...

    Hive初识入门参考的笔记

    - 需要按照固定的格式编写代码,通常分为 Map 和 Reduce 两个部分。 - 缺乏明确的数据模式定义和 SQL 类型的语言支持。 - 高昂的学习和开发成本。 - **Hive 解决方案**: - **提供 SQL 查询接口**:通过 HQL ...

    NFLplaybyplay:NFL 比赛数据的 Hive 数据库

    下面解释了涉及的每个文件: 注意:PlayByPlay Map-Reduce 作业是由 Jesse Anderson ( ) 编写的代码的分支PLAYBYPLAYDRIVER.java:运行 Map Reduce 作业的 Java 文件。 调用 PLAYBYPLAYMAPPPER.java 和 ...

    Hadoop简单应用案例,包括MapReduce、单词统计、HDFS基本操作、web日志分析、Zookeeper基本使用、Hive简单操作等

    6. **Hive**:Hive是基于Hadoop的数据仓库工具,提供SQL-like接口进行数据查询和分析。学习Hive的基本操作,如创建表、加载数据、执行查询等,可以简化大数据分析过程。案例可能包含创建Hive表、导入数据及执行SQL...

    基于Hadoop的分布式文件系统,使用Java语言开发实现了一个本地文件管理系统,其中文件存在于HDFS集群中.zip

    【标题】基于Hadoop的分布式文件系统通过Java实现的本地文件管理系统,将文件存储在HDFS集群中,展示了Hadoop的高效数据处理能力和Java在大数据领域的应用。 【内容详解】 Hadoop是Apache软件基金会开发的一个开源...

    基于Hadoop的电影影评数据分析

    Map阶段将数据拆分成可处理的小块,Reduce阶段则对Map阶段的结果进行聚合,从而得出最终的分析结果。在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取...

Global site tag (gtag.js) - Google Analytics