- 浏览: 1188929 次
- 性别:
- 来自: 北京
-
文章分类
- 全部博客 (350)
- Ajax研究 (2)
- javascript (22)
- struts (15)
- hibernate (12)
- spring (8)
- 我的生活 (28)
- jsp (2)
- 我的随笔 (84)
- 脑筋急转弯 (1)
- struts2 (2)
- ibatis (1)
- groovy (1)
- json (4)
- flex (20)
- Html Css (5)
- lucene (11)
- solr研究 (2)
- nutch (25)
- ExtJs (3)
- linux (6)
- 正则表达式 (2)
- xml (1)
- jetty (0)
- 多线程 (1)
- hadoop (40)
- mapreduce (5)
- webservice (2)
- 云计算 (8)
- 创业计划 (1)
- android (8)
- jvm内存研究 (1)
- 新闻 (2)
- JPA (1)
- 搜索技术研究 (2)
- perl (1)
- awk (1)
- hive (7)
- jvm (1)
最新评论
-
pandaball:
支持一下,心如大海
做有气质的男人 -
recall992:
山东分公司的风格[color=brown]岁的法国电视[/co ...
solr是如何存储索引的 -
zhangsasa:
-services "services-config ...
flex中endpoint的作用是什么? -
来利强:
非常感谢
java使用json所需要的几个包 -
zhanglian520:
有参考价值。
hadoop部署错误之一:java.lang.IllegalArgumentException: Wrong FS
书上的例子是为了取出一年当中气温最高的值,那么将年份和气温做了一个复合的key.
1 通过设置了partitioner来进行分区。因为分区是按照年份来进行,所以同年的数据就可以分区到一个reducer中。
2 自定义key比较器,按照年份升序,温度值降序。这样map输出的所有kv对就是按照年份升序,温度值降序排列的。
3 自定义分组比较器,所有同一年的数据属于同一个组,那么在reduce输出的时候,只需要取第一个value就能达到输出一年最高气温的目的。
代码:
view plaincopy to clipboardprint?
package temperature;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Temperature {
// 自己定义的key类应该实现WritableComparable接口
public static class IntPair implements WritableComparable<IntPair> {
int first;
int second;
/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
@Override
// 反序列化
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first = in.readInt();
second = in.readInt();
}
@Override
// 序列化
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
@Override
// key的比较
public int compareTo(IntPair o) {
// TODO Auto-generated method stub
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
// 新定义类应该重写的两个方法
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
}
/**
* 分区函数类。根据first确定Partition。
*/
public static class FirstPartitioner extends
Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
/**
* key比较函数类。first升序,second降序。
*/
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
int cmp = (l == r ? 0 : (l < r ? -1 : 1));
if (cmp != 0) {
return cmp;
}
l = ip1.getSecond();
r = ip2.getSecond();
return l == r ? 0 : (l < r ? 1 : -1); // reverse
}
}
/**
* 分组函数类。属于同一个组的value会放到同一个迭代器中,而比较是否是同一组需要使用GroupingComparator比较器。
*/
// 第二种方法,继承WritableComparator
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(IntPair.class, true);
}
@Override
// Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
// 自定义map
public static class Map extends
Mapper<LongWritable, Text, IntPair, NullWritable> {
private final IntPair intkey = new IntPair();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens()) {
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
intkey.set(left, right);
context.write(intkey, NullWritable.get());
}
}
}
// 自定义reduce
//
public static class Reduce extends
Reducer<IntPair, NullWritable, IntWritable, IntWritable> {
private final IntWritable left = new IntWritable();
private final IntWritable right = new IntWritable();
public void reduce(IntPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
left.set(key.getFirst());
right.set(key.getSecond());
context.write(left, right);
}
}
/**
* @param args
*/
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
// 读取hadoop配置
Configuration conf = new Configuration();
// 实例化一道作业
Job job = new Job(conf, "temperature");
job.setJarByClass(Temperature.class);
// Mapper类型
job.setMapperClass(Map.class);
// 不再需要Combiner类型,因为Combiner的输出类型<Text,
// IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
// job.setCombinerClass(Reduce.class);
// Reducer类型
job.setReducerClass(Reduce.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
// key比较函数
job.setSortComparatorClass(KeyComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
// map 输出Key的类型
job.setMapOutputKeyClass(IntPair.class);
// map输出Value的类型
job.setMapOutputValueClass(NullWritable.class);
// rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
job.setOutputKeyClass(IntWritable.class);
// rduce输出Value的类型
job.setOutputValueClass(IntWritable.class);
// 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
job.setInputFormatClass(TextInputFormat.class);
// 提供一个RecordWriter的实现,负责数据输出。
job.setOutputFormatClass(TextOutputFormat.class);
// 输入hdfs路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 输出hdfs路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1 通过设置了partitioner来进行分区。因为分区是按照年份来进行,所以同年的数据就可以分区到一个reducer中。
2 自定义key比较器,按照年份升序,温度值降序。这样map输出的所有kv对就是按照年份升序,温度值降序排列的。
3 自定义分组比较器,所有同一年的数据属于同一个组,那么在reduce输出的时候,只需要取第一个value就能达到输出一年最高气温的目的。
代码:
view plaincopy to clipboardprint?
package temperature;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Temperature {
// 自己定义的key类应该实现WritableComparable接口
public static class IntPair implements WritableComparable<IntPair> {
int first;
int second;
/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
@Override
// 反序列化
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first = in.readInt();
second = in.readInt();
}
@Override
// 序列化
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
@Override
// key的比较
public int compareTo(IntPair o) {
// TODO Auto-generated method stub
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
// 新定义类应该重写的两个方法
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
}
/**
* 分区函数类。根据first确定Partition。
*/
public static class FirstPartitioner extends
Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
/**
* key比较函数类。first升序,second降序。
*/
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
int cmp = (l == r ? 0 : (l < r ? -1 : 1));
if (cmp != 0) {
return cmp;
}
l = ip1.getSecond();
r = ip2.getSecond();
return l == r ? 0 : (l < r ? 1 : -1); // reverse
}
}
/**
* 分组函数类。属于同一个组的value会放到同一个迭代器中,而比较是否是同一组需要使用GroupingComparator比较器。
*/
// 第二种方法,继承WritableComparator
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(IntPair.class, true);
}
@Override
// Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
// 自定义map
public static class Map extends
Mapper<LongWritable, Text, IntPair, NullWritable> {
private final IntPair intkey = new IntPair();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens()) {
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
intkey.set(left, right);
context.write(intkey, NullWritable.get());
}
}
}
// 自定义reduce
//
public static class Reduce extends
Reducer<IntPair, NullWritable, IntWritable, IntWritable> {
private final IntWritable left = new IntWritable();
private final IntWritable right = new IntWritable();
public void reduce(IntPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
left.set(key.getFirst());
right.set(key.getSecond());
context.write(left, right);
}
}
/**
* @param args
*/
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
// 读取hadoop配置
Configuration conf = new Configuration();
// 实例化一道作业
Job job = new Job(conf, "temperature");
job.setJarByClass(Temperature.class);
// Mapper类型
job.setMapperClass(Map.class);
// 不再需要Combiner类型,因为Combiner的输出类型<Text,
// IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
// job.setCombinerClass(Reduce.class);
// Reducer类型
job.setReducerClass(Reduce.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
// key比较函数
job.setSortComparatorClass(KeyComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
// map 输出Key的类型
job.setMapOutputKeyClass(IntPair.class);
// map输出Value的类型
job.setMapOutputValueClass(NullWritable.class);
// rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
job.setOutputKeyClass(IntWritable.class);
// rduce输出Value的类型
job.setOutputValueClass(IntWritable.class);
// 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
job.setInputFormatClass(TextInputFormat.class);
// 提供一个RecordWriter的实现,负责数据输出。
job.setOutputFormatClass(TextOutputFormat.class);
// 输入hdfs路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 输出hdfs路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
评论
2 楼
xingwang.ye
2014-06-18
2了,问错地方了
1 楼
xingwang.ye
2014-06-18
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
---------------
请教一下,这儿的127是什么意思?
------------------------
public int hashCode() {
return first * 157 + second;
}
----------
157又是何意?
@Override
public int getPartition(IntPair key, IntWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
---------------
请教一下,这儿的127是什么意思?
------------------------
public int hashCode() {
return first * 157 + second;
}
----------
157又是何意?
发表评论
-
Java并发编程总结---Hadoop核心源码实例解读
2012-04-01 15:46 2213程序设计需要同步(synchronization),原因:1) ... -
使用hadoop的lzo问题!
2011-08-24 17:12 2651使用lzo压缩替换hadoop原始的Gzip压缩。相比之下有如 ... -
secondarynamenode配置使用总结
2011-07-07 08:37 7573一、环境 Hadoop 0.20.2、JDK 1.6、Lin ... -
Map/Reduce中的Combiner的使用
2011-07-07 08:36 4775一、作用 1、combiner最基本是实现本地key的聚合, ... -
Map/Reduce中的Partiotioner使用
2011-07-07 08:35 1883一、环境 1、hadoop 0.20.2 2、操作系统Li ... -
hadoop如何添加节点
2011-07-06 12:43 15031.部署hadoop 和普通的datanode一样。安装 ... -
hadoop如何恢复namenode
2011-07-06 12:36 8551Namenode恢复 1.修改conf/core-site.x ... -
Hadoop删除节点(Decommissioning Nodes)
2011-07-06 11:52 25911.集群配置 修改conf/hdfs-site.xml ... -
hadoop知识点整理
2011-07-06 11:51 26881. Hadoop 是什么? Hadoop 是一种使用 Ja ... -
喜欢hadoop的同学们值得一看
2011-07-03 15:50 2032海量数据正在不断生成,对于急需改变自己传统IT架构的企业而 ... -
hadoop优化
2011-07-03 15:43 1349一. conf/hadoop-site.xml配置, 略过. ... -
hadoop分配任务的问题
2011-05-16 23:09 5请教大家一个关于hadoop分配任务的问题: 1、根据机器 ... -
hadoop-FAQ
2011-05-15 11:38 736hadoop基础,挺详细的。希望对大家有用! -
Apache Hadoop 0.21版本新功能ChangeNode
2011-04-21 22:04 2016Apache Hadoop 0.21.0 在2010年8月23 ... -
Hadoop关于处理大量小文件的问题和解决方法
2011-04-21 11:07 2531小文件指的是那些size比 ... -
hadoop常见错误及解决办法!
2011-04-07 12:18 96496转: 1:Shuffle Error: Exceede ... -
Hadoop节点热拔插
2011-04-07 12:16 1644转 : 一、 Hadoop节点热 ... -
hadoop动态添加节点
2011-04-07 12:14 2024转: 有的时候, datanode或者tasktrac ... -
欢迎大家讨论hadoop性能优化
2011-04-06 15:42 1319大家知道hadoop这家伙是非常吃内存的。除了加内存哦! 如 ... -
hadoop错误之二:could only be replicated to 0 nodes, instead of 1
2011-02-22 08:23 2370WARN hdfs.DFSClient: NotReplic ...
相关推荐
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----...
hadoop-mapreduce-examples-2.7.1.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
01.mapreduce编程模型--及hadoop中的具体实现框架--复习.mp4
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来...HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
13.mapreduce编程案例--流量统计安归属地输出--代码实现--自定义Partitioner的实现.mp4
hadoop-mapreduce-examples-2.0.0-alpha.jar
华为MRS产品文档