- 浏览: 565371 次
- 性别:
- 来自: 济南
文章分类
- 全部博客 (270)
- Ask chenwq (10)
- JSF (2)
- ExtJS (5)
- Life (19)
- jQuery (5)
- ASP (7)
- JavaScript (5)
- SQL Server (1)
- MySQL (4)
- En (1)
- development tools (14)
- Data mining related (35)
- Hadoop (33)
- Oracle (13)
- To Do (2)
- SSO (2)
- work/study diary (10)
- SOA (6)
- Ubuntu (7)
- J2SE (18)
- NetWorks (1)
- Struts2 (2)
- algorithm (9)
- funny (1)
- BMP (1)
- Paper Reading (2)
- MapReduce (23)
- Weka (3)
- web design (1)
- Data visualisation&R (1)
- Mahout (7)
- Social Recommendation (1)
- statistical methods (1)
- Git&GitHub (1)
- Python (1)
- Linux (1)
最新评论
-
brandNewUser:
楼主你好,问个问题,为什么我写的如下的:JobConf pha ...
Hadoop ChainMap -
Molisa:
Molisa 写道mapred.min.split.size指 ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
Molisa:
mapred.min.split.size指的是block数, ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
heyongcs:
请问导入之后,那些错误怎么解决?
Eclipse导入Mahout -
a420144030:
看了你的文章深受启发,想请教你几个问题我的数据都放到hbase ...
Mahout clustering Canopy+K-means 源码分析
Mahout下处理的文件必须是SequenceFile格式的,所以需要把txtfile转换成sequenceFile。SequenceFile是Hadoop中的一个类,允许我们向文件中写入二进制的键值对。
Mahout中提供了一种将指定文件下的文件转换成sequenceFile的方式。(You may find Tika (http://lucene.apache.org/tika) helpful in converting binary documents to text.)
使用方法如下:
$MAHOUT_HOME/bin/mahout seqdirectory \ --input <PARENT DIR WHERE DOCS ARE LOCATED> --output <OUTPUT DIRECTORY> \ <-c <CHARSET NAME OF THE INPUT DOCUMENTS> {UTF-8|cp1252|ascii...}> \ <-chunk <MAX SIZE OF EACH CHUNK in Megabytes> 64> \ <-prefix <PREFIX TO ADD TO THE DOCUMENT ID>>
举个例子:
bin/mahout seqdirectory --input /hive/hadoopuser/ --output /mahout/seq/ --charset UTF-8
运行k-means例子
Kmeans算法思想
首先从n个数据对象任意选择 k 个对象作为初始聚类中心;而对于所剩下其它对象,则根据它们与这些聚类中心的相似度(距离),分别将它们分配给与其最相似的(聚类中心所代表的)聚类;然后再计算每个所获新聚类的聚类中心(该聚类中所有对象的均值);不断重复这一过程直到标准测度函数开始收敛为止。
运行过程:参照官网https://cwiki.apache.org/confluence/display/MAHOUT/Clustering+of+synthetic+control+data的 步骤:
首先,下载数据集synthetic_control.data,在以上官网上的Input data set. Download it here点击可下载,并将其导入到分布式文件系统上,$HADOOP_HOME/bin/hadoop fs -mkdir testdata
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/synthetic_control.data testdata
其次,使用k-means算法,在mahout的安装目录下直接mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job或是$HADOOP_HOME/bin/hadoop jar /home/hadoop/mahout-distribution-0.4/mahout-examples-0.4-job.jar org.apache.mahout.clustering.syntheticcontrol.kmeans.Job这里运行时间会长点,因为迭代,请耐心等待
最后,查看运行结果,如果在控制台直接显示结果:mahout vectordump --seqFile /user/hadoop/output/data/part-00000,或者依次运行命令:$HADOOP_HOME/bin/hadoop fs -lsr output $HADOOP_HOME/bin/hadoop fs -get output $MAHOUT_HOME/examples(将结果从分布式文件系统上导下来),$cd MAHOUT_HOME/examples/output 看到以下结果,那么算法运行成功:canopies clusters-1 clusters-3 clusters-5 clusters-7 points
clusters-0 clusters-2 clusters-4 clusters-6 data
好长一段时间都不知知道怎么查看kmeans的结果,例如想查看clusters-i中的 part-r-00000时,应该将其从分布式上导入到本地的txt格式(命令):
./mahout seqdumper -s /user/hadoop /output/cluster-9/part-r-00000 -o /home/hadoop/out/part-0
其中n为某类的样本数目,c为各类各属性的中心,r为各类属性的半径。
mahout Kmeans聚类实现 :
(1)参数input指定待聚类的所有数据点,clusters指定初始聚类中心
如果指定参数k,由org.apache.mahout.clustering.kmeans.RandomSeedGenerator.buildRandom,通过org.apache.hadoop.fs直接从input指定文件中随机读取k个点放入clusters中
(2)根据原数据点和上一次迭代(或初始聚类)的聚类中心计算本次迭代的聚类中心,输出到clusters-N目录下。
该过程由org.apache.mahout.clustering.kmeans下的KMeansMapper\KMeansCombiner\KMeansReducer\KMeansDriver实现
KMeansMapper:在configure中初始化mapper时读入上一次迭代产生或初始聚类中心(每个mapper都读入所有的聚类中心);map方法对输入的每个点,计算距离其最近的类,并加入其中输出key为该点所属聚类ID,value为KMeansInfo实例,包含点的个数和各分量的累加和。KMeansCombiner:本地累加KMeansMapper输出的同一聚类ID下的点个数和各分量的和KMeansReducer:累加同一聚类ID下的点个数和各分量的和,求本次迭代的聚类中心;并根据输入Delta判断该聚类是否已收敛:上一次迭代聚类中心与本次迭代聚类中心距离<Delta;输出各聚类中心和其是否收敛标记。KMeansDriver:控制迭代过程直至超过最大迭代次数或所有聚类都已收敛,每轮迭代后,KMeansDriver读取其clusters-N目录下的所有聚类,若所有聚类已收敛,则整个Kmeans聚类过程收敛了。
bin/mahout kmeans \
-i <input vectors directory> \
-c <input clusters directory> \
-o <output working directory> \
-k <optional number of initial clusters to sample from input vectors> \
-dm <DistanceMeasure> \
-x <maximum number of iterations> \
-cd <optional convergence delta. Default is 0.5> \
-ow <overwrite output directory if present>
-cl <run input vector clustering after computing Canopies>
-xm <execution method: sequential or mapreduce>
注意: 当-k被指定的时候,-c目录下的所有聚类都将被重写,将从输入的数据向量中随机抽取-k个点作为初始聚类的中心。
参数调整 :mahout Kmeans聚类有两个重要参数:收敛Delta和最大迭代次数。个人觉得Delta值越小,表示收敛条件越高,因此最终收敛的聚类数可能会降低,而最大迭代次数可通过观察每次迭代后收敛聚类数决定,当收敛聚类数几乎不再变化或震荡时可停止迭代了。
评论
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Job extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(Job.class);
private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
private Job() {
}
public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
ToolRunner.run(new Configuration(), new Job(), args);
} else {
log.info("Running with default arguments");
Path output = new Path("output");
Configuration conf = new Configuration();
HadoopUtil.delete(conf, output);
run(conf, new Path("testdata"), output,
new EuclideanDistanceMeasure(), 6, 0.5, 10);
}
}
@Override
public int run(String[] args) throws Exception {
System.out.println("run args");
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
addOption(DefaultOptionCreator.numClustersOption().create());
addOption(DefaultOptionCreator.t1Option().create());
addOption(DefaultOptionCreator.t2Option().create());
addOption(DefaultOptionCreator.convergenceOption().create());
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.overwriteOption().create());
Map<String, String> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}
Path input = getInputPath();
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
double convergenceDelta = Double
.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
int maxIterations = Integer
.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass,
DistanceMeasure.class);
if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
int k = Integer
.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
run(getConf(), input, output, measure, k, convergenceDelta,
maxIterations);
} else {
double t1 = Double
.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double
.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
System.out.println("output: " + output);
System.out.println("t1: " + t1);
System.out.println("t2: " + t2);
run(getConf(), input, output, measure, t1, t2, convergenceDelta,
maxIterations);
}
return 0;
}
/**
* Run the kmeans clustering job on an input dataset using the given the
* number of clusters k and iteration parameters. All output data will be
* written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects a file containing
* equal length space delimited data that resides in a directory named
* "testdata", and writes output to a directory named "output".
*
* @param conf
* the Configuration to use
* @param input
* the String denoting the input directory path
* @param output
* the String denoting the output directory path
* @param measure
* the DistanceMeasure to use
* @param k
* the number of clusters in Kmeans
* @param convergenceDelta
* the double convergence criteria for iterations
* @param maxIterations
* the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, int k, double convergenceDelta,
int maxIterations) throws Exception {
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running random seed to get initial clusters");
Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
clusters = RandomSeedGenerator.buildRandom(conf,
directoryContainingConvertedInput, clusters, k, measure);
log.info("Running KMeans");
KMeansDriver.run(conf, directoryContainingConvertedInput, clusters,
output, measure, convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
/**
* Run the kmeans clustering job on an input dataset using the given
* distance measure, t1, t2 and iteration parameters. All output data will
* be written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects the a file
* containing synthetic_control.data as obtained from
* http://archive.ics.uci.
* edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a
* directory named "testdata", and writes output to a directory named
* "output".
*
* @param conf
* the Configuration to use
* @param input
* the String denoting the input directory path
* @param output
* the String denoting the output directory path
* @param measure
* the DistanceMeasure to use
* @param t1
* the canopy T1 threshold
* @param t2
* the canopy T2 threshold
* @param convergenceDelta
* the double convergence criteria for iterations
* @param maxIterations
* the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, double t1, double t2,
double convergenceDelta, int maxIterations) throws Exception {
System.out.println("run canopy output: " + output);
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running Canopy to get initial clusters");
CanopyDriver.run(conf, directoryContainingConvertedInput, output,
measure, t1, t2, false, false);
log.info("Running KMeans");
System.out.println("kmeans cluster starting...");
KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(
output, Cluster.INITIAL_CLUSTERS_DIR+"-final"), output, measure,
convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
/**
* Return the path to the final iteration's clusters
*/
private static Path finalClusterPath(Configuration conf, Path output,
int maxIterations) throws IOException {
FileSystem fs = FileSystem.get(conf);
for (int i = maxIterations; i >= 0; i--) {
Path clusters = new Path(output, "clusters-" + i);
if (fs.exists(clusters)) {
return clusters;
}
}
return null;
}
}
发表评论
-
Mahout RandomForest Example使用步骤
2012-06-15 16:59 0处理数据集 #hadoop ja ... -
Mahout资源
2012-06-14 16:38 818Quickstart Creating Vect ... -
Mahout Creating Vectors from Weka's ARFF Format
2012-06-12 17:00 1595转自: https://cwiki.apache.org/ ... -
学习Mahout
2012-06-03 16:53 0http://www.cnblogs.com/vivounic ... -
Mahout clustering Canopy+K-means 源码分析
2012-06-03 16:10 4046聚类分析 聚类(Clustering) ... -
Mahout实现的机器学习算法
2012-06-01 20:37 2492使用命令:mahout -h 在Mahout实现 ... -
Mahout文件系统结构说明
2012-06-01 20:35 1418Mahout项目是由多个子项目组成的,各子项目分别位于源 ... -
Eclipse导入Mahout
2012-06-01 20:33 60551、环境配置 a)JDK,使用1.6版本。需要说明一 ...
相关推荐
### Hadoop入门进阶课程之Mahout介绍、安装与应用案例 #### Mahout概述 Mahout作为Apache Software Foundation(ASF)旗下的一个开源项目,致力于提供一系列可扩展的机器学习算法实现,以帮助开发者更轻松地构建...
Mahout提供了多种聚类算法,如K-Means、Fuzzy K-Means和Canopy Clustering。K-Means是最常见的方法,通过迭代寻找最佳的簇中心,将每个数据点分配到最近的簇。在大数据环境中,Mahout利用Hadoop进行分布式处理,提高...
3. **k-means聚类算法**:k-means是一种常见的无监督学习算法,用于将数据集划分为k个互不重叠的类别。在Java中实现k-means,涉及计算欧氏距离、初始化质心、迭代调整等步骤。通过阅读源码,可以学习到算法的细节和...
- **关键技术**:Mahout支持多种聚类算法,包括K-Means、Canopy Clustering、Fuzzy K-Means等。 - **分类(Classification)** - **概念**:分类是一种有监督的学习过程,其目标是根据已有的训练数据来预测新的...
在聚类算法方面,Mahout提供了多种实现,包括k-means、模糊k-means、流k-means和谱聚类等。这些无监督学习算法用于将数据对象分组到相似的簇中。例如,k-means算法通过迭代找到最佳的聚类中心,将数据点分配到最近的...
在Mahout中,聚类算法如K-means、Canopy和Fuzzy K-means被用于无监督学习任务,以发现数据中的自然群体结构。这些算法在市场细分、社交媒体分析和内容过滤等方面有广泛应用。 **分类**是机器学习中的监督学习任务,...
Clustering algorithms in Mahout**:详细介绍了Mahout支持的各种聚类算法,包括K-means、Canopy聚类等,并给出了相应的实现代码。 - **10. Evaluating and improving clustering quality**:讲解如何评估聚类结果...
聚类则是将相似数据分组,如K-means算法;分类和回归则用于预测离散或连续的目标变量,如朴素贝叶斯、决策树和线性回归等。每个算法都会通过实例来演示如何在Mahout中实现,并解释其背后的数学原理。 除了基本的...
在大数据挖掘的实际操作中,Mahout的聚类算法具有特别的意义,尤其是k-means算法。k-means聚类算法在数据挖掘领域应用广泛,它可以帮助我们按照某种相似度或距离将数据分为k个簇。无论是在学术研究还是商业应用中,...
具体介绍了如何使用Mahout的K-means算法进行聚类分析。 - **数据加载**:将数据加载到Hadoop环境。 - **模型训练**:使用K-means算法训练模型。 - **结果评估**:评估聚类结果的有效性。 ##### 7.4 总结 总结Mahout...
时间序列分析和聚类方法在此领域有广泛应用,如K-means、DBSCAN等。 五、数据挖掘技术 数据挖掘技术在Web数据挖掘中扮演核心角色,包括分类、关联规则、聚类和回归等。例如,分类算法如决策树、随机森林用于预测...
7. 聚类算法:K-means是一种常用的聚类方法,用于将数据集分成K个簇。EM算法(期望最大化算法)通常用于处理包含隐变量的数据。 8. 主成分分析(PCA):PCA是一种统计方法,用于数据降维,它通过线性变换将数据转换到...
4. K-means是一种常用的聚类算法,通过迭代地将数据点分配到离它最近的簇中心,从而将数据集分成K个簇。 5. EM算法(期望最大化算法)是处理含有隐变量的概率模型的迭代方法,用于极大化对数似然函数。 6. 需要...
9. **聚类算法**:如K-means、DBSCAN等,用于无监督学习中数据的分组。 四、实战与优化 在实际应用中,我们需要关注数据预处理、特征工程、模型选择、调参等环节。Java中的工具如Apache Commons Math库提供了丰富...