在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:
- 秦东亮;72
- 秦东亮;34
- 秦东亮;100
- 三劫;899
- 三劫;32
- 三劫;1
- a;45
- b;567
- b;12
秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12
要求输出1:
- a 45
- b 12,567
- 三劫 1,32,899
- 秦东亮 34,72,100
a 45 b 12,567 三劫 1,32,899 秦东亮 34,72,100
要求输出2:
- a 45
- b 12
- b 567
- 三劫 1
- 三劫 32
- 三劫 899
- 秦东亮 34
- 秦东亮 72
- 秦东亮 100
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
- package com.qin.groupsort;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
- import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import com.qin.operadb.PersonRecoder;
- import com.qin.operadb.ReadMapDB;
- /**
- * @author qindongliang
- *
- * 大数据交流群:376932160
- *
- *
- * **/
- public class GroupSort {
- /**
- * map任务
- *
- * */
- public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{
- private DescSort tx=new DescSort();
- private IntWritable second=new IntWritable();
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- System.out.println("执行map");
- // System.out.println("进map了");
- //mos.write(namedOutput, key, value);
- String ss[]=value.toString().split(";");
- String mkey=ss[0];
- int mvalue=Integer.parseInt(ss[1]);
- tx.setFirstKey(mkey);
- tx.setSecondKey(mvalue);
- second.set(mvalue);
- context.write(tx, second);
- }
- }
- /***
- * Reduce任务
- *
- * **/
- public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{
- @Override
- protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx)
- throws IOException, InterruptedException {
- System.out.println("执行reduce");
- StringBuffer sb=new StringBuffer();
- for(IntWritable t:arg1){
- // sb.append(t).append(",");
- //con
- ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString()));
- /**这种写法,是这种输出
- *a 45
- *b 12
- b 567
- 三劫 1
- 三劫 32
- 三劫 899
- 秦东亮 34
- 秦东亮 72
- 秦东亮 100
- */
- }
- if(sb.length()>0){
- sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号
- }
- // 在循环里拼接,在循环外输出是这种格式
- // b 12,567
- // 三劫 1,32,899
- // 秦东亮 34,72,100
- // ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString()));
- }
- }
- /***
- *
- * 自定义组合键
- * **/
- public static class DescSort implements WritableComparable{
- public DescSort() {
- // TODO Auto-generated constructor stub
- }
- private String firstKey;
- private int secondKey;
- public String getFirstKey() {
- return firstKey;
- }
- public void setFirstKey(String firstKey) {
- this.firstKey = firstKey;
- }
- public int getSecondKey() {
- return secondKey;
- }
- public void setSecondKey(int secondKey) {
- this.secondKey = secondKey;
- }
- // @Override
- // public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
- // int arg4, int arg5) {
- // return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
- // }
- //
- // @Override
- // public int compare(Object a, Object b) {
- //
- // return -super.compare(a, b);//注意使用负号来完成降序
- // }
- @Override
- public void readFields(DataInput in) throws IOException {
- // TODO Auto-generated method stub
- firstKey=in.readUTF();
- secondKey=in.readInt();
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(firstKey);
- out.writeInt(secondKey);
- }
- @Override
- public int compareTo(Object o) {
- // TODO Auto-generated method stub
- DescSort d=(DescSort)o;
- //this在前代表升序
- return this.getFirstKey().compareTo(d.getFirstKey());
- }
- }
- /**
- * 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
- *
- * **/
- public static class TextComparator extends WritableComparator{
- public TextComparator() {
- // TODO Auto-generated constructor stub
- super(DescSort.class,true);//注册Comparator
- }
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- System.out.println("执行TextComparator分组排序");
- DescSort d1=(DescSort)a;
- DescSort d2=(DescSort)b;
- return d1.getFirstKey().compareTo(d2.getFirstKey());
- }
- }
- /**
- * 组内排序的策略
- * 按照第二个字段排序
- *
- * */
- public static class TextIntCompartator extends WritableComparator{
- public TextIntCompartator() {
- super(DescSort.class,true);
- }
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- DescSort d1=(DescSort)a;
- DescSort d2=(DescSort)b;
- System.out.println("执行组内排序TextIntCompartator");
- if(!d1.getFirstKey().equals(d2.getFirstKey())){
- return d1.getFirstKey().compareTo(d2.getFirstKey());
- }else{
- return d1.getSecondKey()-d2.getSecondKey();//0,-1,1
- }
- }
- }
- /**
- * 分区策略
- *
- * */
- public static class KeyPartition extends Partitioner<DescSort, IntWritable>{
- @Override
- public int getPartition(DescSort key, IntWritable arg1, int arg2) {
- // TODO Auto-generated method stub
- System.out.println("执行自定义分区KeyPartition");
- return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2;
- }
- }
- public static void main(String[] args) throws Exception{
- JobConf conf=new JobConf(ReadMapDB.class);
- //Configuration conf=new Configuration();
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- //读取person中的数据字段
- conf.setJar("tt.jar");
- //注意这行代码放在最前面,进行初始化,否则会报
- /**Job任务**/
- Job job=new Job(conf, "testpartion");
- job.setJarByClass(GroupSort.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- // job.setCombinerClass(PCombine.class);
- // job.setNumReduceTasks(3);//设置为3
- job.setMapperClass(GMapper.class);
- job.setReducerClass(GReduce.class);
- /**设置分区函数*/
- job.setPartitionerClass(KeyPartition.class);
- //分组函数,Reduce前的一次排序
- job.setGroupingComparatorClass(TextComparator.class);
- //组内排序Map输出完毕后,对key进行的一次排序
- job.setSortComparatorClass(TextIntCompartator.class);
- //TextComparator.class
- //TextIntCompartator.class
- // job.setGroupingComparatorClass(TextIntCompartator.class);
- //组内排序Map输出完毕后,对key进行的一次排序
- // job.setSortComparatorClass(TextComparator.class);
- job.setMapOutputKeyClass(DescSort.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- String path="hdfs://192.168.75.130:9000/root/outputdb";
- FileSystem fs=FileSystem.get(conf);
- Path p=new Path(path);
- if(fs.exists(p)){
- fs.delete(p, true);
- System.out.println("输出路径存在,已删除!");
- }
- FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
- FileOutputFormat.setOutputPath(job,p );
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
package com.qin.groupsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /** * @author qindongliang * * 大数据交流群:376932160 * * * **/ public class GroupSort { /** * map任务 * * */ public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{ private DescSort tx=new DescSort(); private IntWritable second=new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("执行map"); // System.out.println("进map了"); //mos.write(namedOutput, key, value); String ss[]=value.toString().split(";"); String mkey=ss[0]; int mvalue=Integer.parseInt(ss[1]); tx.setFirstKey(mkey); tx.setSecondKey(mvalue); second.set(mvalue); context.write(tx, second); } } /*** * Reduce任务 * * **/ public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{ @Override protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx) throws IOException, InterruptedException { System.out.println("执行reduce"); StringBuffer sb=new StringBuffer(); for(IntWritable t:arg1){ // sb.append(t).append(","); //con ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString())); /**这种写法,是这种输出 *a 45 *b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100 */ } if(sb.length()>0){ sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号 } // 在循环里拼接,在循环外输出是这种格式 // b 12,567 // 三劫 1,32,899 // 秦东亮 34,72,100 // ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString())); } } /*** * * 自定义组合键 * **/ public static class DescSort implements WritableComparable{ public DescSort() { // TODO Auto-generated constructor stub } private String firstKey; private int secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } // @Override // public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, // int arg4, int arg5) { // return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 // } // // @Override // public int compare(Object a, Object b) { // // return -super.compare(a, b);//注意使用负号来完成降序 // } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub DescSort d=(DescSort)o; //this在前代表升序 return this.getFirstKey().compareTo(d.getFirstKey()); } } /** * 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 * * **/ public static class TextComparator extends WritableComparator{ public TextComparator() { // TODO Auto-generated constructor stub super(DescSort.class,true);//注册Comparator } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("执行TextComparator分组排序"); DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; return d1.getFirstKey().compareTo(d2.getFirstKey()); } } /** * 组内排序的策略 * 按照第二个字段排序 * * */ public static class TextIntCompartator extends WritableComparator{ public TextIntCompartator() { super(DescSort.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; System.out.println("执行组内排序TextIntCompartator"); if(!d1.getFirstKey().equals(d2.getFirstKey())){ return d1.getFirstKey().compareTo(d2.getFirstKey()); }else{ return d1.getSecondKey()-d2.getSecondKey();//0,-1,1 } } } /** * 分区策略 * * */ public static class KeyPartition extends Partitioner<DescSort, IntWritable>{ @Override public int getPartition(DescSort key, IntWritable arg1, int arg2) { // TODO Auto-generated method stub System.out.println("执行自定义分区KeyPartition"); return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2; } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(GroupSort.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); // job.setNumReduceTasks(3);//设置为3 job.setMapperClass(GMapper.class); job.setReducerClass(GReduce.class); /**设置分区函数*/ job.setPartitionerClass(KeyPartition.class); //分组函数,Reduce前的一次排序 job.setGroupingComparatorClass(TextComparator.class); //组内排序Map输出完毕后,对key进行的一次排序 job.setSortComparatorClass(TextIntCompartator.class); //TextComparator.class //TextIntCompartator.class // job.setGroupingComparatorClass(TextIntCompartator.class); //组内排序Map输出完毕后,对key进行的一次排序 // job.setSortComparatorClass(TextComparator.class); job.setMapOutputKeyClass(DescSort.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在eclipse下,执行,打印日志内容如下:
- 模式: 192.168.75.130:9001
- 输出路径存在,已删除!
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
- WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003
- INFO - Counters.log(585) | Counters: 29
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | Launched reduce tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=1
- INFO - Counters.log(589) | Data-local map tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=86
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=162
- INFO - Counters.log(589) | HDFS_BYTES_READ=205
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=93
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=162
- INFO - Counters.log(589) | Map input records=9
- INFO - Counters.log(589) | Reduce shuffle bytes=162
- INFO - Counters.log(589) | Spilled Records=18
- INFO - Counters.log(589) | Map output bytes=138
- INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792
- INFO - Counters.log(589) | CPU time spent (ms)=970
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=112
- INFO - Counters.log(589) | Reduce input records=9
- INFO - Counters.log(589) | Reduce input groups=4
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336
- INFO - Counters.log(589) | Reduce output records=9
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488
- INFO - Counters.log(589) | Map output records=9
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=86 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=162 INFO - Counters.log(589) | HDFS_BYTES_READ=205 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=93 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=162 INFO - Counters.log(589) | Map input records=9 INFO - Counters.log(589) | Reduce shuffle bytes=162 INFO - Counters.log(589) | Spilled Records=18 INFO - Counters.log(589) | Map output bytes=138 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=970 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=9 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336 INFO - Counters.log(589) | Reduce output records=9 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488 INFO - Counters.log(589) | Map output records=9
执行完,我们在输出目录里里面查看
执行完,内容如下:
- a 45
- b 12
- b 567
- 三劫 1
- 三劫 32
- 三劫 899
- 秦东亮 34
- 秦东亮 72
- 秦东亮 100
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
相关推荐
这个示例展示了Hadoop MapReduce在处理复杂排序需求时的灵活性,以及如何通过自定义Partitioner和Comparator实现特定的业务逻辑。对于大型的基站数据或者其他具有类似排序需求的场景,这样的解决方案非常有价值。...
- 全排序:所有数据最终合并成一个有序文件,通常通过设置一个ReduceTask来实现,但这可能影响效率,尤其在处理大量数据时。 - 辅助排序(GroupingComparator分组):允许在Reduce端按特定字段对key进行分组,适用...
在Hadoop MapReduce的默认过程中,键值对在Map阶段是局部排序的,然后在Reduce阶段进行全局排序。然而,这种排序可能不满足某些特定的应用场景,如按地理位置排序或者按时间戳排序等。这时,二次排序就派上了用场。...
- **Reduce阶段**:经过Map阶段处理后的数据会被排序并分组,然后发送到Reduce任务中进行进一步的处理。Reduce任务会执行Reduce函数,对相同键的所有值进行聚合操作,最终产生汇总的结果。 **3.2 MapReduce示例:...
1. **输入**:输入数据通常是以块(Block)的形式存储在HDFS(Hadoop Distributed File System)上,每个块大小默认为128MB。 2. **Mapper**:Map函数将输入块中的每一行文本作为键值对(, Text>,行号与整行内容)...
但是,如果需要在键相同的记录内部按值进行排序(例如,先处理最大或最小的值),则可以自定义GroupingComparator。这样,相同键的数据将首先被分组,然后在每个组内部进行排序。 ### 示例代码 在自定义这些组件时...
4. **中间结果的分区与排序**:Map阶段产生的中间结果会根据key进行分区和排序,为reduce阶段的处理做准备。 在Hadoop1.1.2的压缩包子文件“hadoop-common-release-1.1.2”中,"common"部分通常包含Hadoop的核心库...
- Map阶段:每个Mapper处理输入的Key-Value对(在默认情况下,输入文件的每一行就是Value,Key是行号),执行用户定义的Map函数,并将中间结果以Key-Value形式输出。 - Shuffle阶段:系统自动进行排序和合并(Sort/...
在IT行业中,Hadoop是一个广泛使用的开源框架,用于处理和存储海量数据。它主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。Hadoop的设计目标是提供高容错性和高可扩展性,使得普通...
- 数据分片:HDFS将大文件切分为多个数据块,并在集群的不同节点上保存副本,通常默认为3个副本,以提高容错性。 - NameNode与DataNode:NameNode是HDFS的元数据管理节点,负责存储文件系统的目录结构和文件数据块...
这种设计使得MapReduce非常适合大规模数据集的并行处理,尤其是在处理PB级别的数据时更为显著。 ##### 2.2 MapReduce流程分析 **2.2.1 Map过程** 1. **输入分片**:每个输入分片会被一个map任务处理,默认情况下...
在map阶段,数据被转换为IntWritable类型,然后作为key输出,reduce阶段则直接将排序后的键值对输出。 除此之外,课程还覆盖了Hadoop的其他关键组件,如HDFS(分布式文件系统)、YARN(资源管理系统)、Hive(数据...
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
1. **数据输入**:InputFormat类负责解析输入数据,如默认的TextInputFormat将文件逻辑地切割成多个split,每个split对应一个MapTask。split通常基于数据块(block)进行,以优化I/O操作。 2. **数据读取**:...
在处理文本文件进行全局排序时,Hadoop的核心组件是InputSampler类和TotalOrderPartitioner类。 InputSampler类用于在Hadoop MapReduce作业中生成样本。它主要用于为TotalOrderPartitioner提供依据,以便对记录进行...
它将大型任务分解为可并行处理的小任务,通过“Map”阶段进行数据预处理,然后在“Reduce”阶段进行结果聚合。在本实例中,我们将深入探讨如何使用MapReduce进行数据排序。 ### Map阶段 Map阶段是MapReduce工作流程...
在分布式计算框架Hadoop中,排序是一个至关重要的过程,尤其在处理大数据时。默认的排序方式只能对MapReduce作业的键(Key)进行排序,但有时我们可能需要对键值对中的值(Value)也进行排序,这就是所谓的“二次...
3. **Shuffling**:Map 任务完成后,框架将所有中间键值对根据键进行排序和分区,并将相同键的值组合在一起,准备传递给 Reduce 任务。 4. **Reducing**:每个 Reduce 任务接收一组中间键值对,并将其聚合为较少数量...
排序分区是在全局范围内对Key进行某种划分,划分依据可以是Key中某一个或几个字段的内容,而这个划分过程并不一定是对数据值进行排序。排序分区的意义在于,它能够在全局范围内控制数据如何分布到不同的reduce任务中...
在进行这些测试时,使用“hadoop-common-2.6.0-bin-master.zip”中的工具和库可以方便地构建测试环境,模拟数据输入,触发各种测试用例,并验证结果。同时,了解Hadoop的源代码和API文档将对深入理解Driver的工作...