package com.bjsxt.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 单词统计
* @author tingyu
* @date 2016-02-29 00:44
*/
/**
* KEYIN:一句话或单词的下标
* VALUEIN:输入的VALUE为文本
* KEYOUT: 输出的KEY为文本
* VALUEOUT: 输出为数字
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* 每次调用map方法时会传入split(分片)中一行数据,key为该行数据在分片中的下标位置
*
*/
protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
String line=value.toString();
StringTokenizer st=new StringTokenizer(line); //默认按空格进行切分
while(st.hasMoreTokens()){
String world=st.nextToken();
context.write(new Text(world), new IntWritable(1)); //map输出
}
};
}
package com.bjsxt.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 单词统计
* @author tingyu
* @date 2016-02-29 00:44
*/
/*
* KEYIN: 即map的输出key
* VALUEIN: 即map输出的value
* KEYOUT: 文本
* VALUEOUT: 数值
*/
public class WcReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, java.lang.Iterable<IntWritable> iterable, Context context)
throws java.io.IOException ,InterruptedException {
int sum=0;
for(IntWritable val:iterable){
sum+=val.get();
}
context.write(key, new IntWritable(sum));
};
}
package com.bjsxt.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 单词统计
* @author tingyu
* @date 2016-02-29 00:44
*/
public class JobRun {
public static void main(String[] args) {
Configuration config=new Configuration();
config.set("mapred.job.tracker", "192.168.0.200:9001"); //即hadoop-1.2/conf/mapred-site.xml中的配置
config.set("fs.default.name", "hdfs://192.168.0.200:9000");
//如果本地Eclipse不行,就需要设置jar文件的位置
//config.set("mapred.jar", "C:\\Users\\tingyu\\Desktop\\hadoop\\wordCount.jar");
try {
Job job=new Job(config,"world count");
job.setJarByClass(JobRun.class);
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduceTask的任务数
//job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path("/opt/input/wc"));
FileOutputFormat.setOutputPath(job, new Path("/opt/output/wc"));
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
分享到:
相关推荐
标题 "Hadoop-1.2.1 QQ推荐好友例子" 提到的是一个关于Hadoop的实战案例,专注于在Hadoop框架下实现QQ推荐好友的功能。Hadoop是一个开源的大数据处理框架,由Apache软件基金会开发,它允许分布式存储和处理海量数据...
例如,WordCount是一个经典的MapReduce程序,它统计文本文件中单词出现的次数。这个例子可以帮助初学者理解MapReduce的工作原理,如何定义Map函数来分割和处理数据,以及如何编写Reduce函数来聚合结果。 通过对...
1.2运行WordCount程序是MapReduce编程中一个经典的入门示例,用来统计文本中单词出现的频率。1.2.1准备工作包括在本地创建示例文件并上传至Hadoop分布式文件系统(HDFS)。1.2.2运行例子步骤是在集群上运行WordCount...
- WordCount是入门级别的例子,统计文本文件中单词的频率。 - PiEstimator估算圆周率,展示了分布式计算的能力。 - Sort和TeraSort则是关于大规模数据排序的挑战。 通过深入研究这些示例代码,开发者可以更好地...