`

Hadoop in-mapper combining 实例

    博客分类:
  • JAVA
 
阅读更多

 

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;




public class wordcount1 extends Configured implements Tool{

	public static class mapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter report)
				throws IOException {
			Map<String, Integer> map = new HashMap<String,Integer>();
			String[] ss = value.toString().split(":");
			
			FileSplit fs = (FileSplit)report.getInputSplit();
			
			System.out.println(fs.getPath().toUri().toString());
			
			for(int i=0;i<ss.length;i++){
				if(!map.containsKey(ss[i])){
					map.put(ss[i], 1);
				}else{
					int tmp = map.get(ss[i])+1;
					map.put(ss[i], tmp);
				}
			}
			
			for(Map.Entry<String, Integer> m : map.entrySet()){
				System.out.println(m.getKey()+"\t"+m.getValue());
				output.collect(new Text(m.getKey()), new IntWritable(m.getValue()));
			}
		}
		
	}
	
	public static class reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text,IntWritable>{

		@Override
		public void reduce(Text key, Iterator<IntWritable> value,
				OutputCollector<Text, IntWritable> output, Reporter report)
				throws IOException {
			int sum = 0;
			while(value.hasNext()){
				sum += value.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
		
	}

	@Override
	public int run(String[] arg0) throws Exception {

		Configuration conf = new Configuration();
		
		JobConf job = new JobConf(conf, wordcount1.class);
		
		FileInputFormat.addInputPath(job, new Path(arg0[0]));
		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
		
		job.setJobName("test citation");
		job.setMapperClass(mapper.class);
		job.setReducerClass(reducer.class);
		/*12/04/08 13:56:09 INFO mapred.JobClient:     Reduce input groups=4
		12/04/08 13:56:09 INFO mapred.JobClient:     Combine output records=4
		12/04/08 13:56:09 INFO mapred.JobClient:     Map input records=4
		12/04/08 13:56:09 INFO mapred.JobClient:     Reduce shuffle bytes=0
		12/04/08 13:56:09 INFO mapred.JobClient:     Reduce output records=4
		12/04/08 13:56:09 INFO mapred.JobClient:     Spilled Records=8
		12/04/08 13:56:09 INFO mapred.JobClient:     Map output bytes=42
		12/04/08 13:56:09 INFO mapred.JobClient:     Map input bytes=33
		12/04/08 13:56:09 INFO mapred.JobClient:     Combine input records=5
		12/04/08 13:56:09 INFO mapred.JobClient:     Map output records=5
		12/04/08 13:56:09 INFO mapred.JobClient:     Reduce input records=4
		 * */
		job.setCombinerClass(reducer.class);
		//job.setNumReduceTasks(2);
		
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		JobClient.runJob(job);
		
		return 0;
	}
	
	public static void main(String[] args) {
		try {
			System.exit(ToolRunner.run(new Configuration(), new wordcount1(), args));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

此例只能在单个map输入key/value对上进行聚集,
比如 value为 huhu xie xie    map输出 huhu 1  xie 2
而如果不采用聚集则输出是 huhu 1 xie 1 xie 1


public class wordcount2 {

	public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		private  Map<String,Integer> map ;
		
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			map = new HashMap<String,Integer>();
		}	

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			//相当于combiner的工作
			for(int i=0;i<ss.length;i++){
				if(!map.containsKey(ss[i])){
					map.put(ss[i], 1);
				}else{
					int tmp = map.get(ss[i])+1;
					map.put(ss[i], tmp);
				}
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			for(Map.Entry<String, Integer> m : map.entrySet()){
				context.write(new Text(m.getKey()), new IntWritable(m.getValue()));
			}
		}
	}
	
	public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable>{

		@Override
		protected void reduce(Text key, Iterable<IntWritable> value,
				Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			while(value.iterator().hasNext()){
				sum += value.iterator().next().get();
			}
			context.write(key, new IntWritable(sum));
		}
	}


	
	public static void main(String[] args) {
		
		try {
			Job job = new Job();
			job.setJarByClass(wordcount2.class);
			job.setJobName("wordcount2");
			
			FileInputFormat.addInputPath(job, new Path("input"));
			FileOutputFormat.setOutputPath(job, new Path("output"));
			
			job.setMapperClass(mapper.class);
			job.setReducerClass(reducer.class);
			
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			System.exit( job.waitForCompletion(true) ? 0 : 1 );
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

此例可以在多个key/value,也可以是不同文件的key/value 进行聚集,起作用相当于Combiner,但是后者只是hadoop的一种优化策略,并不保证其正确性,前者相对后者更灵活控制执行过程

存在一个问题:内存问题,由于这种方法是在处理完所有的文件后才产生map输出,故可能存在内存不足的问题,对于这一个很有效的方法是设定阈值N,达到N就输出,而不是要等到全部处理完成才输出

public class wordcount3 {

	public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		private  Map<String,Integer> map ;
		private int N ;
		
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			map = new HashMap<String,Integer>();
			N = 0;
		}	

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			N++;
			//相当于combiner的工作
			for(int i=0;i<ss.length;i++){
				if(!map.containsKey(ss[i])){
					map.put(ss[i], 1);
				}else{
					int tmp = map.get(ss[i])+1;
					map.put(ss[i], tmp);
				}
			}
			
			if(N == 2){
				for(Map.Entry<String, Integer> m : map.entrySet()){
					context.write(new Text(m.getKey()), new IntWritable(m.getValue()));
				}
				N = 0;
				map.clear();
				System.out.println("write two key/value");
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			//写入最后<=N的 key/value
			if(map.size()>0){
				for(Map.Entry<String, Integer> m : map.entrySet()){
					context.write(new Text(m.getKey()), new IntWritable(m.getValue()));
				}
				System.out.println("writable last "+ map.size()+ " key/value");
			}
		}
	}
	
	public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable>{

		@Override
		protected void reduce(Text key, Iterable<IntWritable> value,
				Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			while(value.iterator().hasNext()){
				sum += value.iterator().next().get();
			}
			context.write(key, new IntWritable(sum));
		}
	}


	
	public static void main(String[] args) {
		
		try {
			Job job = new Job();
			job.setJarByClass(wordcount3.class);
			job.setJobName("wordcount2");
			
			FileInputFormat.addInputPath(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			
			job.setMapperClass(mapper.class);
			job.setReducerClass(reducer.class);
			//job.setCombinerClass(reducer.class);
			
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			System.exit( job.waitForCompletion(true) ? 0 : 1 );
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}
N太大,内存溢出 N太小,聚集性能下降    N的选择很重要

分享到:
评论

相关推荐

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-eclipse-plugin1.2.1 and hadoop-eclipse-plugin2.8.0

    为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    使用Hadoop-Eclipse-Plugin时,建议遵循良好的编程习惯,如合理划分Mapper和Reducer的功能,优化数据处理流程,以及充分利用Hadoop的并行计算能力。同时,及时更新插件至最新版本,以获取最新的功能和修复。 通过...

    hadoop插件apache-hadoop-3.1.0-winutils-master.zip

    标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...

    hadoop-yarn-client-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...

    hadoop2.6-common-bin.zip

    标题 "hadoop2.6-common-bin.zip" 指示这是一个包含Hadoop 2.6版本通用二进制文件的压缩包。这个压缩包主要针对Windows用户,旨在解决在该操作系统上运行Hadoop时可能遇到的"Could not locate executable"错误。这个...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

    hadoop-auth-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-auth-2.5.1.jar; 赠送原API文档:hadoop-auth-2.5.1-javadoc.jar; 赠送源代码:hadoop-auth-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-auth-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-eclipse-plugin-2.7.3和2.7.7

    hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包

    hadoop3.3.0-winutils所有bin文件

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大规模集群上处理海量数据。Hadoop 3.3.0是该框架的一个版本,它带来了许多改进和新特性,旨在提升性能、稳定性和可扩展性。WinUtils是Hadoop在...

    hadoop-eclipse-plugin-2.10.0.jar

    Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...

    apache-hadoop-3.1.3-winutils-master.zip

    在这个"apache-hadoop-3.1.3-winutils-master.zip"压缩包中,包含了在Windows环境下配置Hadoop HDFS客户端所需的组件,特别是`hadoop-winutils`和`hadoop.dll`这两个关键文件,它们对于在Windows系统上运行Hadoop...

    hadoop-eclipse-plugin三个版本的插件都在这里了。

    hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...

    hadoop-common-2.6.0-bin-master.zip

    `hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...

    hadoop-yarn-common-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...

    好用hadoop-eclipse-plugin-1.2.1

    hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....

    Hadoop-eclipse-plugin-2.7.2

    《Hadoop-eclipse-plugin-2.7.2:在Eclipse中轻松开发Hadoop应用》 在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要...

    hadoop-eclipse-plugin-3.1.2.jar

    在eclipse中搭建hadoop环境,需要安装hadoop-eclipse-pulgin的插件,根据hadoop的版本对应jar包的版本,此为hadoop3.1.2版本的插件。

Global site tag (gtag.js) - Google Analytics