`
liujiekasini0312
  • 浏览: 147419 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop实战应用

 
阅读更多



1、环境说明

部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下创建/app目录,并修改该目录拥有者为shiyanlou(chown–R shiyanlou:shiyanlou /app)。

Hadoop搭建环境:

l虚拟机操作系统:CentOS6.664位,单核,1G内存

lJDK:1.7.0_55 64位

lHadoop:1.1.2

2、准备测试数据

测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:

dept文件内容:

10,ACCOUNTING,NEW YORK

20,RESEARCH,DALLAS

30,SALES,CHICAGO

40,OPERATIONS,BOSTON

emp文件内容:

7369,SMITH,CLERK,7902,17-12月-80,800,,20

7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30

7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30

7566,JONES,MANAGER,7839,02-4月-81,2975,,20

7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30

7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30

7782,CLARK,MANAGER,7839,09-6月-81,2450,,10

7839,KING,PRESIDENT,,17-11月-81,5000,,10

7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30

7900,JAMES,CLERK,7698,03-12月-81,950,,30

7902,FORD,ANALYST,7566,03-12月-81,3000,,20

7934,MILLER,CLERK,7782,23-1月-82,1300,,10

在/home/shiyanlou/install-pack/class6目录可以找到这两个文件,把这两个文件上传到HDFS中/class6/input目录中,执行如下命令:

cd /home/shiyanlou/install-pack/class6

hadoop fs -mkdir -p /class6/input

hadoop fs -copyFromLocal dept /class6/input

hadoop fs -copyFromLocal emp /class6/input

hadoop fs -ls /class6/input

clip_image002

3、应用案例

3.1测试例子1:求各个部门的总工资

3.1.1问题分析

MapReduce中的join分为好几种,比如有最常见的reduce side join、map side join和semi join等。reduce join在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join在处理多个小表关联大表时非常有用 。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。

3.1.2处理流程图

clip_image004

3.1.3测试代码

Q1SumDeptSalary.java代码(vi编辑代码是不能存在中文):

复制代码
  1 import java.io.BufferedReader;  2 import java.io.FileReader;  3 import java.io.IOException;  4 import java.util.HashMap;  5 import java.util.Map;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.conf.Configured;  9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23  24 public class Q1SumDeptSalary extends Configured implements Tool { 25  26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27  28         // 用于缓存 dept文件中的数据 29         private Map<String, String> deptMap = new HashMap<String, String>(); 30         private String[] kv; 31  32         // 此方法会在Map方法执行之前执行且执行一次 33         @Override 34         protected void setup(Context context) throws IOException, InterruptedException { 35             BufferedReader in = null; 36             try { 37  38                 // 从当前作业中获取要缓存的文件 39                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 40                 String deptIdName = null; 41                 for (Path path : paths) { 42  43                     // 对部门文件字段进行拆分并缓存到deptMap中 44                     if (path.toString().contains("dept")) { 45                         in = new BufferedReader(new FileReader(path.toString())); 46                         while (null != (deptIdName = in.readLine())) { 47                              48                             // 对部门文件字段进行拆分并缓存到deptMap中 49                             // 其中Map中key为部门编号,value为所在部门名称 50                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 51                         } 52                     } 53                 } 54             } catch (IOException e) { 55                 e.printStackTrace(); 56             } finally { 57                 try { 58                     if (in != null) { 59                         in.close(); 60                     } 61                 } catch (IOException e) { 62                     e.printStackTrace(); 63                 } 64             } 65         } 66  67 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 68  69             // 对员工文件字段进行拆分 70             kv = value.toString().split(","); 71  72             // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资 73             if (deptMap.containsKey(kv[7])) { 74                 if (null != kv[5] && !"".equals(kv[5].toString())) { 75                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 76                 } 77             } 78         } 79     } 80  81     public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 82  83 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 84  85             // 对同一部门的员工工资进行求和 86             long sumSalary = 0; 87             for (Text val : values) { 88                 sumSalary += Long.parseLong(val.toString()); 89             } 90  91             // 输出key为部门名称和value为该部门员工工资总和 92             context.write(key, new LongWritable(sumSalary)); 93         } 94     } 95  96     @Override 97     public int run(String[] args) throws Exception { 98  99         // 实例化作业对象,设置作业名称、Mapper和Reduce类100         Job job = new Job(getConf(), "Q1SumDeptSalary");101         job.setJobName("Q1SumDeptSalary");102         job.setJarByClass(Q1SumDeptSalary.class);103         job.setMapperClass(MapClass.class);104         job.setReducerClass(Reduce.class);105 106         // 设置输入格式类107         job.setInputFormatClass(TextInputFormat.class);108 109         // 设置输出格式110         job.setOutputFormatClass(TextOutputFormat.class);111         job.setOutputKeyClass(Text.class);112         job.setOutputValueClass(Text.class);113 114         // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径115     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();116     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());117         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));118         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));119 120         job.waitForCompletion(true);121         return job.isSuccessful() ? 0 : 1;122     }123 124     /**125      * 主方法,执行入口126      * @param args 输入参数127      */128     public static void main(String[] args) throws Exception {129         int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);130         System.exit(res);131     }132 }
复制代码

3.1.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q1SumDeptSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q1SumDeptSalary.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java

把编译好的代码打成jar包(如果不打成jar形式运行会提示class无法找到的错误)

jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class

mv *.jar ../..

rm Q1SumDept*.class

clip_image006

3.1.5运行并查看结果

运行Q1SumDeptSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out1

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1

clip_image008

运行成功后,刷新CentOS HDFS中的输出路径/class6/out1目录,打开part-r-00000文件

hadoop fs -ls /class6/out1

hadoop fs -cat /class6/out1/part-r-00000

可以看到运行结果:

ACCOUNTING8750

RESEARCH6775

SALES9400

clip_image010

3.2测试例子2:求各个部门的人数和平均工资

3.2.1问题分析

求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。

3.2.2处理流程图

clip_image012

3.2.3编写代码

Q2DeptNumberAveSalary.java代码:

复制代码
  1 import java.io.BufferedReader;  2 import java.io.FileReader;  3 import java.io.IOException;  4 import java.util.HashMap;  5 import java.util.Map;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.conf.Configured;  9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23  24 public class Q2DeptNumberAveSalary extends Configured implements Tool { 25  26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27  28         // 用于缓存 dept文件中的数据 29         private Map<String, String> deptMap = new HashMap<String, String>(); 30         private String[] kv; 31  32         // 此方法会在Map方法执行之前执行且执行一次 33         @Override 34         protected void setup(Context context) throws IOException, InterruptedException { 35             BufferedReader in = null; 36             try { 37                 // 从当前作业中获取要缓存的文件 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39                 String deptIdName = null; 40                 for (Path path : paths) { 41  42                     // 对部门文件字段进行拆分并缓存到deptMap中 43                     if (path.toString().contains("dept")) { 44                         in = new BufferedReader(new FileReader(path.toString())); 45                         while (null != (deptIdName = in.readLine())) { 46                              47                             // 对部门文件字段进行拆分并缓存到deptMap中 48                             // 其中Map中key为部门编号,value为所在部门名称 49                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50                         } 51                     } 52                 } 53             } catch (IOException e) { 54                 e.printStackTrace(); 55             } finally { 56                 try { 57                     if (in != null) { 58                         in.close(); 59                     } 60                 } catch (IOException e) { 61                     e.printStackTrace(); 62                 } 63             } 64         } 65  66     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 67  68             // 对员工文件字段进行拆分 69             kv = value.toString().split(","); 70  71             // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资 72             if (deptMap.containsKey(kv[7])) { 73                 if (null != kv[5] && !"".equals(kv[5].toString())) { 74                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 75                 } 76             } 77         } 78     } 79  80     public static class Reduce extends Reducer<Text, Text, Text, Text> { 81  82     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 83  84             long sumSalary = 0; 85             int deptNumber = 0; 86  87             // 对同一部门的员工工资进行求和 88             for (Text val : values) { 89                 sumSalary += Long.parseLong(val.toString()); 90                 deptNumber++; 91             } 92  93             // 输出key为部门名称和value为该部门员工工资平均值 94     context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber)); 95         } 96     } 97  98     @Override 99     public int run(String[] args) throws Exception {100 101         // 实例化作业对象,设置作业名称、Mapper和Reduce类102         Job job = new Job(getConf(), "Q2DeptNumberAveSalary");103         job.setJobName("Q2DeptNumberAveSalary");104         job.setJarByClass(Q2DeptNumberAveSalary.class);105         job.setMapperClass(MapClass.class);106         job.setReducerClass(Reduce.class);107 108         // 设置输入格式类109         job.setInputFormatClass(TextInputFormat.class);110 111         // 设置输出格式类112         job.setOutputFormatClass(TextOutputFormat.class);113         job.setOutputKeyClass(Text.class);114         job.setOutputValueClass(Text.class);115 116         // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径117     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();118         DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());119         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));120         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));121 122         job.waitForCompletion(true);123         return job.isSuccessful() ? 0 : 1;124     }125 126     /**127      * 主方法,执行入口128      * @param args 输入参数129      */130     public static void main(String[] args) throws Exception {131         int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);132         System.exit(res);133     }134 }
复制代码

3.2.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q2DeptNumberAveSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q2DeptNumberAveSalary.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class

mv *.jar ../..

rm Q2DeptNum*.class

clip_image014

3.2.5运行并查看结果

运行Q2DeptNumberAveSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out2

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2

clip_image016

运行成功后,刷新CentOS HDFS中的输出路径/class6/out2目录

hadoop fs -ls /class6/out2

hadoop fs -cat /class6/out2/part-r-00000

打开part-r-00000文件,可以看到运行结果:

ACCOUNTINGDept Number:3,Ave Salary:2916

RESEARCHDept Number:3,Ave Salary:2258

SALESDept Number:6,Ave Salary:1566

clip_image018

3.3测试例子3:求每个部门最早进入公司的员工姓名

3.3.1问题分析

求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所有员工,找出最早进入公司的员工并输出。

3.3.2处理流程图

clip_image020

3.3.3编写代码

复制代码
  1 import java.io.BufferedReader;  2 import java.io.FileReader;  3 import java.io.IOException;  4 import java.text.DateFormat;  5 import java.text.ParseException;  6 import java.text.SimpleDateFormat;  7 import java.util.Date;  8 import java.util.HashMap;  9 import java.util.Map; 10  11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.conf.Configured; 13 import org.apache.hadoop.filecache.DistributedCache; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.Reducer; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 24 import org.apache.hadoop.util.GenericOptionsParser; 25 import org.apache.hadoop.util.Tool; 26 import org.apache.hadoop.util.ToolRunner; 27  28 public class Q3DeptEarliestEmp extends Configured implements Tool { 29  30     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 31  32         // 用于缓存 dept文件中的数据 33         private Map<String, String> deptMap = new HashMap<String, String>(); 34         private String[] kv; 35  36         // 此方法会在Map方法执行之前执行且执行一次 37         @Override 38         protected void setup(Context context) throws IOException, InterruptedException { 39             BufferedReader in = null; 40             try { 41                 // 从当前作业中获取要缓存的文件 42                 Path[] paths =     DistributedCache.getLocalCacheFiles(context.getConfiguration()); 43                 String deptIdName = null; 44                 for (Path path : paths) { 45                     if (path.toString().contains("dept")) { 46                         in = new BufferedReader(new FileReader(path.toString())); 47                         while (null != (deptIdName = in.readLine())) { 48  49                             // 对部门文件字段进行拆分并缓存到deptMap中 50                             // 其中Map中key为部门编号,value为所在部门名称 51                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 52                         } 53                     } 54                 } 55             } catch (IOException e) { 56                 e.printStackTrace(); 57             } finally { 58                 try { 59                     if (in != null) { 60                         in.close(); 61                     } 62                 } catch (IOException e) { 63                     e.printStackTrace(); 64                 } 65             } 66         } 67  68         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 69  70             // 对员工文件字段进行拆分 71             kv = value.toString().split(","); 72  73             // map join: 在map阶段过滤掉不需要的数据 74             // 输出key为部门名称和value为员工姓名+","+员工进入公司日期 75             if (deptMap.containsKey(kv[7])) { 76                 if (null != kv[4] && !"".equals(kv[4].toString())) { 77                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim()                     + "," + kv[4].trim())); 78                 } 79             } 80         } 81     } 82  83     public static class Reduce extends Reducer<Text, Text, Text, Text> { 84  85         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException { 86  87             // 员工姓名和进入公司日期 88             String empName = null; 89             String empEnterDate = null; 90  91             // 设置日期转换格式和最早进入公司的员工、日期 92             DateFormat df = new SimpleDateFormat("dd-MM月-yy"); 93  94             Date earliestDate = new Date(); 95             String earliestEmp = null; 96  97             // 遍历该部门下所有员工,得到最早进入公司的员工信息 98             for (Text val : values) { 99                 empName = val.toString().split(",")[0];100                 empEnterDate = val.toString().split(",")[1].toString().trim();101                 try {102                     System.out.println(df.parse(empEnterDate));103                     if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {104                         earliestDate = df.parse(empEnterDate);105                         earliestEmp = empName;106                     }107                 } catch (ParseException e) {108                     e.printStackTrace();109                 }110             }111 112             // 输出key为部门名称和value为该部门最早进入公司员工113             context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter             date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));114         }115     }116 117     @Override118     public int run(String[] args) throws Exception {119 120         // 实例化作业对象,设置作业名称121         Job job = new Job(getConf(), "Q3DeptEarliestEmp");122         job.setJobName("Q3DeptEarliestEmp");123 124         // 设置Mapper和Reduce类125         job.setJarByClass(Q3DeptEarliestEmp.class);126         job.setMapperClass(MapClass.class);127         job.setReducerClass(Reduce.class);128 129         // 设置输入格式类130         job.setInputFormatClass(TextInputFormat.class);131 132         // 设置输出格式类133         job.setOutputFormatClass(TextOutputFormat.class);134         job.setOutputKeyClass(Text.class);135         job.setOutputValueClass(Text.class);136 137         // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径138     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();139     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());140         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));141         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));142 143         job.waitForCompletion(true);144         return job.isSuccessful() ? 0 : 1;145     }146 147     /**148      * 主方法,执行入口149      * @param args 输入参数150      */151     public static void main(String[] args) throws Exception {152         int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);153         System.exit(res);154     }155 }
复制代码

3.3.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q3DeptEarliestEmp.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q3DeptEarliestEmp.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.class

mv *.jar ../..

rm Q3DeptEar*.class

clip_image022

3.3.5运行并查看结果

运行Q3DeptEarliestEmp时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out3

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3

clip_image024

运行成功后,刷新CentOS HDFS中的输出路径/class6/out3目录

hadoop fs -ls /class6/out3

hadoop fs -cat /class6/out3/part-r-00000

打开part-r-00000文件,可以看到运行结果:

ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09

RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17

SALESThe earliest emp of dept:ALLEN, Enter date:1981-02-20

clip_image026

3.4测试例子4:求各个城市的员工的总工资

3.4.1问题分析

求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和测试例子1类似在Mapper的Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资总数并输出。

3.4.2处理流程图

clip_image028

3.4.3编写代码

复制代码
  1 import java.io.BufferedReader;  2 import java.io.FileReader;  3 import java.io.IOException;  4 import java.util.HashMap;  5 import java.util.Map;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.conf.Configured;  9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23  24 public class Q4SumCitySalary extends Configured implements Tool { 25  26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27  28         // 用于缓存 dept文件中的数据 29         private Map<String, String> deptMap = new HashMap<String, String>(); 30         private String[] kv; 31  32         // 此方法会在Map方法执行之前执行且执行一次 33         @Override 34         protected void setup(Context context) throws IOException, InterruptedException { 35             BufferedReader in = null; 36             try { 37                 // 从当前作业中获取要缓存的文件 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39                 String deptIdName = null; 40                 for (Path path : paths) { 41                     if (path.toString().contains("dept")) { 42                         in = new BufferedReader(new FileReader(path.toString())); 43                         while (null != (deptIdName = in.readLine())) { 44  45                             // 对部门文件字段进行拆分并缓存到deptMap中 46                             // 其中Map中key为部门编号,value为所在城市名称 47                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]); 48                         } 49                     } 50                 } 51             } catch (IOException e) { 52                 e.printStackTrace(); 53             } finally { 54                 try { 55                     if (in != null) { 56                         in.close(); 57                     } 58                 } catch (IOException e) { 59                     e.printStackTrace(); 60                 } 61             } 62         } 63  64         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 65  66             // 对员工文件字段进行拆分 67             kv = value.toString().split(","); 68  69             // map join: 在map阶段过滤掉不需要的数据,输出key为城市名称和value为员工工资 70             if (deptMap.containsKey(kv[7])) { 71                 if (null != kv[5] && !"".equals(kv[5].toString())) { 72                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 73                 } 74             } 75         } 76     } 77  78     public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 79  80         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException { 81  82             // 对同一城市的员工工资进行求和 83             long sumSalary = 0; 84             for (Text val : values) { 85                 sumSalary += Long.parseLong(val.toString()); 86             } 87  88             // 输出key为城市名称和value为该城市工资总和 89             context.write(key, new LongWritable(sumSalary)); 90         } 91     } 92  93     @Override 94     public int run(String[] args) throws Exception { 95  96         // 实例化作业对象,设置作业名称 97         Job job = new Job(getConf(), "Q4SumCitySalary"); 98         job.setJobName("Q4SumCitySalary"); 99 100         // 设置Mapper和Reduce类101         job.setJarByClass(Q4SumCitySalary.class);102         job.setMapperClass(MapClass.class);103         job.setReducerClass(Reduce.class);104 105         // 设置输入格式类106         job.setInputFormatClass(TextInputFormat.class);107 108         // 设置输出格式类109         job.setOutputFormatClass(TextOutputFormat.class);110         job.setOutputKeyClass(Text.class);111         job.setOutputValueClass(Text.class);112 113         // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径114     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();115     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());116         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));117         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));118 119         job.waitForCompletion(true);120         return job.isSuccessful() ? 0 : 1;121     }122 123     /**124      * 主方法,执行入口125      * @param args 输入参数126      */127     public static void main(String[] args) throws Exception {128         int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);129         System.exit(res);130     }131 }
复制代码

3.4.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q4SumCitySalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q4SumCitySalary.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.class

mv *.jar ../..

rm Q4SumCity*.class

clip_image030

3.4.5运行并查看结果

运行Q4SumCitySalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out4

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4

clip_image032

运行成功后,刷新CentOS HDFS中的输出路径/class6/out4目录

hadoop fs -ls /class6/out4

hadoop fs -cat /class6/out4/part-r-00000

打开part-r-00000文件,可以看到运行结果:

CHICAGO9400

DALLAS6775

NEW YORK8750

clip_image034

3.5测试例子5:列出工资比上司高的员工姓名及其工资

3.5.1问题分析

求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为"M,该员工工资",员工对应经理表数据key为经理编号、value为"E,该员工姓名,该员工工资";然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。

3.5.2处理流程图

clip_image036

3.5.3编写代码

复制代码
  1 import java.io.IOException;  2 import java.util.HashMap;  3   4 import org.apache.hadoop.conf.Configuration;  5 import org.apache.hadoop.conf.Configured;  6 import org.apache.hadoop.fs.Path;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19  20 public class Q5EarnMoreThanManager extends Configured implements Tool { 21  22     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 23  24         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 25  26             // 对员工文件字段进行拆分 27             String[] kv = value.toString().split(","); 28  29             // 输出经理表数据,其中key为员工编号和value为M+该员工工资 30             context.write(new Text(kv[0].toString()), new Text("M," + kv[5])); 31  32             // 输出员工对应经理表数据,其中key为经理编号和value为(E,该员工姓名,该员工工资) 33             if (null != kv[3] && !"".equals(kv[3].toString())) { 34                 context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5])); 35             } 36         } 37     } 38  39     public static class Reduce extends Reducer<Text, Text, Text, Text> { 40  41         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException { 42  43             // 定义员工姓名、工资和存放部门员工Map 44             String empName; 45             long empSalary = 0; 46             HashMap<String, Long> empMap = new HashMap<String, Long>(); 47              48             // 定义经理工资变量 49             long mgrSalary = 0; 50  51             for (Text val : values) { 52                 if (val.toString().startsWith("E")) { 53                     // 当是员工标示时,获取该员工对应的姓名和工资并放入Map中 54                     empName = val.toString().split(",")[1]; 55                     empSalary = Long.parseLong(val.toString().split(",")[2]); 56                     empMap.put(empName, empSalary); 57                 } else { 58                     // 当时经理标志时,获取该经理工资 59                     mgrSalary = Long.parseLong(val.toString().split(",")[1]); 60                 } 61             } 62  63             // 遍历该经理下属,比较员工与经理工资高低,输出工资高于经理的员工 64             for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) { 65                 if (entry.getValue() > mgrSalary) { 66                     context.write(new Text(entry.getKey()), new Text("" + entry.getValue())); 67                 } 68             } 69         } 70     } 71  72     @Override 73     public int run(String[] args) throws Exception { 74  75         // 实例化作业对象,设置作业名称 76         Job job = new Job(getConf(), "Q5EarnMoreThanManager"); 77         job.setJobName("Q5EarnMoreThanManager"); 78  79         // 设置Mapper和Reduce类 80         job.setJarByClass(Q5EarnMoreThanManager.class); 81         job.setMapperClass(MapClass.class); 82         job.setReducerClass(Reduce.class); 83  84         // 设置输入格式类 85         job.setInputFormatClass(TextInputFormat.class); 86  87         // 设置输出格式类 88         job.setOutputFormatClass(TextOutputFormat.class); 89         job.setOutputKeyClass(Text.class); 90         job.setOutputValueClass(Text.class); 91  92         // 第1个参数为员工数据路径和第2个参数为输出路径 93 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 94         FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 95         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 96  97         job.waitForCompletion(true); 98         return job.isSuccessful() ? 0 : 1; 99     }100 101     /**102      * 主方法,执行入口103      * @param args 输入参数104      */105     public static void main(String[] args) throws Exception {106         int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);107         System.exit(res);108     }109 }
复制代码

3.5.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q5EarnMoreThanManager.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.class

mv *.jar ../..

rm Q5EarnMore*.class

clip_image038

3.5.5运行并查看结果

运行Q5EarnMoreThanManager运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out5

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5

clip_image040

运行成功后,刷新CentOS HDFS中的输出路径/class6/out5目录

hadoop fs -ls /class6/out5

hadoop fs -cat /class6/out5/part-r-00000

打开part-r-00000文件,可以看到运行结果:

FORD3000

clip_image042

3.6测试例子6:列出工资比公司平均工资要高的员工姓名及其工资

3.6.1问题分析

求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,从而能先求出平均工资,然后进行比较得出结果。

在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为0、value为"该员工姓名,员工工资";然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。

3.6.2处理流程图

clip_image044

3.6.3编写代码

复制代码
  1 import java.io.IOException;  2 import org.apache.hadoop.conf.Configuration;  3 import org.apache.hadoop.conf.Configured;  4 import org.apache.hadoop.fs.Path;  5 import org.apache.hadoop.io.IntWritable;  6 import org.apache.hadoop.io.LongWritable;  7 import org.apache.hadoop.io.Text;  8 import org.apache.hadoop.mapreduce.Job;  9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 14 import org.apache.hadoop.util.GenericOptionsParser; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17  18 public class Q6HigherThanAveSalary extends Configured implements Tool { 19  20     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 21  22         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 23  24             // 对员工文件字段进行拆分 25             String[] kv = value.toString().split(","); 26  27             // 获取所有员工数据,其中key为0和value为该员工工资 28             context.write(new IntWritable(0), new Text(kv[5])); 29  30             // 获取所有员工数据,其中key为0和value为(该员工姓名 ,员工工资) 31             context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5])); 32         } 33     } 34  35     public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 36  37         // 定义员工工资、员工数和平均工资 38         private long allSalary = 0; 39         private int allEmpCount = 0; 40         private long aveSalary = 0; 41          42         // 定义员工工资变量 43         private long empSalary = 0; 44  45         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException { 46  47             for (Text val : values) { 48                 if (0 == key.get()) { 49                     // 获取所有员工工资和员工数 50                     allSalary += Long.parseLong(val.toString()); 51                     allEmpCount++; 52                     System.out.println("allEmpCount = " + allEmpCount); 53                 } else if (1 == key.get()) { 54                     if (aveSalary == 0) { 55                         aveSalary = allSalary / allEmpCount; 56                         context.write(new Text("Average Salary = "), new Text("" + aveSalary)); 57                         context.write(new Text("Following employees have salarys higher than                         Average:"), new Text("")); 58                     } 59  60                     // 获取员工的平均工资 61                     System.out.println("Employee salary = " + val.toString()); 62                     aveSalary = allSalary / allEmpCount; 63                      64                     // 比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资 65                     empSalary = Long.parseLong(val.toString().split(",")[1]); 66                     if (empSalary > aveSalary) { 67                         context.write(new Text(val.toString().split(",")[0]), new Text("" +                         empSalary)); 68                     } 69                 } 70             } 71         } 72     } 73  74     @Override 75     public int run(String[] args) throws Exception { 76  77         // 实例化作业对象,设置作业名称 78         Job job = new Job(getConf(), "Q6HigherThanAveSalary"); 79         job.setJobName("Q6HigherThanAveSalary"); 80  81         // 设置Mapper和Reduce类 82         job.setJarByClass(Q6HigherThanAveSalary.class); 83         job.setMapperClass(MapClass.class); 84         job.setReducerClass(Reduce.class); 85  86         // 必须设置Reduce任务数为1 # -D mapred.reduce.tasks = 1 87         // 这是该作业设置的核心,这样才能够保证各reduce是串行的 88         job.setNumReduceTasks(1); 89  90         // 设置输出格式类 91         job.setMapOutputKeyClass(IntWritable.class); 92         job.setMapOutputValueClass(Text.class); 93  94         // 设置输出键和值类型 95         job.setOutputFormatClass(TextOutputFormat.class); 96         job.setOutputKeyClass(Text.class); 97         job.setOutputValueClass(LongWritable.class); 98  99         // 第1个参数为员工数据路径和第2个参数为输出路径100 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();101         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));102         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));103 104         job.waitForCompletion(true);105         return job.isSuccessful() ? 0 : 1;106     }107 108     /**109      * 主方法,执行入口110      * @param args 输入参数111      */112     public static void main(String[] args) throws Exception {113         int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);114         System.exit(res);115     }116 }
复制代码

3.6.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q6HigherThanAveSalary.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.class

mv *.jar ../..

rm Q6HigherThan*.class

clip_image046

3.6.5运行并查看结果

运行Q6HigherThanAveSalary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out6

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6

clip_image048

运行成功后,刷新CentOS HDFS中的输出路径/class6/out6目录

hadoop fs -ls /class6/out6

hadoop fs -cat /class6/out6/part-r-00000

打开part-r-00000文件,可以看到运行结果:

Average Salary = 2077

Following employees have salarys higher than Average:

FORD3000

CLARK2450

KING5000

JONES2975

BLAKE2850

clip_image050

3.7测试例子7:列出名字以J开头的员工姓名及其所属部门名称

3.7.1问题分析

求名字以J开头的员工姓名机器所属部门名称,只需判断员工姓名是否以J开头。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段判断员工姓名是否以J开头,如果是抽取出员工姓名和员工所在部门编号,利用缓存部门数据把部门编号对应为部门名称,转换后输出结果。

3.7.2处理流程图

clip_image052

3.7.3编写代码

复制代码
  1 import java.io.BufferedReader;  2 import java.io.FileReader;  3 import java.io.IOException;  4 import java.util.HashMap;  5 import java.util.Map;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.conf.Configured;  9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.util.GenericOptionsParser; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22  23 public class Q7NameDeptOfStartJ extends Configured implements Tool { 24  25     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 26  27         // 用于缓存 dept文件中的数据 28         private Map<String, String> deptMap = new HashMap<String, String>(); 29         private String[] kv; 30  31         // 此方法会在Map方法执行之前执行且执行一次 32         @Override 33         protected void setup(Context context) throws IOException, InterruptedException { 34             BufferedReader in = null; 35             try { 36  37                 // 从当前作业中获取要缓存的文件 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39                 String deptIdName = null; 40                 for (Path path : paths) { 41  42                     // 对部门文件字段进行拆分并缓存到deptMap中 43                     if (path.toString().contains("dept")) { 44                         in = new BufferedReader(new FileReader(path.toString())); 45                         while (null != (deptIdName = in.readLine())) { 46                              47                             // 对部门文件字段进行拆分并缓存到deptMap中 48                             // 其中Map中key为部门编号,value为所在部门名称 49                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50                         } 51                     } 52                 } 53             } catch (IOException e) { 54                 e.printStackTrace(); 55             } finally { 56                 try { 57                     if (in != null) { 58                         in.close(); 59                     } 60                 } catch (IOException e) { 61                     e.printStackTrace(); 62                 } 63             } 64         } 65  66         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 67  68             // 对员工文件字段进行拆分 69             kv = value.toString().split(","); 70  71             // 输出员工姓名为J开头的员工信息,key为员工姓名和value为员工所在部门名称 72             if (kv[1].toString().trim().startsWith("J")) { 73                 context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim()))); 74             } 75         } 76     } 77  78     @Override 79     public int run(String[] args) throws Exception { 80  81         // 实例化作业对象,设置作业名称 82         Job job = new Job(getConf(), "Q7NameDeptOfStartJ"); 83         job.setJobName("Q7NameDeptOfStartJ"); 84  85         // 设置Mapper和Reduce类 86         job.setJarByClass(Q7NameDeptOfStartJ.class); 87         job.setMapperClass(MapClass.class); 88  89         // 设置输入格式类 90         job.setInputFormatClass(TextInputFormat.class); 91  92         // 设置输出格式类 93         job.setOutputFormatClass(TextOutputFormat.class); 94         job.setOutputKeyClass(Text.class); 95         job.setOutputValueClass(Text.class); 96  97         // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径 98 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 99     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());100         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));101         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));102 103         job.waitForCompletion(true);104         return job.isSuccessful() ? 0 : 1;105     }106 107     /**108      * 主方法,执行入口109      * @param args 输入参数110      */111     public static void main(String[] args) throws Exception {112         int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);113         System.exit(res);114     }115 }
复制代码

3.7.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q7NameDeptOfStartJ.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q7NameDeptOfStartJ.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.class

mv *.jar ../..

rm Q7NameDept*.class

clip_image054

3.7.5运行并查看结果

运行Q7NameDeptOfStartJ时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out7

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7

clip_image056

运行成功后,刷新CentOS HDFS中的输出路径/class6/out7目录

hadoop fs -ls /class6/out7

hadoop fs -cat /class6/out7/part-r-00000

打开part-r-00000文件,可以看到运行结果:

JAMESSALES

JONESRESEARCH

clip_image058

3.8测试例子8:列出工资最高的头三名员工姓名及其工资

3.8.1问题分析

求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为"员工姓名,员工工资";最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。

3.8.2处理流程图

clip_image060

3.8.3编写代码

复制代码
  1 import java.io.IOException;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.conf.Configured;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.io.IntWritable;  7 import org.apache.hadoop.io.LongWritable;  8 import org.apache.hadoop.io.Text;  9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19  20 public class Q8SalaryTop3Salary extends Configured implements Tool { 21  22     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 23  24         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 25  26             // 对员工文件字段进行拆分 27             String[] kv = value.toString().split(","); 28  29             // 输出key为0和value为员工姓名+","+员工工资 30             context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim())); 31         } 32     } 33  34     public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 35  36         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException { 37  38             // 定义工资前三员工姓名 39             String empName; 40             String firstEmpName = ""; 41             String secondEmpName = ""; 42             String thirdEmpName = ""; 43              44             // 定义工资前三工资 45             long empSalary = 0; 46             long firstEmpSalary = 0; 47             long secondEmpSalary = 0; 48             long thirdEmpSalary = 0; 49  50             // 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名 51             for (Text val : values) { 52                 empName = val.toString().split(",")[0]; 53                 empSalary = Long.parseLong(val.toString().split(",")[1]); 54                  55                 if(empSalary > firstEmpSalary) { 56                     thirdEmpName = secondEmpName; 57                     thirdEmpSalary = secondEmpSalary; 58                     secondEmpName = firstEmpName; 59                     secondEmpSalary = firstEmpSalary; 60                     firstEmpName = empName; 61                     firstEmpSalary = empSalary; 62                 } else if (empSalary > secondEmpSalary) { 63                     thirdEmpName = secondEmpName; 64                     thirdEmpSalary = secondEmpSalary; 65                     secondEmpName = empName; 66                     secondEmpSalary = empSalary; 67                 } else if (empSalary > thirdEmpSalary) { 68                     thirdEmpName = empName; 69                     thirdEmpSalary = empSalary; 70                 } 71             } 72              73             // 输出工资前三名信息 74             context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:"             + firstEmpSalary)); 75             context.write(new Text( "Second employee name:" + secondEmpName), new                     Text("Salary:" + secondEmpSalary)); 76             context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:"             + thirdEmpSalary)); 77         } 78     } 79  80     @Override 81     public int run(String[] args) throws Exception { 82  83         // 实例化作业对象,设置作业名称 84         Job job = new Job(getConf(), "Q8SalaryTop3Salary"); 85         job.setJobName("Q8SalaryTop3Salary"); 86  87         // 设置Mapper和Reduce类 88         job.setJarByClass(Q8SalaryTop3Salary.class); 89         job.setMapperClass(MapClass.class); 90         job.setReducerClass(Reduce.class); 91         job.setMapOutputKeyClass(IntWritable.class);  92         job.setMapOutputValueClass(Text.class); 93  94         // 设置输入格式类 95         job.setInputFormatClass(TextInputFormat.class); 96  97         // 设置输出格式类 98         job.setOutputKeyClass(Text.class); 99         job.setOutputFormatClass(TextOutputFormat.class);100         job.setOutputValueClass(Text.class);101 102         // 第1个参数为员工数据路径和第2个参数为输出路径103         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();104         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));105         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));106 107         job.waitForCompletion(true);108         return job.isSuccessful() ? 0 : 1;109     }110 111     /**112      * 主方法,执行入口113      * @param args 输入参数114      */115     public static void main(String[] args) throws Exception {116         int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);117         System.exit(res);118     }119 }
复制代码

3.8.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q8SalaryTop3Salary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q8SalaryTop3Salary.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.class

mv *.jar ../..

rm Q8SalaryTop3*.class

clip_image062

3.8.5运行并查看结果

运行Q8SalaryTop3Salary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out8

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8

clip_image064

运行成功后,刷新CentOS HDFS中的输出路径/class6/out8目录

hadoop fs -ls /class6/out8

hadoop fs -cat /class6/out8/part-r-00000

打开part-r-00000文件,可以看到运行结果:

First employee name:KINGSalary:5000

Second employee name:FORDSalary:3000

Third employee name:JONESSalary:2975

clip_image066

3.9测试例子9:将全体员工按照总收入(工资+提成)从高到低排列

3.9.1问题分析

求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数据进行排序。

3.9.2处理流程图

clip_image068

3.9.3编写代码

复制代码
 1 import java.io.IOException; 2  3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.WritableComparable;10 import org.apache.hadoop.mapreduce.Job;11 import org.apache.hadoop.mapreduce.Mapper;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 import org.apache.hadoop.util.GenericOptionsParser;15 import org.apache.hadoop.util.Tool;16 import org.apache.hadoop.util.ToolRunner;17 18 public class Q9EmpSalarySort extends Configured implements Tool {19 20     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {21 22         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {23 24             // 对员工文件字段进行拆分25             String[] kv = value.toString().split(",");26 27             // 输出key为员工所有工资和value为员工姓名28             int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) :                             Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]);29             context.write(new IntWritable(empAllSalary), new Text(kv[1]));30         }31     }32 33     /**34      * 递减排序算法35      */36     public static class DecreaseComparator extends IntWritable.Comparator {37         public int compare(WritableComparable a, WritableComparable b) {38             return -super.compare(a, b);39         }40 41         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {42             return -super.compare(b1, s1, l1, b2, s2, l2);43         }44     }45 46     @Override47     public int run(String[] args) throws Exception {48 49         // 实例化作业对象,设置作业名称50         Job job = new Job(getConf(), "Q9EmpSalarySort");51         job.setJobName("Q9EmpSalarySort");52 53         // 设置Mapper和Reduce类54         job.setJarByClass(Q9EmpSalarySort.class);55         job.setMapperClass(MapClass.class);56 57         // 设置输出格式类58         job.setMapOutputKeyClass(IntWritable.class);59         job.setMapOutputValueClass(Text.class);60         job.setSortComparatorClass(DecreaseComparator.class);61 62         // 第1个参数为员工数据路径和第2个参数为输出路径63         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();64         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));65         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));66 67         job.waitForCompletion(true);68         return job.isSuccessful() ? 0 : 1;69     }70 71     /**72      * 主方法,执行入口73      * @param args 输入参数74      */75     public static void main(String[] args) throws Exception {76         int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);77         System.exit(res);78     }79 }
复制代码

3.9.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q9EmpSalarySort.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q9EmpSalarySort.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.class

mv *.jar ../..

rm Q9EmpSalary*.class

clip_image070

3.9.5运行并查看结果

运行Q9EmpSalarySort运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out9

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9

clip_image072

运行成功后,刷新CentOS HDFS中的输出路径/class6/out9目录

hadoop fs -ls /class6/out9

hadoop fs -cat /class6/out9/part-r-00000

打开part-r-00000文件,可以看到运行结果:

5000KING

3000FORD

2975JONES

2850BLAKE

......

clip_image074

3.10测试例子10:求任何两名员工信息传递所需要经过的中间节点数

3.10.1问题分析

该公司所有员工可以形成入下图的树形结构,求两个员工的沟通的中间节点数,可转换在员工树中求两个节点连通所经过的节点数,即从其中一节点到汇合节点经过节点数加上另一节点到汇合节点经过节点数。例如求M到Q所需节点数,可以先找出M到A经过的节点数,然后找出Q到A经过的节点数,两者相加得到M到Q所需节点数。

clip_image076

在作业中首先在Mapper阶段所有员工数据,其中经理数据key为0值、value为"员工编号,员工经理编号",然后在Reduce阶段把所有员工放到员工列表和员工对应经理链表Map中,最后在Reduce的Cleanup中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出。

3.10.2处理流程图

clip_image078

3.10.3编写代码

复制代码
  1 import java.io.IOException;  2 import java.util.ArrayList;  3 import java.util.HashMap;  4 import java.util.List;  5 import java.util.Map;  6   7 import org.apache.hadoop.conf.Configuration;  8 import org.apache.hadoop.conf.Configured;  9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IntWritable; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23  24 public class Q10MiddlePersonsCountForComm extends Configured implements Tool { 25  26     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 27  28         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException { 29  30             // 对员工文件字段进行拆分 31             String[] kv = value.toString().split(","); 32  33             // 输出key为0和value为员工编号+","+员工经理编号 34             context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3]))); 35         } 36     } 37  38     public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> { 39  40         // 定义员工列表和员工对应经理Map 41         List<String> employeeList = new ArrayList<String>(); 42         Map<String, String> employeeToManagerMap = new HashMap<String, String>(); 43  44         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException { 45  46             // 在reduce阶段把所有员工放到员工列表和员工对应经理Map中 47             for (Text value : values) { 48                 employeeList.add(value.toString().split(",")[0].trim()); 49                 employeeToManagerMap.put(value.toString().split(",")[0].trim(),                             value.toString().split(",")[1].trim()); 50             } 51         } 52  53         @Override 54         protected void cleanup(Context context) throws IOException, InterruptedException { 55             int totalEmployee = employeeList.size(); 56             int i, j; 57             int distance; 58             System.out.println(employeeList); 59             System.out.println(employeeToManagerMap); 60  61             // 对任意两个员工计算出沟通的路径长度并输出 62             for (i = 0; i < (totalEmployee - 1); i++) { 63                 for (j = (i + 1); j < totalEmployee; j++) { 64                     distance = calculateDistance(i, j); 65                     String value = employeeList.get(i) + " and " + employeeList.get(j) + " =                     " + distance; 66                     context.write(NullWritable.get(), new Text(value));  67                 } 68             } 69         } 70  71         /** 72          * 该公司可以由所有员工形成树形结构,求两个员工的沟通的中间节点数,可以转换在员工树中两员工之间的距离 73          * 由于在树中任意两点都会在某上级节点汇合,根据该情况设计了如下算法 74          */ 75         private int calculateDistance(int i, int j) { 76             String employeeA = employeeList.get(i); 77             String employeeB = employeeList.get(j); 78             int distance = 0; 79  80             // 如果A是B的经理,反之亦然 81             if (employeeToManagerMap.get(employeeA).equals(employeeB) ||                                     employeeToManagerMap.get(employeeB).equals(employeeA)) { 82                 distance = 0; 83             } 84             // A和B在同一经理下 85             else if  (employeeToManagerMap.get(employeeA).equals( 86                     employeeToManagerMap.get(employeeB))) { 87                 distance = 0; 88             } else { 89                 // 定义A和B对应经理链表 90                 List<String> employeeA_ManagerList = new ArrayList<String>(); 91                 List<String> employeeB_ManagerList = new ArrayList<String>(); 92  93                 // 获取从A开始经理链表 94                 employeeA_ManagerList.add(employeeA); 95                 String current = employeeA; 96                 while (false == employeeToManagerMap.get(current).isEmpty()) { 97                     current = employeeToManagerMap.get(current); 98                     employeeA_ManagerList.add(current); 99                 }100 101                 // 获取从B开始经理链表102                 employeeB_ManagerList.add(employeeB);103                 current = employeeB;104                 while (false == employeeToManagerMap.get(current).isEmpty()) {105                     current = employeeToManagerMap.get(current);106                     employeeB_ManagerList.add(current);107                 }108 109                 int ii = 0, jj = 0;110                 String currentA_manager, currentB_manager;111                 boolean found = false;112 113                 // 遍历A与B开始经理链表,找出汇合点计算114                 for (ii = 0; ii < employeeA_ManagerList.size(); ii++) {115                     currentA_manager = employeeA_ManagerList.get(ii);116                     for (jj = 0; jj < employeeB_ManagerList.size(); jj++) {117                         currentB_manager = employeeB_ManagerList.get(jj);118                         if (currentA_manager.equals(currentB_manager)) {119                             found = true;120                             break;121                         }122                     }123 124                     if (found) {125                         break;126                     }127                 }128 129                 // 最后获取两只之前的路径130                 distance = ii + jj - 1;131             }132 133             return distance;134         }135     }136 137     @Override138     public int run(String[] args) throws Exception {139 140         // 实例化作业对象,设置作业名称141         Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");142         job.setJobName("Q10MiddlePersonsCountForComm");143 144         // 设置Mapper和Reduce类145         job.setJarByClass(Q10MiddlePersonsCountForComm.class);146         job.setMapperClass(MapClass.class);147         job.setReducerClass(Reduce.class);148 149         // 设置Mapper输出格式类150         job.setMapOutputKeyClass(IntWritable.class);151         job.setMapOutputValueClass(Text.class);152 153         // 设置Reduce输出键和值类型154         job.setOutputFormatClass(TextOutputFormat.class);155         job.setOutputKeyClass(NullWritable.class);156         job.setOutputValueClass(Text.class);157 158         // 第1个参数为员工数据路径和第2个参数为输出路径159         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();160         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));161         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));162 163         job.waitForCompletion(true);164         return job.isSuccessful() ? 0 : 1;165     }166 167     /**168      * 主方法,执行入口169      * @param args 输入参数170      */171     public static void main(String[] args) throws Exception {172         int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);173         System.exit(res);174     }175 }
复制代码

3.10.4编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q10MiddlePersonsCountForComm.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q10MiddlePersonsCountForComm.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q10MiddlePersonsCountForComm.java

编译代码

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

jar cvf ./Q10MiddlePersonsCountForComm.jar ./Q10MiddlePersons*.class

mv *.jar ../..

rm Q10MiddlePersons*.class

clip_image080

3.10.5运行并查看结果

运行Q10MiddlePersonsCountForComm运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l员工数据路径:hdfs://hadoop:9000/class6/input/emp

l输出路径:hdfs://hadoop:9000/class6/out10

运行如下命令:

cd /app/hadoop-1.1.2

hadoop jar Q10MiddlePersonsCountForComm.jar Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out10

clip_image082

运行成功后,刷新CentOS HDFS中的输出路径/class6/out10目录

hadoop fs -ls /class6/out10

hadoop fs -cat /class6/out10/part-r-00000

打开part-r-00000文件,可以看到运行结果:

7369 and 7499 = 4

7369 and 7521 = 4

7369 and 7566 = 1

7369 and 7654 = 4

7369 and 7698 = 3

......

clip_image084


分享到:
评论

相关推荐

    《Hadoop实战》中文版电子书下载

    《Hadoop实战》一书通过一系列实战案例,展示了如何在实际项目中运用Hadoop解决具体问题。例如,书中可能包含了数据分析、日志处理、推荐系统构建等方面的应用案例,这些案例不仅提供了理论指导,还详细介绍了具体的...

    Hadoop实战大数据大作业

    ### Hadoop实战大数据大作业——基于Hadoop的单词统计系统 #### 一、课题简介与研究意义 **课题简介:** 本课题旨在设计一个简单的基于Hadoop平台进行的单词统计系统。该系统需要自行搭建Hadoop伪分布式架构,并...

    hadoop 实战 中文版

    《Hadoop实战》中文版是一本深入探讨Apache Hadoop生态系统技术的书籍,旨在帮助读者理解和掌握分布式计算的核心概念以及在实际项目中的应用。Hadoop Action是本书的副标题,暗示了其强调实践性的特点,旨在通过具体...

    hadoop实战第二版(陆嘉恒)

    《Hadoop实战第二版》是由国内知名大数据专家陆嘉恒编著的一本深入浅出的Hadoop技术指南。这本书针对Hadoop生态系统进行了详尽的解析,尤其适合对大数据处理感兴趣的IT从业者、学生以及研究人员阅读。作为第二版,...

    Hadoop实战.源代码

    《Hadoop实战.源代码》这个压缩包包含的资源是关于Hadoop技术的实战指南以及相关的源代码,对于想要深入理解和应用Hadoop的IT从业者来说,这是一个宝贵的资料库。Hadoop是一个开源的分布式计算框架,它允许在大量...

    Hadoop实战-陆嘉恒(高清完整版)

    《Hadoop实战-陆嘉恒(高清完整版)》是一本深入浅出介绍Hadoop技术的书籍,尤其适合初学者作为入门教材。Hadoop作为大数据处理领域的基石,其重要性不言而喻。这本书详细讲解了Hadoop的核心概念、架构以及实际应用,...

    大数据云计算技术 hadoop实战培训(共56页).ppt

    总结来说,Hadoop实战培训涵盖了大数据处理的挑战、解决方案、Hadoop的核心概念、生态系统及其在各行业的应用实例。对于处理和分析大规模数据,Hadoop提供了强大的工具和框架,是现代大数据时代不可或缺的技术。随着...

    《Hadoop实战》PDF版本下载.txt

    根据提供的文件信息,我们可以推断出这是一份关于获取《Hadoop实战》一书PDF版本的资源链接。接下来,我们将围绕这一主题展开讨论,重点介绍Hadoop的基本概念、功能特性以及如何利用这本书籍来深入学习Hadoop技术。 ...

    hadoop 实战 dev_03

    内容概述:本次分享是关于Hadoop实战的第三部分,专注于Hadoop系统的应用开发实例。 知识点详细说明: 1. Hadoop系统:Hadoop是一个能够存储和处理大量数据的分布式系统框架,是大数据技术的重要组成部分。它主要...

    Hadoop实战+源代码

    《Hadoop实战+源代码》是一本深度探讨Hadoop技术的书籍,结合源代码提供了丰富的实践指导。Hadoop是Apache基金会开发的一个开源分布式计算框架,它允许在廉价硬件上处理和存储海量数据,是大数据处理领域的核心工具...

    [Hadoop实战].源代码

    【Hadoop实战】源代码是针对大数据处理框架Hadoop的一份实践性教程的配套源码。这份源码包含了多个章节的示例代码,旨在帮助读者深入理解和应用Hadoop技术。通过对这些代码的学习和实践,我们可以掌握Hadoop的核心...

    Hadoop实战.Hadoop.in.Action.Chuck.Lam.文字版

    #### 三、Hadoop实战应用 ##### 2.1 编写基本的MapReduce程序 - **Mapper**:将输入数据转换成键值对形式。 - **Reducer**:处理Mapper产生的中间结果,进一步加工形成最终输出。 - 通过实例讲解如何编写简单的...

    Hadoop实战-附目录

    #### 三、Hadoop实战应用 ##### 3.1 HDFS实战 - **文件上传与下载**:了解如何使用Hadoop命令行工具将本地文件上传到HDFS以及从HDFS下载文件。 - **文件管理**:掌握HDFS文件管理操作,如创建目录、删除文件、查看...

    hadoop实战源代码Java

    总之,`hadoop实战源代码Java`提供了学习和实践Hadoop与Java集成的机会,这对于大数据开发者来说是非常宝贵的资源。通过深入研究这些示例代码,你可以更好地理解Hadoop的API用法,提升处理大数据的能力。在实践中,...

    实战hadoop中的源码

    【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...

    (数据挖掘三部曲3)HADOOP实战第二版中文清晰版

    《数据挖掘三部曲3》中的HADOOP实战第二版中文清晰版是一本专为初学者和有经验的IT从业者设计的指南,旨在帮助读者深入理解并掌握大数据处理框架Hadoop的核心技术和应用。Hadoop是Apache软件基金会开发的一个开源...

    hadoop 实战开发

    本篇将基于书名和描述,结合相关标签,深入探讨Hadoop的核心概念、实战应用以及高级编程技巧。 Hadoop是一个开源的分布式计算框架,由Apache软件基金会维护,其核心设计理念是处理和存储海量数据。Hadoop的主要组成...

    Hadoop实战中文版

    #### 四、Hadoop实战应用案例 - **日志分析**:收集来自网站或应用的日志数据,并使用Hadoop进行处理,提取有用的信息,如用户访问趋势、热门页面等。 - **推荐系统**:利用用户的历史行为数据,通过MapReduce等...

    Hadoop实战+Hadoop权威指南(第二版)+Hadoop源码分析(完整版)_PDF文件

    首先,《Hadoop实战》这本书旨在帮助初学者快速掌握Hadoop的实际应用。书中会介绍如何安装和配置Hadoop环境,以及如何使用Hadoop MapReduce进行大规模数据处理。MapReduce是Hadoop的核心组件之一,它将大型数据集...

Global site tag (gtag.js) - Google Analytics