`
cloudtech
  • 浏览: 4767023 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
文章分类
社区版块
存档分类
最新评论

Hadoop的“全局变量”

 
阅读更多

以前有做过在Hadoop编写程序时使用全局变量的想法,但是最后却没有实现,上网查才看到说Hadoop不支持全局变量。但是有时候编程的时候又会用到,比如编写k-means算法的时候,如果可以有个全局变量存储中心点该多好呀。其实在hadoop中确实是有相关的实现的,比如可以在mapper中的setup函数中读取一个小文件,然后从这个文件中取出全局变量的值。

那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:

0.0	0.2	0.4
0.3	0.2	0.4
0.4	0.2	0.4
0.5	0.2	0.4
5.0	5.2	5.4
6.0	5.2	6.4
4.0	5.2	4.4
10.3	10.4	10.5
10.3	10.4	10.5
10.3	10.4	10.5
而且还有一个小数据文件(中心点)如下:

0	0	0
5	5	5
10	10	10
我想做的事情就是把输入数据按照中心点求平均值,即首先我把输入数据分类,比如倒数三行应该都是属于(10,10,10)这个中心点的,那么我的map就把倒数三行的key都赋值为2,然后value值还是保持这三行不变。在reduce阶段,我求出相同key的sum值,同时求出一共的行数count,最后我用sum/count得到我想要的按照中心点求出的平均值了。

下面贴代码:

KmeansDriver:

package org.fansy.date927;

import java.io.IOException;

//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class KmeansDriver {
	/**
	 *   k-means algorithm program  
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf=new Configuration();
		// set the centers data file
		Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers");
		DistributedCache.addCacheFile(centersFile.toUri(), conf);
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: KmeansDriver <in> <out>");
	      System.exit(2);
	    }
	    Job job = new Job(conf, "kmeans job");
	    job.setJarByClass(KmeansDriver.class);
	    job.setMapperClass(KmeansM.class);
	    job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(DataPro.class);
	    job.setNumReduceTasks(2);
	    	    //    job.setCombinerClass(KmeansC.class);
	    job.setReducerClass(KmeansR.class);
	    job.setOutputKeyClass(NullWritable.class);
	    job.setOutputValueClass(Text.class);
	    
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    
	    if(!job.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }	
	}

}
上面代码中加红的部分比较重要,是mapper的setup函数实现读取文件数据的关键;

Mapper:

package org.fansy.date927;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{
	private static Log log=LogFactory.getLog(KmeansM.class);
	
	private double[][] centers;
	private int dimention_m;  //  this is the k 
	private int dimention_n;   //  this is the features 

	
    static enum Counter{Fansy_Miss_Records};
	@Override
	public void setup(Context context) throws IOException,InterruptedException{
		Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
		if(caches==null||caches.length<=0){
			log.error("center file does not exist");
			System.exit(1);
		}
		@SuppressWarnings("resource")
		BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
		String line;
		List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>();
		ArrayList<Double> center=null;
		//  get the file data
		while((line=br.readLine())!=null){
			center=new ArrayList<Double>();
			String[] str=line.split("\t");
			for(int i=0;i<str.length;i++){
				center.add(Double.parseDouble(str[i]));
			}
			temp_centers.add(center);
		}
		//  fill the centers 
		@SuppressWarnings("unchecked")
		ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{});
		 dimention_m=temp_centers.size();
		 dimention_n=newcenters[0].size();
		centers=new double[dimention_m][dimention_n];
		for(int i=0;i<dimention_m;i++){
			Double[] temp_double=newcenters[i].toArray(new Double[]{});
			for(int j=0;j<dimention_n;j++){
				centers[i][j]=temp_double[j];
		//		System.out.print(temp_double[j]+",");
			}
	//		System.out.println();
		}
	}
			
	public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
		String[] values=value.toString().split("\t");
		
		if(values.length!=dimention_n){
			context.getCounter(Counter.Fansy_Miss_Records).increment(1);
			return;
		}
		double[] temp_double=new double[values.length];
		for(int i=0;i<values.length;i++){
			temp_double[i]=Double.parseDouble(values[i]);
		}
		//  set the index
		double distance=Double.MAX_VALUE;
		double temp_distance=0.0;
		int index=0;
		for(int i=0;i<dimention_m;i++){
			double[] temp_center=centers[i];
			temp_distance=getEnumDistance(temp_double,temp_center);
			if(temp_distance<distance){
				 index=i;
				distance=temp_distance;
			}
		}
		DataPro newvalue=new DataPro();
		newvalue.set(value, new IntWritable(1));
		context.write(new IntWritable(index), newvalue);
		
	}
	public static double getEnumDistance(double[] source,double[] other){  //  get the distance
		double distance=0.0;
		if(source.length!=other.length){
			return Double.MAX_VALUE;
		}
		for(int i=0;i<source.length;i++){
			distance+=(source[i]-other[i])*(source[i]-other[i]);
		}
		distance=Math.sqrt(distance);
		return distance;
	}
}
红色代码部分是读取文件值,然后赋值为这个job的全局变量值,这样这个map任务就可以把 centers当作全局变量来使用了(同时centers里面存放了centers文件中的值)
Reducer:

package org.fansy.date927;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> {
	
	public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{
		// get dimension first
		int dimension=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			dimension=datastr.length;
			break;
		}
		double[] sum=new double[dimension];
		int sumCount=0;
		for(DataPro val:values){
			String[] datastr=val.getCenter().toString().split("\t");
			sumCount+=val.getCount().get();
			for(int i=0;i<dimension;i++){
				sum[i]+=Double.parseDouble(datastr[i]);
			}
		}
		//  calculate the new centers
//		double[] newcenter=new double[dimension];
		StringBuffer sb=new StringBuffer();
		for(int i=0;i<dimension;i++){
			sb.append(sum[i]/sumCount+"\t");
		}
		context.write(null, new Text(sb.toString()));
	}
}

DataPro:

package org.fansy.date927;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class DataPro implements WritableComparable<DataPro>{

	private Text center;
	private IntWritable count;
	
	public DataPro(){
		set(new Text(),new IntWritable());
	}
	public void set(Text text, IntWritable intWritable) {
		// TODO Auto-generated method stub
		this.center=text;
		this.count=intWritable;
	}
	
	public Text getCenter(){
		return center;
	}
	public IntWritable getCount(){
		return count;
	}
	
	
	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		center.readFields(arg0);
		count.readFields(arg0);
	}

	@Override
	public void write(DataOutput arg0) throws IOException {
		// TODO Auto-generated method stub
		center.write(arg0);
		count.write(arg0);
	}

	@Override
	public int compareTo(DataPro o) {
		// TODO Auto-generated method stub
		int cmp=count.compareTo(o.count);
		if(cmp!=0){
			return cmp;
		}
		return center.compareTo(o.center);
	}

}
这里自定义了一个DataPro数据类型,主要是为了为以后编写真正的k-means算法时使用combiner做准备,具体思想可以参考上篇combine操作。

输出文件如下:

0.39999999999999997	0.20000000000000004	0.4000000000000001	
5.0	5.2	5.4	
10.3	10.4	10.5	
这篇文章参考了 http://www.cnblogs.com/zhangchaoyang/articles/2634365.html部分实现,在那篇文章中的k-means思想的主要思想是:使用map读入centers文件值,然后把数据文件data作为一个全局量,然后reduce在进行求中心点的操作。(或许我理解错了也说不定)

做完这一步后,如果要编写K-means算法就可以说是已经实现了大半了,剩下的就是设置下输入和输出路径,然后进行迭代了。



分享到:
评论

相关推荐

    hadoop config 配置文件

    这个文件包含了Hadoop核心的全局设置,如文件系统的默认地址、临时目录设置、IO流缓冲区大小等。例如,`fs.defaultFS`属性定义了HDFS的默认命名节点地址。 3. **hdfs-site.xml** HDFS相关的配置都在这里,如副本...

    hadoop2.9.1 winutils.exe hadoop.dll

    4. 配置环境变量,添加`HADOOP_HOME`并将其值设置为Hadoop的安装目录,同时将`PATH`环境变量更新,包含`%HADOOP_HOME%\bin`。 配置完成后,用户就可以在Windows上执行Hadoop相关的操作,如启动HDFS、运行MapReduce...

    hadoop-3.1.1-winutils.rar

    3. 将`hadoop\bin`目录添加到系统的PATH环境变量中,确保可以全局访问winutils.exe。 **四、配置Hadoop** 1. 创建Hadoop的配置文件夹:`C:\hadoop\etc\hadoop`。 2. 在配置文件夹中创建两个XML文件:`core-site.xml...

    Hadoop入门教程

    这涉及安装Java运行环境、配置Hadoop环境变量、修改Hadoop配置文件等步骤。《Hadoop入门教程》将详细讲解这些过程,以帮助初学者顺利启动Hadoop。 六、Hadoop编程 了解Hadoop的API和编程模型是必不可少的。...

    hadoop-3.1.1winutils.rar

    在Hadoop生态系统中,`winutils.exe` 是一个关键组件,它提供了在Windows上运行Hadoop所必需的一些功能,如配置环境变量、管理HDFS(Hadoop Distributed File System)等。这个压缩包可能包含了对Hadoop进行本地化...

    hadoop-2.7.2.tar.gz

    1. YARN(Yet Another Resource Negotiator):YARN是Hadoop 2引入的一个重大变化,它作为全局资源管理系统,负责集群资源的分配和调度,提高了资源利用率和系统性能。 2. HA(High Availability):Hadoop 2.7.2...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    - **调试支持**:支持本地和远程的MapReduce任务调试,可以设置断点,单步执行,查看变量状态等。 5. **版本3.1.1**: Hadoop-Eclipse-Plugin 3.1.1是该插件的一个特定版本,可能包含了一些针对Hadoop 3.x版本的...

    Hadoop大数据平台构建、HDFS配置、启动与验证教学课件.pptx

    编辑/etc/profile文件,添加HADOOP_HOME环境变量,并将Hadoop的bin和sbin目录添加到PATH中,以便全局访问Hadoop命令。 任务四涉及分发Hadoop文件,这通常通过SSH或脚本实现,将配置文件和Hadoop安装包复制到集群的...

    hadoop2.8.2版本 linxu

    2. **配置环境变量**:为了能在系统中全局访问Hadoop,需要在`.bashrc`或`.bash_profile`文件中设置环境变量。添加以下行: ``` export HADOOP_HOME=/path/to/hadoop-2.8.2 export PATH=$PATH:$HADOOP_HOME/bin:$...

    hadoop-eclipse-plugin

    同时,正确配置Hadoop的环境变量和Eclipse插件的连接参数也是必不可少的步骤。 6. **优化与进阶** 随着Hadoop的发展,除了基本的MapReduce编程模型,还有Pig、Hive、Spark等更高级的数据处理工具。开发者可以结合...

    linux下的hadoop安装及配置详解

    4. **Path 设置**:将 JDK 的 bin 目录添加到系统的 PATH 变量中,方便全局调用。 5. **Hadoop 软件下载并解压**:下载 Hadoop 的安装包,并解压到指定目录。 6. **修改 Hadoop 配置文件**:对 Hadoop 解压文件夹下...

    细细品味Hadoop_Hadoop集群(第5期)_Hadoop安装配置

    2. 设置环境变量,如JAVA_HOME,确保Hadoop能找到Java环境。 3. 下载Hadoop二进制包,并解压到适当目录。 4. 配置Hadoop的配置文件,如hdfs-site.xml(HDFS相关配置)和mapred-site.xml(MapReduce相关配置)。 5. ...

    hadoop-2.6.1安装教程

    保存并关闭文件后,通过执行`source /etc/profile`命令来使全局变量生效。 **2. SSH 免密码登录设置** 为实现集群内的无密码登录,需进行SSH配置。主要包括: - **2.1 启用公钥验证** 修改`/etc/ssh/sshd_...

    Hadoop2.4.1的JAR包

    在使用Hadoop 2.4.1的JAR包时,你需要根据你的操作系统(Windows或Linux)正确配置环境变量,并将这些JAR包添加到你的类路径(Classpath)中,以便Java虚拟机能找到并加载所需库。在开发和运行Hadoop程序时,这一步...

    hadoop2.6.5自动化编译

    你需要下载并安装JDK,然后设置`JAVA_HOME`、`JRE_HOME`、`PATH`等环境变量,确保可以全局执行`java -version`命令验证安装成功。 **Hadoop2.6.5自动化编译步骤:** 1. **获取源码**:从Apache官方网站下载Hadoop...

    细细品味Hadoop

    一旦配置完成,我们需要启动Hadoop的各个守护进程,包括DataNode(数据节点,存储数据块)、NodeManager(YARN的节点管理器)、ResourceManager(YARN的全局资源调度器)、NameNode和Secondary NameNode。...

    DANY资源-hadoop.zip

    安装Hadoop时,我们需要设置环境变量,如`HADOOP_HOME`,并将`bin`目录添加到`PATH`中。然后,可以使用`hadoop dfsadmin`或`hadoop fs`命令与HDFS交互,`hadoop jar`命令用于运行MapReduce作业。 在集群环境中,...

    hadoop本地windows开发环境配置

    - 同时,创建一个新的系统变量`HADOOP_HOME`,其值为你解压后的Hadoop文件夹路径。 3. **验证安装** 打开命令提示符,输入`hadoop version`命令来验证Hadoop是否安装成功。 #### 二、IDEA Scala插件安装 为了...

    Hadoop认证复习

    4. shell变量管理:使用`export`命令定义shell全局变量。 5. 文件权限和属性:例如,权限`drwxrwxr-x`表示一个目录,其所有者和所属组具有读写执行权限,其他用户具有读执行权限。 6. 进程管理:使用`su`命令切换...

    hadoop集群部署.docx

    3.3 配置环境变量:编辑bash配置文件(如`~/.bashrc`或`/etc/profile`),添加Hadoop的路径到PATH和JAVA_HOME变量中,使系统能够找到Hadoop命令。 3.4 Hadoop的配置:在Hadoop的配置文件夹(如 `conf`)内,主要...

Global site tag (gtag.js) - Google Analytics