`
weitao1026
  • 浏览: 1064887 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

使用Hadoop处理数据时,在Reduce阶段

阅读更多

有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,

下面,先来看下散仙的测试数据:

Java代码 复制代码 收藏代码
  1. 中国;我们  
  2. 美国;他们  
  3. 中国;123  
  4. 中国人;善良  
  5. 美国;USA  
  6. 美国;在北美洲  
中国;我们
美国;他们
中国;123
中国人;善良
美国;USA
美国;在北美洲


输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:

Java代码 复制代码 收藏代码
  1. package com.partition.test;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapred.JobConf;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Partitioner;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
  15. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  20.   
  21. import com.qin.operadb.PersonRecoder;  
  22. import com.qin.operadb.ReadMapDB;  
  23.    
  24. /*** 
  25.  * @author qindongliang 
  26.  *  
  27.  * 大数据技术交流群:324714439 
  28.  * **/  
  29. public class TestMultiOutput {  
  30.       
  31.       
  32.     /** 
  33.      * map任务 
  34.      *  
  35.      * **/  
  36.     public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{  
  37.           
  38.         @Override  
  39.         protected void map(LongWritable key, Text value,Context context)  
  40.                 throws IOException, InterruptedException {  
  41.              String ss[]=value.toString().split(";");  
  42.             context.write(new Text(ss[0]), new Text(ss[1]));      
  43.         }  
  44.           
  45.           
  46.     }  
  47.       
  48.    
  49.      public static class PReduce extends Reducer<Text, Text, Text, Text>{  
  50.          /** 
  51.           * 设置多个文件输出 
  52.           * */  
  53.          private MultipleOutputs mos;  
  54.            
  55.          @Override  
  56.         protected void setup(Context context)  
  57.                 throws IOException, InterruptedException {  
  58.               mos=new MultipleOutputs(context);//初始化mos  
  59.         }  
  60.          @Override  
  61.         protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)  
  62.                 throws IOException, InterruptedException {  
  63.                
  64.               String key=arg0.toString();  
  65.              for(Text t:arg1){  
  66.                    if(key.equals("中国")){   
  67.                        /** 
  68.                         * 一个参数 
  69.                         * **/  
  70.                        mos.write("china", arg0,t);   
  71.                    } else if(key.equals("美国")){  
  72.                        mos.write("USA", arg0,t);      
  73.                    } else if(key.equals("中国人")){  
  74.                        mos.write("cperson", arg0,t);   
  75.                          
  76.                    }  
  77.            
  78.                  //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());  
  79.              }  
  80.                  
  81.                
  82.         }  
  83.            
  84.          @Override  
  85.         protected void cleanup(  
  86.                  Context context)  
  87.                 throws IOException, InterruptedException {  
  88.              mos.close();//释放资源  
  89.         }  
  90.            
  91.      }  
  92.        
  93.        
  94.      public static void main(String[] args) throws Exception{  
  95.          JobConf conf=new JobConf(ReadMapDB.class);  
  96.          //Configuration conf=new Configuration();  
  97.         // conf.set("mapred.job.tracker","192.168.75.130:9001");  
  98.         //读取person中的数据字段  
  99.         // conf.setJar("tt.jar");  
  100.         //注意这行代码放在最前面,进行初始化,否则会报  
  101.        
  102.        
  103.         /**Job任务**/  
  104.         Job job=new Job(conf, "testpartion");  
  105.         job.setJarByClass(TestMultiOutput.class);  
  106.         System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
  107.         // job.setCombinerClass(PCombine.class);  
  108.         //job.setPartitionerClass(PPartition.class);  
  109.         //job.setNumReduceTasks(5);  
  110.          job.setMapperClass(PMapper.class);  
  111.            
  112.          /** 
  113.           * 注意在初始化时需要设置输出文件的名 
  114.           * 另外名称,不支持中文名,仅支持英文字符 
  115.           *  
  116.           * **/  
  117.          MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);  
  118.          MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);  
  119.          MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);  
  120.          job.setReducerClass(PReduce.class);  
  121.          job.setOutputKeyClass(Text.class);  
  122.          job.setOutputValueClass(Text.class);  
  123.           
  124.         String path="hdfs://192.168.75.130:9000/root/outputdb";  
  125.         FileSystem fs=FileSystem.get(conf);  
  126.         Path p=new Path(path);  
  127.         if(fs.exists(p)){  
  128.             fs.delete(p, true);  
  129.             System.out.println("输出路径存在,已删除!");  
  130.         }  
  131.         FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");  
  132.         FileOutputFormat.setOutputPath(job,p );  
  133.         System.exit(job.waitForCompletion(true) ? 0 : 1);    
  134.            
  135.            
  136.     }  
  137.       
  138.       
  139.   
  140. }  
package com.partition.test;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
 
/***
 * @author qindongliang
 * 
 * 大数据技术交流群:324714439
 * **/
public class TestMultiOutput {
	
	
	/**
	 * map任务
	 * 
	 * **/
	public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			 String ss[]=value.toString().split(";");
			context.write(new Text(ss[0]), new Text(ss[1]));	
		}
		
		
	}
	
 
	 public static class PReduce extends Reducer<Text, Text, Text, Text>{
		 /**
		  * 设置多个文件输出
		  * */
		 private MultipleOutputs mos;
		 
		 @Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			  mos=new MultipleOutputs(context);//初始化mos
		}
		 @Override
		protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			 
			  String key=arg0.toString();
			 for(Text t:arg1){
				   if(key.equals("中国")){ 
					   /**
					    * 一个参数
					    * **/
					   mos.write("china", arg0,t); 
				   } else if(key.equals("美国")){
					   mos.write("USA", arg0,t);    
				   } else if(key.equals("中国人")){
					   mos.write("cperson", arg0,t); 
					   
				   }
	     
				 //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());
			 }
			   
			 
		}
		 
		 @Override
		protected void cleanup(
				 Context context)
				throws IOException, InterruptedException {
			 mos.close();//释放资源
		}
		 
	 }
	 
	 
	 public static void main(String[] args) throws Exception{
		 JobConf conf=new JobConf(ReadMapDB.class);
		 //Configuration conf=new Configuration();
	  	// conf.set("mapred.job.tracker","192.168.75.130:9001");
		//读取person中的数据字段
	  	// conf.setJar("tt.jar");
		//注意这行代码放在最前面,进行初始化,否则会报
	 
	 
		/**Job任务**/
		Job job=new Job(conf, "testpartion");
		job.setJarByClass(TestMultiOutput.class);
		System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		// job.setCombinerClass(PCombine.class);
		//job.setPartitionerClass(PPartition.class);
		//job.setNumReduceTasks(5);
		 job.setMapperClass(PMapper.class);
		 
		 /**
		  * 注意在初始化时需要设置输出文件的名
		  * 另外名称,不支持中文名,仅支持英文字符
		  * 
		  * **/
		 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
		 MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
		 MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
		 job.setReducerClass(PReduce.class);
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
	    
		String path="hdfs://192.168.75.130:9000/root/outputdb";
		FileSystem fs=FileSystem.get(conf);
		Path p=new Path(path);
		if(fs.exists(p)){
			fs.delete(p, true);
			System.out.println("输出路径存在,已删除!");
		}
		FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
		FileOutputFormat.setOutputPath(job,p );
		System.exit(job.waitForCompletion(true) ? 0 : 1);  
		 
		 
	}
	
	

}


如果是中文的路径名,则会报如下的一个异常:

Java代码 复制代码 收藏代码
  1. 模式:  local  
  2. 输出路径存在,已删除!  
  3. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  4. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  5. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).  
  6. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
  7. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  8. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001  
  9. INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks  
  10. INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0  
  11. INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
  12. INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91  
  13. INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100  
  14. INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720  
  15. INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680  
  16. INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output  
  17. INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0  
  18. INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting  
  19. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  20. INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.  
  21. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0  
  22. INFO - LocalJobRunner$Job.run(348) | Map task executor complete.  
  23. INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null  
  24. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  25. INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments  
  26. INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes  
  27. INFO - LocalJobRunner$Job.statusUpdate(466) |   
  28. WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001  
  29. java.lang.IllegalArgumentException: Name cannot be have a '一' char  
  30.     at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)  
  31.     at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)  
  32.     at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)  
  33.     at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)  
  34.     at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)  
  35.     at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)  
  36.     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)  
  37.     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)  
  38.     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)  
  39.     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)  
  40. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  41. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001  
  42. INFO - Counters.log(585) | Counters: 17  
  43. INFO - Counters.log(587) |   File Input Format Counters   
  44. INFO - Counters.log(589) |     Bytes Read=91  
  45. INFO - Counters.log(587) |   FileSystemCounters  
  46. INFO - Counters.log(589) |     FILE_BYTES_READ=177  
  47. INFO - Counters.log(589) |     HDFS_BYTES_READ=91  
  48. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=71111  
  49. INFO - Counters.log(587) |   Map-Reduce Framework  
  50. INFO - Counters.log(589) |     Map output materialized bytes=105  
  51. INFO - Counters.log(589) |     Map input records=6  
  52. INFO - Counters.log(589) |     Reduce shuffle bytes=0  
  53. INFO - Counters.log(589) |     Spilled Records=6  
  54. INFO - Counters.log(589) |     Map output bytes=87  
  55. INFO - Counters.log(589) |     Total committed heap usage (bytes)=227737600  
  56. INFO - Counters.log(589) |     Combine input records=0  
  57. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
  58. INFO - Counters.log(589) |     Reduce input records=0  
  59. INFO - Counters.log(589) |     Reduce input groups=0  
  60. INFO - Counters.log(589) |     Combine output records=0  
  61. INFO - Counters.log(589) |     Reduce output records=0  
  62. INFO - Counters.log(589) |     Map output records=6  
模式:  local
输出路径存在,已删除!
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001
INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks
INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null
INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91
INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100
INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720
INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680
INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output
INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0
INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting
INFO - LocalJobRunner$Job.statusUpdate(466) | 
INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.
INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0
INFO - LocalJobRunner$Job.run(348) | Map task executor complete.
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null
INFO - LocalJobRunner$Job.statusUpdate(466) | 
INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments
INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes
INFO - LocalJobRunner$Job.statusUpdate(466) | 
WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001
java.lang.IllegalArgumentException: Name cannot be have a '一' char
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)
	at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)
	at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001
INFO - Counters.log(585) | Counters: 17
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=91
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=177
INFO - Counters.log(589) |     HDFS_BYTES_READ=91
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=71111
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=105
INFO - Counters.log(589) |     Map input records=6
INFO - Counters.log(589) |     Reduce shuffle bytes=0
INFO - Counters.log(589) |     Spilled Records=6
INFO - Counters.log(589) |     Map output bytes=87
INFO - Counters.log(589) |     Total committed heap usage (bytes)=227737600
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112
INFO - Counters.log(589) |     Reduce input records=0
INFO - Counters.log(589) |     Reduce input groups=0
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Reduce output records=0
INFO - Counters.log(589) |     Map output records=6



源码中关于名称的校验如下:

Java代码 复制代码 收藏代码
  1. /** 
  2.   * Checks if a named output name is valid token. 
  3.   * 
  4.   * @param namedOutput named output Name 
  5.   * @throws IllegalArgumentException if the output name is not valid. 
  6.   */  
  7.  private static void checkTokenName(String namedOutput) {  
  8.    if (namedOutput == null || namedOutput.length() == 0) {  
  9.      throw new IllegalArgumentException(  
  10.        "Name cannot be NULL or emtpy");  
  11.    }  
  12.    for (char ch : namedOutput.toCharArray()) {  
  13.      if ((ch >= 'A') && (ch <= 'Z')) {  
  14.        continue;  
  15.      }  
  16.      if ((ch >= 'a') && (ch <= 'z')) {  
  17.        continue;  
  18.      }  
  19.      if ((ch >= '0') && (ch <= '9')) {  
  20.        continue;  
  21.      }  
  22.      throw new IllegalArgumentException(  
  23.        "Name cannot be have a '" + ch + "' char");  
  24.    }  
  25.  }  
 /**
   * Checks if a named output name is valid token.
   *
   * @param namedOutput named output Name
   * @throws IllegalArgumentException if the output name is not valid.
   */
  private static void checkTokenName(String namedOutput) {
    if (namedOutput == null || namedOutput.length() == 0) {
      throw new IllegalArgumentException(
        "Name cannot be NULL or emtpy");
    }
    for (char ch : namedOutput.toCharArray()) {
      if ((ch >= 'A') && (ch <= 'Z')) {
        continue;
      }
      if ((ch >= 'a') && (ch <= 'z')) {
        continue;
      }
      if ((ch >= '0') && (ch <= '9')) {
        continue;
      }
      throw new IllegalArgumentException(
        "Name cannot be have a '" + ch + "' char");
    }
  }


程序运行成功输出:

Java代码 复制代码 收藏代码
  1. 模式:  192.168.75.130:9001  
  2. 输出路径存在,已删除!  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1  
  5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  12. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006  
  13. INFO - Counters.log(585) | Counters: 29  
  14. INFO - Counters.log(587) |   Job Counters   
  15. INFO - Counters.log(589) |     Launched reduce tasks=1  
  16. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=9289  
  17. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  18. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Launched map tasks=1  
  20. INFO - Counters.log(589) |     Data-local map tasks=1  
  21. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=13645  
  22. INFO - Counters.log(587) |   File Output Format Counters   
  23. INFO - Counters.log(589) |     Bytes Written=0  
  24. INFO - Counters.log(587) |   FileSystemCounters  
  25. INFO - Counters.log(589) |     FILE_BYTES_READ=105  
  26. INFO - Counters.log(589) |     HDFS_BYTES_READ=203  
  27. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=113616  
  28. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=87  
  29. INFO - Counters.log(587) |   File Input Format Counters   
  30. INFO - Counters.log(589) |     Bytes Read=91  
  31. INFO - Counters.log(587) |   Map-Reduce Framework  
  32. INFO - Counters.log(589) |     Map output materialized bytes=105  
  33. INFO - Counters.log(589) |     Map input records=6  
  34. INFO - Counters.log(589) |     Reduce shuffle bytes=105  
  35. INFO - Counters.log(589) |     Spilled Records=12  
  36. INFO - Counters.log(589) |     Map output bytes=87  
  37. INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792  
  38. INFO - Counters.log(589) |     CPU time spent (ms)=1880  
  39. INFO - Counters.log(589) |     Combine input records=0  
  40. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112  
  41. INFO - Counters.log(589) |     Reduce input records=6  
  42. INFO - Counters.log(589) |     Reduce input groups=3  
  43. INFO - Counters.log(589) |     Combine output records=0  
  44. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=278876160  
  45. INFO - Counters.log(589) |     Reduce output records=0  
  46. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460908032  
  47. INFO - Counters.log(589) |     Map output records=6  
模式:  192.168.75.130:9001
输出路径存在,已删除!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=9289
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=1
INFO - Counters.log(589) |     Data-local map tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=13645
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=0
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=105
INFO - Counters.log(589) |     HDFS_BYTES_READ=203
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=113616
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=87
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=91
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=105
INFO - Counters.log(589) |     Map input records=6
INFO - Counters.log(589) |     Reduce shuffle bytes=105
INFO - Counters.log(589) |     Spilled Records=12
INFO - Counters.log(589) |     Map output bytes=87
INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792
INFO - Counters.log(589) |     CPU time spent (ms)=1880
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112
INFO - Counters.log(589) |     Reduce input records=6
INFO - Counters.log(589) |     Reduce input groups=3
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=278876160
INFO - Counters.log(589) |     Reduce output records=0
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460908032
INFO - Counters.log(589) |     Map output records=6


运行成功后,生成的文件如下所示:


china-r-00000里面的数据如下:

Java代码 复制代码 收藏代码
  1. 中国  我们  
  2. 中国  123  
中国	我们
中国	123


USA-r-00000里面的数据如下:

Java代码 复制代码 收藏代码
  1. 美国  他们  
  2. 美国  USA  
  3. 美国  在北美洲  
美国	他们
美国	USA
美国	在北美洲


cperson-r-00000里面的数据如下:

Java代码 复制代码 收藏代码
  1. 中国人  善良  
中国人	 善良



在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。

分享到:
评论

相关推荐

    4703031《Hadoop大数据处理实战》(康开锋)423-1资源包.rar

    《Hadoop大数据处理实战》是康开锋撰写的一本深入探讨Hadoop技术的书籍,旨在帮助读者理解并掌握Hadoop在大数据处理中的实际应用。这本书的内容涵盖了Hadoop生态系统的核心组件,以及如何利用这些组件解决实际问题。...

    Hadoop构建数据仓库实践1_hadoop_

    MapReduce则是一种编程模型,用于大规模数据集的并行计算,其工作原理包括Map阶段(数据分片和处理)和Reduce阶段(汇总结果)。 构建数据仓库在Hadoop上,通常涉及以下几个步骤: 1. 数据获取:这是数据仓库生命...

    使用hadoop进行数据分析

    使用Hadoop进行数据分析是一个涉及多个步骤的过程,特别是在处理大规模数据集时。以下是一个基本的步骤指南,帮助你使用Hadoop进行数据分析: 1. 环境搭建 安装Hadoop:在集群上安装Hadoop,并配置HDFS(Hadoop ...

    Hadoop大数据处理技术基础与实践(第2版)(微课版)PPT-课件.rar

    Map阶段将数据分解并并行处理,Reduce阶段则对结果进行聚合。 4. **YARN**:Yet Another Resource Negotiator(YARN)是Hadoop的资源管理系统,负责调度集群中的计算资源,确保高效运行MapReduce任务和其他计算框架...

    Hadoop数据迁移--从Oracle向Hadoop

    在执行Hadoop数据迁移时,Hadoop集群中的MapReduce作业会利用JDBC驱动与Oracle数据库建立连接,通过执行SQL查询来检索数据,并使用Hadoop的序列化机制将数据写入到HDFS。这样,Oracle中的数据就成功地迁移到了Hadoop...

    Hadoop数据分析_大数据_hadoop_数据分析_

    在进行Hadoop数据分析时,我们通常会经历以下步骤: 1. **数据摄入**:使用Flume或类似的工具将数据从各种来源引入Hadoop集群。 2. **数据存储**:利用HDFS存储原始数据。 3. **数据清洗**:使用MapReduce或Pig等...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    在大数据处理领域,Hadoop和MapReduce是两个至关重要的概念,它们构成了大数据处理的基础框架。本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一...

    使用Hadoop进行数据分析涉及大数据的处理和分析 一个简单的案例介绍如何使用Hadoop进行数据分析

    本案例旨在介绍如何使用Hadoop这一强大的大数据处理工具来进行数据分析,具体场景设定在一个在线零售商的销售数据上。该数据包含了订单信息、产品信息以及客户信息。目标是对这些数据进行深入分析,以便于了解哪些...

    大数据云计算技术 淘宝网Hadoop与数据分析 taobao数据团队(共30页).ppt

    Reduce阶段,将相同键值的数据聚合在一起,进行总结和汇总,最终输出结果。 四、Hive与数据分析 Hive是基于Hadoop的数据仓库工具,它允许用户使用SQL-like语法对大数据进行查询和分析。在淘宝网中,Hive用于构建...

    数据算法--HadoopSpark大数据处理技巧.pdf

    在大数据处理领域,Hadoop和Spark是两个关键的框架,它们提供了高效的数据处理能力。这份“数据算法--HadoopSpark大数据处理技巧”文档显然探讨了如何利用这两个工具进行复杂的数据操作,具体涉及到Scala编程实现的...

    基于Hadoop的电影影评数据分析

    Map阶段将数据拆分成可处理的小块,Reduce阶段则对Map阶段的结果进行聚合,从而得出最终的分析结果。在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取...

    hadoop map-reduce turorial

    Hadoop Map-Reduce框架是设计用于处理大规模数据集(多太字节级)的软件框架,它允许在大量廉价硬件集群上(可达数千节点)进行并行处理,确保了数据处理的可靠性与容错性。此教程全面介绍了Hadoop Map-Reduce框架的...

    用-Hadoop-进行分布式数据处理

    MapReduce 将数据处理过程分为两个阶段:Map 阶段和 Reduce 阶段。Map 阶段将输入数据分割成小块,并将其分配给多个节点处理。Reduce 阶段将处理结果合并成最终结果。 本文将指导读者如何安装和配置 Hadoop 集群,...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    在大数据处理领域,Hadoop 是一个至关重要的框架,它提供了分布式存储和计算的能力,使得海量数据的处理变得可能。在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨...

    Hadoop分析气象数据完整版代码

    在IT行业中,大数据处理是一项至关重要的任务,而Hadoop作为开源的大数据处理框架,因其高效、可扩展的特性,被广泛应用于气象数据分析等场景。在这个项目中,我们重点关注的是一套完整的Hadoop分析气象数据的代码,...

    基于开源Hadoop的矢量空间数据分布式处理研究.pdf

    实验结果表明,采用Hadoop MapReduce模型的系统能够显著提高分析性能,尤其是在处理需要复杂空间分析任务时,优势更为明显。 本研究对于GIS、遥感数据分析、城市规划等领域的实际应用具有重要的实践意义。通过本...

    数据算法: Hadoop+Spark大数据处理技巧.pdf

    Map阶段将数据集分割为小块,应用函数进行映射,而Reduce阶段则负责对映射结果进行聚合,实现数据的整合。 Spark则是新一代的大数据处理框架,以其快速、通用和可扩展性而闻名。与Hadoop MapReduce相比,Spark采用...

    基于Hadoop的海量数据处理模型研究和应用.pdf

    文章中提到,现有的计算和存储技术在处理海量数据时面临诸多困难,如网络压力大、资源管理复杂等。而Hadoop的分布式特性可以有效缓解这些问题。通过Hadoop的分布式架构,数据可以在不同的节点间并行处理,大大提高了...

    Hadoop之外卖订单数据分析系统

    在这个外卖订单分析系统中,MapReduce负责将订单数据进行拆分、映射和排序,而Reduce阶段则对映射后的数据进行聚合,提取关键信息。 在Hadoop平台上,我们通常会使用Hive或Pig这样的数据仓库工具进行数据预处理和...

    基于Hadoop 的海量日志数据处理

    然而,传统的单机数据处理方法在处理如此大规模数据时,面临明显的存储和计算瓶颈。 为了解决这些问题,文章提出了一种基于开源框架Hadoop的海量日志数据处理方法。Hadoop是一个由Apache软件基金会支持的开源分布式...

Global site tag (gtag.js) - Google Analytics