一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。
1
2 3 4 5 6 7 8 9 10 11 12 |
Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription( "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create(); //term weight,TF或TFIDF Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument( abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription( "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create(); //最小文档频率minDF Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument( abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription( "The minimum document frequency. Default is 1").withShortName("md").create(); …… |
一系列获取用户输入的选项的操作:
1
2 3 4 5 6 7 8 9 10 11 12 13 |
Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt)); int chunkSize = 100; if (cmdLine.hasOption(chunkSizeOpt)) { chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt)); } int minSupport = 2; if (cmdLine.hasOption(minSupportOpt)) { String minSupportString = (String) cmdLine.getValue(minSupportOpt); minSupport = Integer.parseInt(minSupportString); } …… |
在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。
SparseVectorsFromSequenceFiles有如下两行:
1
2 3 4 5 |
Configuration conf = getConf();
Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER); //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this. DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf); |
再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public static void tokenizeDocuments(Path input,
Class< ? extends Analyzer > analyzerClass, Path output, Configuration baseConf) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(baseConf); // this conf parameter needs to be set enable serialisation of conf values conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); conf.set(ANALYZER_CLASS, analyzerClass.getName()); Job job = new Job(conf); job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input); job.setJarByClass(DocumentProcessor.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(StringTuple.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setMapperClass(SequenceFileTokenizerMapper.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); HadoopUtil.delete(conf, output); boolean succeeded = job.waitForCompletion(true); if (!succeeded) throw new IllegalStateException("Job failed!"); |
tokenizer之后,便进行TFIDF计算。
进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成
outputKey=Text outputValue=LongWritable Mapper= TermCountMapper(org.apache.mahout.vectorizer.term) Combiner=TermCountCombiner Reducer=TermCountReducer 输出类型=SequenceFileOutputFormat 输出目录=wordcount
说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):
generateCollocations computeNGramsPruneByLLR
第二步,给每个单词编号(assign ids to feature List)。
由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
int i = 0;
for (Pair< Writable,Writable > record : new SequenceFileDirIterable< Writable,Writable >(filesPattern, PathType.GLOB, null, null, true, conf)) { if (currentChunkSize > chunkSizeLimit) { Closeables.closeQuietly(dictWriter); chunkIndex++; chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex); chunkPaths.add(chunkPath); dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); currentChunkSize = 0; } Writable key = record.getFirst(); int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8; currentChunkSize += fieldSize; dictWriter.append(key, new IntWritable(i++));//编号! } maxTermDimension[0] = i; |
从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。
第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
protected void reduce(Text key, Iterable< StringTuple > values, Context context)
throws IOException, InterruptedException { Iterator< StringTuple > it = values.iterator(); if (!it.hasNext()) { return; } StringTuple value = it.next(); Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size for (String term : value.getEntries()) { if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram int termId = dictionary.get(term); vector.setQuick(termId, vector.getQuick(termId) + 1); } } if (vector.getNumNondefaultElements() > 0) { VectorWritable vectorWritable = new VectorWritable(vector); context.write(key, vectorWritable); } } |
此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。
1
2 3 4 |
if (shouldPrune || processIdf) {
docFrequenciesFeatures = TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize); } |
计算IDF过程比较清晰:
看此过程的Mapper:
1
2 3 4 5 6 7 8 9 10 |
protected void map(WritableComparable< ? > key, VectorWritable value, Context context)
throws IOException, InterruptedException { Vector vector = value.get(); Iterator< Vector.Element > it = vector.iterateNonZero(); while (it.hasNext()) { Vector.Element e = it.next(); context.write(new IntWritable(e.index()), ONE); } context.write(TOTAL_COUNT, ONE); } |
输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:
1
2 3 4 5 6 7 8 |
protected void reduce(IntWritable key, Iterable< LongWritable > values, Context context)
throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } |
相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:
1
2 3 |
double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
long vectorCount = docFrequenciesFeatures.getFirst()[1]; maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount); |
以及一个pruneVector的过程:
1
2 3 4 5 6 7 8 9 |
HighDFWordsPruner.pruneVectors(tfDir,
prunedTFDir, prunedPartialTFDir, maxDF, conf, docFrequenciesFeatures, -1.0f, false, reduceTasks); |
最后计算TFIDF:
1
2 3 4 5 6 7 8 9 10 11 |
public static void processTfIdf(Path input,
Path output, Configuration baseConf, Pair< Long[], List< Path > > datasetFeatures, int minDf, long maxDF, float normPower, boolean logNormalize, boolean sequentialAccessOutput, boolean namedVector, int numReducers) |
Mapper照原样输出,Reducer一部分如下:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
protected void reduce(WritableComparable< ? > key, Iterable< VectorWritable > values, Context context)
throws IOException, InterruptedException { Iterator< VectorWritable > it = values.iterator(); if (!it.hasNext()) { return; } Vector value = it.next().get(); Iterator< Vector.Element > it1 = value.iterateNonZero(); Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements()); while (it1.hasNext()) { Vector.Element e = it1.next(); if (!dictionary.containsKey(e.index())) { continue; } long df = dictionary.get(e.index()); if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) { continue; } if (df < minDf) { df = minDf; } vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount)); } if (sequentialAccess) { vector = new SequentialAccessSparseVector(vector); } if (namedVector) { vector = new NamedVector(vector, key.toString()); } VectorWritable vectorWritable = new VectorWritable(vector); context.write(key, vectorWritable); } |
注意到
vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
首先得到单词的id,然后计算tf*idf,再写回到这个vector。
最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。
至此,计算完成。
整个过程产生的目录如下:
整个过程的所有job信息如下:
http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2
相关推荐
樊哲是Mahout的积极学习者和实践者,他在CSDN上分享了关于Mahout算法的解析与案例实战的博客,获得了“CSDN2013博客之星”的荣誉。樊哲的经验表明,虽然Hadoop平台上算法开发一般需要耗费很长时间,但Mahout已经实现...
Mahout 构建在Hadoop之上,利用MapReduce进行分布式计算。这意味着,对于处理大量数据,Mahout 可以在多台机器上并行运行,大大提高了计算效率。Hadoop的输入/输出机制与Mahout相结合,使得大数据处理变得简单易行。...
在大数据时代,Mahout已经成为数据科学家和工程师们的重要工具,尤其在文本分析、推荐系统和分类任务中扮演着关键角色。本篇将深入探讨Mahout中的朴素贝叶斯分类以及中文分词这两个核心功能。 一、Mahout与朴素...
安装Mahout首先需要准备Hadoop环境,因为Mahout是构建在Hadoop之上的。你需要下载并安装Hadoop,配置Hadoop环境变量,并确保集群运行正常。接着,从Apache官方网站获取Mahout的最新版本,解压后将其添加到你的系统...
mahout0.9的源码,支持hadoop2,需要自行使用mvn编译。mvn编译使用命令: mvn clean install -Dhadoop2 -Dhadoop.2.version=2.2.0 -DskipTests
《Mahout in Action》是一本深入探讨Apache Mahout机器学习框架的专业书籍,其源码提供了丰富的实践示例和深入理解Mahout算法的机会。在GitHub上,你可以找到这些源码的完整版本,链接为。下面,我们将详细探讨...
以上就是关于Mahout 0.9源码及其在Eclipse中的使用介绍。通过学习和实践,开发者可以利用Mahout构建强大的机器学习应用,处理各种数据挖掘任务。在实际应用中,可以根据项目需求选择合适的算法,结合Hadoop分布式...
Mahout:整体框架,实现了协同过滤 Deeplearning4j,构建VSM Jieba:分词,关键词提取 HanLP:分词,关键词提取 Spring Boot:提供API、ORM 关键实现 基于用户的协同过滤 直接调用Mahout相关接口即可 选择不同...
《Mahout之Item-based应用使用》 Apache Mahout是一个开源的机器学习库,主要专注于大规模数据集上的推荐系统、分类和聚类算法。在这个主题中,我们将深入探讨Mahout中的Item-based协同过滤(Item-based ...
mahout-distribution-0.5-src.zip mahout 源码包
Mahout是建立在Hadoop之上的,利用其分布式计算能力处理大规模数据集。这使得Mahout能够处理超出单台机器内存和计算能力的数据。 3. **版本差异**: - mahout-core-0.1.jar:这是早期版本,可能包含的基本功能,...
Mahout的目标是帮助开发人员构建智能应用程序,如推荐系统、分类和聚类算法,这些在大数据分析领域中极为重要。 **K-Means聚类算法** K-Means是一种无监督学习的聚类算法,用于将数据集分成不同的群组或类别。在...
打开命令行,进入解压后的Mahout源码目录,执行以下Maven命令来构建Mahout: ``` mvn clean install -DskipTests ``` 这个过程可能会比较耗时,因为Maven会自动下载所有依赖。等待编译完成后,Mahout的可执行jar文件...
Apache Mahout是一个基于Hadoop的机器学习库,它提供了一系列的算法,包括聚类、分类和协同过滤,用于大数据分析。贝叶斯分类器是其中一种常用的文本分类方法,因其简单高效而在实际应用中广泛使用。 首先,我们要...
在源码中,您可以探索Mahout实现的各种算法,如协同过滤(Collaborative Filtering)、频繁项集挖掘(Frequent Itemset Mining)、近邻搜索(Nearest Neighbor Search)等。这些算法是通过Java编程语言实现的,因此...
《Apache Mahout 0.7源码解析与应用探索》 Apache Mahout 是一个开源机器学习库,专注于大规模数据集的算法实现。该库由Java编写,并采用Maven作为构建工具,提供了一系列用于构建智能应用的高效算法。本文将深入...
《Apache Maven与Mahout实战:基于maven_mahout_template-mahout-0.8的探索》 Apache Maven是一款强大的项目管理和依赖管理工具,广泛应用于Java开发领域。它通过一个项目对象模型(Project Object Model,POM)来...