有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,
下面,先来看下散仙的测试数据:
- 中国;我们
- 美国;他们
- 中国;123
- 中国人;善良
- 美国;USA
- 美国;在北美洲
中国;我们 美国;他们 中国;123 中国人;善良 美国;USA 美国;在北美洲
输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:
- package com.partition.test;
- import java.io.IOException;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
- import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import com.qin.operadb.PersonRecoder;
- import com.qin.operadb.ReadMapDB;
- /***
- * @author qindongliang
- *
- * 大数据技术交流群:324714439
- * **/
- public class TestMultiOutput {
- /**
- * map任务
- *
- * **/
- public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- String ss[]=value.toString().split(";");
- context.write(new Text(ss[0]), new Text(ss[1]));
- }
- }
- public static class PReduce extends Reducer<Text, Text, Text, Text>{
- /**
- * 设置多个文件输出
- * */
- private MultipleOutputs mos;
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- mos=new MultipleOutputs(context);//初始化mos
- }
- @Override
- protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
- throws IOException, InterruptedException {
- String key=arg0.toString();
- for(Text t:arg1){
- if(key.equals("中国")){
- /**
- * 一个参数
- * **/
- mos.write("china", arg0,t);
- } else if(key.equals("美国")){
- mos.write("USA", arg0,t);
- } else if(key.equals("中国人")){
- mos.write("cperson", arg0,t);
- }
- //System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
- }
- }
- @Override
- protected void cleanup(
- Context context)
- throws IOException, InterruptedException {
- mos.close();//释放资源
- }
- }
- public static void main(String[] args) throws Exception{
- JobConf conf=new JobConf(ReadMapDB.class);
- //Configuration conf=new Configuration();
- // conf.set("mapred.job.tracker","192.168.75.130:9001");
- //读取person中的数据字段
- // conf.setJar("tt.jar");
- //注意这行代码放在最前面,进行初始化,否则会报
- /**Job任务**/
- Job job=new Job(conf, "testpartion");
- job.setJarByClass(TestMultiOutput.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- // job.setCombinerClass(PCombine.class);
- //job.setPartitionerClass(PPartition.class);
- //job.setNumReduceTasks(5);
- job.setMapperClass(PMapper.class);
- /**
- * 注意在初始化时需要设置输出文件的名
- * 另外名称,不支持中文名,仅支持英文字符
- *
- * **/
- MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
- MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
- MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
- job.setReducerClass(PReduce.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- String path="hdfs://192.168.75.130:9000/root/outputdb";
- FileSystem fs=FileSystem.get(conf);
- Path p=new Path(path);
- if(fs.exists(p)){
- fs.delete(p, true);
- System.out.println("输出路径存在,已删除!");
- }
- FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
- FileOutputFormat.setOutputPath(job,p );
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
package com.partition.test; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /*** * @author qindongliang * * 大数据技术交流群:324714439 * **/ public class TestMultiOutput { /** * map任务 * * **/ public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String ss[]=value.toString().split(";"); context.write(new Text(ss[0]), new Text(ss[1])); } } public static class PReduce extends Reducer<Text, Text, Text, Text>{ /** * 设置多个文件输出 * */ private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos=new MultipleOutputs(context);//初始化mos } @Override protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { String key=arg0.toString(); for(Text t:arg1){ if(key.equals("中国")){ /** * 一个参数 * **/ mos.write("china", arg0,t); } else if(key.equals("美国")){ mos.write("USA", arg0,t); } else if(key.equals("中国人")){ mos.write("cperson", arg0,t); } //System.out.println("Reduce: "+arg0.toString()+" "+t.toString()); } } @Override protected void cleanup( Context context) throws IOException, InterruptedException { mos.close();//释放资源 } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); // conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 // conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(TestMultiOutput.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); //job.setPartitionerClass(PPartition.class); //job.setNumReduceTasks(5); job.setMapperClass(PMapper.class); /** * 注意在初始化时需要设置输出文件的名 * 另外名称,不支持中文名,仅支持英文字符 * * **/ MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class); job.setReducerClass(PReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
如果是中文的路径名,则会报如下的一个异常:
- 模式: local
- 输出路径存在,已删除!
- WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001
- INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks
- INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0
- INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null
- INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91
- INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100
- INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720
- INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680
- INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output
- INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0
- INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting
- INFO - LocalJobRunner$Job.statusUpdate(466) |
- INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.
- INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0
- INFO - LocalJobRunner$Job.run(348) | Map task executor complete.
- INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null
- INFO - LocalJobRunner$Job.statusUpdate(466) |
- INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments
- INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes
- INFO - LocalJobRunner$Job.statusUpdate(466) |
- WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001
- java.lang.IllegalArgumentException: Name cannot be have a '一' char
- at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)
- at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)
- at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)
- at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)
- at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)
- at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)
- at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
- at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
- at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
- at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001
- INFO - Counters.log(585) | Counters: 17
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=91
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=177
- INFO - Counters.log(589) | HDFS_BYTES_READ=91
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=105
- INFO - Counters.log(589) | Map input records=6
- INFO - Counters.log(589) | Reduce shuffle bytes=0
- INFO - Counters.log(589) | Spilled Records=6
- INFO - Counters.log(589) | Map output bytes=87
- INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=112
- INFO - Counters.log(589) | Reduce input records=0
- INFO - Counters.log(589) | Reduce input groups=0
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Reduce output records=0
- INFO - Counters.log(589) | Map output records=6
模式: local 输出路径存在,已删除! WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001 INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0 INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91 INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100 INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720 INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680 INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0 INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0 INFO - LocalJobRunner$Job.run(348) | Map task executor complete. INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes INFO - LocalJobRunner$Job.statusUpdate(466) | WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001 java.lang.IllegalArgumentException: Name cannot be have a '一' char at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398) INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001 INFO - Counters.log(585) | Counters: 17 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=177 INFO - Counters.log(589) | HDFS_BYTES_READ=91 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=0 INFO - Counters.log(589) | Spilled Records=6 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=0 INFO - Counters.log(589) | Reduce input groups=0 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Map output records=6
源码中关于名称的校验如下:
- /**
- * Checks if a named output name is valid token.
- *
- * @param namedOutput named output Name
- * @throws IllegalArgumentException if the output name is not valid.
- */
- private static void checkTokenName(String namedOutput) {
- if (namedOutput == null || namedOutput.length() == 0) {
- throw new IllegalArgumentException(
- "Name cannot be NULL or emtpy");
- }
- for (char ch : namedOutput.toCharArray()) {
- if ((ch >= 'A') && (ch <= 'Z')) {
- continue;
- }
- if ((ch >= 'a') && (ch <= 'z')) {
- continue;
- }
- if ((ch >= '0') && (ch <= '9')) {
- continue;
- }
- throw new IllegalArgumentException(
- "Name cannot be have a '" + ch + "' char");
- }
- }
/** * Checks if a named output name is valid token. * * @param namedOutput named output Name * @throws IllegalArgumentException if the output name is not valid. */ private static void checkTokenName(String namedOutput) { if (namedOutput == null || namedOutput.length() == 0) { throw new IllegalArgumentException( "Name cannot be NULL or emtpy"); } for (char ch : namedOutput.toCharArray()) { if ((ch >= 'A') && (ch <= 'Z')) { continue; } if ((ch >= 'a') && (ch <= 'z')) { continue; } if ((ch >= '0') && (ch <= '9')) { continue; } throw new IllegalArgumentException( "Name cannot be have a '" + ch + "' char"); } }
程序运行成功输出:
- 模式: 192.168.75.130:9001
- 输出路径存在,已删除!
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
- WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006
- INFO - Counters.log(585) | Counters: 29
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | Launched reduce tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9289
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=1
- INFO - Counters.log(589) | Data-local map tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=13645
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=0
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=105
- INFO - Counters.log(589) | HDFS_BYTES_READ=203
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=113616
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=87
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=91
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=105
- INFO - Counters.log(589) | Map input records=6
- INFO - Counters.log(589) | Reduce shuffle bytes=105
- INFO - Counters.log(589) | Spilled Records=12
- INFO - Counters.log(589) | Map output bytes=87
- INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792
- INFO - Counters.log(589) | CPU time spent (ms)=1880
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=112
- INFO - Counters.log(589) | Reduce input records=6
- INFO - Counters.log(589) | Reduce input groups=3
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=278876160
- INFO - Counters.log(589) | Reduce output records=0
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460908032
- INFO - Counters.log(589) | Map output records=6
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9289 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=13645 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=0 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=105 INFO - Counters.log(589) | HDFS_BYTES_READ=203 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=113616 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=87 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=105 INFO - Counters.log(589) | Spilled Records=12 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=1880 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=6 INFO - Counters.log(589) | Reduce input groups=3 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=278876160 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460908032 INFO - Counters.log(589) | Map output records=6
运行成功后,生成的文件如下所示:
china-r-00000里面的数据如下:
- 中国 我们
- 中国 123
中国 我们 中国 123
USA-r-00000里面的数据如下:
- 美国 他们
- 美国 USA
- 美国 在北美洲
美国 他们 美国 USA 美国 在北美洲
cperson-r-00000里面的数据如下:
- 中国人 善良
中国人 善良
在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。
相关推荐
《Hadoop大数据处理实战》是康开锋撰写的一本深入探讨Hadoop技术的书籍,旨在帮助读者理解并掌握Hadoop在大数据处理中的实际应用。这本书的内容涵盖了Hadoop生态系统的核心组件,以及如何利用这些组件解决实际问题。...
MapReduce则是一种编程模型,用于大规模数据集的并行计算,其工作原理包括Map阶段(数据分片和处理)和Reduce阶段(汇总结果)。 构建数据仓库在Hadoop上,通常涉及以下几个步骤: 1. 数据获取:这是数据仓库生命...
使用Hadoop进行数据分析是一个涉及多个步骤的过程,特别是在处理大规模数据集时。以下是一个基本的步骤指南,帮助你使用Hadoop进行数据分析: 1. 环境搭建 安装Hadoop:在集群上安装Hadoop,并配置HDFS(Hadoop ...
Map阶段将数据分解并并行处理,Reduce阶段则对结果进行聚合。 4. **YARN**:Yet Another Resource Negotiator(YARN)是Hadoop的资源管理系统,负责调度集群中的计算资源,确保高效运行MapReduce任务和其他计算框架...
在执行Hadoop数据迁移时,Hadoop集群中的MapReduce作业会利用JDBC驱动与Oracle数据库建立连接,通过执行SQL查询来检索数据,并使用Hadoop的序列化机制将数据写入到HDFS。这样,Oracle中的数据就成功地迁移到了Hadoop...
在进行Hadoop数据分析时,我们通常会经历以下步骤: 1. **数据摄入**:使用Flume或类似的工具将数据从各种来源引入Hadoop集群。 2. **数据存储**:利用HDFS存储原始数据。 3. **数据清洗**:使用MapReduce或Pig等...
在大数据处理领域,Hadoop和MapReduce是两个至关重要的概念,它们构成了大数据处理的基础框架。本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一...
本案例旨在介绍如何使用Hadoop这一强大的大数据处理工具来进行数据分析,具体场景设定在一个在线零售商的销售数据上。该数据包含了订单信息、产品信息以及客户信息。目标是对这些数据进行深入分析,以便于了解哪些...
Reduce阶段,将相同键值的数据聚合在一起,进行总结和汇总,最终输出结果。 四、Hive与数据分析 Hive是基于Hadoop的数据仓库工具,它允许用户使用SQL-like语法对大数据进行查询和分析。在淘宝网中,Hive用于构建...
在大数据处理领域,Hadoop和Spark是两个关键的框架,它们提供了高效的数据处理能力。这份“数据算法--HadoopSpark大数据处理技巧”文档显然探讨了如何利用这两个工具进行复杂的数据操作,具体涉及到Scala编程实现的...
Map阶段将数据拆分成可处理的小块,Reduce阶段则对Map阶段的结果进行聚合,从而得出最终的分析结果。在这个过程中,我们可以编写Java程序,利用Hadoop API来实现数据处理逻辑,例如计算平均分、统计评分分布和提取...
Hadoop Map-Reduce框架是设计用于处理大规模数据集(多太字节级)的软件框架,它允许在大量廉价硬件集群上(可达数千节点)进行并行处理,确保了数据处理的可靠性与容错性。此教程全面介绍了Hadoop Map-Reduce框架的...
MapReduce 将数据处理过程分为两个阶段:Map 阶段和 Reduce 阶段。Map 阶段将输入数据分割成小块,并将其分配给多个节点处理。Reduce 阶段将处理结果合并成最终结果。 本文将指导读者如何安装和配置 Hadoop 集群,...
在大数据处理领域,Hadoop 是一个至关重要的框架,它提供了分布式存储和计算的能力,使得海量数据的处理变得可能。在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨...
在IT行业中,大数据处理是一项至关重要的任务,而Hadoop作为开源的大数据处理框架,因其高效、可扩展的特性,被广泛应用于气象数据分析等场景。在这个项目中,我们重点关注的是一套完整的Hadoop分析气象数据的代码,...
实验结果表明,采用Hadoop MapReduce模型的系统能够显著提高分析性能,尤其是在处理需要复杂空间分析任务时,优势更为明显。 本研究对于GIS、遥感数据分析、城市规划等领域的实际应用具有重要的实践意义。通过本...
Map阶段将数据集分割为小块,应用函数进行映射,而Reduce阶段则负责对映射结果进行聚合,实现数据的整合。 Spark则是新一代的大数据处理框架,以其快速、通用和可扩展性而闻名。与Hadoop MapReduce相比,Spark采用...
文章中提到,现有的计算和存储技术在处理海量数据时面临诸多困难,如网络压力大、资源管理复杂等。而Hadoop的分布式特性可以有效缓解这些问题。通过Hadoop的分布式架构,数据可以在不同的节点间并行处理,大大提高了...
在这个外卖订单分析系统中,MapReduce负责将订单数据进行拆分、映射和排序,而Reduce阶段则对映射后的数据进行聚合,提取关键信息。 在Hadoop平台上,我们通常会使用Hive或Pig这样的数据仓库工具进行数据预处理和...
然而,传统的单机数据处理方法在处理如此大规模数据时,面临明显的存储和计算瓶颈。 为了解决这些问题,文章提出了一种基于开源框架Hadoop的海量日志数据处理方法。Hadoop是一个由Apache软件基金会支持的开源分布式...