`
wbj0110
  • 浏览: 1604113 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

mahout关联规则源码分析 Part 1

阅读更多

最近看了关联规则的相关算法,着重看了mahout的具体实现,mahout官网上面给出了好多算法,具体网址如下:https://cwiki.apache.org/confluence/display/MAHOUT/Parallel+Frequent+Pattern+Mining 。

先说下命令行运行关联规则,关联规则的算法在mahout-core-0,7.jar包下面,命令行运行如下:

 

[java] view plaincopy
 
  1. fansy@fansypc:~/hadoop-1.0.2$ bin/hadoop jar ../mahout-pure-0.7/core/target/mahout-core-0.7.jar  
  2.  org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver -i input/retail.dat -o date1101/fpgrowthdriver00 -s 2 -method mapreduce -regex '[\ ]'  
  3. 12/11/01 16:31:39 INFO common.AbstractJob:  
  4.  Command line arguments: {--encoding=[UTF-8], --endPhase=[2147483647],   
  5. --input=[input/retail.dat], --maxHeapSize=[50], --method=[mapreduce], --minSupport=[2], --numGroups=[1000],   
  6. --numTreeCacheEntries=[5], --output=[date1101/fpgrowthdriver00], --splitterPattern=[[\ ]], --startPhase=[0], --tempDir=[temp]}  

最后的 -regex '[\ ]' 一定是需要的对于输入数据 retail.dat来说,因为mahout默认的item的分隔符是没有空格的;

 

而且这里只讨论 并行的程序,所以使用 -method mapreduce

下面分析源码:

在分析源码之前,先看一张图:

这张图很好的说明了mahout实现关联规则思想,或者说是流程;

首先,读入数据,比如上图的5个transactions(事务),接着根据一张总表(这张总表是每个item的次数从大到小的一个排列,同时这张表还去除了出现次数小于min_support的item)把这些transactions 去除一些项目并按照总表的顺序排序,得到另外的一个transaction A,接着map的输出就是根据transaction A输出规则,从出现次数最小的item开始输出直到出现次数第二大的item。

Reduce收集map输出相同的key值,把他们的value值放一个集合set 中,然后在统计这些集合中item出现的次数,如果次数大于min_confidence(本例中为3),那么就输出key和此item的规则;

命令行运行时可以看到三个MR,即可以把关联规则的算法分为三部分,但是个人觉得可以分为四个部分,其中的一部分就是总表的获得;鉴于目前本人只看了一个MR和总表的获得部分的源码,今天就只分享这两个部分;

贴代码先,基本都是源码来的,只是稍微改了下:

第一个MR的驱动程序:PFGrowth_ParallelCounting.java:

 

[java] view plaincopy
 
  1. package org.fansy.date1101.pfgrowth;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
  11. import org.apache.mahout.common.HadoopUtil;  
  12. public class PFGrowth_ParallelCounting {  
  13.     public boolean runParallelCountingJob(String input,String output) throws IOException, ClassNotFoundException, InterruptedException{  
  14.         Configuration conf=new Configuration();  
  15.         Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);  
  16.         job.setJarByClass(PFGrowth_ParallelCounting.class);  
  17.         job.setMapperClass(PFGrowth_ParallelCountingM.class);  
  18.         job.setCombinerClass(PFGrowth_ParallelCountingR.class);  
  19.         job.setReducerClass(PFGrowth_ParallelCountingR.class);  
  20.         job.setOutputFormatClass(SequenceFileOutputFormat.class); //  get rid of this line you can get the text file  
  21.         job.setOutputKeyClass(Text.class);  
  22.         job.setOutputValueClass(LongWritable.class);      
  23.         FileInputFormat.setInputPaths(job,new Path( input));  
  24.         Path outPut=new Path(output,"parallelcounting");  
  25.         HadoopUtil.delete(conf, outPut);  
  26.         FileOutputFormat.setOutputPath(job, outPut);          
  27.         boolean succeeded = job.waitForCompletion(true);  
  28.         if (!succeeded) {  
  29.           throw new IllegalStateException("Job failed!");  
  30.         }     
  31.         return succeeded;  
  32.     }  
  33. }  

第一个MR的M:PFGrowth_ParallelCountingM.java:

 

 

[java] view plaincopy
 
  1. package org.fansy.date1101.pfgrowth;  
  2. import java.io.IOException;  
  3. import java.util.regex.Pattern;  
  4. import org.apache.hadoop.io.LongWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Mapper;  
  7. public class PFGrowth_ParallelCountingM extends Mapper<LongWritable,Text,Text,LongWritable> {  
  8.      private static final LongWritable ONE = new LongWritable(1);  
  9.       private Pattern splitter=Pattern.compile("[ ,\t]*[ ,|\t][ ,\t]*");  
  10.       @Override  
  11.       protected void map(LongWritable offset, Text input, Context context) throws IOException,  
  12.                                                                           InterruptedException {  
  13.         String[] items = splitter.split(input.toString());  
  14.         for (String item : items) {  
  15.           if (item.trim().isEmpty()) {  
  16.             continue;  
  17.           }  
  18.           context.setStatus("Parallel Counting Mapper: " + item);  
  19.           context.write(new Text(item), ONE);  
  20.         }  
  21.       }    
  22. }  

上面的代码中的间隔符号修改了源码,加上了空格;

 

第一个MR的R:PFGrowth_ParallelCountingR.java:

 

[java] view plaincopy
 
  1. package org.fansy.date1101.pfgrowth;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.io.LongWritable;  
  4. import org.apache.hadoop.io.Text;  
  5. import org.apache.hadoop.mapreduce.Reducer;  
  6. public class PFGrowth_ParallelCountingR extends Reducer<Text,LongWritable,Text,LongWritable>{  
  7.     protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,  
  8.             InterruptedException {  
  9.         long sum = 0;  
  10.         for (LongWritable value : values) {  
  11.         context.setStatus("Parallel Counting Reducer :" + key);  
  12.         sum += value.get();  
  13.         }  
  14.         context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);  
  15.         context.write(key, new LongWritable(sum));  
  16.     }  
  17. }  

其实第一个MR还是比较好理解的,M分解每个transaction的item,然后输出<item_id ,1>,然后R针对每个item_id 把value值相加求和,这个和wordcount的例子是一样的,当然这里也可以加combine操作的。

 

接着是总表的获得:

PFGrowth_Driver.java ,同时这个程序也调用第一个MR,也就是说可以直接运行这个文件就可以同时运行第一个MR和获得总表了。

 

[java] view plaincopy
 
  1. package org.fansy.date1101.pfgrowth;  
  2. import java.io.IOException;  
  3. import java.util.Comparator;  
  4. import java.util.List;  
  5. import java.util.PriorityQueue;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.filecache.DistributedCache;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.SequenceFile;  
  12. import org.apache.hadoop.io.Text;  
  13. import org.apache.mahout.common.HadoopUtil;  
  14. import org.apache.mahout.common.Pair;  
  15. import org.apache.mahout.common.Parameters;  
  16. import org.apache.mahout.common.iterator.sequencefile.PathType;  
  17. import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;  
  18. import com.google.common.collect.Lists;  
  19. class MyComparator implements Comparator<Pair<String,Long>>{  
  20.      @Override  
  21.      public int compare(Pair<String,Long> o1, Pair<String,Long> o2) {  
  22.        int ret = o2.getSecond().compareTo(o1.getSecond());  
  23.        if (ret != 0) {  
  24.          return ret;  
  25.        }  
  26.        return o1.getFirst().compareTo(o2.getFirst());  
  27.      }    
  28. }  
  29. public class PFGrowth_Driver {  
  30.     public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{  
  31.         if(args.length!=3){  
  32.             System.out.println("wrong input args");  
  33.             System.out.println("usage: <intput><output><minsupport>");  
  34.             System.exit(-1);  
  35.         }  
  36.         // set parameters  
  37.         Parameters params=new Parameters();  
  38.         params.set("INPUT", args[0]);  
  39.         params.set("OUTPUT", args[1]);  
  40.         params.set("MIN_SUPPORT", args[2]);  
  41.         // get parameters  
  42.         String input=params.get("INPUT");  
  43.         String output=params.get("OUTPUT");  
  44.         //  run the first job  
  45.         PFGrowth_ParallelCounting ppc=new PFGrowth_ParallelCounting();  
  46.         ppc.runParallelCountingJob(input, output);    
  47.         //  read input and set the fList  
  48.          List<Pair<String,Long>> fList = readFList(params);  
  49.          Configuration conf=new Configuration();  
  50.          saveFList(fList, params, conf);           
  51.     }     
  52.     /** 
  53.        * Serializes the fList and returns the string representation of the List 
  54.        *  
  55.        * @return Serialized String representation of List 
  56.        */  
  57.       public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf)  
  58.         throws IOException {  
  59.         Path flistPath = new Path(params.get("OUTPUT"), "fList");  
  60.         FileSystem fs = FileSystem.get(flistPath.toUri(), conf);  
  61.         flistPath = fs.makeQualified(flistPath);  
  62.         HadoopUtil.delete(conf, flistPath);  
  63.         SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class);  
  64.         try {  
  65.           for (Pair<String,Long> pair : flist) {  
  66.             writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond()));  
  67.           }  
  68.         } finally {  
  69.           writer.close();  
  70.         }  
  71.         DistributedCache.addCacheFile(flistPath.toUri(), conf);  
  72.       }  
  73.     public static List<Pair<String,Long>> readFList(Parameters params) {  
  74.         int minSupport = Integer.valueOf(params.get("MIN_SUPPORT"));  
  75.         Configuration conf = new Configuration();      
  76.         Path parallelCountingPath = new Path(params.get("OUTPUT"),"parallelcounting");  
  77.         //  add MyComparator  
  78.         PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11,new MyComparator());  
  79.         // sort according to the occur times from large to small   
  80.   
  81.         for (Pair<Text,LongWritable> record  
  82.              : new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, "part-*"),  
  83.                                                             PathType.GLOB, nullnulltrue, conf)) {  
  84.           long value = record.getSecond().get();  
  85.           if (value >= minSupport) {   // get rid of the item which is below the minimum support  
  86.             queue.add(new Pair<String,Long>(record.getFirst().toString(), value));  
  87.           }  
  88.         }  
  89.         List<Pair<String,Long>> fList = Lists.newArrayList();  
  90.         while (!queue.isEmpty()) {  
  91.           fList.add(queue.poll());  
  92.         }  
  93.         return fList;  
  94.       }   
  95. }  

第一个MR运行完毕后,调用readFList()函数,把第一个MR的输出按照item出现的次数从大到小放入一个列表List中,然后调用saveFList()函数把上面求得的List存入HDFS文件中,不过存入的格式是被序列话的,可以另外编写函数查看文件是否和自己的假设相同;

 

FList 文件反序列化如下:

http://blog.csdn.net/fansy1990/article/details/8137942

分享到:
评论

相关推荐

    mahout关联推荐算法

    总的来说,Mahout的PFPGrowth算法是一种强大的工具,适用于需要对大规模数据进行关联规则挖掘的场景,如电商、社交媒体分析等。通过合理配置参数和利用并行计算能力,PFPGrowth能够在处理海量数据的同时保持高效,为...

    mahout Algorithms源码分析

    Mahout是一个Apache Software Foundation(ASF)旗下的开源项目,主要用途是提供可扩展的机器学习算法的实现,帮助开发人员更方便快捷地创建智能应用程序。Mahout包含了很多算法的实现,包括聚类(Clustering)、...

    Mahout源码

    - **关联规则学习(Association Rule Learning)**: 通过发现项集之间的频繁模式,来挖掘数据中的有趣关系。 **2. Mahout 与 Maven** 作为基于Maven的项目,Mahout的源码组织遵循Maven的标准结构。每个模块对应...

    mahout源码

    在大数据时代,Mahout已经成为数据科学家和工程师们的重要工具,尤其在文本分析、推荐系统和分类任务中扮演着关键角色。本篇将深入探讨Mahout中的朴素贝叶斯分类以及中文分词这两个核心功能。 一、Mahout与朴素...

    mahout in action中的源码

    4. **关联规则学习(Association Rule Learning)**:例如Apriori算法,用于发现数据集中的频繁项集和有趣的关联规则,常见于购物篮分析。 5. **主成分分析(Principal Component Analysis, PCA)**:这是一种降维...

    mahout0.9 源码

    2. **机器学习算法库**:Mahout包含了多种机器学习算法,包括分类、聚类、推荐系统和关联规则挖掘等。这些算法是实现数据分析和预测的关键。 3. **可扩展性**:由于其模块化设计,Mahout允许开发者轻松地添加新的...

    mahout0.9源码(支持hadoop2)

    mahout0.9的源码,支持hadoop2,需要自行使用mvn编译。mvn编译使用命令: mvn clean install -Dhadoop2 -Dhadoop.2.version=2.2.0 -DskipTests

    Mahout教程内含源码以及说明书可以自己运行复现.zip

    通过源码和说明书,你可以亲自运行示例,加深对Mahout的理解。 **0. Mahout 简介** Apache Mahout起源于2009年,是基于Hadoop的数据挖掘工具,主要目标是提供可扩展的、高效的机器学习算法。它支持多种类型的机器...

    mahout-distribution-0.5-src.zip mahout 源码包

    mahout-distribution-0.5-src.zip mahout 源码包

    FP关联规则置信度

    FP关联规则计算置信度的方法:参考Mahout FP算法相关相关源码。 只是单机版的实现,并没有MapReduce的代码,可以参考: http://blog.csdn.net/fansy1990/article/details/41279833 实现思路

    人工智能-推荐系统-新闻推荐-基于Mahout的新闻推荐系统

    Mahout:整体框架,实现了协同过滤 Deeplearning4j,构建VSM Jieba:分词,关键词提取 HanLP:分词,关键词提取 Spring Boot:提供API、ORM 关键实现 基于用户的协同过滤 直接调用Mahout相关接口即可 选择不同...

    机器学习算法关联规则贝叶斯SVM、ME、kmeans、knn

    1. **关联规则**:关联规则挖掘是一种数据挖掘技术,用于发现数据集中的项集之间的有趣关系,如“如果顾客购买了尿布,那么他们很可能也会购买啤酒”。Apriori、FP-Growth等算法是关联规则挖掘的经典方法。在Java中...

    Mahout算法调用展示平台2.1-part3

    第三部分 功能主要包括四个方面:集群配置、集群算法监控、Hadoop模块、Mahout模块。 详情参考《Mahout算法调用展示平台2.1》

    mahout 原理 简介

    1. **Weka**:自1999年起开发的 Java 机器学习工具包,包含了大量的数据预处理、分类、回归、聚类、关联规则以及可视化功能。 2. **Java 机器学习项目**:截至当前,已有超过38个 Java 项目在 mloss.org 上列出,...

    [Mahout] Windows下Mahout单机安装

    1. Java Development Kit (JDK):Mahout依赖Java环境,所以你需要先安装JDK,并确保`JAVA_HOME`环境变量设置正确,指向JDK的安装目录。 2. Apache Maven:Mahout的构建工具,负责下载依赖和构建项目。确保Maven已...

    mahout所需jar包

    Mahout的目标是帮助开发人员构建智能应用程序,如推荐系统、分类和聚类算法,这些在大数据分析领域中极为重要。 **K-Means聚类算法** K-Means是一种无监督学习的聚类算法,用于将数据集分成不同的群组或类别。在...

    Mahout算法调用展示平台2.1-part2

    第二部分 功能主要包括四个方面:集群配置、集群算法监控、Hadoop模块、Mahout模块。 详情参考《Mahout算法调用展示平台2.1》

    mahout-0.3.tar.gz

    - **关联规则学习(Association Rule Learning)**: 用于发现数据集中的频繁项集和规则,如购物篮分析。 - **向量空间模型(Vector Space Models)**: 提供了TF-IDF(Term Frequency-Inverse Document Frequency)等...

Global site tag (gtag.js) - Google Analytics