- 浏览: 194973 次
文章分类
最新评论
import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class wordcount1 extends Configured implements Tool{ public static class mapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException { Map<String, Integer> map = new HashMap<String,Integer>(); String[] ss = value.toString().split(":"); FileSplit fs = (FileSplit)report.getInputSplit(); System.out.println(fs.getPath().toUri().toString()); for(int i=0;i<ss.length;i++){ if(!map.containsKey(ss[i])){ map.put(ss[i], 1); }else{ int tmp = map.get(ss[i])+1; map.put(ss[i], tmp); } } for(Map.Entry<String, Integer> m : map.entrySet()){ System.out.println(m.getKey()+"\t"+m.getValue()); output.collect(new Text(m.getKey()), new IntWritable(m.getValue())); } } } public static class reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text,IntWritable>{ @Override public void reduce(Text key, Iterator<IntWritable> value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException { int sum = 0; while(value.hasNext()){ sum += value.next().get(); } output.collect(key, new IntWritable(sum)); } } @Override public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); JobConf job = new JobConf(conf, wordcount1.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setJobName("test citation"); job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); /*12/04/08 13:56:09 INFO mapred.JobClient: Reduce input groups=4 12/04/08 13:56:09 INFO mapred.JobClient: Combine output records=4 12/04/08 13:56:09 INFO mapred.JobClient: Map input records=4 12/04/08 13:56:09 INFO mapred.JobClient: Reduce shuffle bytes=0 12/04/08 13:56:09 INFO mapred.JobClient: Reduce output records=4 12/04/08 13:56:09 INFO mapred.JobClient: Spilled Records=8 12/04/08 13:56:09 INFO mapred.JobClient: Map output bytes=42 12/04/08 13:56:09 INFO mapred.JobClient: Map input bytes=33 12/04/08 13:56:09 INFO mapred.JobClient: Combine input records=5 12/04/08 13:56:09 INFO mapred.JobClient: Map output records=5 12/04/08 13:56:09 INFO mapred.JobClient: Reduce input records=4 * */ job.setCombinerClass(reducer.class); //job.setNumReduceTasks(2); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); JobClient.runJob(job); return 0; } public static void main(String[] args) { try { System.exit(ToolRunner.run(new Configuration(), new wordcount1(), args)); } catch (Exception e) { e.printStackTrace(); } } } 此例只能在单个map输入key/value对上进行聚集, 比如 value为 huhu xie xie map输出 huhu 1 xie 2 而如果不采用聚集则输出是 huhu 1 xie 1 xie 1 public class wordcount2 { public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Map<String,Integer> map ; @Override protected void setup(Context context) throws IOException, InterruptedException { map = new HashMap<String,Integer>(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); //相当于combiner的工作 for(int i=0;i<ss.length;i++){ if(!map.containsKey(ss[i])){ map.put(ss[i], 1); }else{ int tmp = map.get(ss[i])+1; map.put(ss[i], tmp); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Map.Entry<String, Integer> m : map.entrySet()){ context.write(new Text(m.getKey()), new IntWritable(m.getValue())); } } } public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; while(value.iterator().hasNext()){ sum += value.iterator().next().get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) { try { Job job = new Job(); job.setJarByClass(wordcount2.class); job.setJobName("wordcount2"); FileInputFormat.addInputPath(job, new Path("input")); FileOutputFormat.setOutputPath(job, new Path("output")); job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 此例可以在多个key/value,也可以是不同文件的key/value 进行聚集,起作用相当于Combiner,但是后者只是hadoop的一种优化策略,并不保证其正确性,前者相对后者更灵活控制执行过程 存在一个问题:内存问题,由于这种方法是在处理完所有的文件后才产生map输出,故可能存在内存不足的问题,对于这一个很有效的方法是设定阈值N,达到N就输出,而不是要等到全部处理完成才输出
public class wordcount3 { public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private Map<String,Integer> map ; private int N ; @Override protected void setup(Context context) throws IOException, InterruptedException { map = new HashMap<String,Integer>(); N = 0; } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); N++; //相当于combiner的工作 for(int i=0;i<ss.length;i++){ if(!map.containsKey(ss[i])){ map.put(ss[i], 1); }else{ int tmp = map.get(ss[i])+1; map.put(ss[i], tmp); } } if(N == 2){ for(Map.Entry<String, Integer> m : map.entrySet()){ context.write(new Text(m.getKey()), new IntWritable(m.getValue())); } N = 0; map.clear(); System.out.println("write two key/value"); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //写入最后<=N的 key/value if(map.size()>0){ for(Map.Entry<String, Integer> m : map.entrySet()){ context.write(new Text(m.getKey()), new IntWritable(m.getValue())); } System.out.println("writable last "+ map.size()+ " key/value"); } } } public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; while(value.iterator().hasNext()){ sum += value.iterator().next().get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) { try { Job job = new Job(); job.setJarByClass(wordcount3.class); job.setJobName("wordcount2"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); //job.setCombinerClass(reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
N太大,内存溢出 N太小,聚集性能下降 N的选择很重要
发表评论
-
JDK配置
2012-09-19 14:13 697JAVA_HOME C:\Program Files\ ... -
Java abstract class interface 区别
2012-09-09 23:35 9931.abstract class 表示的是一种继承关系,并 ... -
全组合
2012-08-28 23:10 885package cn.xmu.microsoft; ... -
全排列
2012-08-28 11:35 845private static String[] perm ... -
Java 方法引用
2012-08-12 11:23 788一个对象的引用本质上是一个很强的完整和安全方面约束能力的对象指 ... -
Java 动态绑定 静态绑定
2012-08-12 11:07 823静态绑定:使用编译类型的对象引用 动态绑定:使用运行类型所指 ... -
A Java Runtime Environment (JRE) or Java Development Kit (JDK) must be available
2012-05-26 15:34 958Eclipse 在 Unbutu下双击图标启动会出现上述问题, ... -
Java BitSet
2012-04-13 09:29 930//说明 bs 至少能够装下65bits 大小由系统 ... -
Java 中文编码问题
2012-04-10 22:09 783import java.io.UnsupportedEn ... -
Java 正则表达式
2012-04-08 09:50 892Pattern p = Pattern.compile( ... -
Java 加载属性配置文件
2012-04-07 16:49 681import java.util.MissingReso ... -
Java notify wait
2012-04-06 09:42 1077wait 和 notify 是Object类而非Thread类 ... -
Dangling meta character '*' near index 0
2012-04-05 14:12 1241这个由于是在java中使用split(“”);造成的,在 ... -
Logger
2012-04-05 08:54 792/LogTest/src/log4j.properties ... -
分割字符串时存在多个分隔符
2012-03-20 17:14 915public static void main(Str ... -
Error: could not open `C:Program FilesJavajre6libamd64jvm.cfg'
2012-02-26 18:55 1081重新配置环境变量后出现可以编译,不可以运行... 删除 ... -
JAVA 运行包中的类
2012-02-23 13:41 1089Java中的包是以目录的形式组织的,在java文件中如果指定了 ... -
Data Access Object
2012-02-18 21:24 770DAO:Data Access Object 把项目开发分为 ... -
文件读写问题
2012-02-16 13:36 750'赵','1234' '钱','2345''孙','3456 ... -
JUnit hamcrest匹配器
2012-02-16 12:04 951Hamcrest带有一个有用的匹配器库.以下是一些最重要的. ...
相关推荐
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...
使用Hadoop-Eclipse-Plugin时,建议遵循良好的编程习惯,如合理划分Mapper和Reducer的功能,优化数据处理流程,以及充分利用Hadoop的并行计算能力。同时,及时更新插件至最新版本,以获取最新的功能和修复。 通过...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-auth-2.5.1.jar; 赠送原API文档:hadoop-auth-2.5.1-javadoc.jar; 赠送源代码:hadoop-auth-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-auth-2.5.1.pom; 包含翻译后的API文档:hadoop...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大规模集群上处理海量数据。Hadoop 3.3.0是该框架的一个版本,它带来了许多改进和新特性,旨在提升性能、稳定性和可扩展性。WinUtils是Hadoop在...
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
在这个"apache-hadoop-3.1.3-winutils-master.zip"压缩包中,包含了在Windows环境下配置Hadoop HDFS客户端所需的组件,特别是`hadoop-winutils`和`hadoop.dll`这两个关键文件,它们对于在Windows系统上运行Hadoop...
hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。
在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...
`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....
hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
《Hadoop-eclipse-plugin-2.7.2:在Eclipse中轻松开发Hadoop应用》 在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要...