`
coderplay
  • 浏览: 578073 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

怎么在hadoop作map/reduce时输出N种不同类型的value

阅读更多

BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.

 

在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.

 

用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。

package redpoll.examples;

import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.Writable;

/**
 * Generic Writable class for terms.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermWritable extends GenericWritable {
  private static Class<? extends Writable>[] CLASSES = null;

  static {
    CLASSES = (Class<? extends Writable>[]) new Class[] {
        org.apache.hadoop.io.ArrayWritable.class,
        org.apache.hadoop.io.IntWritable.class,
        redpoll.examples.TFWritable.class
        };
  }

  public TermWritable() {
  }

  public TermWritable(Writable instance) {
    set(instance);
  }

  @Override
  protected Class<? extends Writable>[] getTypes() {
    return CLASSES;
  }
}

 Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。

package redpoll.examples;

import java.io.IOException;
import java.io.StringReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;

/**
 * A class provides for doing words segmenation and counting term TFs and DFs.<p>
 * in: key is document id, value is a document instance. <br>
 * output:
 * <li>key is term, value is a <documentId, tf> pair</li>
 * <li>key is term, value is document frequency corresponsing to the key</li>
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermMapper extends MapReduceBase implements
    Mapper<LongWritable, Document, Text, TermWritable> {
  private static final Log log = LogFactory.getLog(TermMapper.class
      .getName());
  
  /* analyzer for words segmentation */
  private Analyzer analyzer = null;
   
  /* frequency weight for document title */
  private IntWritable titleWeight = new IntWritable(2);
  /* frequency weight for document content */
  private IntWritable contentWeight = new IntWritable(1);

  
  public void map(LongWritable key, Document value,
      OutputCollector<Text, TermWritable> output, Reporter reporter)
      throws IOException {
    doMap(key, value.getTitle(), titleWeight, output, reporter);
    doMap(key, value.getContent(), contentWeight, output, reporter);
  }
  
  private void doMap(LongWritable key, String value, IntWritable weight,
      OutputCollector<Text, TermWritable> output, Reporter reporter)
      throws IOException {
    // do words segmentation
    TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value));
    Token token = new Token();
    while ((token = ts.next(token)) != null) {
      String termString = new String(token.termBuffer(), 0, token.termLength());
      Text term = new Text(termString);
      // <term, <documentId,tf>>
      TFWritable tf = new TFWritable(key, weight);
      output.collect(term, new TermWritable(tf)); // wrap then collect
      // <term, weight>
      output.collect(term, new TermWritable(weight)); // wrap then collect
    }
  }
    
  @Override
  public void configure(JobConf job) {
    String analyzerName = job.get("redpoll.text.analyzer");
    try {
      if (analyzerName != null)
        analyzer = (Analyzer) Class.forName(analyzerName).newInstance();
    } catch (Exception excp) {
      excp.printStackTrace();
    }
    if (analyzer == null)
      analyzer = new StandardAnalyzer();
  }

}
 

Reduce如果想获取数据,则可以解包(unwrap)它:

package redpoll.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * Form a tf vector and caculate the df for terms.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> {
  
  private static final Log log = LogFactory.getLog(TermReducer.class.getName());
  
  public void reduce(Text key, Iterator<TermWritable> values,
      OutputCollector<Text, Writable> output, Reporter reporter)
      throws IOException {
    ArrayList<TFWritable> tfs = new ArrayList<TFWritable>();
    int sum = 0;
//    log.info("term:" + key.toString());
    while (values.hasNext()) {
      Writable value = values.next().get(); // unwrap
      if (value  instanceof TFWritable) {
        tfs.add((TFWritable) value ); 
      }else {
        sum += ((IntWritable) value).get();
      }
    }
    
    TFWritable writables[] = new TFWritable[tfs.size()];
    ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables));
    // wrap again
    output.collect(key, new TermWritable(aw)); 
    output.collect(key, new TermWritable(new IntWritable(sum)));
  }

}

 这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。

 

分享到:
评论
6 楼 qingzew 2014-05-22  
请问如果是map的输出中一个key有多个value值该怎么办
5 楼 javalive20120108 2012-06-20  
回答3楼的问题:
   在map里
    String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
4 楼 riddle_chen 2009-05-27  
you_laner 写道

确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。


MultipleOutputFormat可以让你根据不同的key把汇总好的value保存不同的文件中,至于在后续任务中加入输入文件只要使用FileInputFormat.setInputPaths即可。
3 楼 you_laner 2009-05-20  
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。

我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
2 楼 shuchaoo 2009-05-05  
不错,GenericWritable的应用!没看出来你这个词频是怎么计算的,有combiner?
1 楼 yawl 2008-10-30  
看来EC2这种scale好的平台还是最适合了,反正一台机器跑10个小时和10台机器跑1个小时都是$1 (small instance)

相关推荐

    hadoop之map/reduce

    在Hadoop生态系统中,MapReduce是一种分布式编程模型,主要用于处理和生成大数据集。它通过将大规模数据分割成小块,然后在多台机器上并行处理这些数据块,最后将结果汇总,从而实现高效的批量数据处理。MapReduce的...

    Hadoop Map-Reduce教程

    在 Hadoop Map-Reduce 中,数据处理过程主要分为两个阶段:**Map 阶段** 和 **Reduce 阶段**。 ##### Map 阶段 Map 函数接收输入数据块,并将其转换为一系列键值对。这一阶段的主要任务是对输入数据进行预处理,...

    Hadoop教程.pdf

    13. Map/Reduce框架运转在, value&gt;键值对上,也就是说,框架把作业的输入看为是一组, value&gt;键值对,同样也产出一组, value&gt;键值对做为作业的输出,这两组键值对的类型可能不同。 14. 框架需要对key和value的类...

    hadoop map reduce 中文教程

    例如,在示例代码中出现了对文件路径的处理,这涉及到如何正确指定输入输出路径,以及如何处理不同类型的文件。 #### 四、Hadoop MapReduce环境配置 在使用 Hadoop MapReduce 之前,需要先搭建好相应的环境。主要...

    hadoop Join代码(map join 和reduce join)

    本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...

    论文研究-Hadoop平台中一种Reduce负载均衡贪心算法 .pdf

    Hadoop平台中一种Reduce负载均衡贪心算法,刘朵,曾锋,MapReduce是目前广泛应用的并行计算框架,是Hadoop平台的重要组成部分。主要包括Map函数和Reduce函数。Map函数输出key-value键值对作为Reduce的

    现有student.txt和student-score.txt 将两个文件上传到hdfs上 使用Map/Reduce框架完成下面

    - **Map输出**: `, SCC&gt;`形式的数据,其中`Text`为学号,`SCC`是一个自定义的类,包含`id`、`name`、`course`、`score`和`table`等属性。 - **Shuffle结果**: `, Iterable&lt;SCC&gt;&gt;`,即按照学号进行分组。 #### Map端...

    Hadoop下的分布式搜索引擎

    本文通过对Hadoop系统结构的深入分析,并结合Map/Reduce编程模型,设计了一种基于Hadoop的高性能、高可靠性和可扩展性强的分布式搜索引擎。 #### 二、Hadoop系统结构分析 ##### 2.1 Map/Reduce 编程模型 Map/...

    基于Hadoop的MapReduce框架研究报告.ppt

    Map/Reduce计算模型的核心是map和reduce两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的, value&gt;对转换成另一个或一批, value&gt;对输出。 四、Map和Reduce函数计算模型 Map和Reduce函数是Map/...

    史上最强Hadoop-1.2.1安装文档

    - 通过`Window -&gt; Preferences -&gt; Hadoop Map/Reduce`配置Hadoop路径。 - 选择`Hadoop installation directory`,点击`Browse`,选中Hadoop安装目录。 - 在Eclipse中显示MapReduce工具栏。 - **创建Map/Reduce...

    hadoop 命令大全

    在部署新的Hadoop集群或重置现有集群时,需要格式化名称节点(NameNode),这可以通过命令`$bin/hadoop namenode -format`完成。 **6. 启动HDFS** 在分配的NameNode上启动Hadoop分布式文件系统(HDFS),可以使用...

    Hadoop开发环境配置

    在Eclipse中,选择Window -&gt; Preferences -&gt; Hadoop Map/Reduce,设置Hadoop安装目录为Hadoop的本地路径,例如:E:\Hadoop\hadoop-0.19.1。 创建一个新的Map/Reduce项目后,配置Map/Reduce Locations。这样,在...

    hadoop-map-reduce-demo

    《Hadoop MapReduce实战指南——基于&lt;hadoop-map-reduce-demo&gt;项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...

    hadoop_map_reduce:Hadoop Map reduce 示例

    在压缩包文件`hadoop_map_reduce-master`中,可能包含了完整的MapReduce示例代码,包括Mapper、Reducer的实现,以及主程序。你可以通过阅读和运行这些代码来学习如何在实际项目中应用Hadoop MapReduce解决大数据问题...

Global site tag (gtag.js) - Google Analytics