`
fighting_2013
  • 浏览: 15516 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

数据挖掘笔记-聚类-Canopy-2

阅读更多
Canopy并行化处理在Mahout里面有很好的实现,网上有很多人都做过相关的分析,有的写的很详细,本来只想看看Mahout Canopy源码就好了,但还是觉得自己记录下也好。我看的是mahout-distribution-0.9版本。
 
首先先看下CanopyDriver类:
run(String[] args)方法里面是一些参数的设置。
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, double t3, double t4, 
      int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
    throws IOException, InterruptedException, ClassNotFoundException {
    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
        t4, clusterFilter, runSequential);
    if (runClustering) {
      clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential);
    }
  }
buildClusters:构建一个Canopy聚类的目录,根据runSequential参数来决定是通过什么方式来构建。可以是顺序单机序列化执行,也可以是基于MapReduce执行。
clusterData:根据buildClusters产生的Canopy聚类的目录去聚类数据。它的执行过程主要是在ClusterClassificationDriver这个类里面去执行的。
public static Path buildClusters(Configuration conf, Path input, Path output,
      DistanceMeasure measure, double t1, double t2, double t3, double t4,
      int clusterFilter, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
             input, output, measure, t1, t2);
    if (runSequential) {
      return buildClustersSeq(input, output, measure, t1, t2, clusterFilter);
    } else {
      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4,
          clusterFilter);
    }
  }
buildClustersSeq:实际就是顺序执行单机版本的Canopy算法,最后将Canopy聚类序列化写入到HDFS上。
buildClustersMR:Canopy并行处理核心,基于Hadoop MapReduce的处理。主要涉及有两个类CanopyMapper、CanopyReducer。
 
接着看CanopyMapper类:
protected void setup(Context context) throws IOException,
      InterruptedException {
    super.setup(context);
    canopyClusterer = new CanopyClusterer(context.getConfiguration());
    clusterFilter = Integer.parseInt(context.getConfiguration().get(
        CanopyConfigKeys.CF_KEY));
  }
setup:主要是做一些参数的初始化工作。
protected void map(WritableComparable<?> key, VectorWritable point,
      Context context) throws IOException, InterruptedException {
    canopyClusterer.addPointToCanopies(point.get(), canopies);
  }
map:是对每一个点向量做Canopy算法,将其归入相应的Canopy中。
protected void cleanup(Context context) throws IOException,
      InterruptedException {
    for (Canopy canopy : canopies) {
      canopy.computeParameters();
      if (canopy.getNumObservations() > clusterFilter) {
        context.write(new Text("centroid"), new VectorWritable(canopy.getCenter()));
      }
    }
    super.cleanup(context);
  }
cleanup:遍历所有Canopy,调用computeParameters去计算并更新相关参数,然后把符合条件的Canopy写入。
下面看下几个核心的方法:
public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
    boolean pointStronglyBound = false;
    for (Canopy canopy : canopies) {
      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
      if (dist < t1) {
        if (log.isDebugEnabled()) {
          log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier());
        }
        canopy.observe(point);
      }
      pointStronglyBound = pointStronglyBound || dist < t2;
    }
    if (!pointStronglyBound) {
      if (log.isDebugEnabled()) {
        log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null));
      }
      canopies.add(new Canopy(point, nextCanopyId++, measure));
    }
  }
计算点到每个Canopy的距离,如果小于T1,则将其加入到相应的Canopy中,同时更新相关参数S0、S1、S2,如果距离小于T2,pointStronglyBound为true,则不在往下走了,反之则新增一个Canopy。即点到所有Canopy的距离都大于T2或者等于T2的都作为新的一个Canopy。
public void observe(Vector x) {
    setS0(getS0() + 1);
    if (getS1() == null) {
      setS1(x.clone());
    } else {
      getS1().assign(x, Functions.PLUS);
    }
    Vector x2 = x.times(x);
    if (getS2() == null) {
      setS2(x2);
    } else {
      getS2().assign(x2, Functions.PLUS);
    }
  }
Canopy每次新增一个点都去更新相关的参数。S0:表示Canopy包含点的权重之和。S1:表示各点的加权和。S2:表示各点平方的加权和。
public void computeParameters() {
    if (getS0() == 0) {
      return;
    }
    setNumObservations((long) getS0());
    setTotalObservations(getTotalObservations() + getNumObservations());
    setCenter(getS1().divide(getS0()));
    // compute the component stds
    if (getS0() > 1) {
      setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
    }
    setS0(0);
    setS1(center.like());
    setS2(center.like());
  }
computeParameters:主要是计算中心点与半径,同时也更新了S0、S1、S2。
 
下面接着看CanopyReducer类:
protected void setup(Context context) throws IOException,
      InterruptedException {
    super.setup(context);
    canopyClusterer = new CanopyClusterer(context.getConfiguration());
    canopyClusterer.useT3T4();
    clusterFilter = Integer.parseInt(context.getConfiguration().get(
        CanopyConfigKeys.CF_KEY));
  }
setup:同Mapper一样是参数的初始化,但注意这里useT3T4(),用的是T3、T4,而Mapper用的是T1、T2,两者可以不同。
protected void reduce(Text arg0, Iterable<VectorWritable> values,
      Context context) throws IOException, InterruptedException {
    for (VectorWritable value : values) {
      Vector point = value.get();
      canopyClusterer.addPointToCanopies(point, canopies);
    }
    for (Canopy canopy : canopies) {
      canopy.computeParameters();
      if (canopy.getNumObservations() > clusterFilter) {
        ClusterWritable clusterWritable = new ClusterWritable();
        clusterWritable.setValue(canopy);
        context.write(new Text(canopy.getIdentifier()), clusterWritable);
      }
    }
  }
reduce:迭代点向量集合,将其归入相应的Canopy聚类中。然后有遍历所有的Canopy聚类,将符合条件的全局Canopy写入到序列化文件中。
这样Canopy聚类的MR过程就完成了。下面看下之后的方法:
private static void clusterData(Configuration conf,
                                  Path points,
                                  Path canopies,
                                  Path output,
                                  double clusterClassificationThreshold,
                                  boolean runSequential)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies);
    ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY),
                                    clusterClassificationThreshold, true, runSequential);
  }
writePolicy:是将Canopy算法中的T1、T2序列化写入到HDFS中
ClusterClassificationDriver启动另外一个Job来进行聚类数据。
public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
      double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    if (runSequential) {
      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
    } else {
      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
    }
   
  }
同样是分为顺序单机序列化版本与MapReduce版本,这里只看下MR版本。里面只有一个Mapper,没有Reducer.
 
接着看ClusterClassificationMapper类:
protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
   
    Configuration conf = context.getConfiguration();
    String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
    threshold = conf.getFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, 0.0f);
    emitMostLikely = conf.getBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, false);
   
    clusterModels = Lists.newArrayList();
   
    if (clustersIn != null && !clustersIn.isEmpty()) {
      Path clustersInPath = new Path(clustersIn);
      clusterModels = populateClusterModels(clustersInPath, conf);
      ClusteringPolicy policy = ClusterClassifier
          .readPolicy(finalClustersPath(clustersInPath));
      clusterClassifier = new ClusterClassifier(clusterModels, policy);
    }
    clusterId = new IntWritable();
  }
setup:一些参数的初始化工作,其中包括读取上一步MR产生的全局Canopy聚类集合和读取聚类策略生成clusterClassifier等。
protected void map(WritableComparable<?> key, VectorWritable vw, Context context)
    throws IOException, InterruptedException {
    if (!clusterModels.isEmpty()) {
      Class<? extends Vector> vectorClass = vw.get().getClass();
      Vector vector = vw.get();
      if (!vectorClass.equals(NamedVector.class)) {
        if (key.getClass().equals(Text.class)) {
          vector = new NamedVector(vector, key.toString());
        } else if (key.getClass().equals(IntWritable.class)) {
          vector = new NamedVector(vector, Integer.toString(((IntWritable) key).get()));
        }
      }
      Vector pdfPerCluster = clusterClassifier.classify(vector);
      if (shouldClassify(pdfPerCluster)) {
        if (emitMostLikely) {
          int maxValueIndex = pdfPerCluster.maxValueIndex();
          write(new VectorWritable(vector), context, maxValueIndex, 1.0);
        } else {
          writeAllAboveThreshold(new VectorWritable(vector), context, pdfPerCluster);
        }
      }
    }
  }
map方法里面主要就是将输入路径中的向量分入到不同的聚类中。然后将其序列化到HDFS上。
自此Mahout里的Canopy算法处理的整个过程基本看完了。虽然很粗糙,但是大体上还是理解了整个执行过程。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics