`
wbj0110
  • 浏览: 1598378 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Mahout之SparseVectorsFromSequenceFiles源码分析

阅读更多

一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。

1
2
3
4
5
6
7
8
9
10
11
12
    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
      "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
       //term weight,TF或TFIDF
    Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument(
     abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
      "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
    //最小文档频率minDF
    Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument(
     abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
      "The minimum document frequency.  Default is 1").withShortName("md").create();
……

一系列获取用户输入的选项的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
      Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
      Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt));
   
      int chunkSize = 100;
      if (cmdLine.hasOption(chunkSizeOpt)) {
        chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
      }
      int minSupport = 2;
      if (cmdLine.hasOption(minSupportOpt)) {
        String minSupportString = (String) cmdLine.getValue(minSupportOpt);
        minSupport = Integer.parseInt(minSupportString);
      }
   ……

在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。

SparseVectorsFromSequenceFiles有如下两行:

1
2
3
4
5
      Configuration conf = getConf();
      Path tokenizedPath =
new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
      //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this.
      DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);

再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public static void tokenizeDocuments(Path input,
                                       Class< ? extends Analyzer > analyzerClass,
                                       Path output,
                                       Configuration baseConf)
    throws IOExceptionInterruptedExceptionClassNotFoundException {
    Configuration conf = new Configuration(baseConf);
    // this conf parameter needs to be set enable serialisation of conf values
    conf.set("io.serializations""org.apache.hadoop.io.serializer.JavaSerialization,"
                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
    conf.set(ANALYZER_CLASS, analyzerClass.getName());
    Job job = new Job(conf);
    job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
    job.setJarByClass(DocumentProcessor.class);
     
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(StringTuple.class);
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
   
    job.setMapperClass(SequenceFileTokenizerMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    HadoopUtil.delete(conf, output);
    
    boolean succeeded = job.waitForCompletion(true);
    if (!succeeded)
      throw new IllegalStateException("Job failed!");

tokenizer之后,便进行TFIDF计算。

进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成

         outputKey=Text
         outputValue=LongWritable
         Mapper= TermCountMapper(org.apache.mahout.vectorizer.term)
	Combiner=TermCountCombiner
	Reducer=TermCountReducer
	输出类型=SequenceFileOutputFormat
	输出目录=wordcount

说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):

         generateCollocations
	computeNGramsPruneByLLR

第二步,给每个单词编号(assign ids to feature List)。

由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
      int i = 0;
      for (Pair< Writable,Writable > record
           : new SequenceFileDirIterable< Writable,Writable >(filesPattern, PathType.GLOBnullnulltrue, conf)) {
        if (currentChunkSize > chunkSizeLimit) {
          Closeables.closeQuietly(dictWriter);
          chunkIndex++;
          chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
          chunkPaths.add(chunkPath);
          dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
          currentChunkSize = 0;
        }
        Writable key = record.getFirst();
        int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
        currentChunkSize += fieldSize;
        dictWriter.append(key, new IntWritable(i++));//编号!
      }
      maxTermDimension[0] = i;

从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。

第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  protected void reduce(Text key, Iterable< StringTuple > values, Context context)
          throws IOExceptionInterruptedException {
    Iterator< StringTuple > it = values.iterator();
    if (!it.hasNext()) {
      return;
    }
    StringTuple value = it.next();
    Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size
      for (String term : value.getEntries()) {
        if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram
          int termId = dictionary.get(term);
          vector.setQuick(termId, vector.getQuick(termId) + 1);
        }
      }
    if (vector.getNumNondefaultElements() > 0) {
      VectorWritable vectorWritable = new VectorWritable(vector);
      context.write(key, vectorWritable);
    }
  }

此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。

1
2
3
4
    if (shouldPrune || processIdf) {
      docFrequenciesFeatures = TFIDFConverter.calculateDF(new Path(outputDir, tfDirName),
          outputDir, conf, chunkSize);
    }

计算IDF过程比较清晰:
看此过程的Mapper:

1
2
3
4
5
6
7
8
9
10
  protected void map(WritableComparable< ? > key, VectorWritable value, Context context)
    throws IOExceptionInterruptedException {
    Vector vector = value.get();
    Iterator< Vector.Element > it = vector.iterateNonZero();
    while (it.hasNext()) {
      Vector.Element e = it.next();
      context.write(new IntWritable(e.index()), ONE);
    }
    context.write(TOTAL_COUNT, ONE);
  }

输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:

1
2
3
4
5
6
7
8
  protected void reduce(IntWritable key, Iterable< LongWritable > values, Context context)
    throws IOExceptionInterruptedException {
    long sum = 0;
    for (LongWritable value : values) {
      sum += value.get();
    }
    context.write(key, new LongWritable(sum));
  }

相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:

1
2
3
   double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
   long vectorCount = docFrequenciesFeatures.getFirst()[1];
   maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);

以及一个pruneVector的过程:

1
2
3
4
5
6
7
8
9
HighDFWordsPruner.pruneVectors(tfDir,
                                          prunedTFDir,
                                          prunedPartialTFDir,
                                          maxDF,
                                          conf,
                                          docFrequenciesFeatures,
                                          -1.0f,
                                          false,
                                          reduceTasks);

最后计算TFIDF:

1
2
3
4
5
6
7
8
9
10
11
  public static void processTfIdf(Path input,
                                  Path output,
                                  Configuration baseConf,
                                  Pair< Long[], List< Path > > datasetFeatures,
                                  int minDf,
                                  long maxDF,
                                  float normPower,
                                  boolean logNormalize,
                                  boolean sequentialAccessOutput,
                                  boolean namedVector,
                                  int numReducers)

Mapper照原样输出,Reducer一部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  protected void reduce(WritableComparable< ? > key, Iterable< VectorWritable > values, Context context)
    throws IOExceptionInterruptedException {
    Iterator< VectorWritable > it = values.iterator();
    if (!it.hasNext()) {
      return;
    }
    Vector value = it.next().get();
    Iterator< Vector.Element > it1 = value.iterateNonZero();
    Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
    while (it1.hasNext()) {
      Vector.Element e = it1.next();
      if (!dictionary.containsKey(e.index())) {
        continue;
      }
      long df = dictionary.get(e.index());
      if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {
        continue;
      }
      if (df < minDf) {
        df = minDf;
      }
      vector.setQuick(e.index(), tfidf.calculate((int) e.get()(int) df, (int) featureCount, (int) vectorCount));
    }

    if (sequentialAccess) {
      vector = new SequentialAccessSparseVector(vector);
    }
    if (namedVector) {
      vector = new NamedVector(vector, key.toString());
    }
    VectorWritable vectorWritable = new VectorWritable(vector);
    context.write(key, vectorWritable);
  }

注意到

vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));

首先得到单词的id,然后计算tf*idf,再写回到这个vector。
最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。
至此,计算完成。
整个过程产生的目录如下:

整个过程的所有job信息如下:

http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2

 

 

http://soledede.com/

 

大家可以加我个人微信号:scccdgf

 

 

或者关注soledede的微信公众号:soledede
微信公众号:

 

分享到:
评论

相关推荐

    mahout Algorithms源码分析

    樊哲是Mahout的积极学习者和实践者,他在CSDN上分享了关于Mahout算法的解析与案例实战的博客,获得了“CSDN2013博客之星”的荣誉。樊哲的经验表明,虽然Hadoop平台上算法开发一般需要耗费很长时间,但Mahout已经实现...

    Mahout源码

    Mahout 构建在Hadoop之上,利用MapReduce进行分布式计算。这意味着,对于处理大量数据,Mahout 可以在多台机器上并行运行,大大提高了计算效率。Hadoop的输入/输出机制与Mahout相结合,使得大数据处理变得简单易行。...

    mahout源码

    在大数据时代,Mahout已经成为数据科学家和工程师们的重要工具,尤其在文本分析、推荐系统和分类任务中扮演着关键角色。本篇将深入探讨Mahout中的朴素贝叶斯分类以及中文分词这两个核心功能。 一、Mahout与朴素...

    Mahout教程内含源码以及说明书可以自己运行复现.zip

    安装Mahout首先需要准备Hadoop环境,因为Mahout是构建在Hadoop之上的。你需要下载并安装Hadoop,配置Hadoop环境变量,并确保集群运行正常。接着,从Apache官方网站获取Mahout的最新版本,解压后将其添加到你的系统...

    mahout0.9源码(支持hadoop2)

    mahout0.9的源码,支持hadoop2,需要自行使用mvn编译。mvn编译使用命令: mvn clean install -Dhadoop2 -Dhadoop.2.version=2.2.0 -DskipTests

    mahout in action中的源码

    《Mahout in Action》是一本深入探讨Apache Mahout机器学习框架的专业书籍,其源码提供了丰富的实践示例和深入理解Mahout算法的机会。在GitHub上,你可以找到这些源码的完整版本,链接为。下面,我们将详细探讨...

    mahout0.9 源码

    以上就是关于Mahout 0.9源码及其在Eclipse中的使用介绍。通过学习和实践,开发者可以利用Mahout构建强大的机器学习应用,处理各种数据挖掘任务。在实际应用中,可以根据项目需求选择合适的算法,结合Hadoop分布式...

    人工智能-推荐系统-新闻推荐-基于Mahout的新闻推荐系统

    Mahout:整体框架,实现了协同过滤 Deeplearning4j,构建VSM Jieba:分词,关键词提取 HanLP:分词,关键词提取 Spring Boot:提供API、ORM 关键实现 基于用户的协同过滤 直接调用Mahout相关接口即可 选择不同...

    Mahout之Item-based应用使用

    《Mahout之Item-based应用使用》 Apache Mahout是一个开源的机器学习库,主要专注于大规模数据集上的推荐系统、分类和聚类算法。在这个主题中,我们将深入探讨Mahout中的Item-based协同过滤(Item-based ...

    mahout-distribution-0.5-src.zip mahout 源码包

    mahout-distribution-0.5-src.zip mahout 源码包

    mahout-core-0.9.jar+mahout-core-0.8.jar+mahout-core-0.1.jar

    Mahout是建立在Hadoop之上的,利用其分布式计算能力处理大规模数据集。这使得Mahout能够处理超出单台机器内存和计算能力的数据。 3. **版本差异**: - mahout-core-0.1.jar:这是早期版本,可能包含的基本功能,...

    mahout所需jar包

    Mahout的目标是帮助开发人员构建智能应用程序,如推荐系统、分类和聚类算法,这些在大数据分析领域中极为重要。 **K-Means聚类算法** K-Means是一种无监督学习的聚类算法,用于将数据集分成不同的群组或类别。在...

    [Mahout] Windows下Mahout单机安装

    打开命令行,进入解压后的Mahout源码目录,执行以下Maven命令来构建Mahout: ``` mvn clean install -DskipTests ``` 这个过程可能会比较耗时,因为Maven会自动下载所有依赖。等待编译完成后,Mahout的可执行jar文件...

    【甘道夫】通过Mahout构建贝叶斯文本分类器案例详解 -- 配套源码

    Apache Mahout是一个基于Hadoop的机器学习库,它提供了一系列的算法,包括聚类、分类和协同过滤,用于大数据分析。贝叶斯分类器是其中一种常用的文本分类方法,因其简单高效而在实际应用中广泛使用。 首先,我们要...

    apache-mahout-distribution-0.11.0-src.zip

    在源码中,您可以探索Mahout实现的各种算法,如协同过滤(Collaborative Filtering)、频繁项集挖掘(Frequent Itemset Mining)、近邻搜索(Nearest Neighbor Search)等。这些算法是通过Java编程语言实现的,因此...

    mahout-distribution-0.7-src.zip

    《Apache Mahout 0.7源码解析与应用探索》 Apache Mahout 是一个开源机器学习库,专注于大规模数据集的算法实现。该库由Java编写,并采用Maven作为构建工具,提供了一系列用于构建智能应用的高效算法。本文将深入...

    maven_mahout_template-mahout-0.8

    《Apache Maven与Mahout实战:基于maven_mahout_template-mahout-0.8的探索》 Apache Maven是一款强大的项目管理和依赖管理工具,广泛应用于Java开发领域。它通过一个项目对象模型(Project Object Model,POM)来...

Global site tag (gtag.js) - Google Analytics