`
fighting_2013
  • 浏览: 15702 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

数据挖掘笔记-关联规则-FPGrowth-2

 
阅读更多

前面是单机版的实现,现在通过MapReduce来实现FPGrowth算法,主要用了两个MR,具体过程如下:

1、第一个MR扫描所有数据集统计数据集中的频繁一项集,即每个项的出现次数。

2、读取第一个MR产生的文件,对频繁一项集排序,然后上传到HDFS上。

3、第二个MR扫描所有数据集,并根据第二步产生的排序好的频繁一项集来得出频繁项集。

第二个MR的Map阶段过程:首先根据排好序的频繁一项集将事务数据排好序,然后遍历排好序的事务数据,以频繁项为键,事务数据为值传递给Reduce阶段。

第二个MR的Reduce阶段过程:Reduce节点接收到从Map节点过来的数据,遍历这个频繁项对应的事务数据集,将它们构建起该频繁项的条件FP树。从条件FP树进而得到包含本频繁项的频繁项集。

 

FPGrowth算法MapReduce简单实现:
public class FPGrowthJob {
	
	private Configuration conf = null;
	
	//频繁一项集生成
	public String frequency_1_itemset_gen(String input, String minSupport) {
		String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID();
		String[] inputArgs = new String[]{input, output, minSupport};
		Frequency1ItemSetMR.main(inputArgs);
		return output;
	}
	
	//频繁一项集排序
	public String frequency_1_itemset_sort(String input) {
		Map<String, Integer> map = new HashMap<String, Integer>();
		SequenceFile.Reader reader = null;
		try {
			Path dirPath = new Path(input);
			Path[] paths = HDFSUtils.getPathFiles(conf, dirPath);
			FileSystem fs = FileSystem.get(conf);
			reader = new SequenceFile.Reader(fs, paths[0], conf);
			Text key = (Text) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			IntWritable value = new IntWritable();
			while (reader.next(key, value)) {
				map.put(key.toString(), value.get());
				key = new Text();
				value = new IntWritable();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(reader);
		}
		List<Map.Entry<String, Integer>> entries = 
				new ArrayList<Map.Entry<String, Integer>>(); 
		for (Map.Entry<String, Integer> entry : map.entrySet()) {
			entries.add(entry);
		}
		//根据出现频次排序项
		Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
			public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
				return ((Integer) o2.getValue()).compareTo((Integer) o1.getValue());
			}
		});
		String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID()
				+ File.separator + IdentityUtils.generateUUID();
		SequenceFile.Writer writer = null;
		try {
			Path path = new Path(output);
			FileSystem fs = FileSystem.get(conf);
			writer = SequenceFile.createWriter(fs, conf, path,
					Text.class, IntWritable.class);
			for (Map.Entry<String, Integer> entry : entries) {
				writer.append(new Text(entry.getKey()), new IntWritable(entry.getValue()));
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(writer);
		}
		return output;
	}
	
	//频繁项集生成
	public void frequency_itemset_gen(String input, String output, String sort_input) {
		System.out.println("frequency_itemset_gen input: " + input);
		System.out.println("frequency_itemset_gen sort input: " + sort_input);
		String[] inputArgs = new String[]{input, output, sort_input};
		FPGrowthMR.main(inputArgs);
	}
	
	public void run(String[] args) {
		if (null == conf) conf = new Configuration();
		try {
			String[] inputArgs = new GenericOptionsParser(
					conf, args).getRemainingArgs();
			if (inputArgs.length != 3) {
				System.out.println("error");
				System.out.println("1. input path.");
				System.out.println("2. output path.");
				System.out.println("3. min support.");
				System.exit(2);
			}
			String fre1_output = frequency_1_itemset_gen(inputArgs[0], inputArgs[2]);
			String fre1_sort_output = frequency_1_itemset_sort(fre1_output);
			frequency_itemset_gen(inputArgs[0], inputArgs[1], fre1_sort_output);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		FPGrowthJob job = new FPGrowthJob();
		long startTime = System.currentTimeMillis();
		job.run(args);
		long endTime = System.currentTimeMillis();
		System.out.println("spend time: " + (endTime - startTime));
	}
}

第一个MR很简单就不上了,直接贴第二个MR

public class FPGrowthMR {
	
	private static void configureJob(Job job) {
		job.setJarByClass(FPGrowthMR.class);
		
		job.setMapperClass(FPGrowthMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(FPGrowthReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
	}

	public static void main(String[] args) {
		Configuration configuration = new Configuration();
		try {
			String[] inputArgs = new GenericOptionsParser(
						configuration, args).getRemainingArgs();
			if (inputArgs.length != 3) {
				System.out.println("error");
				System.out.println("error, please input two path. input and output");
				System.out.println("1. input path.");
				System.out.println("2. output path.");
				System.out.println("3. sort input path.");
				System.exit(2);
			}
//			configuration.set("mapred.job.queue.name", "q_hudong");
			configuration.set("sort.input.path", inputArgs[2]);
			
			Path sortPath = new Path(inputArgs[2]);
			DistributedCache.addCacheFile(sortPath.toUri(), configuration);
			
			Job job = new Job(configuration, "FPGrowth Algorithm");
			
			FileInputFormat.setInputPaths(job, new Path(inputArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(inputArgs[1]));
			
			configureJob(job);
			
			System.out.println(job.waitForCompletion(true) ? 0 : 1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

class FPGrowthMapper extends Mapper<LongWritable, Text, Text, Text> {
	
	private List<Map.Entry<String, Integer>> entries = null;
	
	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {
		super.setup(context);
		Configuration conf = context.getConfiguration();
		URI[] uris = DistributedCache.getCacheFiles(conf);
		Map<String, Integer> map = new HashMap<String, Integer>();
		SequenceFile.Reader reader = null;
		try {
			Path path = new Path(uris[0]);
			FileSystem fs = FileSystem.get(conf);
			reader = new SequenceFile.Reader(fs, path, conf);
			Text key = (Text) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			IntWritable value = new IntWritable();
			while (reader.next(key, value)) {
				map.put(key.toString(), value.get());
				key = new Text();
				value = new IntWritable();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(reader);
		}
		entries = new ArrayList<Map.Entry<String, Integer>>(); 
		for (Map.Entry<String, Integer> entry : map.entrySet()) {
			entries.add(entry);
		}
	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		StringTokenizer tokenizer = new StringTokenizer(value.toString());
		tokenizer.nextToken();
		List<String> results = new ArrayList<String>();
		while (tokenizer.hasMoreTokens()) {
			String token = tokenizer.nextToken();
			String[] items = token.split(",");
			for (Map.Entry<String, Integer> entry : entries) {
				String eKey = entry.getKey();
				for (String item : items) {
					if (eKey.equals(item)) {
						results.add(eKey);
						break;
					}
				}
			}
		}
		String[] values = results.toArray(new String[0]);
		StringBuilder sb = new StringBuilder();
		for (String v : values) {
			sb.append(v).append(",");
		}
		if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1);
		for (String v : values) {
			context.write(new Text(v), new Text(sb.toString()));
		}
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		super.cleanup(context);
	}
}

class FPGrowthReducer extends Reducer<Text, Text, Text, IntWritable> {
	
	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);
	}
	
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Context context) throws IOException, InterruptedException {
		String keyItem = key.toString();
		System.out.println("key: " + keyItem);
		Data data = new Data();
		for (Text value : values) {
			Instance instance = new Instance();
			StringTokenizer tokenizer = new StringTokenizer(value.toString());
			String token = tokenizer.nextToken();
			String[] items = token.split(",");
			List<String> temp = new ArrayList<String>();
			for (String item : items) {
				if (keyItem.equals(item)) {
					break;
				}
				temp.add(item);
			}
			instance.setValues(temp.toArray(new String[0]));
			data.getInstances().add(instance);
		}
		context.write(new Text(keyItem), new IntWritable(data.getInstances().size()));
		FPGrowthBuilder fpBuilder = new FPGrowthBuilder();
		fpBuilder.build(data, null);
		List<List<ItemSet>> frequencies = fpBuilder.obtainFrequencyItemSet();
		for (List<ItemSet> frequency : frequencies) {
			for (ItemSet itemSet : frequency) {
				StringBuilder sb = new StringBuilder();
				for (String i : itemSet.getItems()) {
					sb.append(i).append(",");
				}
				sb.append(keyItem);
				context.write(new Text(sb.toString()), new IntWritable(itemSet.getSupport()));
			}
		}
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		super.cleanup(context);
	}
	
}

HDFS上查看结果如下:

尿布	5
莴苣,尿布	4
葡萄酒,尿布	4
豆奶,尿布	3
橙汁,尿布	2
莴苣,葡萄酒,尿布	3
莴苣,豆奶,尿布	2
葡萄酒,豆奶,尿布	2
橙汁,豆奶,尿布	2
橙汁	2
豆奶,橙汁	2
莴苣	5
豆奶,莴苣	3
葡萄酒	4
莴苣,葡萄酒	3
豆奶,葡萄酒	2
豆奶	4

 

分享到:
评论

相关推荐

    数据挖掘笔记思维导图1

    此外,关联规则分析是发现数据中频繁项集和强关联规则的过程,Apriori和FP-growth是经典的关联规则挖掘算法。 关联规则不仅用于市场预测,还可以帮助决策制定。支持度和置信度是评估关联规则强度的关键指标,频繁项...

    《数据挖掘技术》课程学习笔记

    关联规则学习,如Apriori和FP-Growth,用于找出项集之间的频繁模式,常用于市场篮子分析。回归分析则是预测连续变量的方法,如线性回归和逻辑回归。 在《数据挖掘》课程学习笔记中,还可能包含了特征选择的内容。...

    学习笔记5:数据预处理与数据挖掘十大经典算法.docx

    10. **FP-Growth**:另一种用于关联规则学习的算法,相比Apriori更加高效。 #### 四、总结 通过对数据预处理方法的详细介绍以及数据挖掘经典算法的概述,我们可以看到,数据预处理是确保数据挖掘结果准确性和有效...

    学习笔记5:大数据预处理与大数据挖掘十大经典算法.docx

    关联规则挖掘的算法包括Apriori算法、Eclat算法、FP-Growth算法等。 决策树是一种常见的分类算法,它可以用于解决分类问题。决策树的算法包括ID3算法、C4.5算法、 CART算法等。 随机森林是一种ensemble学习算法,...

    数据挖掘打印课件.7z

    建模阶段则会应用各种算法,如分类(如决策树、随机森林)、回归、聚类(如K-means、DBSCAN)和关联规则学习(如Apriori、FP-Growth)。评估阶段通过交叉验证、混淆矩阵等方法检验模型的性能和泛化能力。 在课程中...

    北邮计算机学院Python程序设计:数据挖掘类作业.zip

    4. **关联规则**:Apriori、FP-Growth等算法用于找出项集之间的频繁模式。例如,mlxtend库包含了这些算法。 五、特征工程 特征选择和特征提取是提高模型性能的关键步骤。Python的FeatureSelection模块可以帮助进行...

    基于分辨矩阵的含负属性项关联规则挖掘 (2012年)

    关联规则挖掘是数据挖掘领域中的一个重要研究方向,关联规则挖掘算法的目标是发现数据库中强关联规则,即那些具有高支持度(Support)和高置信度(Confidence)的规则,其中支持度表示规则中所有项集在所有交易中...

    物联网中的智能决策概述.pptx

    Apriori和FP-Growth是两种常见的关联规则挖掘算法,Apriori通过迭代生成频繁项集,而FP-Growth则利用压缩数据结构降低存储和计算需求。 聚类分析则是将数据分组,使同一组内的数据相似度高,组间差异大。在物联网中...

    图书馆:研究-数据挖掘

    - 关联规则学习:寻找项集之间的频繁模式,如Apriori算法和FP-Growth算法,常用于市场篮子分析。 - 回归:预测连续值,如线性回归、多项式回归和岭回归。 - 异常检测:识别数据中的异常值,常用的方法有基于统计...

    物联网导论第13章_物联网中的智能决策v1135.pptx

    关联分析的经典算法有Apriori和FP-Growth,其中FP-Growth通过构建树形结构优化了频繁项集的查找,减少了对数据库的扫描次数。 2. **聚类分析**:聚类是将相似数据分组在一起,形成不同的类别或簇。它是一种无监督...

    FP_Mining_New

    频繁模式挖掘是数据挖掘领域的一个重要组成部分,主要目标是从交易数据库或序列数据中找出频繁出现的项集或模式。这些模式可以用于市场篮子分析、关联规则学习、用户行为模式识别等多种应用场景。 在Python中,我们...

    Association-rule-for-movies

    2. 关联规则算法实现:常见的关联规则挖掘算法有Apriori、FP-Growth等。这些算法会找出频繁项集(一组经常一起出现的项目),然后生成满足最小支持度和置信度阈值的关联规则。 3. 结果可视化:Jupyter Notebook中的...

    Datamining

    常见的挖掘技术有分类(如决策树、随机森林)、聚类(如K-means、DBSCAN)、关联规则(如Apriori、FP-Growth)、序列模式挖掘和回归等。 3. 模式评估与解释:找到的模式需要经过验证和解释,以确定其在业务或研究中...

    DataMiningStands

    数据挖掘算法可以分为分类(如决策树、随机森林、支持向量机)、聚类(K-Means、DBSCAN)、关联规则(Apriori、FP-Growth)、序列挖掘等。Jupyter Notebook中的代码很可能会展示如何应用这些算法。 3. **模式评估**...

    DataMining

    Apriori算法和FP-growth是典型的关联规则挖掘算法。 5. 序列和时间序列分析:在处理具有时间顺序的数据时,时间序列分析如ARIMA模型和状态空间模型可以帮助我们理解和预测未来的趋势。 6. 异常检测:数据中可能...

    机器学习常见算法思想梳理

    Apriori算法基于候选生成和检验的思想,而FPGrowth算法采用FP树的结构来挖掘频繁项集,前者适合小规模数据集,后者适合大规模数据集。 总结来说,北京邮电大学的这篇文档通过梳理各种机器学习算法的基本概念、核心...

Global site tag (gtag.js) - Google Analytics