前面是单机版的实现,现在通过MapReduce来实现FPGrowth算法,主要用了两个MR,具体过程如下:
1、第一个MR扫描所有数据集统计数据集中的频繁一项集,即每个项的出现次数。
2、读取第一个MR产生的文件,对频繁一项集排序,然后上传到HDFS上。
3、第二个MR扫描所有数据集,并根据第二步产生的排序好的频繁一项集来得出频繁项集。
第二个MR的Map阶段过程:首先根据排好序的频繁一项集将事务数据排好序,然后遍历排好序的事务数据,以频繁项为键,事务数据为值传递给Reduce阶段。
第二个MR的Reduce阶段过程:Reduce节点接收到从Map节点过来的数据,遍历这个频繁项对应的事务数据集,将它们构建起该频繁项的条件FP树。从条件FP树进而得到包含本频繁项的频繁项集。
FPGrowth算法MapReduce简单实现:
public class FPGrowthJob { private Configuration conf = null; //频繁一项集生成 public String frequency_1_itemset_gen(String input, String minSupport) { String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID(); String[] inputArgs = new String[]{input, output, minSupport}; Frequency1ItemSetMR.main(inputArgs); return output; } //频繁一项集排序 public String frequency_1_itemset_sort(String input) { Map<String, Integer> map = new HashMap<String, Integer>(); SequenceFile.Reader reader = null; try { Path dirPath = new Path(input); Path[] paths = HDFSUtils.getPathFiles(conf, dirPath); FileSystem fs = FileSystem.get(conf); reader = new SequenceFile.Reader(fs, paths[0], conf); Text key = (Text) ReflectionUtils.newInstance( reader.getKeyClass(), conf); IntWritable value = new IntWritable(); while (reader.next(key, value)) { map.put(key.toString(), value.get()); key = new Text(); value = new IntWritable(); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(); for (Map.Entry<String, Integer> entry : map.entrySet()) { entries.add(entry); } //根据出现频次排序项 Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return ((Integer) o2.getValue()).compareTo((Integer) o1.getValue()); } }); String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID() + File.separator + IdentityUtils.generateUUID(); SequenceFile.Writer writer = null; try { Path path = new Path(output); FileSystem fs = FileSystem.get(conf); writer = SequenceFile.createWriter(fs, conf, path, Text.class, IntWritable.class); for (Map.Entry<String, Integer> entry : entries) { writer.append(new Text(entry.getKey()), new IntWritable(entry.getValue())); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(writer); } return output; } //频繁项集生成 public void frequency_itemset_gen(String input, String output, String sort_input) { System.out.println("frequency_itemset_gen input: " + input); System.out.println("frequency_itemset_gen sort input: " + sort_input); String[] inputArgs = new String[]{input, output, sort_input}; FPGrowthMR.main(inputArgs); } public void run(String[] args) { if (null == conf) conf = new Configuration(); try { String[] inputArgs = new GenericOptionsParser( conf, args).getRemainingArgs(); if (inputArgs.length != 3) { System.out.println("error"); System.out.println("1. input path."); System.out.println("2. output path."); System.out.println("3. min support."); System.exit(2); } String fre1_output = frequency_1_itemset_gen(inputArgs[0], inputArgs[2]); String fre1_sort_output = frequency_1_itemset_sort(fre1_output); frequency_itemset_gen(inputArgs[0], inputArgs[1], fre1_sort_output); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FPGrowthJob job = new FPGrowthJob(); long startTime = System.currentTimeMillis(); job.run(args); long endTime = System.currentTimeMillis(); System.out.println("spend time: " + (endTime - startTime)); } }
第一个MR很简单就不上了,直接贴第二个MR
public class FPGrowthMR { private static void configureJob(Job job) { job.setJarByClass(FPGrowthMR.class); job.setMapperClass(FPGrowthMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FPGrowthReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); } public static void main(String[] args) { Configuration configuration = new Configuration(); try { String[] inputArgs = new GenericOptionsParser( configuration, args).getRemainingArgs(); if (inputArgs.length != 3) { System.out.println("error"); System.out.println("error, please input two path. input and output"); System.out.println("1. input path."); System.out.println("2. output path."); System.out.println("3. sort input path."); System.exit(2); } // configuration.set("mapred.job.queue.name", "q_hudong"); configuration.set("sort.input.path", inputArgs[2]); Path sortPath = new Path(inputArgs[2]); DistributedCache.addCacheFile(sortPath.toUri(), configuration); Job job = new Job(configuration, "FPGrowth Algorithm"); FileInputFormat.setInputPaths(job, new Path(inputArgs[0])); FileOutputFormat.setOutputPath(job, new Path(inputArgs[1])); configureJob(job); System.out.println(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } } class FPGrowthMapper extends Mapper<LongWritable, Text, Text, Text> { private List<Map.Entry<String, Integer>> entries = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); URI[] uris = DistributedCache.getCacheFiles(conf); Map<String, Integer> map = new HashMap<String, Integer>(); SequenceFile.Reader reader = null; try { Path path = new Path(uris[0]); FileSystem fs = FileSystem.get(conf); reader = new SequenceFile.Reader(fs, path, conf); Text key = (Text) ReflectionUtils.newInstance( reader.getKeyClass(), conf); IntWritable value = new IntWritable(); while (reader.next(key, value)) { map.put(key.toString(), value.get()); key = new Text(); value = new IntWritable(); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } entries = new ArrayList<Map.Entry<String, Integer>>(); for (Map.Entry<String, Integer> entry : map.entrySet()) { entries.add(entry); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); tokenizer.nextToken(); List<String> results = new ArrayList<String>(); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); String[] items = token.split(","); for (Map.Entry<String, Integer> entry : entries) { String eKey = entry.getKey(); for (String item : items) { if (eKey.equals(item)) { results.add(eKey); break; } } } } String[] values = results.toArray(new String[0]); StringBuilder sb = new StringBuilder(); for (String v : values) { sb.append(v).append(","); } if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1); for (String v : values) { context.write(new Text(v), new Text(sb.toString())); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } class FPGrowthReducer extends Reducer<Text, Text, Text, IntWritable> { @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String keyItem = key.toString(); System.out.println("key: " + keyItem); Data data = new Data(); for (Text value : values) { Instance instance = new Instance(); StringTokenizer tokenizer = new StringTokenizer(value.toString()); String token = tokenizer.nextToken(); String[] items = token.split(","); List<String> temp = new ArrayList<String>(); for (String item : items) { if (keyItem.equals(item)) { break; } temp.add(item); } instance.setValues(temp.toArray(new String[0])); data.getInstances().add(instance); } context.write(new Text(keyItem), new IntWritable(data.getInstances().size())); FPGrowthBuilder fpBuilder = new FPGrowthBuilder(); fpBuilder.build(data, null); List<List<ItemSet>> frequencies = fpBuilder.obtainFrequencyItemSet(); for (List<ItemSet> frequency : frequencies) { for (ItemSet itemSet : frequency) { StringBuilder sb = new StringBuilder(); for (String i : itemSet.getItems()) { sb.append(i).append(","); } sb.append(keyItem); context.write(new Text(sb.toString()), new IntWritable(itemSet.getSupport())); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }
HDFS上查看结果如下:
尿布 5 莴苣,尿布 4 葡萄酒,尿布 4 豆奶,尿布 3 橙汁,尿布 2 莴苣,葡萄酒,尿布 3 莴苣,豆奶,尿布 2 葡萄酒,豆奶,尿布 2 橙汁,豆奶,尿布 2 橙汁 2 豆奶,橙汁 2 莴苣 5 豆奶,莴苣 3 葡萄酒 4 莴苣,葡萄酒 3 豆奶,葡萄酒 2 豆奶 4
相关推荐
此外,关联规则分析是发现数据中频繁项集和强关联规则的过程,Apriori和FP-growth是经典的关联规则挖掘算法。 关联规则不仅用于市场预测,还可以帮助决策制定。支持度和置信度是评估关联规则强度的关键指标,频繁项...
关联规则学习,如Apriori和FP-Growth,用于找出项集之间的频繁模式,常用于市场篮子分析。回归分析则是预测连续变量的方法,如线性回归和逻辑回归。 在《数据挖掘》课程学习笔记中,还可能包含了特征选择的内容。...
10. **FP-Growth**:另一种用于关联规则学习的算法,相比Apriori更加高效。 #### 四、总结 通过对数据预处理方法的详细介绍以及数据挖掘经典算法的概述,我们可以看到,数据预处理是确保数据挖掘结果准确性和有效...
关联规则挖掘的算法包括Apriori算法、Eclat算法、FP-Growth算法等。 决策树是一种常见的分类算法,它可以用于解决分类问题。决策树的算法包括ID3算法、C4.5算法、 CART算法等。 随机森林是一种ensemble学习算法,...
建模阶段则会应用各种算法,如分类(如决策树、随机森林)、回归、聚类(如K-means、DBSCAN)和关联规则学习(如Apriori、FP-Growth)。评估阶段通过交叉验证、混淆矩阵等方法检验模型的性能和泛化能力。 在课程中...
4. **关联规则**:Apriori、FP-Growth等算法用于找出项集之间的频繁模式。例如,mlxtend库包含了这些算法。 五、特征工程 特征选择和特征提取是提高模型性能的关键步骤。Python的FeatureSelection模块可以帮助进行...
关联规则挖掘是数据挖掘领域中的一个重要研究方向,关联规则挖掘算法的目标是发现数据库中强关联规则,即那些具有高支持度(Support)和高置信度(Confidence)的规则,其中支持度表示规则中所有项集在所有交易中...
Apriori和FP-Growth是两种常见的关联规则挖掘算法,Apriori通过迭代生成频繁项集,而FP-Growth则利用压缩数据结构降低存储和计算需求。 聚类分析则是将数据分组,使同一组内的数据相似度高,组间差异大。在物联网中...
- 关联规则学习:寻找项集之间的频繁模式,如Apriori算法和FP-Growth算法,常用于市场篮子分析。 - 回归:预测连续值,如线性回归、多项式回归和岭回归。 - 异常检测:识别数据中的异常值,常用的方法有基于统计...
关联分析的经典算法有Apriori和FP-Growth,其中FP-Growth通过构建树形结构优化了频繁项集的查找,减少了对数据库的扫描次数。 2. **聚类分析**:聚类是将相似数据分组在一起,形成不同的类别或簇。它是一种无监督...
频繁模式挖掘是数据挖掘领域的一个重要组成部分,主要目标是从交易数据库或序列数据中找出频繁出现的项集或模式。这些模式可以用于市场篮子分析、关联规则学习、用户行为模式识别等多种应用场景。 在Python中,我们...
2. 关联规则算法实现:常见的关联规则挖掘算法有Apriori、FP-Growth等。这些算法会找出频繁项集(一组经常一起出现的项目),然后生成满足最小支持度和置信度阈值的关联规则。 3. 结果可视化:Jupyter Notebook中的...
常见的挖掘技术有分类(如决策树、随机森林)、聚类(如K-means、DBSCAN)、关联规则(如Apriori、FP-Growth)、序列模式挖掘和回归等。 3. 模式评估与解释:找到的模式需要经过验证和解释,以确定其在业务或研究中...
数据挖掘算法可以分为分类(如决策树、随机森林、支持向量机)、聚类(K-Means、DBSCAN)、关联规则(Apriori、FP-Growth)、序列挖掘等。Jupyter Notebook中的代码很可能会展示如何应用这些算法。 3. **模式评估**...
Apriori算法和FP-growth是典型的关联规则挖掘算法。 5. 序列和时间序列分析:在处理具有时间顺序的数据时,时间序列分析如ARIMA模型和状态空间模型可以帮助我们理解和预测未来的趋势。 6. 异常检测:数据中可能...
Apriori算法基于候选生成和检验的思想,而FPGrowth算法采用FP树的结构来挖掘频繁项集,前者适合小规模数据集,后者适合大规模数据集。 总结来说,北京邮电大学的这篇文档通过梳理各种机器学习算法的基本概念、核心...