`

MapReduce

阅读更多
WordCount
统计文本单词的数量
源文件内容:
word.txt
hello world
hello hadoop
hello 1606
hello world

一、Map 处理

1.在 hdfs 上新增 path -- wordCount , 上传 word.txt

选中 hadoop 连接 --> 右键 -->
新增path :create new dictory
上传文件 :upload file

2.新建 MapReduce 工程



3.新增 Mapper.java

package com.study.wordcount.day01;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text,LongWritable, Text> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(key, value);
	}
}



4.新增 WordCountDriver

package com.study.wordcount.day01;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		// 设置Job运行的类
		job.setJarByClass(WordCountDriver.class);
		
		job.setMapperClass(WordCountMapper.class);
		// 设置Mappre 组件输出的KEY的类型
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		
		FileInputFormat.setInputPaths(job, new Path("/wordCount/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordCount/result"));
	
		job.waitForCompletion(true);
	}

}




5.导出 jar 包

export --> jar file -->




不勾选配置文件




选择运行入口


6.上传文件到 linux 上

7.运行

hadoop jar wordCount.java

8.运行结果



9.异常处理




解决:
hdfs-site.xml 与 core-site.xml 中 需要配置数据路径且需要一致

http://blog.sina.com.cn/s/blog_61d8d9640102whof.html

10.结果分析

Path : wordCount/result 中生成的文件

0 hello world
13 hello hadoop
27 hello 1606
39 hello world

相比与源文件,多了一行 key 值,key 是字符串的偏移量
对应Map 的 key 与 value 的格式,key 是 LongWritable 类型 ,value 是 Text 类型

二、Reducer

1.未统计单词数量,欲将输出结果改为

hello world 1
hello hadoop 1
hello 1606 1
hello world 1

即:将原来的value 作为 key ,也就是单词作为 key 处理

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {

		String line = value.toString();
		String [] datas = line.split(" ");
		for(String word : datas){
			context.write(new Text(word), new IntWritable(1));
		}
	}
}


		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		// 设置Job运行的类
		job.setJarByClass(WordCountDriver.class);
		
		job.setMapperClass(WordCountMapper.class);
		// 设置Mappre 组件输出的KEY的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		
		FileInputFormat.setInputPaths(job, new Path("/wordCount/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordCount/result1"));
	
		job.waitForCompletion(true);



修改 Key 与value的类型,并修改输出路径,否则,报错:路径已存在的异常

2.
导出 jar 包 、 上传 jar 包并运行 hadoop jar wordCount.jar

再次处理后的结果为:

1606 1
hadoop 1
hello 1
hello 1
hello 1
hello 1
world 1
world 1




3.添加 reducer 处理

package com.study.wordcount.day01;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 
 * Map组件可独立运行,Reducer组件需在Map基础上运行
 * 
 * 独立运行 Map ,结果为 Map 组件输出结果
 * 添加reducer 后,结果 reducer 处理的结果
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, Text> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, Text>.Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();
		for(IntWritable intw : values){
			sb.append(",").append(intw.get());
		}
		context.write(key, new Text(sb.toString()));
		
	}
}




public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		// 设置Job运行的类
		job.setJarByClass(WordCountDriver.class);
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 设置Mappre 组件输出的KEY的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 设置 reducer 的key 与 value 
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job, new Path("/wordCount/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordCount/result2"));
	
		job.waitForCompletion(true);
	}

}


导出 jar 包 ,上传 并 运行

reducer 处理结果为:

1606 ,1
hadoop ,1
hello ,1,1,1,1
world ,1,1

即:reducer 将 mapper 中相同的 key 的value进行了合并

4.统计单词数量

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text key , Iterable<IntWritable> value,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
	
		Integer result = 0 ;
		for(IntWritable intw : value){
			result = result + intw.get();
		}
		context.write(key, new IntWritable(result));
	}
	
}


public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		// 设置Job运行的类
		job.setJarByClass(WordCountDriver.class);
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 设置Mappre 组件输出的KEY的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 设置 reducer 的key 与 value 
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/wordCount/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/wordCount/result3"));
	
		job.waitForCompletion(true);
	}

}



配置文件 log4j.properties

###\u8BBE\u7F6E ###
log4j.rootLogger = info,stdout
###\u8F93\u51FA\u4FE1\u606F\u5230\u63A7\u5236\u62AC ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n



右键,选择 WordCountDriver
run as hadoop
无需上传 jar包 ,在本地客户端运行

运行结果:

1606 1
hadoop 1
hello 4
world 2


三、总结

Map:
将 TXT 中的文件按照 key 与 value 的格式输出

Reduce
将 Map 传递过来的数据,按照 key 值相同,进行 value 数据的整合

Mapper.java 中  value 的 类型 与
Reducer.java 中 value 的类型 要对应
  • 大小: 17.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics