- 浏览: 951343 次
-
文章分类
最新评论
-
mxdxm:
总结不错,赞
NoSQL学习笔记(三)之BigTable -
itbj00:
不错,有实际案例的文章,看着简单。
以公司实际应用讲解OpenStack到底是什么 -
追梦--:
赞一个!
数据库插入百万数据 -
enet_java:
<artifactId>PM_Member_EAR ...
使用maven2 打ear包
mahout贝叶斯算法开发思路(拓展篇)2
如果想直接下面算法调用包,可以直接在mahout贝叶斯算法拓展下载,该算法调用的方式如下:
$HADOOP_HOME/bin hadoop jar mahout.jar mahout.fansy.bayes.BayerRunner -i hdfs_input_path -o hdfs_output_path -scl : -scv ,调用参数如下:
usage: <command> [Generic Options] [Job-Specific Options] Generic Options: -archives <paths> comma separated archives to be unarchived on the compute machines. -conf <configuration file> specify an application configuration file -D <property=value> use value for given property -files <paths> comma separated files to be copied to the map reduce cluster -fs <local|namenode:port> specify a namenode -jt <local|jobtracker:port> specify a job tracker -libjars <paths> comma separated jar files to include in the classpath. -tokenCacheFile <tokensFile> name of the file with the tokens Job-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --splitCharacterVector (-scv) splitCharacterVector Vector split character,default is ',' --splitCharacterLabel (-scl) splitCharacterLabel Vector and Label split character,default is ':' --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run接上篇分析下面的步骤:
4. 获取贝叶斯模型的属性值2:
这一步骤相当于 TrainNaiveBayesJob的第二个prepareJob,其中mapper和reducer都是参考这个job的,基本没有修改代码;代码如下:
package mahout.fansy.bayes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.classifier.naivebayes.training.WeightsMapper; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.common.mapreduce.VectorSumReducer; import org.apache.mahout.math.VectorWritable; /** * 贝叶斯算法第二个job任务相当于 TrainNaiveBayesJob的第二个prepareJob * Mapper,Reducer还用原来的 * @author Administrator * */ public class BayesJob2 extends AbstractJob { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new BayesJob2(),args); } @Override public int run(String[] args) throws Exception { addInputOption(); addOutputOption(); addOption("labelNumber","ln", "The number of the labele "); if (parseArguments(args) == null) { return -1; } Path input = getInputPath(); Path output = getOutputPath(); String labelNumber=getOption("labelNumber"); Configuration conf=getConf(); conf.set(WeightsMapper.class.getName() + ".numLabels",labelNumber); HadoopUtil.delete(conf, output); Job job=new Job(conf); job.setJobName("job2 get weightsFeture and weightsLabel by job1's output:"+input.toString()); job.setJarByClass(BayesJob2.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(WeightsMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VectorWritable.class); job.setCombinerClass(VectorSumReducer.class); job.setReducerClass(VectorSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VectorWritable.class); SequenceFileInputFormat.setInputPaths(job, input); SequenceFileOutputFormat.setOutputPath(job, output); if(job.waitForCompletion(true)){ return 0; } return -1; } }其单独调用方式如下:
usage: <command> [Generic Options] [Job-Specific Options] Generic Options: -archives <paths> comma separated archives to be unarchived on the compute machines. -conf <configuration file> specify an application configuration file -D <property=value> use value for given property -files <paths> comma separated files to be copied to the map reduce cluster -fs <local|namenode:port> specify a namenode -jt <local|jobtracker:port> specify a job tracker -libjars <paths> comma separated jar files to include in the classpath. -tokenCacheFile <tokensFile> name of the file with the tokens Job-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --labelNumber (-ln) labelNumber The number of the labele --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run其实也就是设置一个标识的个数而已,其他参考AbstractJob的默认参数;
5.贝叶斯模型写入文件:
这一步把3、4步骤的输出进行转换然后作为贝叶斯模型的一部分,然后把贝叶斯模型写入文件,其中的转换以及写入文件都参考BayesUtils中的相关方法,具体代码如下:
package mahout.fansy.bayes; import java.io.IOException; import mahout.fansy.bayes.util.OperateArgs; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.mahout.classifier.naivebayes.NaiveBayesModel; import org.apache.mahout.classifier.naivebayes.training.ThetaMapper; import org.apache.mahout.classifier.naivebayes.training.TrainNaiveBayesJob; import org.apache.mahout.common.Pair; import org.apache.mahout.common.iterator.sequencefile.PathFilters; import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; import org.apache.mahout.math.Matrix; import org.apache.mahout.math.SparseMatrix; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import com.google.common.base.Preconditions; public class WriteBayesModel extends OperateArgs{ /** * @param args,输入和输出都是没有用的,输入是job1和job 2 的输出,输出是model的路径 * model存储的路径是 输出路径下面的naiveBayesModel.bin文件 * @throws ParseException * @throws IOException */ public static void main(String[] args) throws IOException, ParseException { String[] arg={"-jt","ubuntu:9001", "-i","", "-o","", "-mp","hdfs://ubuntu:9000/user/mahout/output_bayes/bayesModel", "-bj1","hdfs://ubuntu:9000/user/mahout/output_bayes/job1", "-bj2","hdfs://ubuntu:9000/user/mahout/output_bayes/job2"}; new WriteBayesModel().run(arg); } /** * 把model写入文件中 * @param args * @throws IOException * @throws ParseException */ public int run(String[] args) throws IOException, ParseException{ // modelPath setOption("mp","modelPath",true,"the path for bayesian model to store",true); // bayes job 1 path setOption("bj1","bayesJob1",true,"the path for bayes job 1",true); // bayes job 2 path setOption("bj2","bayesJob2",true,"the path for bayes job 2",true); if(!parseArgs(args)){ return -1; } String job1Path=getNameValue("bj1"); String job2Path=getNameValue("bj2"); Configuration conf=getConf(); String modelPath=getNameValue("mp"); NaiveBayesModel naiveBayesModel=readFromPaths(job1Path,job2Path,conf); naiveBayesModel.validate(); naiveBayesModel.serialize(new Path(modelPath), getConf()); System.out.println("Write bayesian model to '"+modelPath+"/naiveBayesModel.bin'"); return 0; } /** * 摘自BayesUtils的readModelFromDir方法,只修改了相关路径 * @param job1Path * @param job2Path * @param conf * @return */ public NaiveBayesModel readFromPaths(String job1Path,String job2Path,Configuration conf){ float alphaI = conf.getFloat(ThetaMapper.ALPHA_I, 1.0f); // read feature sums and label sums Vector scoresPerLabel = null; Vector scoresPerFeature = null; for (Pair<Text,VectorWritable> record : new SequenceFileDirIterable<Text, VectorWritable>( new Path(job2Path), PathType.LIST, PathFilters.partFilter(), conf)) { String key = record.getFirst().toString(); VectorWritable value = record.getSecond(); if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE)) { scoresPerFeature = value.get(); } else if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_LABEL)) { scoresPerLabel = value.get(); } } Preconditions.checkNotNull(scoresPerFeature); Preconditions.checkNotNull(scoresPerLabel); Matrix scoresPerLabelAndFeature = new SparseMatrix(scoresPerLabel.size(), scoresPerFeature.size()); for (Pair<IntWritable,VectorWritable> entry : new SequenceFileDirIterable<IntWritable,VectorWritable>( new Path(job1Path), PathType.LIST, PathFilters.partFilter(), conf)) { scoresPerLabelAndFeature.assignRow(entry.getFirst().get(), entry.getSecond().get()); } Vector perlabelThetaNormalizer = scoresPerLabel.like(); return new NaiveBayesModel(scoresPerLabelAndFeature, scoresPerFeature, scoresPerLabel, perlabelThetaNormalizer, alphaI); } }6. 应用贝叶斯模型分类原始数据:
这个部分的代码也基本是参考mahout中贝叶斯算法的源码,只是修改了其中的解析部分的代码而已,具体如下:
package mahout.fansy.bayes; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.classifier.naivebayes.AbstractNaiveBayesClassifier; import org.apache.mahout.classifier.naivebayes.NaiveBayesModel; import org.apache.mahout.classifier.naivebayes.StandardNaiveBayesClassifier; import org.apache.mahout.classifier.naivebayes.training.WeightsMapper; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; /** * 用于分类的Job * @author Administrator * */ public class BayesClassifyJob extends AbstractJob { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new BayesClassifyJob(),args); } @Override public int run(String[] args) throws Exception { addInputOption(); addOutputOption(); addOption("model","m", "The file where bayesian model store "); addOption("labelNumber","ln", "The labels number "); if (parseArguments(args) == null) { return -1; } Path input = getInputPath(); Path output = getOutputPath(); String labelNumber=getOption("labelNumber"); String modelPath=getOption("model"); Configuration conf=getConf(); conf.set(WeightsMapper.class.getName() + ".numLabels",labelNumber); HadoopUtil.cacheFiles(new Path(modelPath), conf); HadoopUtil.delete(conf, output); Job job=new Job(conf); job.setJobName("Use bayesian model to classify the input:"+input.getName()); job.setJarByClass(BayesClassifyJob.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(BayesClasifyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VectorWritable.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VectorWritable.class); SequenceFileInputFormat.setInputPaths(job, input); SequenceFileOutputFormat.setOutputPath(job, output); if(job.waitForCompletion(true)){ return 0; } return -1; } /** * 自定义Mapper,只修改了解析部分代码 * @author Administrator * */ public static class BayesClasifyMapper extends Mapper<Text, VectorWritable, Text, VectorWritable>{ private AbstractNaiveBayesClassifier classifier; @Override public void setup(Context context) throws IOException, InterruptedException { System.out.println("Setup"); Configuration conf = context.getConfiguration(); Path modelPath = HadoopUtil.cachedFile(conf); NaiveBayesModel model = NaiveBayesModel.materialize(modelPath, conf); classifier = new StandardNaiveBayesClassifier(model); } @Override public void map(Text key, VectorWritable value, Context context) throws IOException, InterruptedException { Vector result = classifier.classifyFull(value.get()); //the key is the expected value context.write(new Text(key.toString()), new VectorWritable(result)); } } }如果要单独运行这一步,可以参考:
usage: <command> [Generic Options] [Job-Specific Options] Generic Options: -archives <paths> comma separated archives to be unarchived on the compute machines. -conf <configuration file> specify an application configuration file -D <property=value> use value for given property -files <paths> comma separated files to be copied to the map reduce cluster -fs <local|namenode:port> specify a namenode -jt <local|jobtracker:port> specify a job tracker -libjars <paths> comma separated jar files to include in the classpath. -tokenCacheFile <tokensFile> name of the file with the tokens Job-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --model (-m) model The file where bayesian model store --labelNumber (-ln) labelNumber The labels number --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run只需提供model的路径和标识的个数这两个参数即可;
7. 对第6步分类的结果进行评价,这部分的代码如下:
package mahout.fansy.bayes; import java.io.IOException; import java.util.Map; import mahout.fansy.bayes.util.OperateArgs; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.mahout.classifier.ClassifierResult; import org.apache.mahout.classifier.ResultAnalyzer; import org.apache.mahout.classifier.naivebayes.BayesUtils; import org.apache.mahout.common.Pair; import org.apache.mahout.common.iterator.sequencefile.PathFilters; import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AnalyzeBayesModel extends OperateArgs{ /** * 输入是BayesClassifyJob的输出 * -o 参数没作用 */ private static final Logger log = LoggerFactory.getLogger(AnalyzeBayesModel.class); public static void main(String[] args) throws IOException, ParseException { String[] arg={"-jt","ubuntu:9001", "-i","hdfs://ubuntu:9000/user/mahout/output_bayes/classifyJob", "-o","", "-li","hdfs://ubuntu:9000/user/mahout/output_bayes/index.bin" }; new AnalyzeBayesModel().run(arg); } /** * 分析BayesClassifyJob输出文件和labelIndex做对比,分析正确率 * @param args * @throws IOException * @throws ParseException */ public int run(String[] args) throws IOException, ParseException{ // labelIndex setOption("li","labelIndex",true,"the path where labelIndex store",true); if(!parseArgs(args)){ return -1; } Configuration conf=getConf(); String labelIndex=getNameValue("labelIndex"); String input=getInput(); Path inputPath=new Path(input); //load the labels Map<Integer, String> labelMap = BayesUtils.readLabelIndex(getConf(), new Path(labelIndex)); //loop over the results and create the confusion matrix SequenceFileDirIterable<Text, VectorWritable> dirIterable = new SequenceFileDirIterable<Text, VectorWritable>(inputPath, PathType.LIST, PathFilters.partFilter(), conf); ResultAnalyzer analyzer = new ResultAnalyzer(labelMap.values(), "DEFAULT"); analyzeResults(labelMap, dirIterable, analyzer); log.info("{} Results: {}", "Standard NB", analyzer); return 0; } /** * 摘自TestNaiveBayesDriver中的analyzeResults方法 */ private void analyzeResults(Map<Integer, String> labelMap, SequenceFileDirIterable<Text, VectorWritable> dirIterable, ResultAnalyzer analyzer) { for (Pair<Text, VectorWritable> pair : dirIterable) { int bestIdx = Integer.MIN_VALUE; double bestScore = Long.MIN_VALUE; for (Vector.Element element : pair.getSecond().get()) { if (element.get() > bestScore) { bestScore = element.get(); bestIdx = element.index(); } } if (bestIdx != Integer.MIN_VALUE) { ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore); analyzer.addInstance(pair.getFirst().toString(), classifierResult); } } } }运行拓展篇1中的数据得到的模型的分类结果如下:
13/09/14 14:52:13 INFO bayes.AnalyzeBayesModel: Standard NB Results: ======================================================= Summary ------------------------------------------------------- Correctly Classified Instances : 7 70% Incorrectly Classified Instances : 3 30% Total Classified Instances : 10 ======================================================= Confusion Matrix ------------------------------------------------------- a b c d <--Classified as 3 0 0 0 | 3 a = 1 0 1 0 1 | 2 b = 2 1 1 2 0 | 4 c = 3 0 0 0 1 | 1 d = 4
运行后可以在hdfs上面看到如下的文件夹:
任务列表如下:
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
相关推荐
mahout中的贝叶斯算法的拓展开发包,提供了相关接口可以供用户调用,直接即可跑出结果,相关运行方式参考blog《mahout贝叶斯算法开发思路(拓展篇)》
Mahout 贝叶斯算法根据模型分类无标签数据,具体参考http://blog.csdn.net/fansy1990/article/details/37991447
Mahout 聚类算法 Mahout 聚类算法是数据挖掘和机器学习领域中的一种重要算法,它可以将相似的数据点聚集在一起,以便更好地理解和分析数据。Mahout 聚类算法可以分为多种类型,如 Canopy、KMeans、Fuzzy-KMeans、...
总之,推荐算法在IT领域起着至关重要的作用,Apache Mahout作为一个强大的工具,为开发和实验推荐系统提供了便利。通过对Chubbyjiang在GitHub上分享的数据集进行分析和处理,我们可以深入理解Mahout的协同过滤算法...
Apache Mahout 是一个基于 Hadoop 的机器学习库,它提供了多种机器学习算法,包括聚类、分类和推荐系统。在本教程中,我们将专注于 Mahout 0.9 版本中的 KMeans 算法测试,这是一个无监督的学习方法,用于将数据集中...
在Java中实现朴素贝叶斯算法,可以自定义数据结构和算法,或者使用现有的机器学习库,如Weka、Smile或Apache Mahout。这些库提供了现成的朴素贝叶斯分类器实现,简化了开发过程。 总结来说,Java实现朴素贝叶斯算法...
《Mahout推荐算法实战》是一本深度探讨Apache Mahout库在推荐系统开发中的应用书籍。Apache Mahout是一个基于Hadoop的机器学习库,它提供了多种推荐算法,旨在帮助开发者构建大规模的数据挖掘和机器学习系统。这本书...
mahout中bayesian算法的数据流,可以根据excel中的公式来推断出该算法的数据流
Mahout中的贝叶斯分类器通常采用朴素贝叶斯(Naive Bayes)算法。这个“朴素”指的是假设所有特征之间相互独立,这简化了计算,但可能会影响准确性。尽管如此,朴素贝叶斯在实际应用中仍然表现出色。 接下来,我们...
Mahoutt推荐算法,从数据处理能力上,可以划分为2类:单机内存算法实现基于Hadoop的分步式算法实现单机内存算法实现:就是在单机下运行的算法,是由cf.taste项目实现的,像我的们熟悉的UserCF,ItemCF都支持单机内存...
朴素贝叶斯算法是一种基于概率统计的分类方法,它的理论基础是贝叶斯定理,而“朴素”一词则来源于对特征之间相互独立的假设。在Java中实现朴素贝叶斯分类器,我们可以利用其简单高效的特点,适用于文本分类、垃圾...
第二部分 功能主要包括四个方面:集群配置、集群算法监控、Hadoop模块、Mahout模块。 详情参考《Mahout算法调用展示平台2.1》
Apache Mahout是一个基于Hadoop的数据挖掘库,提供了多种机器学习算法,其中包括推荐系统中的协同过滤(Collaborative Filtering)算法。本项目重点介绍了Mahout中的User-Based Collaborative Filtering(用户基协同...
在Java编程中,我们可以使用一些库,如Weka、Apache Mahout或者自己编写代码来实现贝叶斯算法。例如,Weka是一个流行的开源数据挖掘工具,其中包含了多种机器学习算法,包括朴素贝叶斯分类器。 朴素贝叶斯是贝叶斯...
### Mahout算法详解 #### Mahout推荐算法概览 Mahout是一个开源项目,专注于为开发者提供一系列用于构建高度可伸缩的大数据推荐引擎、聚类分析以及分类算法库。其核心算法包括协同过滤、聚类分析及分类算法,广泛...
在Java编程环境中实现贝叶斯算法,可以借助各种库,如Weka、Apache Mahout等,但这里的源代码是独立编写的,可以帮助开发者深入理解算法内部工作流程。源代码可能包括以下几个部分: 1. 数据预处理:数据清洗,去除...
总之,朴素贝叶斯算法以其简单性和高效性在Java开发中得到广泛应用。理解其背后的数学原理,熟练掌握在Java中实现的方法,将有助于你在实际项目中解决分类问题。无论是使用开源库还是自定义代码,关键在于理解算法...
本篇文章将详细探讨 Mahout 的推荐算法 API,特别是 `Recommender` 接口及其相关的实现。 1. **Recommender 接口**: `Recommender` 是 Mahout 中用于推荐的核心接口,它定义了一系列方法来获取推荐项和估计用户对...