- 浏览: 293500 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
onlyamoment:
请问为什么要限制不合并文件呢?事实上,用动态分区写表时候容易出 ...
HIVE动态分区参数配置 -
alexss1988:
请问楼主,RCFILE由于列式存储方式,数据加载时性能消耗较大 ...
HIVE文件存储格式的测试比较 -
空谷悠悠:
jersey文档中提到:Client instances ar ...
自整理手册Jersey Client API -
bottle1:
我也遇到FileNotFoundException这个问题,发 ...
Hadoop 中使用DistributedCache遇到的问题 -
yongqi:
hi hugh.wangp: 请教您一个问题,我现在也在被 ...
Hadoop 中使用DistributedCache遇到的问题
by hugh.wangp
我们的数据绝大多数都是在HIVE上,对HIVE的SEQUENCEFILE和RCFILE的存储格式都有利用,为了满足HIVE的数据开放,hive client的方式就比较单一,直接访问HIVE生成的HDFS数据也是一种必要途径,所以本文整理测试了如何编写基于TEXTFILE、SEQUENCEFILE、RCFILE的数据的map reduce的代码。以wordcount的逻辑展示3种MR的代码。
其实只要知道MAP的输入格式是什么,就知道如何在MAP中处理数据;只要知道REDUCE(也可能只有MAP)的输出格式,就知道如何把处理结果转成输出格式。
表1:
如下代码片段是运行一个MR的最简单的配置:定义job、配置job、运行job
//map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 JobConf conf = new JobConf(WordCountRC.class); //设置一个用户定义的job名称 conf.setJobName("WordCountRC"); //为job的输出数据设置Key类 conf.setOutputKeyClass(Text.class); //为job输出设置value类 conf.setOutputValueClass(IntWritable.class); //为job设置Mapper类 conf.setMapperClass(MapClass.class); //为job设置Combiner类 conf.setCombinerClass(Reduce.class); //为job设置Reduce类 conf.setReducerClass(Reduce.class); //为map-reduce任务设置InputFormat实现类 conf.setInputFormat(RCFileInputFormat.class); //为map-reduce任务设置OutputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce job设置路径数组作为输入列表 FileInputFormat.setInputPaths(conf, new Path(args[0])); //为map-reduce job设置路径数组作为输出列表 FileOutputFormat.setOutputPath(conf, new Path(args[1])); //运行一个job JobClient.runJob(conf);
而此刻,我们更多的是关注配置InputFormat和OutputFormat的setInputFormat和setOutputFormat。根据我们不同的输入输出做相应的配置,可以选择表1的任何格式。
当我们确定了输入输出格式,接下来就是来在实现map和reduce函数时首选对输入格式做相应的处理,然后处理具体的业务逻辑,最后把处理后的数据转成既定的输出格式。
如下是处理textfile、sequencefile、rcfile输入文件的wordcount代码,大家可以比较一下具体区别,应该就能处理更多其它输入文件或者输出文件格式的数据。
代码1:textfile版wordcount
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class WordCountTxt{ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountTxt.class); conf.setJobName("wordcounttxt"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
代码2:sequencefile版wordcount
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCountSeq { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Text key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub JobConf conf = new JobConf(WordCountSeq.class); conf.setJobName("wordcountseq"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(SequenceFileAsTextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
代码3:rcfile版wordcount
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCountRC { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, BytesRefArrayWritable, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word =new Text(); @Override public void map(LongWritable key, BytesRefArrayWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { Text txt = new Text(); txt.set(value.get(0).getData(), value.get(0).getStart(), value.get(0).getLength()); String[] result = txt.toString().split("\\s"); for(int i=0; i < result.length; i++){ word.set(result[i]); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterator<IntWritable> value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (value.hasNext()) { sum += value.next().get(); } result.set(sum); output.collect(key, result); } } /** * @param args */ public static void main(String[] args) throws IOException{ JobConf conf = new JobConf(WordCountRC.class); conf.setJobName("WordCountRC"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(RCFileInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
原始数据:
hadoop fs -text /group/alidw-dev/seq_input/attempt_201201101606_2339628_m_000000_0 12/02/13 17:07:57 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/02/13 17:07:57 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor hello, i am ok. are you? i am fine too!
编译打包完成后执行:
hadoop jarWordCountSeq.jar WordCountSeq /group/alidw-dev/seq_input/ /group/alidw-dev/rc_output
执行完毕就能看到最终结果:
hadoop fs -cat /group/alidw-dev/seq_output/part-00000 am 2 are 1 fine 1 hello, 1 i 2 ok. 1 too! 1 you? 1
发表评论
-
【转】Hadoop 中的两表join
2012-08-09 10:35 1698原文见:http://www.gemini5201314 ... -
HIVE动态分区参数配置
2012-07-30 15:33 12717设置如下参数开启动态分区: hive.exec ... -
配置HIVE执行的本地模式
2012-07-21 09:20 4431自0.7版本后Hive开始支持任务执行选择本地模式(l ... -
HIVE表数据量和数据记录数的矛与盾
2012-07-06 09:45 12322HIVE作为在Hadoop ... -
HIVE如何使用自定义函数
2012-06-28 19:44 2724HIVE提供了很多函数 ... -
[陷阱]HIVE外部分区表一定要增加分区
2012-06-27 16:43 13188刚开始玩HIVE外部表可能会遇到的小陷阱。 只要 ... -
HIVE元数据
2012-06-20 12:52 13397HIVE元数据表数据字典: 表名 ... -
LINUX下单机安装HADOOP+HIVE手册
2012-05-31 15:59 2211HADOOP篇 HADOOP安装 1.tar - ... -
小文件合并
2012-05-03 13:07 3213文件数目过多,增加namenode的压力,hdfs的 ... -
HIVE UDF/UDAF/UDTF的Map Reduce代码框架模板
2012-04-01 10:09 5948自己写代码时候的利用到的模板 UDF步骤: ... -
HIVE文件存储格式的测试比较
2012-02-13 17:26 3734by hugh.wangp 根据自身涉及到的数据分布和 ...
相关推荐
然而,Map-Reduce编程模型本身较为底层,开发者需要编写大量的自定义程序来实现特定的功能,这不仅增加了开发的难度,还降低了代码的可维护性和复用性。 针对这些问题,Facebook的数据基础设施团队提出了一种名为...
Hive是由Facebook开源的一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,将SQL语句转化为MapReduce任务运行在Hadoop上。Hive的设计目标是将复杂的数据处理工作变得...
MapReduce的工作机制分为两个主要阶段:Map阶段和Reduce阶段,这两个阶段之间通过中间结果 Shuffle 和 Sort 阶段连接。 1. Map阶段:在这个阶段,原始输入数据被分割成多个小块(Split),每个Split会被分配到集群...
虽然Hadoop MapReduce可以直接处理JOIN操作,但使用Hive或Pig等高级数据处理工具可以简化编写代码的过程。Hive和Pig都支持多种JOIN类型,如LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN等,并且能够自动优化JOIN策略。 ...
针对HQL的优化,课程将深入探讨具体的策略和方法,包括Map和Reduce的优化、数据倾斜问题的解决,以及执行计划的分析。此外,课程还涉及在Amazon Web Services (AWS)上的Hive应用,讲解如何在EMR集群上管理Hive,配置...
- 几种Join方式(ReduceJoin、MapJoin、SMBJoin)的工作原理与适用场景; - PredicatePushdown(PPD)的作用与实现; - 数据倾斜现象的诊断与解决策略; - 分区使用的优化方法。 2. **实战案例** - 通过具体的...
而在处理配置文件和词典时,可以利用DistributedCache来加载这些文件到TaskTracker节点上,从而避免在map和reduce阶段重复读取外部文件。 在MapReduce的调优过程中,除了编写高效代码之外,还需要根据具体应用场景...
- **定制化 MapReduce 任务**:对于复杂的分析需求,Hive 允许用户编写自定义的 Map 和 Reduce 函数。 #### 三、Hive 的数据格式与存储 - **数据格式灵活**:Hive 不强制要求数据必须采用某种特定格式,用户可以...
"map"阶段将数据分割并处理,"reduce"阶段整合结果。`mapreduce`目录下的文件可能包括了MapReduce的示例程序或者库文件,帮助用户理解并编写自己的MapReduce作业。 使用这些jar包,开发者可以编写应用程序,通过...
- 需要按照固定的格式编写代码,通常分为 Map 和 Reduce 两个部分。 - 缺乏明确的数据模式定义和 SQL 类型的语言支持。 - 高昂的学习和开发成本。 - **Hive 解决方案**: - **提供 SQL 查询接口**:通过 HQL ...
下面解释了涉及的每个文件: 注意:PlayByPlay Map-Reduce 作业是由 Jesse Anderson ( ) 编写的代码的分支PLAYBYPLAYDRIVER.java:运行 Map Reduce 作业的 Java 文件。 调用 PLAYBYPLAYMAPPPER.java 和 ...
6. **Hive**:Hive是基于Hadoop的数据仓库工具,提供SQL-like接口进行数据查询和分析。学习Hive的基本操作,如创建表、加载数据、执行查询等,可以简化大数据分析过程。案例可能包含创建Hive表、导入数据及执行SQL...
【标题】基于Hadoop的分布式文件系统通过Java实现的本地文件管理系统,将文件存储在HDFS集群中,展示了Hadoop的高效数据处理能力和Java在大数据领域的应用。 【内容详解】 Hadoop是Apache软件基金会开发的一个开源...
Map阶段将数据拆分成可处理的小块,Reduce阶段则对Map阶段的结果进行聚合,从而得出最终的分析结果。在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取...