Canopy并行化处理在Mahout里面有很好的实现,网上有很多人都做过相关的分析,有的写的很详细,本来只想看看Mahout Canopy源码就好了,但还是觉得自己记录下也好。我看的是mahout-distribution-0.9版本。
首先先看下CanopyDriver类:
run(String[] args)方法里面是一些参数的设置。
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, double t3, double t4, int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3, t4, clusterFilter, runSequential); if (runClustering) { clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential); } }
buildClusters:构建一个Canopy聚类的目录,根据runSequential参数来决定是通过什么方式来构建。可以是顺序单机序列化执行,也可以是基于MapReduce执行。
clusterData:根据buildClusters产生的Canopy聚类的目录去聚类数据。它的执行过程主要是在ClusterClassificationDriver这个类里面去执行的。
public static Path buildClusters(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, double t3, double t4, int clusterFilter, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}", input, output, measure, t1, t2); if (runSequential) { return buildClustersSeq(input, output, measure, t1, t2, clusterFilter); } else { return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4, clusterFilter); } }
buildClustersSeq:实际就是顺序执行单机版本的Canopy算法,最后将Canopy聚类序列化写入到HDFS上。
buildClustersMR:Canopy并行处理核心,基于Hadoop MapReduce的处理。主要涉及有两个类CanopyMapper、CanopyReducer。
接着看CanopyMapper类:
protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); canopyClusterer = new CanopyClusterer(context.getConfiguration()); clusterFilter = Integer.parseInt(context.getConfiguration().get( CanopyConfigKeys.CF_KEY)); }
setup:主要是做一些参数的初始化工作。
protected void map(WritableComparable<?> key, VectorWritable point, Context context) throws IOException, InterruptedException { canopyClusterer.addPointToCanopies(point.get(), canopies); }
map:是对每一个点向量做Canopy算法,将其归入相应的Canopy中。
protected void cleanup(Context context) throws IOException, InterruptedException { for (Canopy canopy : canopies) { canopy.computeParameters(); if (canopy.getNumObservations() > clusterFilter) { context.write(new Text("centroid"), new VectorWritable(canopy.getCenter())); } } super.cleanup(context); }
cleanup:遍历所有Canopy,调用computeParameters去计算并更新相关参数,然后把符合条件的Canopy写入。
下面看下几个核心的方法:
public void addPointToCanopies(Vector point, Collection<Canopy> canopies) { boolean pointStronglyBound = false; for (Canopy canopy : canopies) { double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point); if (dist < t1) { if (log.isDebugEnabled()) { log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier()); } canopy.observe(point); } pointStronglyBound = pointStronglyBound || dist < t2; } if (!pointStronglyBound) { if (log.isDebugEnabled()) { log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null)); } canopies.add(new Canopy(point, nextCanopyId++, measure)); } }
计算点到每个Canopy的距离,如果小于T1,则将其加入到相应的Canopy中,同时更新相关参数S0、S1、S2,如果距离小于T2,pointStronglyBound为true,则不在往下走了,反之则新增一个Canopy。即点到所有Canopy的距离都大于T2或者等于T2的都作为新的一个Canopy。
public void observe(Vector x) { setS0(getS0() + 1); if (getS1() == null) { setS1(x.clone()); } else { getS1().assign(x, Functions.PLUS); } Vector x2 = x.times(x); if (getS2() == null) { setS2(x2); } else { getS2().assign(x2, Functions.PLUS); } }
Canopy每次新增一个点都去更新相关的参数。S0:表示Canopy包含点的权重之和。S1:表示各点的加权和。S2:表示各点平方的加权和。
public void computeParameters() { if (getS0() == 0) { return; } setNumObservations((long) getS0()); setTotalObservations(getTotalObservations() + getNumObservations()); setCenter(getS1().divide(getS0())); // compute the component stds if (getS0() > 1) { setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0())); } setS0(0); setS1(center.like()); setS2(center.like()); }
computeParameters:主要是计算中心点与半径,同时也更新了S0、S1、S2。
下面接着看CanopyReducer类:
protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); canopyClusterer = new CanopyClusterer(context.getConfiguration()); canopyClusterer.useT3T4(); clusterFilter = Integer.parseInt(context.getConfiguration().get( CanopyConfigKeys.CF_KEY)); }
setup:同Mapper一样是参数的初始化,但注意这里useT3T4(),用的是T3、T4,而Mapper用的是T1、T2,两者可以不同。
protected void reduce(Text arg0, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException { for (VectorWritable value : values) { Vector point = value.get(); canopyClusterer.addPointToCanopies(point, canopies); } for (Canopy canopy : canopies) { canopy.computeParameters(); if (canopy.getNumObservations() > clusterFilter) { ClusterWritable clusterWritable = new ClusterWritable(); clusterWritable.setValue(canopy); context.write(new Text(canopy.getIdentifier()), clusterWritable); } } }
reduce:迭代点向量集合,将其归入相应的Canopy聚类中。然后有遍历所有的Canopy聚类,将符合条件的全局Canopy写入到序列化文件中。
这样Canopy聚类的MR过程就完成了。下面看下之后的方法:
private static void clusterData(Configuration conf, Path points, Path canopies, Path output, double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies); ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY), clusterClassificationThreshold, true, runSequential); }
writePolicy:是将Canopy算法中的T1、T2序列化写入到HDFS中
ClusterClassificationDriver启动另外一个Job来进行聚类数据。
public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output, double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { if (runSequential) { classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); } else { classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); } }
同样是分为顺序单机序列化版本与MapReduce版本,这里只看下MR版本。里面只有一个Mapper,没有Reducer.
接着看ClusterClassificationMapper类:
protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN); threshold = conf.getFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, 0.0f); emitMostLikely = conf.getBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, false); clusterModels = Lists.newArrayList(); if (clustersIn != null && !clustersIn.isEmpty()) { Path clustersInPath = new Path(clustersIn); clusterModels = populateClusterModels(clustersInPath, conf); ClusteringPolicy policy = ClusterClassifier .readPolicy(finalClustersPath(clustersInPath)); clusterClassifier = new ClusterClassifier(clusterModels, policy); } clusterId = new IntWritable(); }
setup:一些参数的初始化工作,其中包括读取上一步MR产生的全局Canopy聚类集合和读取聚类策略生成clusterClassifier等。
protected void map(WritableComparable<?> key, VectorWritable vw, Context context) throws IOException, InterruptedException { if (!clusterModels.isEmpty()) { Class<? extends Vector> vectorClass = vw.get().getClass(); Vector vector = vw.get(); if (!vectorClass.equals(NamedVector.class)) { if (key.getClass().equals(Text.class)) { vector = new NamedVector(vector, key.toString()); } else if (key.getClass().equals(IntWritable.class)) { vector = new NamedVector(vector, Integer.toString(((IntWritable) key).get())); } } Vector pdfPerCluster = clusterClassifier.classify(vector); if (shouldClassify(pdfPerCluster)) { if (emitMostLikely) { int maxValueIndex = pdfPerCluster.maxValueIndex(); write(new VectorWritable(vector), context, maxValueIndex, 1.0); } else { writeAllAboveThreshold(new VectorWritable(vector), context, pdfPerCluster); } } } }
map方法里面主要就是将输入路径中的向量分入到不同的聚类中。然后将其序列化到HDFS上。
自此Mahout里的Canopy算法处理的整个过程基本看完了。虽然很粗糙,但是大体上还是理解了整个执行过程。
相关推荐
聚类分析是数据挖掘中的一个重要分支,主要用于发现数据集中的自然群体或类别,无需事先知道具体的分类信息。在这个数据挖掘作业中,我们将深入探讨聚类分析的概念、方法以及其在实际应用中的价值。 聚类分析的目标...
UCI(University of California, Irvine)机器学习仓库是数据挖掘和机器学习领域广泛使用的资源库,其中包含了众多经典的数据集,用于研究和教学目的。这个压缩包“UCI常用数据集-聚类、分类.zip”显然是针对聚类和...
基于密度方法,根据密度完成对象的聚类。 perl实现 OPTICS(Ordering Points To Identify the Clustering Structure):并不明确产生一个聚类,而是为自动交互的聚类分析计算出一个增强聚类顺序。。
K均值聚类(K-Means聚类)-聚类算法-聚类可视化-MATLAB代码 ...可保证运行,运行失败或报错免费解决。 k均值聚类算法的基本概念和原理 k均值聚类算法(k-...重复步骤2和3:不断重复分配对象和更新聚类中心的过程
聚类技术是数据挖掘中的一个重要分支,主要目的是通过无监督学习方法将数据集中的对象自动分成多个类别,使得同一类别的对象相似度较高,而不同类别间的对象相似度较低。本课程件将深入探讨聚类技术的原理、方法以及...
2. 选择聚类方法:SPSS提供了多种聚类算法,包括K-means聚类、分层聚类等。每种方法都有其适用场景和优缺点,需要根据数据特性和分析目的来选择合适的聚类方法。 3. 分析设置:在SPSS中选择相应的聚类分析模块后,...
数据挖掘考试题目-聚类.doc
聚类分析是数据挖掘领域中一种重要的无监督学习方法,主要用于探索性数据分析,它通过对数据集中的对象进行分组,形成由相似对象组成的簇,以揭示数据的内在结构和模式。在聚类分析中,数据对象被组织成若干个簇,...
复旦大学数据挖掘概念和技术-聚类分析 共26页.ppt
1.数据挖掘方法与应用-聚类
k-聚类是一种无监督学习方法,主要用于数据挖掘和机器学习领域,旨在将数据集中的对象分成不同的类别或簇,使得同一簇内的对象相似度较高,而不同簇间的对象相似度较低。在k-均值(k-means)算法中,“k”表示我们...
聚类分析是数据挖掘中一种常用的无监督学习方法,它通过将数据集中的样本划分为若干个类别或簇,使得同一类簇内的样本之间具有较高的相似性,而不同类簇间的样本差异性较大。K-means算法是最经典的聚类算法之一,因...
鸢尾花IRIS数据集-聚类分析机器学习
人工智能-项目实践-聚类-Chinese-whisper 聚类算法(由于涉及公司代码保护,只显示文档) 链接 https://github.com/ouprince/CW.git 说明 原版论文:《CW聚类算法.pdf》 作者翻译:《CW聚类算法论文翻译.doc》
机器学习-数据预处理-聚类-回归-单车数据集
MATLAB数据挖掘算法_回归算法_关联算法_聚类算法源代码.rar 一元回归 多元回归 逐步回归 Logistic回归 Apriori算法 FP-Growth算法 算法 相关系数法 K-means 层次聚类 神经网络聚类 模糊C均值聚类 高斯聚类