近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。
算法的思想还是参考上次的图片:
这里实现分为五个步骤:
- 针对原始输入计算每个项目出现的次数;
- 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
- 针对原始输入的事务进行按frequence list file进行排序并剪枝;
- 生成二项集规则;
- 计算二项集规则出现的次数,并删除小于阈值的二项集规则;
第一步的实现:包括步骤1和步骤2,代码如下:
GetFlist.java:
- package org.fansy.date1108.fpgrowth.twodimension;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.ArrayList;
- import java.util.Comparator;
- import java.util.Iterator;
- import java.util.List;
- import java.util.PriorityQueue;
- import java.util.regex.Pattern;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- // the specific comparator
- class MyComparator implements Comparator<String>{
- private String splitter=",";
- public MyComparator(String splitter){
- this.splitter=splitter;
- }
- @Override
- public int compare(String o1, String o2) {
- // TODO Auto-generated method stub
- String[] str1=o1.toString().split(splitter);
- String[] str2=o2.toString().split(splitter);
- int num1=Integer.parseInt(str1[1]);
- int num2=Integer.parseInt(str2[1]);
- if(num1>num2){
- return -1;
- }else if(num1<num2){
- return 1;
- }else{
- return str1[0].compareTo(str2[0]);
- }
- }
- }
- public class GetFList {
- /**
- * the program is based on the picture
- */
- // Mapper
- public static class MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{
- private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
- private final IntWritable newvalue=new IntWritable(1);
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String [] items=splitter.split(value.toString());
- for(String item:items){
- context.write(new Text(item), newvalue);
- }
- }
- }
- // Reducer
- public static class ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{
- public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
- int temp=0;
- for(IntWritable v:value){
- temp+=v.get();
- }
- context.write(key, new IntWritable(temp));
- }
- }
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // TODO Auto-generated method stub
- if(args.length!=3){
- System.out.println("Usage: <input><output><min_support>");
- System.exit(1);
- }
- String input=args[0];
- String output=args[1];
- int minSupport=0;
- try {
- minSupport=Integer.parseInt(args[2]);
- } catch (NumberFormatException e) {
- // TODO Auto-generated catch block
- minSupport=3;
- }
- Configuration conf=new Configuration();
- String temp=args[1]+"_temp";
- Job job=new Job(conf,"the get flist job");
- job.setJarByClass(GetFList.class);
- job.setMapperClass(MapperGF.class);
- job.setCombinerClass(ReducerGF.class);
- job.setReducerClass(ReducerGF.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(temp));
- boolean succeed=job.waitForCompletion(true);
- if(succeed){
- // read the temp output and write the data to the final output
- List<String> list=readFList(temp+"/part-r-00000",minSupport);
- System.out.println("the frequence list has generated ... ");
- // generate the frequence file
- generateFList(list,output);
- System.out.println("the frequence file has generated ... ");
- }else{
- System.out.println("the job is failed");
- System.exit(1);
- }
- }
- // read the temp_output and return the frequence list
- public static List<String> readFList(String input,int minSupport) throws IOException{
- // read the hdfs file
- Configuration conf=new Configuration();
- Path path=new Path(input);
- FileSystem fs=FileSystem.get(path.toUri(),conf);
- FSDataInputStream in1=fs.open(path);
- PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t"));
- InputStreamReader isr1=new InputStreamReader(in1);
- BufferedReader br=new BufferedReader(isr1);
- String line;
- while((line=br.readLine())!=null){
- int num=0;
- try {
- num=Integer.parseInt(line.split("\t")[1]);
- } catch (NumberFormatException e) {
- // TODO Auto-generated catch block
- num=0;
- }
- if(num>minSupport){
- queue.add(line);
- }
- }
- br.close();
- isr1.close();
- in1.close();
- List<String> list=new ArrayList<String>();
- while(!queue.isEmpty()){
- list.add(queue.poll());
- }
- return list;
- }
- // generate the frequence file
- public static void generateFList(List<String> list,String output) throws IOException{
- Configuration conf=new Configuration();
- Path path=new Path(output);
- FileSystem fs=FileSystem.get(path.toUri(),conf);
- FSDataOutputStream writer=fs.create(path);
- Iterator<String> i=list.iterator();
- while(i.hasNext()){
- writer.writeBytes(i.next()+"\n");// in the last line add a \n which is not supposed to exist
- }
- writer.close();
- }
- }
步骤1的实现其实就是最简单的wordcount程序的实现,在步骤2中涉及到HDFS文件的读取以及写入。在生成frequence list file时排序时用到了PriorityQueue类,同时自定义了一个类用来定义排序规则;
第二步:步骤3,代码如下:
SortAndCut.java:
- package org.fansy.date1108.fpgrowth.twodimension;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.net.URI;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.LinkedHashSet;
- import java.util.Set;
- import java.util.regex.Pattern;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class SortAndCut {
- /**
- * sort and cut the items
- */
- public static class M extends Mapper<LongWritable,Text,NullWritable,Text>{
- private LinkedHashSet<String> list=new LinkedHashSet<String>();
- private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
- public void setup(Context context) throws IOException{
- String input=context.getConfiguration().get("FLIST");
- FileSystem fs=FileSystem.get(URI.create(input),context.getConfiguration());
- Path path=new Path(input);
- FSDataInputStream in1=fs.open(path);
- InputStreamReader isr1=new InputStreamReader(in1);
- BufferedReader br=new BufferedReader(isr1);
- String line;
- while((line=br.readLine())!=null){
- String[] str=line.split("\t");
- if(str.length>0){
- list.add(str[0]);
- }
- }
- }
- // map
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String [] items=splitter.split(value.toString());
- Set<String> set=new HashSet<String>();
- set.clear();
- for(String s:items){
- set.add(s);
- }
- Iterator<String> iter=list.iterator();
- StringBuffer sb=new StringBuffer();
- sb.setLength(0);
- int num=0;
- while(iter.hasNext()){
- String item=iter.next();
- if(set.contains(item)){
- sb.append(item+",");
- num++;
- }
- }
- if(num>0){
- context.write(NullWritable.get(), new Text(sb.toString()));
- }
- }
- }
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // TODO Auto-generated method stub
- if(args.length!=3){
- System.out.println("Usage: <input><output><fListPath>");
- System.exit(1);
- }
- String input=args[0];
- String output=args[1];
- String fListPath=args[2];
- Configuration conf=new Configuration();
- conf.set("FLIST", fListPath);
- Job job=new Job(conf,"the sort and cut the items job");
- job.setJarByClass(SortAndCut.class);
- job.setMapperClass(M.class);
- job.setNumReduceTasks(0);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.setInputPaths(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(output));
- boolean succeed=job.waitForCompletion(true);
- if(succeed){
- System.out.println(job.getJobName()+" succeed ... ");
- }
- }
- }
在本阶段的Mapper的setup中读取frequence file到一个LinkedHashSet(可以保持原始的插入顺序)中,然后在map中针对一个事务输出这个LinkedHashSet,不过限制输出是在这个事务中出现的项目而已。
第三步:步骤4和步骤5,代码如下:
OutRules.java
- package org.fansy.date1108.fpgrowth.twodimension;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map.Entry;
- import java.util.Stack;
- import java.util.TreeSet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class OutRules {
- public static class M extends Mapper<LongWritable,Text,Text,Text>{
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String str=value.toString();
- String[] s=str.split(",");
- if(s.length<=1){
- return;
- }
- Stack<String> stack=new Stack<String>();
- for(int i=0;i<s.length;i++){
- stack.push(s[i]);
- }
- int num=str.length();
- while(stack.size()>1){
- num=num-2;
- context.write(new Text(stack.pop()),new Text(str.substring(0,num)));
- }
- }
- }
- // Reducer
- public static class R extends Reducer<Text ,Text,Text,Text>{
- private int minConfidence=0;
- public void setup(Context context){
- String str=context.getConfiguration().get("MIN");
- try {
- minConfidence=Integer.parseInt(str);
- } catch (NumberFormatException e) {
- // TODO Auto-generated catch block
- minConfidence=3;
- }
- }
- public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
- HashMap<String,Integer> hm=new HashMap<String ,Integer>();
- for(Text v:values){
- String[] str=v.toString().split(",");
- for(int i=0;i<str.length;i++){
- if(hm.containsKey(str[i])){
- int temp=hm.get(str[i]);
- hm.put(str[i], temp+1);
- }else{
- hm.put(str[i], 1);
- }
- }
- }
- // end of for
- TreeSet<String> sss=new TreeSet<String>(new MyComparator(" "));
- Iterator<Entry<String,Integer>> iter=hm.entrySet().iterator();
- while(iter.hasNext()){
- Entry<String,Integer> k=iter.next();
- if(k.getValue()>minConfidence&&!key.toString().equals(k.getKey())){
- sss.add(k.getKey()+" "+k.getValue());
- }
- }
- Iterator<String> iters=sss.iterator();
- StringBuffer sb=new StringBuffer();
- while(iters.hasNext()){
- sb.append(iters.next()+"|");
- }
- context.write(key, new Text(":\t"+sb.toString()));
- }
- }
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // TODO Auto-generated method stub
- if(args.length!=3){
- System.out.println("Usage: <input><output><min_confidence>");
- System.exit(1);
- }
- String input=args[0];
- String output=args[1];
- String minConfidence=args[2];
- Configuration conf=new Configuration();
- conf.set("MIN", minConfidence);
- Job job=new Job(conf,"the out rules job");
- job.setJarByClass(OutRules.class);
- job.setMapperClass(M.class);
- job.setNumReduceTasks(1);
- job.setReducerClass(R.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.setInputPaths(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(output));
- boolean succeed=job.waitForCompletion(true);
- if(succeed){
- System.out.println(job.getJobName()+" succeed ... ");
- }
- }
- }
在map阶段使用了Stack 和字符串操作实现类似下面的功能:
- input:p,x,z,y,a,b
- output:
- b:p,x,z,y,a
- a:p,x,z,y
- y:p,x,z
- z:p,x
- x:p
在reduce阶段只是统计下项目出现的次数而已,用到了一个HashMap,又如果输出是根据项目出现的次数从大到小的一个排序那就更好了,所以又用到了TreeSet.
其中上面所有的输出文件中的格式都只是拼串而已,所以其中的格式可以按照自己的要求进行更改。
比如,我的输出如下:
- 0 : 39 125|48 99|32 44|41 37|38 26|310 17|5 16|65 14|1 13|89 13|1144 12|225 12|60 11|604 11|
- 1327 10|237 10|101 9|147 9|270 9|533 9|9 9|107 8|11 8|117 8|170 8|271 8|334 8|549 8|62 8|812 8|10 7|
- 1067 7|12925 7|23 7|255 7|279 7|548 7|783 7|14098 6|2 6|208 6|22 6|36 6|413 6|789 6|824 6|961 6|110 5|
- 120 5|12933 5|201 5|2238 5|2440 5|2476 5|251 5|286 5|2879 5|3 5|4105 5|415 5|438 5|467 5|475 5|479 5|49 5|
- 592 5|675 5|715 5|740 5|791 5|830 5|921 5|9555 5|976 5|979 5|1001 4|1012 4|1027 4|1055 4|1146 4|12 4|13334 4|
- 136 4|1393 4|16 4|1600 4|165 4|167 4|1819 4|1976 4|2051 4|2168 4|2215 4|2284 4|2353 4|2524 4|261 4|267 4|269 4|
- 27 4|2958 4|297 4|3307 4|338 4|420 4|4336 4|4340 4|488 4|4945 4|5405 4|58 4|589 4|75 4|766 4|795 4|809 4|880 4|8978 4|916 4|94 4|956 4|
冒号前面是项目,后面的39是项目再后面是<0,39>出现的次数,即125次,<0,48>出现的次数是99次;
总结,mahout的源代码确实比较难啃,所以要先对算法非常熟悉,然后去看源码的话 应该会比较容易点;
http://blog.csdn.net/fansy1990/article/details/8160956
相关推荐
总结起来,"基于Apriori算法的频繁项集Hadoop mapreduce"是一个利用大数据处理框架Hadoop的MapReduce模型,高效地执行经典数据挖掘算法Apriori的过程,旨在发现大规模数据集中的频繁项集和关联规则,为商业智能、...
《基于Hadoop实现的关联规则挖掘在图书数据分析推荐系统中的应用》 图书数据分析推荐系统是当前数字图书馆和在线书店中广泛应用的技术,它旨在通过分析用户的阅读习惯和偏好,为用户提供个性化的图书推荐。本项目...
标题中的“基于Apriori算法的频繁项集Hadoop mapreduce”揭示了我们要讨论的核心内容:使用经典的Apriori算法在大数据环境下,通过Hadoop的MapReduce框架进行频繁项集挖掘。Apriori算法是一种在数据库中寻找频繁项集...
Spark中的MLlib库提供了FP-Growth算法的实现,使得在大规模数据集上执行关联规则挖掘成为可能。 具体实践步骤大致如下: 1. 数据预处理:首先,需要将原始数据转化为适合关联规则挖掘的格式,通常是一个交易集合,...
3. Apriori算法:Apriori算法是一种关联规则挖掘算法,通过频繁项集的挖掘来发现关联规则。 4. Hadoop:Hadoop是基于Java的开源大数据处理平台,提供了分布式计算和存储的能力。 5. HDFS:HDFS(Hadoop ...
本实验是关于如何在Hadoop平台上并行实现经典的Apriori算法,这是一门重要的数据挖掘技术,主要用于关联规则学习。下面将详细阐述Apriori算法的基本原理、并行化实现的关键点以及在Hadoop上的应用。 Apriori算法是...
Apriori算法是一种经典的关联规则学习算法,主要用于发现频繁项集和生成关联规则。它的基本思想是从已知的频繁项集中生成新的频繁项集。Apriori算法的核心步骤包括: - 生成候选频繁项集 - 扫描数据库,计算候选项...
除此之外,MapReduce框架下的关联规则挖掘还涉及到多种解决方案,例如:使用Hadoop的MapReduce编程模型来实现数据的并行处理和分析,以及通过优化算法和数据结构来提高效率。这些解决方案利用了MapReduce框架的map和...
Apriori算法是一种经典的频繁项集挖掘算法,用于发现数据集中事物之间的关联规则。该算法通过迭代的方式,从频繁1项集开始,逐步生成更长的频繁项集,直到找不到新的频繁项集为止。在每次迭代中,通过扫描数据集计算...
Apriori算法的基本思想是通过频繁项集的生成和剪枝来挖掘强关联规则,以此推测用户可能感兴趣但尚未接触的图书。 【Hadoop】Hadoop是Apache软件基金会开发的开源框架,主要用于处理和存储大量数据。它采用分片...
Apriori算法是最经典的关联规则算法之一,该算法通过逐步增加项集事务数量来发现频繁集,然后以频繁集为基准去发现关联规则。Apriori算法在数据挖掘和机器学习领域中有着广泛的应用。 3. 并行化Apriori算法的研究与...
关联规则挖掘是一种数据挖掘技术,用于发现数据集中项集之间的有趣关系。在推荐系统中,它可以帮助发现用户购买行为的模式,例如“买了商品A的人常常也会买商品B”。这种技术可以增强推荐的精准度,提供更相关的商品...
关联规则挖掘是数据挖掘中的一项重要技术,其目的是发现在大量数据中频繁出现的模式、相关性及结构化信息,其中,Apriori算法和FP-Growth算法是最常用的两种关联规则挖掘算法。 二、大数据环境下的挑战 大数据环境...