- 浏览: 578073 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.
在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.
用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。
package redpoll.examples; import org.apache.hadoop.io.GenericWritable; import org.apache.hadoop.io.Writable; /** * Generic Writable class for terms. * @author Jeremy Chow(coderplay@gmail.com) */ public class TermWritable extends GenericWritable { private static Class<? extends Writable>[] CLASSES = null; static { CLASSES = (Class<? extends Writable>[]) new Class[] { org.apache.hadoop.io.ArrayWritable.class, org.apache.hadoop.io.IntWritable.class, redpoll.examples.TFWritable.class }; } public TermWritable() { } public TermWritable(Writable instance) { set(instance); } @Override protected Class<? extends Writable>[] getTypes() { return CLASSES; } }
Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。
package redpoll.examples; import java.io.IOException; import java.io.StringReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Token; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.standard.StandardAnalyzer; /** * A class provides for doing words segmenation and counting term TFs and DFs.<p> * in: key is document id, value is a document instance. <br> * output: * <li>key is term, value is a <documentId, tf> pair</li> * <li>key is term, value is document frequency corresponsing to the key</li> * @author Jeremy Chow(coderplay@gmail.com) */ public class TermMapper extends MapReduceBase implements Mapper<LongWritable, Document, Text, TermWritable> { private static final Log log = LogFactory.getLog(TermMapper.class .getName()); /* analyzer for words segmentation */ private Analyzer analyzer = null; /* frequency weight for document title */ private IntWritable titleWeight = new IntWritable(2); /* frequency weight for document content */ private IntWritable contentWeight = new IntWritable(1); public void map(LongWritable key, Document value, OutputCollector<Text, TermWritable> output, Reporter reporter) throws IOException { doMap(key, value.getTitle(), titleWeight, output, reporter); doMap(key, value.getContent(), contentWeight, output, reporter); } private void doMap(LongWritable key, String value, IntWritable weight, OutputCollector<Text, TermWritable> output, Reporter reporter) throws IOException { // do words segmentation TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value)); Token token = new Token(); while ((token = ts.next(token)) != null) { String termString = new String(token.termBuffer(), 0, token.termLength()); Text term = new Text(termString); // <term, <documentId,tf>> TFWritable tf = new TFWritable(key, weight); output.collect(term, new TermWritable(tf)); // wrap then collect // <term, weight> output.collect(term, new TermWritable(weight)); // wrap then collect } } @Override public void configure(JobConf job) { String analyzerName = job.get("redpoll.text.analyzer"); try { if (analyzerName != null) analyzer = (Analyzer) Class.forName(analyzerName).newInstance(); } catch (Exception excp) { excp.printStackTrace(); } if (analyzer == null) analyzer = new StandardAnalyzer(); } }
Reduce如果想获取数据,则可以解包(unwrap)它:
package redpoll.examples; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * Form a tf vector and caculate the df for terms. * @author Jeremy Chow(coderplay@gmail.com) */ public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> { private static final Log log = LogFactory.getLog(TermReducer.class.getName()); public void reduce(Text key, Iterator<TermWritable> values, OutputCollector<Text, Writable> output, Reporter reporter) throws IOException { ArrayList<TFWritable> tfs = new ArrayList<TFWritable>(); int sum = 0; // log.info("term:" + key.toString()); while (values.hasNext()) { Writable value = values.next().get(); // unwrap if (value instanceof TFWritable) { tfs.add((TFWritable) value ); }else { sum += ((IntWritable) value).get(); } } TFWritable writables[] = new TFWritable[tfs.size()]; ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables)); // wrap again output.collect(key, new TermWritable(aw)); output.collect(key, new TermWritable(new IntWritable(sum))); } }
这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。
评论
6 楼
qingzew
2014-05-22
请问如果是map的输出中一个key有多个value值该怎么办
5 楼
javalive20120108
2012-06-20
回答3楼的问题:
在map里
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
在map里
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
4 楼
riddle_chen
2009-05-27
you_laner 写道
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
MultipleOutputFormat可以让你根据不同的key把汇总好的value保存不同的文件中,至于在后续任务中加入输入文件只要使用FileInputFormat.setInputPaths即可。
3 楼
you_laner
2009-05-20
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。
我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
2 楼
shuchaoo
2009-05-05
不错,GenericWritable的应用!没看出来你这个词频是怎么计算的,有combiner?
1 楼
yawl
2008-10-30
看来EC2这种scale好的平台还是最适合了,反正一台机器跑10个小时和10台机器跑1个小时都是$1 (small instance)
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8206把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6199很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9116点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4224花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7459前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7979LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3777MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3281hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 5030先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5126对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3775avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3842偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3748很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10903某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 51424月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4347这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 44401. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析之MapReduce(二)
2009-01-18 22:14 8606任务的申请、派发与执行 TaskTracker.run() ... -
hadoop源码分析
2008-12-26 15:37 4951blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ...
相关推荐
在Hadoop生态系统中,MapReduce是一种分布式编程模型,主要用于处理和生成大数据集。它通过将大规模数据分割成小块,然后在多台机器上并行处理这些数据块,最后将结果汇总,从而实现高效的批量数据处理。MapReduce的...
在 Hadoop Map-Reduce 中,数据处理过程主要分为两个阶段:**Map 阶段** 和 **Reduce 阶段**。 ##### Map 阶段 Map 函数接收输入数据块,并将其转换为一系列键值对。这一阶段的主要任务是对输入数据进行预处理,...
13. Map/Reduce框架运转在, value>键值对上,也就是说,框架把作业的输入看为是一组, value>键值对,同样也产出一组, value>键值对做为作业的输出,这两组键值对的类型可能不同。 14. 框架需要对key和value的类...
例如,在示例代码中出现了对文件路径的处理,这涉及到如何正确指定输入输出路径,以及如何处理不同类型的文件。 #### 四、Hadoop MapReduce环境配置 在使用 Hadoop MapReduce 之前,需要先搭建好相应的环境。主要...
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
Hadoop平台中一种Reduce负载均衡贪心算法,刘朵,曾锋,MapReduce是目前广泛应用的并行计算框架,是Hadoop平台的重要组成部分。主要包括Map函数和Reduce函数。Map函数输出key-value键值对作为Reduce的
- **Map输出**: `, SCC>`形式的数据,其中`Text`为学号,`SCC`是一个自定义的类,包含`id`、`name`、`course`、`score`和`table`等属性。 - **Shuffle结果**: `, Iterable<SCC>>`,即按照学号进行分组。 #### Map端...
本文通过对Hadoop系统结构的深入分析,并结合Map/Reduce编程模型,设计了一种基于Hadoop的高性能、高可靠性和可扩展性强的分布式搜索引擎。 #### 二、Hadoop系统结构分析 ##### 2.1 Map/Reduce 编程模型 Map/...
Map/Reduce计算模型的核心是map和reduce两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的, value>对转换成另一个或一批, value>对输出。 四、Map和Reduce函数计算模型 Map和Reduce函数是Map/...
- 通过`Window -> Preferences -> Hadoop Map/Reduce`配置Hadoop路径。 - 选择`Hadoop installation directory`,点击`Browse`,选中Hadoop安装目录。 - 在Eclipse中显示MapReduce工具栏。 - **创建Map/Reduce...
在部署新的Hadoop集群或重置现有集群时,需要格式化名称节点(NameNode),这可以通过命令`$bin/hadoop namenode -format`完成。 **6. 启动HDFS** 在分配的NameNode上启动Hadoop分布式文件系统(HDFS),可以使用...
在Eclipse中,选择Window -> Preferences -> Hadoop Map/Reduce,设置Hadoop安装目录为Hadoop的本地路径,例如:E:\Hadoop\hadoop-0.19.1。 创建一个新的Map/Reduce项目后,配置Map/Reduce Locations。这样,在...
《Hadoop MapReduce实战指南——基于<hadoop-map-reduce-demo>项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...
在压缩包文件`hadoop_map_reduce-master`中,可能包含了完整的MapReduce示例代码,包括Mapper、Reducer的实现,以及主程序。你可以通过阅读和运行这些代码来学习如何在实际项目中应用Hadoop MapReduce解决大数据问题...