以前有做过在Hadoop编写程序时使用全局变量的想法,但是最后却没有实现,上网查才看到说Hadoop不支持全局变量。但是有时候编程的时候又会用到,比如编写k-means算法的时候,如果可以有个全局变量存储中心点该多好呀。其实在hadoop中确实是有相关的实现的,比如可以在mapper中的setup函数中读取一个小文件,然后从这个文件中取出全局变量的值。
那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:
0.0 0.2 0.4
0.3 0.2 0.4
0.4 0.2 0.4
0.5 0.2 0.4
5.0 5.2 5.4
6.0 5.2 6.4
4.0 5.2 4.4
10.3 10.4 10.5
10.3 10.4 10.5
10.3 10.4 10.5
而且还有一个小数据文件(中心点)如下:
0 0 0
5 5 5
10 10 10
我想做的事情就是把输入数据按照中心点求平均值,即首先我把输入数据分类,比如倒数三行应该都是属于(10,10,10)这个中心点的,那么我的map就把倒数三行的key都赋值为2,然后value值还是保持这三行不变。在reduce阶段,我求出相同key的sum值,同时求出一共的行数count,最后我用sum/count得到我想要的按照中心点求出的平均值了。
下面贴代码:
KmeansDriver:
package org.fansy.date927;
import java.io.IOException;
//import org.apache.commons.logging.Log;
//import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
//import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class KmeansDriver {
/**
* k-means algorithm program
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf=new Configuration();
// set the centers data file
Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers");
DistributedCache.addCacheFile(centersFile.toUri(), conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: KmeansDriver <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "kmeans job");
job.setJarByClass(KmeansDriver.class);
job.setMapperClass(KmeansM.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DataPro.class);
job.setNumReduceTasks(2);
// job.setCombinerClass(KmeansC.class);
job.setReducerClass(KmeansR.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
if(!job.waitForCompletion(true)){
System.exit(1); // run error then exit
}
}
}
上面代码中加红的部分比较重要,是mapper的setup函数实现读取文件数据的关键;
Mapper:
package org.fansy.date927;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{
private static Log log=LogFactory.getLog(KmeansM.class);
private double[][] centers;
private int dimention_m; // this is the k
private int dimention_n; // this is the features
static enum Counter{Fansy_Miss_Records};
@Override
public void setup(Context context) throws IOException,InterruptedException{
Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(caches==null||caches.length<=0){
log.error("center file does not exist");
System.exit(1);
}
@SuppressWarnings("resource")
BufferedReader br=new BufferedReader(new FileReader(caches[0].toString()));
String line;
List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>();
ArrayList<Double> center=null;
// get the file data
while((line=br.readLine())!=null){
center=new ArrayList<Double>();
String[] str=line.split("\t");
for(int i=0;i<str.length;i++){
center.add(Double.parseDouble(str[i]));
}
temp_centers.add(center);
}
// fill the centers
@SuppressWarnings("unchecked")
ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{});
dimention_m=temp_centers.size();
dimention_n=newcenters[0].size();
centers=new double[dimention_m][dimention_n];
for(int i=0;i<dimention_m;i++){
Double[] temp_double=newcenters[i].toArray(new Double[]{});
for(int j=0;j<dimention_n;j++){
centers[i][j]=temp_double[j];
// System.out.print(temp_double[j]+",");
}
// System.out.println();
}
}
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
String[] values=value.toString().split("\t");
if(values.length!=dimention_n){
context.getCounter(Counter.Fansy_Miss_Records).increment(1);
return;
}
double[] temp_double=new double[values.length];
for(int i=0;i<values.length;i++){
temp_double[i]=Double.parseDouble(values[i]);
}
// set the index
double distance=Double.MAX_VALUE;
double temp_distance=0.0;
int index=0;
for(int i=0;i<dimention_m;i++){
double[] temp_center=centers[i];
temp_distance=getEnumDistance(temp_double,temp_center);
if(temp_distance<distance){
index=i;
distance=temp_distance;
}
}
DataPro newvalue=new DataPro();
newvalue.set(value, new IntWritable(1));
context.write(new IntWritable(index), newvalue);
}
public static double getEnumDistance(double[] source,double[] other){ // get the distance
double distance=0.0;
if(source.length!=other.length){
return Double.MAX_VALUE;
}
for(int i=0;i<source.length;i++){
distance+=(source[i]-other[i])*(source[i]-other[i]);
}
distance=Math.sqrt(distance);
return distance;
}
}
红色代码部分是读取文件值,然后赋值为这个job的全局变量值,这样这个map任务就可以把 centers当作全局变量来使用了(同时centers里面存放了centers文件中的值)
Reducer:
package org.fansy.date927;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> {
public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{
// get dimension first
int dimension=0;
for(DataPro val:values){
String[] datastr=val.getCenter().toString().split("\t");
dimension=datastr.length;
break;
}
double[] sum=new double[dimension];
int sumCount=0;
for(DataPro val:values){
String[] datastr=val.getCenter().toString().split("\t");
sumCount+=val.getCount().get();
for(int i=0;i<dimension;i++){
sum[i]+=Double.parseDouble(datastr[i]);
}
}
// calculate the new centers
// double[] newcenter=new double[dimension];
StringBuffer sb=new StringBuffer();
for(int i=0;i<dimension;i++){
sb.append(sum[i]/sumCount+"\t");
}
context.write(null, new Text(sb.toString()));
}
}
DataPro:
package org.fansy.date927;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class DataPro implements WritableComparable<DataPro>{
private Text center;
private IntWritable count;
public DataPro(){
set(new Text(),new IntWritable());
}
public void set(Text text, IntWritable intWritable) {
// TODO Auto-generated method stub
this.center=text;
this.count=intWritable;
}
public Text getCenter(){
return center;
}
public IntWritable getCount(){
return count;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
center.readFields(arg0);
count.readFields(arg0);
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
center.write(arg0);
count.write(arg0);
}
@Override
public int compareTo(DataPro o) {
// TODO Auto-generated method stub
int cmp=count.compareTo(o.count);
if(cmp!=0){
return cmp;
}
return center.compareTo(o.center);
}
}
这里自定义了一个DataPro数据类型,主要是为了为以后编写真正的k-means算法时使用combiner做准备,具体思想可以参考上篇combine操作。
输出文件如下:
0.39999999999999997 0.20000000000000004 0.4000000000000001
5.0 5.2 5.4
10.3 10.4 10.5
这篇文章参考了
http://www.cnblogs.com/zhangchaoyang/articles/2634365.html部分实现,在那篇文章中的k-means思想的主要思想是:使用map读入centers文件值,然后把数据文件data作为一个全局量,然后reduce在进行求中心点的操作。(或许我理解错了也说不定)
做完这一步后,如果要编写K-means算法就可以说是已经实现了大半了,剩下的就是设置下输入和输出路径,然后进行迭代了。
分享到:
相关推荐
这个文件包含了Hadoop核心的全局设置,如文件系统的默认地址、临时目录设置、IO流缓冲区大小等。例如,`fs.defaultFS`属性定义了HDFS的默认命名节点地址。 3. **hdfs-site.xml** HDFS相关的配置都在这里,如副本...
4. 配置环境变量,添加`HADOOP_HOME`并将其值设置为Hadoop的安装目录,同时将`PATH`环境变量更新,包含`%HADOOP_HOME%\bin`。 配置完成后,用户就可以在Windows上执行Hadoop相关的操作,如启动HDFS、运行MapReduce...
3. 将`hadoop\bin`目录添加到系统的PATH环境变量中,确保可以全局访问winutils.exe。 **四、配置Hadoop** 1. 创建Hadoop的配置文件夹:`C:\hadoop\etc\hadoop`。 2. 在配置文件夹中创建两个XML文件:`core-site.xml...
这涉及安装Java运行环境、配置Hadoop环境变量、修改Hadoop配置文件等步骤。《Hadoop入门教程》将详细讲解这些过程,以帮助初学者顺利启动Hadoop。 六、Hadoop编程 了解Hadoop的API和编程模型是必不可少的。...
在Hadoop生态系统中,`winutils.exe` 是一个关键组件,它提供了在Windows上运行Hadoop所必需的一些功能,如配置环境变量、管理HDFS(Hadoop Distributed File System)等。这个压缩包可能包含了对Hadoop进行本地化...
1. YARN(Yet Another Resource Negotiator):YARN是Hadoop 2引入的一个重大变化,它作为全局资源管理系统,负责集群资源的分配和调度,提高了资源利用率和系统性能。 2. HA(High Availability):Hadoop 2.7.2...
- **调试支持**:支持本地和远程的MapReduce任务调试,可以设置断点,单步执行,查看变量状态等。 5. **版本3.1.1**: Hadoop-Eclipse-Plugin 3.1.1是该插件的一个特定版本,可能包含了一些针对Hadoop 3.x版本的...
编辑/etc/profile文件,添加HADOOP_HOME环境变量,并将Hadoop的bin和sbin目录添加到PATH中,以便全局访问Hadoop命令。 任务四涉及分发Hadoop文件,这通常通过SSH或脚本实现,将配置文件和Hadoop安装包复制到集群的...
2. **配置环境变量**:为了能在系统中全局访问Hadoop,需要在`.bashrc`或`.bash_profile`文件中设置环境变量。添加以下行: ``` export HADOOP_HOME=/path/to/hadoop-2.8.2 export PATH=$PATH:$HADOOP_HOME/bin:$...
同时,正确配置Hadoop的环境变量和Eclipse插件的连接参数也是必不可少的步骤。 6. **优化与进阶** 随着Hadoop的发展,除了基本的MapReduce编程模型,还有Pig、Hive、Spark等更高级的数据处理工具。开发者可以结合...
4. **Path 设置**:将 JDK 的 bin 目录添加到系统的 PATH 变量中,方便全局调用。 5. **Hadoop 软件下载并解压**:下载 Hadoop 的安装包,并解压到指定目录。 6. **修改 Hadoop 配置文件**:对 Hadoop 解压文件夹下...
2. 设置环境变量,如JAVA_HOME,确保Hadoop能找到Java环境。 3. 下载Hadoop二进制包,并解压到适当目录。 4. 配置Hadoop的配置文件,如hdfs-site.xml(HDFS相关配置)和mapred-site.xml(MapReduce相关配置)。 5. ...
保存并关闭文件后,通过执行`source /etc/profile`命令来使全局变量生效。 **2. SSH 免密码登录设置** 为实现集群内的无密码登录,需进行SSH配置。主要包括: - **2.1 启用公钥验证** 修改`/etc/ssh/sshd_...
在使用Hadoop 2.4.1的JAR包时,你需要根据你的操作系统(Windows或Linux)正确配置环境变量,并将这些JAR包添加到你的类路径(Classpath)中,以便Java虚拟机能找到并加载所需库。在开发和运行Hadoop程序时,这一步...
你需要下载并安装JDK,然后设置`JAVA_HOME`、`JRE_HOME`、`PATH`等环境变量,确保可以全局执行`java -version`命令验证安装成功。 **Hadoop2.6.5自动化编译步骤:** 1. **获取源码**:从Apache官方网站下载Hadoop...
一旦配置完成,我们需要启动Hadoop的各个守护进程,包括DataNode(数据节点,存储数据块)、NodeManager(YARN的节点管理器)、ResourceManager(YARN的全局资源调度器)、NameNode和Secondary NameNode。...
安装Hadoop时,我们需要设置环境变量,如`HADOOP_HOME`,并将`bin`目录添加到`PATH`中。然后,可以使用`hadoop dfsadmin`或`hadoop fs`命令与HDFS交互,`hadoop jar`命令用于运行MapReduce作业。 在集群环境中,...
- 同时,创建一个新的系统变量`HADOOP_HOME`,其值为你解压后的Hadoop文件夹路径。 3. **验证安装** 打开命令提示符,输入`hadoop version`命令来验证Hadoop是否安装成功。 #### 二、IDEA Scala插件安装 为了...
4. shell变量管理:使用`export`命令定义shell全局变量。 5. 文件权限和属性:例如,权限`drwxrwxr-x`表示一个目录,其所有者和所属组具有读写执行权限,其他用户具有读执行权限。 6. 进程管理:使用`su`命令切换...
3.3 配置环境变量:编辑bash配置文件(如`~/.bashrc`或`/etc/profile`),添加Hadoop的路径到PATH和JAVA_HOME变量中,使系统能够找到Hadoop命令。 3.4 Hadoop的配置:在Hadoop的配置文件夹(如 `conf`)内,主要...