`
cloudtech
  • 浏览: 4814969 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
文章分类
社区版块
存档分类
最新评论

Hadoop k-means 算法实现

 
阅读更多

经过昨天的准备工作,今天基本就可以编写整个k-means算法程序了。今天编写的时候遇到了一个问题,是combine操作时遇到的问题。除了这个问题基本都按照原来的思路进行。先说下我的思路吧。

准备工作:在上传数据文件到HDFS上之前,先应该产生一个中心文件,比如我的输入文件如下:

0.0	0.2	0.4
0.3	0.2	0.4
0.4	0.2	0.4
0.5	0.2	0.4
5.0	5.2	5.4
6.0	5.2	6.4
4.0	5.2	4.4
10.3	10.4	10.5
10.3	10.4	10.5
10.3	10.4	10.5
然后要产生中心文件,可以使用如下命令来操作:

(1)、获取文件的总行数: wc data.txt 。可以得到文件的行数是 :10

(2)、因为我要分为三类,所以10/3=3,那么我取的行数就是1,3,6(这个行数可以自己选择,比如也可以直接去前三行 head -n 3 data.txt >centers.txt),然后使用如下命令:awk 'NR==1||NR==3||NR==6' data.txt > centers.txt,然后再把centers.txt上传到HDFS上就可以了。

(下面我使用的是前三行作为数据中心文件)

下面的程序中就不用设置要分 的类别和数据文件的维度数了,我在写这篇和前篇文章的时候参考了这篇文章:http://www.cnblogs.com/zhangchaoyang/articles/2634365.html,这篇里面要在代码中自己设置要分的类别以及数据文件的维度数。

下面是map-combine-reduce 操作:

map: map的setup()函数主要是读取中心文件把文件的中心点读入一个double[][]中,然后是map。数据转换为:

Text(包含数据的字符串)--》[index,DataPro(Text(包含数据文件的字符串),IntWritable(1))]

combine:

[index,DataPro(Text(包含数据文件的字符串),IntWritable(1))]-->[index,DataPro(Text(包含数据文件相同index的相加的结果的字符串),IntWritable(sum(1)))]

reduce: reduce的setup()函数主要是读取数据中心文件,然后取出其中的数据维度信息(在reduce操作中需要数组赋值需要知道数据维度),

[index,DataPro(Text(包含数据文件相同index的相加的结果的字符串),IntWritable(sum(1)))]--》[index,DataPro(Text(包含数据文件相同index的相加的结果的字符串),IntWritable(sum(1)))]--》[index,Text(相同index的数据相加的平均值)]

上面的是循环的过程,最后一个job任务是输出分类的结果。

下面贴代码:

KmeansDriver:

package org.fansy.date928;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class KmeansDriver {

	/**
	 *   k-means algorithm program  
	 */
	private static final String temp_path="hdfs://fansyPC:9000/user/fansy/date928/kmeans/temp_center/";
	private static final String dataPath="hdfs://fansyPC:9000/user/fansy/input/smallkmeansdata";
	private static final int iterTime=300;
	private static int iterNum=1;
	private static final double threadHold=0.01;
	
	private static Log log=LogFactory.getLog(KmeansDriver.class);
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf=new Configuration();
		
		// set the centers data file
		Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers");
		DistributedCache.addCacheFile(centersFile.toUri(), conf);
		
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 1) {
	      System.err.println("Usage: KmeansDriver <indatafile> ");
	      System.exit(2);
	    }
	    Job job = new Job(conf, "kmeans job 0");
	    job.setJarByClass(KmeansDriver.class);
	    job.setMapperClass(KmeansM.class);
	    job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(DataPro.class);
	    job.setNumReduceTasks(1);
	    job.setCombinerClass(KmeansC.class);
	    job.setReducerClass(KmeansR.class);
	    job.setOutputKeyClass(NullWritable.class);
	    job.setOutputValueClass(Text.class);    
	    FileInputFormat.addInputPath(job, new Path(dataPath));
	    FileOutputFormat.setOutputPath(job, new Path(temp_path+0+"/"));  
	    if(!job.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	    //  do iteration
	    boolean flag=true;
		while(flag&&iterNum<iterTime){
			Configuration conf1=new Configuration();
			
			// set the centers data file
			Path centersFile1=new Path(temp_path+(iterNum-1)+"/part-r-00000");  //  the new centers file
			DistributedCache.addCacheFile(centersFile1.toUri(), conf1);
			boolean iterflag=doIteration(conf1,iterNum);
			if(!iterflag){
				log.error("job fails");
				System.exit(1);
			}
			//  set the flag based on the old centers and the new centers
			
			Path oldCentersFile=new Path(temp_path+(iterNum-1)+"/part-r-00000");
			Path newCentersFile=new Path(temp_path+iterNum+"/part-r-00000");
			FileSystem fs1=FileSystem.get(oldCentersFile.toUri(),conf1);
			FileSystem fs2=FileSystem.get(oldCentersFile.toUri(),conf1);
			if(!(fs1.exists(oldCentersFile)&&fs2.exists(newCentersFile))){
				log.info("the old centers and new centers should exist at the same time");
				System.exit(1);
			}
			String line1,line2;
			FSDataInputStream in1=fs1.open(oldCentersFile);
			FSDataInputStream in2=fs2.open(newCentersFile);
			InputStreamReader istr1=new InputStreamReader(in1);
			InputStreamReader istr2=new InputStreamReader(in2);
			BufferedReader br1=new BufferedReader(istr1);
			BufferedReader br2=new BufferedReader(istr2);
			double error=0.0;
			while((line1=br1.readLine())!=null&&((line2=br2.readLine())!=null)){
				String[] str1=line1.split("\t");
				String[] str2=line2.split("\t");
				for(int i=0;i<str1.length;i++){
					error+=(Double.parseDouble(str1[i])-Double.parseDouble(str2[i]))*(Double.parseDouble(str1[i])-Double.parseDouble(str2[i]));
				}
			}
			if(error<threadHold){
				flag=false;
			}
			iterNum++;
			
		}
		// the last job , classify the data
		
		Configuration conf2=new Configuration();
		// set the centers data file
		Path centersFile2=new Path(temp_path+(iterNum-1)+"/part-r-00000");  //  the new centers file
		DistributedCache.addCacheFile(centersFile2.toUri(), conf2);
		lastJob(conf2,iterNum);
		System.out.println(iterNum);
	}
	
	public static boolean doIteration(Configuration conf,int iterNum) throws IOException, ClassNotFoundException, InterruptedException{
		boolean flag=false;
		Job job = new Job(conf, "kmeans job"+" "+iterNum);
	    job.setJarByClass(KmeansDriver.class);
	    job.setMapperClass(KmeansM.class);
	    job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(DataPro.class);
	    job.setNumReduceTasks(1);
	    job.setCombinerClass(KmeansC.class);
	    job.setReducerClass(KmeansR.class);
	    job.setOutputKeyClass(NullWritable.class);
	    job.setOutputValueClass(Text.class);    
	    FileInputFormat.addInputPath(job, new Path(dataPath));
	    FileOutputFormat.setOutputPath(job, new Path(temp_path+iterNum+"/"));  
	    flag=job.waitForCompletion(true);
		return flag;
	}
	public static void lastJob(Configuration conf,int iterNum) throws IOException, ClassNotFoundException, InterruptedException{
		Job job = new Job(conf, "kmeans job"+" "+iterNum);
	    job.setJarByClass(KmeansDriver.class);
	    job.setMapperClass(KmeansLastM.class);
	    job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
	    job.setNumReduceTasks(4);
	  //  job.setCombinerClass(KmeansC.class);
	    job.setReducerClass(KmeansLastR.class);
	    job.setOutputKeyClass(IntWritable.class);
	    job.setOutputValueClass(Text.class);    
	    FileInputFormat.addInputPath(job, new Path(dataPath));
	    FileOutputFormat.setOutputPath(job, new Path(temp_path+iterNum+"/"));  
	    job.waitForCompletion(true);
	}

}

Mapper:
package org.fansy.date928;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{
	private static Log log=LogFactory.getLog(KmeansM.class);
	
	private double[][] centers;
	private int dimention_m;  //  this is the k 
	private int dimention_n;   //  this is the features 

	
    static enum Counter{Fansy_Miss_Records};
	@Override
	public void setup(Context context) throws IOException,InterruptedException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>();
		ArrayList<Double> center=null;
		//  get the file data
		while((line=br.readLine())!=null){
			center=new ArrayList<Double>();
			String[] str=line.split("\t");
		//	String[] str=line.split("\\s+");
			for(int i=0;i<str.length;i++){
				center.add(Double.parseDouble(str[i]));
			//	center.add((double)Float.parseFloat(str[i]));
			}
			temp_centers.add(center);
		}
		try {
			br.close();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//  fill the centers 
		@SuppressWarnings("unchecked")
		ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{});
		 dimention_m=temp_centers.size();
		 dimention_n=newcenters[0].size();
		centers=new double[dimention_m][dimention_n];
		for(int i=0;i<dimention_m;i++){
			Double[] temp_double=newcenters[i].toArray(new Double[]{});
			for(int j=0;j<dimention_n;j++){
				centers[i][j]=temp_double[j];
		//		System.out.print(temp_double[j]+",");
			}
		//	System.out.println();
		}
	}
		
		
	public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
		String[] values=value.toString().split("\t");
	//	String[] values=value.toString().split("\\s+");
		if(values.length!=dimention_n){
			context.getCounter(Counter.Fansy_Miss_Records).increment(1);
			return;
		}
		double[] temp_double=new double[values.length];
		for(int i=0;i<values.length;i++){
			temp_double[i]=Double.parseDouble(values[i]);
		}
		//  set the index
		double distance=Double.MAX_VALUE;
		double temp_distance=0.0;
		int index=0;
		for(int i=0;i<dimention_m;i++){
			double[] temp_center=centers[i];
			temp_distance=getEnumDistance(temp_double,temp_center);
			if(temp_distance<distance){
				 index=i;
				distance=temp_distance;
			}
		}
		DataPro newvalue=new DataPro();
		newvalue.set(value, new IntWritable(1));
	//	System.out.println("the map out:"+index+","+value);
		context.write(new IntWritable(index), newvalue);
		
	}
	public static double getEnumDistance(double[] source,double[] other){  //  get the distance
		double distance=0.0;
		if(source.length!=other.length){
			return Double.MAX_VALUE;
		}
		for(int i=0;i<source.length;i++){
			distance+=(source[i]-other[i])*(source[i]-other[i]);
		}
		distance=Math.sqrt(distance);
		return distance;
	}
}

Combiner:
package org.fansy.date928;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansC extends Reducer<IntWritable,DataPro,IntWritable,DataPro> {
	private static int dimension=0;
	
	private static Log log =LogFactory.getLog(KmeansC.class);
	// the main purpose of the sutup() function is to get the dimension of the original data
	public void setup(Context context) throws IOException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		while((line=br.readLine())!=null){
			String[] str=line.split("\t");
		//	String[] str=line.split("\\s+");
			dimension=str.length;
			break;
		}
		try {
			br.close();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
	public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{	
		double[] sum=new double[dimension];
		int sumCount=0;
		// operation two
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
	//		String[] datastr=val.getCenter().toString().split("\\s+");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}
		//  calculate the new centers
//		double[] newcenter=new double[dimension];
		StringBuffer sb=new StringBuffer();
		for(int i=0;i<dimension;i++){
			sb.append(sum[i]+"\t");
		}
	//	System.out.println("combine text:"+sb.toString());
	//	System.out.println("combine sumCount:"+sumCount);
		DataPro newvalue=new DataPro();
		newvalue.set(new Text(sb.toString()), new IntWritable(sumCount));
		context.write(key, newvalue);
	}
}

Reducer:
package org.fansy.date928;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> {
	private static int dimension=0;
	
	private static Log log =LogFactory.getLog(KmeansC.class);
	// the main purpose of the sutup() function is to get the dimension of the original data
	public void setup(Context context) throws IOException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		while((line=br.readLine())!=null){
			String[] str=line.split("\t");
			dimension=str.length;
			break;
		}
		try {
			br.close();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{
	
		double[] sum=new double[dimension];
		int sumCount=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
		//	String[] datastr=val.getCenter().toString().split("\\s+");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}
		//  calculate the new centers
//		double[] newcenter=new double[dimension];
		StringBuffer sb=new StringBuffer();
		for(int i=0;i<dimension;i++){
			sb.append(sum[i]/sumCount+"\t");
		//	sb.append(sum[i]/sumCount+"\\s+");
		}
		context.write(null, new Text(sb.toString()));
	}
}

LastMapper:
package org.fansy.date928;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KmeansLastM extends Mapper<LongWritable,Text,IntWritable,Text>{
	private static Log log=LogFactory.getLog(KmeansLastM.class);
	
	private double[][] centers;
	private int dimention_m;  //  this is the k 
	private int dimention_n;   //  this is the features 

	
    static enum Counter{Fansy_Miss_Records};
	@Override
	public void setup(Context context) throws IOException,InterruptedException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		@SuppressWarnings("resource")
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>();
		ArrayList<Double> center=null;
		//  get the file data
		while((line=br.readLine())!=null){
			center=new ArrayList<Double>();
			String[] str=line.split("\t");
			for(int i=0;i<str.length;i++){
				center.add(Double.parseDouble(str[i]));
			}
			temp_centers.add(center);
		}
		//  fill the centers 
		@SuppressWarnings("unchecked")
		ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{});
		 dimention_m=temp_centers.size();
		 dimention_n=newcenters[0].size();
		centers=new double[dimention_m][dimention_n];
		for(int i=0;i<dimention_m;i++){
			Double[] temp_double=newcenters[i].toArray(new Double[]{});
			for(int j=0;j<dimention_n;j++){
				centers[i][j]=temp_double[j];
		//		System.out.print(temp_double[j]+",");
			}
	//		System.out.println();
		}
	}
		
		
	public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
		String[] values=value.toString().split("\t");
	//	String[] values=value.toString().split("\\s+");
		if(values.length!=dimention_n){
			context.getCounter(Counter.Fansy_Miss_Records).increment(1);
			return;
		}
		double[] temp_double=new double[values.length];
		for(int i=0;i<values.length;i++){
			temp_double[i]=Double.parseDouble(values[i]);
		}
		//  set the index
		double distance=Double.MAX_VALUE;
		double temp_distance=0.0;
		int index=0;
		for(int i=0;i<dimention_m;i++){
			double[] temp_center=centers[i];
			temp_distance=getEnumDistance(temp_double,temp_center);
			if(temp_distance<distance){
				 index=i;
				distance=temp_distance;
			}
		}
		context.write(new IntWritable(index), value);
		
	}
	public static double getEnumDistance(double[] source,double[] other){  //  get the distance
		double distance=0.0;
		if(source.length!=other.length){
			return Double.MAX_VALUE;
		}
		for(int i=0;i<source.length;i++){
			distance+=(source[i]-other[i])*(source[i]-other[i]);
		}
		distance=Math.sqrt(distance);
		return distance;
	}
}

LastReducer:
package org.fansy.date928;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansLastR extends Reducer<IntWritable,Text,IntWritable,Text> {
	
	public void reduce(IntWritable key,Iterable<Text> values,Context context)throws InterruptedException, IOException{

		//  output the data directly
		for(Text val:values){
			context.write(key, val);
		}
		
	}
}

上面就是全部的代码了,下面贴出结果:
0	0.0	0.2	0.4
1	0.3	0.2	0.4
1	0.4	0.2	0.4
1	0.5	0.2	0.4
2	5.0	5.2	5.4
2	6.0	5.2	6.4
2	4.0	5.2	4.4
2	10.3	10.4	10.5
2	10.3	10.4	10.5
2	10.3	10.4	10.5

由最终的结果可以看出分类的结果不是很好,所以说初始的数据中心点一定要选好才行。

下面说下我遇到的问题,刚开始的时候我获取数据维度的方法不是使用读入文件然后再取得相应的信息,而是按照下面的方法:

WrongCombine:

package org.fansy.date927;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansC extends Reducer<IntWritable,DataPro,IntWritable,DataPro> {
	
	public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{
		// get dimension first
		Iterator<DataPro> iter=values.iterator();
		int dimension=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			dimension=datastr.length;
			break;
		}
		
		double[] sum=new double[dimension];
		int sumCount=0;
		//  operation one
		while(iter.hasNext()){
			DataPro val=iter.next();
			String[] datastr=val.getCenter().toString().split("\t");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}
		
		// operation two
		/*for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}*/
		//  calculate the new centers
//		double[] newcenter=new double[dimension];
		StringBuffer sb=new StringBuffer();
		for(int i=0;i<dimension;i++){
			sb.append(sum[i]+"\t");
		}
		System.out.println("combine text:"+sb.toString());
		System.out.println("combine sumCount:"+sumCount);
		DataPro newvalue=new DataPro();
		newvalue.set(new Text(sb.toString()), new IntWritable(sumCount));
		context.write(key, newvalue);
	}
}

从第16到20行是我获得数据维度的方法,但是虽然维度获得了,但是,后面的操作就出问题了,从我加入的调试提示信息可以看出是出了什么问题了:
12/09/28 14:40:40 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@435e331b
12/09/28 14:40:40 INFO mapred.MapTask: io.sort.mb = 100
12/09/28 14:40:40 INFO mapred.MapTask: data buffer = 79691776/99614720
12/09/28 14:40:40 INFO mapred.MapTask: record buffer = 262144/327680
0.0,0.0,0.0,
5.0,5.0,5.0,
10.0,10.0,10.0,
the map out:0,0.0	0.2	0.4
the map out:0,0.3	0.2	0.4
the map out:0,0.4	0.2	0.4
the map out:0,0.5	0.2	0.4
the map out:1,5.0	5.2	5.4
the map out:1,6.0	5.2	6.4
the map out:1,4.0	5.2	4.4
the map out:2,10.3	10.4	10.5
the map out:2,10.3	10.4	10.5
the map out:2,10.3	10.4	10.5
12/09/28 14:40:40 INFO mapred.MapTask: Starting flush of map output
combine text:1.2	0.6000000000000001	1.2000000000000002	
combine sumCount:3
combine text:10.0	10.4	10.8	
combine sumCount:2
combine text:20.6	20.8	21.0	
combine sumCount:2
12/09/28 14:40:40 INFO mapred.MapTask: Finished spill 0
12/09/28 14:40:40 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

有上面的信息可以看出首先map操作没有问题,但是combine操作时计算相同的index的个数不对,每个都少了1个,而且计算的总和也不对,所以我这样猜想,是不是通过这样的操作:
for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			dimension=datastr.length;
			break;
		}
可能改变了values的指针或者其他的什么东西之类的,导致后面我再使用这样的操作读取的时候就会从第二个开始读了。这个就不知到了。





分享,快乐,成长



分享到:
评论

相关推荐

    hadoop k-means算法实现(可直接命令行运行)

    hadoop k-means算法实现java工程的打包类,可直接在terminal中运行,运行命令为: $HADOOP_HOME/bin/hadoop jar ClusterDemo.jar main.Cluster 然后直接确定就可以看到提示的运行参数或者参考下面: +"&lt;input&gt; ...

    hadoop k-means实现

    hadoop实现k-means算法的java工程,具体过程参考http://blog.csdn.net/fansy1990/article/details/8028546这个,此工程和上面页面的思路稍有改动,大致思路一致;

    分布式K-means聚类算法研究与实现.pdf

    在Hadoop平台上实现分布式K-means算法,首先要考虑的是数据的分片存储,即将大数据集分散存储到多个节点上。其次,算法的设计和实现策略也至关重要,包括中心点的选取、数据点与中心点的距离计算以及中心点的更新等...

    hadoop-kmeans:使用 Hadoop 实现 K-Means 算法

    使用 Hadoop 的 K-Means 算法实现。 该算法不执行任何初始质心的计算,必须给出这些。 用法 家庭输入集群数量输出增量最大 hadoop jar HadoopKMeans.jar com.jgalilee.hadoop.kmeans.driver.Driver \ input/...

    java实现的K-means算法

    Java实现的K-means算法是一种在数据挖掘领域广泛使用的无监督学习方法,主要用于聚类分析。K-means算法的核心思想是将数据集分成K个不同的类别(或簇),使得每个数据点都尽可能接近其所在簇的中心,而不同簇之间的...

    基于Hadoop的灰狼优化K-means算法在主题发现的研究.docx

    接着,我们详细介绍了基于Hadoop的灰狼优化K-means算法的实现过程,包括算法的改进和优化。最后,我们讨论了该算法的应用前景和发展方向。 本文研究了基于Hadoop的灰狼优化K-means算法在主题发现中的应用,旨在提高...

    基于Spark的K-means算法的并行化实现与优化1

    【基于Spark的K-means算法的并行化实现与优化】是针对大数据时代下,如何高效处理海量数据聚类问题的研究。K-means算法是一种广泛应用的无监督学习方法,用于发现数据集中潜在的分组结构。然而,随着数据规模的增长...

    MPI与Hadoop在K-means算法上的性能比较分析1

    论文中,作者蒋鑫对比了MPI和Hadoop在执行K-means算法时的性能,实验环境为CentOS6.5,使用了Hadoop-2.6和openmpi-1.8.4版本。通过生成的随机数据进行实验,结果显示在地质学模拟中的复杂迭代计算,MPI相比Hadoop...

    自适应布谷鸟搜索的并行K-means聚类算法

    针对K-means聚类算法受初始类中心影响,聚类结果容易陷入局部最优导致聚类准确率较低的问题,提出了一种基于自适应布谷鸟搜索的K-means聚类改进算法,并利用MapReduce编程模型实现了改进算法的并行化。通过搭建的Hadoop...

    面向大数据的K-means算法综述.zip

    面向大数据的K-means算法广泛应用于市场细分、图像分割、推荐系统、文本分类等场景,帮助企业从海量数据中发现潜在规律,实现智能决策。 总结,K-means算法在大数据时代发挥着重要作用,但同时也面临诸多挑战。通过...

    Hadoop课程实验和报告——K-Means算法并行实现

    Hadoop课程实验和报告——K-Means算法并行实现

    基于Hadoop的Kmeans算法实现

    《基于Hadoop的Kmeans算法实现详解》 Kmeans算法是一种广泛应用的无监督学习方法,主要用于数据聚类,它通过将数据点分配到最近的聚类中心来形成多个紧密聚集的簇。在大数据处理领域,结合Hadoop框架,Kmeans算法...

    改进K-means算法下大数据精准挖掘.zip

    这个压缩包文件“改进K-means算法下大数据精准挖掘.zip”可能包含了对如何优化K-means算法以适应大数据环境的研究,从而实现更精确的聚类效果。 K-means算法的基本原理是将数据集划分为K个簇,通过迭代过程不断调整...

    基于Hadoop的Canopy-K-means并行算法的学生成绩与毕业流向关系分析.docx

    在本研究中,使用Canopy-K-means算法可以更高效地处理学生成绩数据。通过对成绩数据进行预聚类,可以快速识别出表现相似的学生群体,进而深入分析这些群体的成绩特点以及他们毕业后的流向趋势。 #### 知识点三:...

    k-means聚类算法的java实现描述

    总的来说,k-means算法的Java实现涉及到数据结构的设计、距离计算、聚类中心的更新以及迭代过程的控制。在实际应用中,还需要考虑异常处理、性能优化以及如何选择合适的k值等问题。对于大数据集,可能需要采用分布式...

    一种基于遗传算法的K-means聚类算法.docx

    K-means算法是一种广泛应用的数据聚类方法,它将数据集合分成K个不重叠的子集,每个子集称为一个簇。该算法的目标是使得同一簇内的数据点相互间距离尽可能小,而不同簇间的数据点距离尽可能大。在K-means算法中,...

    关于k-means的一篇好的总结论文

    K-Means及其变种算法在大规模数据集上的应用变得更加普遍,同时,为了应对更高维度和更大规模数据的挑战,研究人员还探索了如何利用分布式计算框架(如Hadoop和Spark)来实现高效的并行K-Means算法。 #### 四、K-...

    基于云计算和改进K-means算法的海量用电数据分析方法.pdf

    最终,基于Hadoop集群的实验结果表明,改进的K-means算法运行稳定、可靠,具有很好的聚类效果。 在介绍的这项研究中,关键词包括用电数据、云计算、改进K-means算法、MapReduce模型和并行化。这些关键词不仅反映了...

Global site tag (gtag.js) - Google Analytics