- 浏览: 147419 次
- 性别:
- 来自: 北京
文章分类
最新评论
hadoop实战应用
1、环境说明
部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下创建/app目录,并修改该目录拥有者为shiyanlou(chown–R shiyanlou:shiyanlou /app)。
Hadoop搭建环境:
l虚拟机操作系统:CentOS6.664位,单核,1G内存
lJDK:1.7.0_55 64位
lHadoop:1.1.2
2、准备测试数据
测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:
dept文件内容:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp文件内容:
7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10
在/home/shiyanlou/install-pack/class6目录可以找到这两个文件,把这两个文件上传到HDFS中/class6/input目录中,执行如下命令:
cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input
3、应用案例
3.1测试例子1:求各个部门的总工资
3.1.1问题分析
MapReduce中的join分为好几种,比如有最常见的reduce side join、map side join和semi join等。reduce join在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join在处理多个小表关联大表时非常有用 。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
3.1.2处理流程图
3.1.3测试代码
Q1SumDeptSalary.java代码(vi编辑代码是不能存在中文):
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q1SumDeptSalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用于缓存 dept文件中的数据 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法会在Map方法执行之前执行且执行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 38 // 从当前作业中获取要缓存的文件 39 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 40 String deptIdName = null; 41 for (Path path : paths) { 42 43 // 对部门文件字段进行拆分并缓存到deptMap中 44 if (path.toString().contains("dept")) { 45 in = new BufferedReader(new FileReader(path.toString())); 46 while (null != (deptIdName = in.readLine())) { 47 48 // 对部门文件字段进行拆分并缓存到deptMap中 49 // 其中Map中key为部门编号,value为所在部门名称 50 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 51 } 52 } 53 } 54 } catch (IOException e) { 55 e.printStackTrace(); 56 } finally { 57 try { 58 if (in != null) { 59 in.close(); 60 } 61 } catch (IOException e) { 62 e.printStackTrace(); 63 } 64 } 65 } 66 67 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 68 69 // 对员工文件字段进行拆分 70 kv = value.toString().split(","); 71 72 // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资 73 if (deptMap.containsKey(kv[7])) { 74 if (null != kv[5] && !"".equals(kv[5].toString())) { 75 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 76 } 77 } 78 } 79 } 80 81 public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 82 83 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 84 85 // 对同一部门的员工工资进行求和 86 long sumSalary = 0; 87 for (Text val : values) { 88 sumSalary += Long.parseLong(val.toString()); 89 } 90 91 // 输出key为部门名称和value为该部门员工工资总和 92 context.write(key, new LongWritable(sumSalary)); 93 } 94 } 95 96 @Override 97 public int run(String[] args) throws Exception { 98 99 // 实例化作业对象,设置作业名称、Mapper和Reduce类100 Job job = new Job(getConf(), "Q1SumDeptSalary");101 job.setJobName("Q1SumDeptSalary");102 job.setJarByClass(Q1SumDeptSalary.class);103 job.setMapperClass(MapClass.class);104 job.setReducerClass(Reduce.class);105 106 // 设置输入格式类107 job.setInputFormatClass(TextInputFormat.class);108 109 // 设置输出格式110 job.setOutputFormatClass(TextOutputFormat.class);111 job.setOutputKeyClass(Text.class);112 job.setOutputValueClass(Text.class);113 114 // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径115 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();116 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());117 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));118 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));119 120 job.waitForCompletion(true);121 return job.isSuccessful() ? 0 : 1;122 }123 124 /**125 * 主方法,执行入口126 * @param args 输入参数127 */128 public static void main(String[] args) throws Exception {129 int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);130 System.exit(res);131 }132 }
3.1.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q1SumDeptSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q1SumDeptSalary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java
把编译好的代码打成jar包(如果不打成jar形式运行会提示class无法找到的错误)
jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
mv *.jar ../..
rm Q1SumDept*.class
3.1.5运行并查看结果
运行Q1SumDeptSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out1
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1
运行成功后,刷新CentOS HDFS中的输出路径/class6/out1目录,打开part-r-00000文件
hadoop fs -ls /class6/out1
hadoop fs -cat /class6/out1/part-r-00000
可以看到运行结果:
ACCOUNTING8750
RESEARCH6775
SALES9400
3.2测试例子2:求各个部门的人数和平均工资
3.2.1问题分析
求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。
3.2.2处理流程图
3.2.3编写代码
Q2DeptNumberAveSalary.java代码:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q2DeptNumberAveSalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用于缓存 dept文件中的数据 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法会在Map方法执行之前执行且执行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 // 从当前作业中获取要缓存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 42 // 对部门文件字段进行拆分并缓存到deptMap中 43 if (path.toString().contains("dept")) { 44 in = new BufferedReader(new FileReader(path.toString())); 45 while (null != (deptIdName = in.readLine())) { 46 47 // 对部门文件字段进行拆分并缓存到deptMap中 48 // 其中Map中key为部门编号,value为所在部门名称 49 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50 } 51 } 52 } 53 } catch (IOException e) { 54 e.printStackTrace(); 55 } finally { 56 try { 57 if (in != null) { 58 in.close(); 59 } 60 } catch (IOException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 66 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 67 68 // 对员工文件字段进行拆分 69 kv = value.toString().split(","); 70 71 // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资 72 if (deptMap.containsKey(kv[7])) { 73 if (null != kv[5] && !"".equals(kv[5].toString())) { 74 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 75 } 76 } 77 } 78 } 79 80 public static class Reduce extends Reducer<Text, Text, Text, Text> { 81 82 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 83 84 long sumSalary = 0; 85 int deptNumber = 0; 86 87 // 对同一部门的员工工资进行求和 88 for (Text val : values) { 89 sumSalary += Long.parseLong(val.toString()); 90 deptNumber++; 91 } 92 93 // 输出key为部门名称和value为该部门员工工资平均值 94 context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber)); 95 } 96 } 97 98 @Override 99 public int run(String[] args) throws Exception {100 101 // 实例化作业对象,设置作业名称、Mapper和Reduce类102 Job job = new Job(getConf(), "Q2DeptNumberAveSalary");103 job.setJobName("Q2DeptNumberAveSalary");104 job.setJarByClass(Q2DeptNumberAveSalary.class);105 job.setMapperClass(MapClass.class);106 job.setReducerClass(Reduce.class);107 108 // 设置输入格式类109 job.setInputFormatClass(TextInputFormat.class);110 111 // 设置输出格式类112 job.setOutputFormatClass(TextOutputFormat.class);113 job.setOutputKeyClass(Text.class);114 job.setOutputValueClass(Text.class);115 116 // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径117 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();118 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());119 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));120 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));121 122 job.waitForCompletion(true);123 return job.isSuccessful() ? 0 : 1;124 }125 126 /**127 * 主方法,执行入口128 * @param args 输入参数129 */130 public static void main(String[] args) throws Exception {131 int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);132 System.exit(res);133 }134 }
3.2.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q2DeptNumberAveSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q2DeptNumberAveSalary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class
mv *.jar ../..
rm Q2DeptNum*.class
3.2.5运行并查看结果
运行Q2DeptNumberAveSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out2
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2
运行成功后,刷新CentOS HDFS中的输出路径/class6/out2目录
hadoop fs -ls /class6/out2
hadoop fs -cat /class6/out2/part-r-00000
打开part-r-00000文件,可以看到运行结果:
ACCOUNTINGDept Number:3,Ave Salary:2916
RESEARCHDept Number:3,Ave Salary:2258
SALESDept Number:6,Ave Salary:1566
3.3测试例子3:求每个部门最早进入公司的员工姓名
3.3.1问题分析
求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所有员工,找出最早进入公司的员工并输出。
3.3.2处理流程图
3.3.3编写代码
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.text.DateFormat; 5 import java.text.ParseException; 6 import java.text.SimpleDateFormat; 7 import java.util.Date; 8 import java.util.HashMap; 9 import java.util.Map; 10 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.conf.Configured; 13 import org.apache.hadoop.filecache.DistributedCache; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.Reducer; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 24 import org.apache.hadoop.util.GenericOptionsParser; 25 import org.apache.hadoop.util.Tool; 26 import org.apache.hadoop.util.ToolRunner; 27 28 public class Q3DeptEarliestEmp extends Configured implements Tool { 29 30 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 31 32 // 用于缓存 dept文件中的数据 33 private Map<String, String> deptMap = new HashMap<String, String>(); 34 private String[] kv; 35 36 // 此方法会在Map方法执行之前执行且执行一次 37 @Override 38 protected void setup(Context context) throws IOException, InterruptedException { 39 BufferedReader in = null; 40 try { 41 // 从当前作业中获取要缓存的文件 42 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 43 String deptIdName = null; 44 for (Path path : paths) { 45 if (path.toString().contains("dept")) { 46 in = new BufferedReader(new FileReader(path.toString())); 47 while (null != (deptIdName = in.readLine())) { 48 49 // 对部门文件字段进行拆分并缓存到deptMap中 50 // 其中Map中key为部门编号,value为所在部门名称 51 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 52 } 53 } 54 } 55 } catch (IOException e) { 56 e.printStackTrace(); 57 } finally { 58 try { 59 if (in != null) { 60 in.close(); 61 } 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 } 67 68 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 // 对员工文件字段进行拆分 71 kv = value.toString().split(","); 72 73 // map join: 在map阶段过滤掉不需要的数据 74 // 输出key为部门名称和value为员工姓名+","+员工进入公司日期 75 if (deptMap.containsKey(kv[7])) { 76 if (null != kv[4] && !"".equals(kv[4].toString())) { 77 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim() + "," + kv[4].trim())); 78 } 79 } 80 } 81 } 82 83 public static class Reduce extends Reducer<Text, Text, Text, Text> { 84 85 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 86 87 // 员工姓名和进入公司日期 88 String empName = null; 89 String empEnterDate = null; 90 91 // 设置日期转换格式和最早进入公司的员工、日期 92 DateFormat df = new SimpleDateFormat("dd-MM月-yy"); 93 94 Date earliestDate = new Date(); 95 String earliestEmp = null; 96 97 // 遍历该部门下所有员工,得到最早进入公司的员工信息 98 for (Text val : values) { 99 empName = val.toString().split(",")[0];100 empEnterDate = val.toString().split(",")[1].toString().trim();101 try {102 System.out.println(df.parse(empEnterDate));103 if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {104 earliestDate = df.parse(empEnterDate);105 earliestEmp = empName;106 }107 } catch (ParseException e) {108 e.printStackTrace();109 }110 }111 112 // 输出key为部门名称和value为该部门最早进入公司员工113 context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));114 }115 }116 117 @Override118 public int run(String[] args) throws Exception {119 120 // 实例化作业对象,设置作业名称121 Job job = new Job(getConf(), "Q3DeptEarliestEmp");122 job.setJobName("Q3DeptEarliestEmp");123 124 // 设置Mapper和Reduce类125 job.setJarByClass(Q3DeptEarliestEmp.class);126 job.setMapperClass(MapClass.class);127 job.setReducerClass(Reduce.class);128 129 // 设置输入格式类130 job.setInputFormatClass(TextInputFormat.class);131 132 // 设置输出格式类133 job.setOutputFormatClass(TextOutputFormat.class);134 job.setOutputKeyClass(Text.class);135 job.setOutputValueClass(Text.class);136 137 // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径138 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();139 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());140 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));141 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));142 143 job.waitForCompletion(true);144 return job.isSuccessful() ? 0 : 1;145 }146 147 /**148 * 主方法,执行入口149 * @param args 输入参数150 */151 public static void main(String[] args) throws Exception {152 int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);153 System.exit(res);154 }155 }
3.3.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q3DeptEarliestEmp.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q3DeptEarliestEmp.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.class
mv *.jar ../..
rm Q3DeptEar*.class
3.3.5运行并查看结果
运行Q3DeptEarliestEmp时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out3
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3
运行成功后,刷新CentOS HDFS中的输出路径/class6/out3目录
hadoop fs -ls /class6/out3
hadoop fs -cat /class6/out3/part-r-00000
打开part-r-00000文件,可以看到运行结果:
ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09
RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17
SALESThe earliest emp of dept:ALLEN, Enter date:1981-02-20
3.4测试例子4:求各个城市的员工的总工资
3.4.1问题分析
求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和测试例子1类似在Mapper的Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资总数并输出。
3.4.2处理流程图
3.4.3编写代码
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q4SumCitySalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用于缓存 dept文件中的数据 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法会在Map方法执行之前执行且执行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 // 从当前作业中获取要缓存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 if (path.toString().contains("dept")) { 42 in = new BufferedReader(new FileReader(path.toString())); 43 while (null != (deptIdName = in.readLine())) { 44 45 // 对部门文件字段进行拆分并缓存到deptMap中 46 // 其中Map中key为部门编号,value为所在城市名称 47 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]); 48 } 49 } 50 } 51 } catch (IOException e) { 52 e.printStackTrace(); 53 } finally { 54 try { 55 if (in != null) { 56 in.close(); 57 } 58 } catch (IOException e) { 59 e.printStackTrace(); 60 } 61 } 62 } 63 64 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 65 66 // 对员工文件字段进行拆分 67 kv = value.toString().split(","); 68 69 // map join: 在map阶段过滤掉不需要的数据,输出key为城市名称和value为员工工资 70 if (deptMap.containsKey(kv[7])) { 71 if (null != kv[5] && !"".equals(kv[5].toString())) { 72 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 73 } 74 } 75 } 76 } 77 78 public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 79 80 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 81 82 // 对同一城市的员工工资进行求和 83 long sumSalary = 0; 84 for (Text val : values) { 85 sumSalary += Long.parseLong(val.toString()); 86 } 87 88 // 输出key为城市名称和value为该城市工资总和 89 context.write(key, new LongWritable(sumSalary)); 90 } 91 } 92 93 @Override 94 public int run(String[] args) throws Exception { 95 96 // 实例化作业对象,设置作业名称 97 Job job = new Job(getConf(), "Q4SumCitySalary"); 98 job.setJobName("Q4SumCitySalary"); 99 100 // 设置Mapper和Reduce类101 job.setJarByClass(Q4SumCitySalary.class);102 job.setMapperClass(MapClass.class);103 job.setReducerClass(Reduce.class);104 105 // 设置输入格式类106 job.setInputFormatClass(TextInputFormat.class);107 108 // 设置输出格式类109 job.setOutputFormatClass(TextOutputFormat.class);110 job.setOutputKeyClass(Text.class);111 job.setOutputValueClass(Text.class);112 113 // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径114 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();115 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());116 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));117 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));118 119 job.waitForCompletion(true);120 return job.isSuccessful() ? 0 : 1;121 }122 123 /**124 * 主方法,执行入口125 * @param args 输入参数126 */127 public static void main(String[] args) throws Exception {128 int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);129 System.exit(res);130 }131 }
3.4.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q4SumCitySalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q4SumCitySalary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.class
mv *.jar ../..
rm Q4SumCity*.class
3.4.5运行并查看结果
运行Q4SumCitySalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out4
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4
运行成功后,刷新CentOS HDFS中的输出路径/class6/out4目录
hadoop fs -ls /class6/out4
hadoop fs -cat /class6/out4/part-r-00000
打开part-r-00000文件,可以看到运行结果:
CHICAGO9400
DALLAS6775
NEW YORK8750
3.5测试例子5:列出工资比上司高的员工姓名及其工资
3.5.1问题分析
求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为"M,该员工工资",员工对应经理表数据key为经理编号、value为"E,该员工姓名,该员工工资";然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。
3.5.2处理流程图
3.5.3编写代码
1 import java.io.IOException; 2 import java.util.HashMap; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class Q5EarnMoreThanManager extends Configured implements Tool { 21 22 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 23 24 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 25 26 // 对员工文件字段进行拆分 27 String[] kv = value.toString().split(","); 28 29 // 输出经理表数据,其中key为员工编号和value为M+该员工工资 30 context.write(new Text(kv[0].toString()), new Text("M," + kv[5])); 31 32 // 输出员工对应经理表数据,其中key为经理编号和value为(E,该员工姓名,该员工工资) 33 if (null != kv[3] && !"".equals(kv[3].toString())) { 34 context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5])); 35 } 36 } 37 } 38 39 public static class Reduce extends Reducer<Text, Text, Text, Text> { 40 41 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 42 43 // 定义员工姓名、工资和存放部门员工Map 44 String empName; 45 long empSalary = 0; 46 HashMap<String, Long> empMap = new HashMap<String, Long>(); 47 48 // 定义经理工资变量 49 long mgrSalary = 0; 50 51 for (Text val : values) { 52 if (val.toString().startsWith("E")) { 53 // 当是员工标示时,获取该员工对应的姓名和工资并放入Map中 54 empName = val.toString().split(",")[1]; 55 empSalary = Long.parseLong(val.toString().split(",")[2]); 56 empMap.put(empName, empSalary); 57 } else { 58 // 当时经理标志时,获取该经理工资 59 mgrSalary = Long.parseLong(val.toString().split(",")[1]); 60 } 61 } 62 63 // 遍历该经理下属,比较员工与经理工资高低,输出工资高于经理的员工 64 for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) { 65 if (entry.getValue() > mgrSalary) { 66 context.write(new Text(entry.getKey()), new Text("" + entry.getValue())); 67 } 68 } 69 } 70 } 71 72 @Override 73 public int run(String[] args) throws Exception { 74 75 // 实例化作业对象,设置作业名称 76 Job job = new Job(getConf(), "Q5EarnMoreThanManager"); 77 job.setJobName("Q5EarnMoreThanManager"); 78 79 // 设置Mapper和Reduce类 80 job.setJarByClass(Q5EarnMoreThanManager.class); 81 job.setMapperClass(MapClass.class); 82 job.setReducerClass(Reduce.class); 83 84 // 设置输入格式类 85 job.setInputFormatClass(TextInputFormat.class); 86 87 // 设置输出格式类 88 job.setOutputFormatClass(TextOutputFormat.class); 89 job.setOutputKeyClass(Text.class); 90 job.setOutputValueClass(Text.class); 91 92 // 第1个参数为员工数据路径和第2个参数为输出路径 93 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 94 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 95 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 96 97 job.waitForCompletion(true); 98 return job.isSuccessful() ? 0 : 1; 99 }100 101 /**102 * 主方法,执行入口103 * @param args 输入参数104 */105 public static void main(String[] args) throws Exception {106 int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);107 System.exit(res);108 }109 }
3.5.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q5EarnMoreThanManager.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.class
mv *.jar ../..
rm Q5EarnMore*.class
3.5.5运行并查看结果
运行Q5EarnMoreThanManager运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out5
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5
运行成功后,刷新CentOS HDFS中的输出路径/class6/out5目录
hadoop fs -ls /class6/out5
hadoop fs -cat /class6/out5/part-r-00000
打开part-r-00000文件,可以看到运行结果:
FORD3000
3.6测试例子6:列出工资比公司平均工资要高的员工姓名及其工资
3.6.1问题分析
求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,从而能先求出平均工资,然后进行比较得出结果。
在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为0、value为"该员工姓名,员工工资";然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。
3.6.2处理流程图
3.6.3编写代码
1 import java.io.IOException; 2 import org.apache.hadoop.conf.Configuration; 3 import org.apache.hadoop.conf.Configured; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 14 import org.apache.hadoop.util.GenericOptionsParser; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 18 public class Q6HigherThanAveSalary extends Configured implements Tool { 19 20 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 21 22 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 23 24 // 对员工文件字段进行拆分 25 String[] kv = value.toString().split(","); 26 27 // 获取所有员工数据,其中key为0和value为该员工工资 28 context.write(new IntWritable(0), new Text(kv[5])); 29 30 // 获取所有员工数据,其中key为0和value为(该员工姓名 ,员工工资) 31 context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5])); 32 } 33 } 34 35 public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 36 37 // 定义员工工资、员工数和平均工资 38 private long allSalary = 0; 39 private int allEmpCount = 0; 40 private long aveSalary = 0; 41 42 // 定义员工工资变量 43 private long empSalary = 0; 44 45 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 46 47 for (Text val : values) { 48 if (0 == key.get()) { 49 // 获取所有员工工资和员工数 50 allSalary += Long.parseLong(val.toString()); 51 allEmpCount++; 52 System.out.println("allEmpCount = " + allEmpCount); 53 } else if (1 == key.get()) { 54 if (aveSalary == 0) { 55 aveSalary = allSalary / allEmpCount; 56 context.write(new Text("Average Salary = "), new Text("" + aveSalary)); 57 context.write(new Text("Following employees have salarys higher than Average:"), new Text("")); 58 } 59 60 // 获取员工的平均工资 61 System.out.println("Employee salary = " + val.toString()); 62 aveSalary = allSalary / allEmpCount; 63 64 // 比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资 65 empSalary = Long.parseLong(val.toString().split(",")[1]); 66 if (empSalary > aveSalary) { 67 context.write(new Text(val.toString().split(",")[0]), new Text("" + empSalary)); 68 } 69 } 70 } 71 } 72 } 73 74 @Override 75 public int run(String[] args) throws Exception { 76 77 // 实例化作业对象,设置作业名称 78 Job job = new Job(getConf(), "Q6HigherThanAveSalary"); 79 job.setJobName("Q6HigherThanAveSalary"); 80 81 // 设置Mapper和Reduce类 82 job.setJarByClass(Q6HigherThanAveSalary.class); 83 job.setMapperClass(MapClass.class); 84 job.setReducerClass(Reduce.class); 85 86 // 必须设置Reduce任务数为1 # -D mapred.reduce.tasks = 1 87 // 这是该作业设置的核心,这样才能够保证各reduce是串行的 88 job.setNumReduceTasks(1); 89 90 // 设置输出格式类 91 job.setMapOutputKeyClass(IntWritable.class); 92 job.setMapOutputValueClass(Text.class); 93 94 // 设置输出键和值类型 95 job.setOutputFormatClass(TextOutputFormat.class); 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(LongWritable.class); 98 99 // 第1个参数为员工数据路径和第2个参数为输出路径100 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();101 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));102 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));103 104 job.waitForCompletion(true);105 return job.isSuccessful() ? 0 : 1;106 }107 108 /**109 * 主方法,执行入口110 * @param args 输入参数111 */112 public static void main(String[] args) throws Exception {113 int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);114 System.exit(res);115 }116 }
3.6.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q6HigherThanAveSalary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.class
mv *.jar ../..
rm Q6HigherThan*.class
3.6.5运行并查看结果
运行Q6HigherThanAveSalary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out6
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6
运行成功后,刷新CentOS HDFS中的输出路径/class6/out6目录
hadoop fs -ls /class6/out6
hadoop fs -cat /class6/out6/part-r-00000
打开part-r-00000文件,可以看到运行结果:
Average Salary = 2077
Following employees have salarys higher than Average:
FORD3000
CLARK2450
KING5000
JONES2975
BLAKE2850
3.7测试例子7:列出名字以J开头的员工姓名及其所属部门名称
3.7.1问题分析
求名字以J开头的员工姓名机器所属部门名称,只需判断员工姓名是否以J开头。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段判断员工姓名是否以J开头,如果是抽取出员工姓名和员工所在部门编号,利用缓存部门数据把部门编号对应为部门名称,转换后输出结果。
3.7.2处理流程图
3.7.3编写代码
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.util.GenericOptionsParser; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22 23 public class Q7NameDeptOfStartJ extends Configured implements Tool { 24 25 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 26 27 // 用于缓存 dept文件中的数据 28 private Map<String, String> deptMap = new HashMap<String, String>(); 29 private String[] kv; 30 31 // 此方法会在Map方法执行之前执行且执行一次 32 @Override 33 protected void setup(Context context) throws IOException, InterruptedException { 34 BufferedReader in = null; 35 try { 36 37 // 从当前作业中获取要缓存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 42 // 对部门文件字段进行拆分并缓存到deptMap中 43 if (path.toString().contains("dept")) { 44 in = new BufferedReader(new FileReader(path.toString())); 45 while (null != (deptIdName = in.readLine())) { 46 47 // 对部门文件字段进行拆分并缓存到deptMap中 48 // 其中Map中key为部门编号,value为所在部门名称 49 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50 } 51 } 52 } 53 } catch (IOException e) { 54 e.printStackTrace(); 55 } finally { 56 try { 57 if (in != null) { 58 in.close(); 59 } 60 } catch (IOException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 66 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 67 68 // 对员工文件字段进行拆分 69 kv = value.toString().split(","); 70 71 // 输出员工姓名为J开头的员工信息,key为员工姓名和value为员工所在部门名称 72 if (kv[1].toString().trim().startsWith("J")) { 73 context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim()))); 74 } 75 } 76 } 77 78 @Override 79 public int run(String[] args) throws Exception { 80 81 // 实例化作业对象,设置作业名称 82 Job job = new Job(getConf(), "Q7NameDeptOfStartJ"); 83 job.setJobName("Q7NameDeptOfStartJ"); 84 85 // 设置Mapper和Reduce类 86 job.setJarByClass(Q7NameDeptOfStartJ.class); 87 job.setMapperClass(MapClass.class); 88 89 // 设置输入格式类 90 job.setInputFormatClass(TextInputFormat.class); 91 92 // 设置输出格式类 93 job.setOutputFormatClass(TextOutputFormat.class); 94 job.setOutputKeyClass(Text.class); 95 job.setOutputValueClass(Text.class); 96 97 // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径 98 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 99 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());100 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));101 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));102 103 job.waitForCompletion(true);104 return job.isSuccessful() ? 0 : 1;105 }106 107 /**108 * 主方法,执行入口109 * @param args 输入参数110 */111 public static void main(String[] args) throws Exception {112 int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);113 System.exit(res);114 }115 }
3.7.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q7NameDeptOfStartJ.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q7NameDeptOfStartJ.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.class
mv *.jar ../..
rm Q7NameDept*.class
3.7.5运行并查看结果
运行Q7NameDeptOfStartJ时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out7
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7
运行成功后,刷新CentOS HDFS中的输出路径/class6/out7目录
hadoop fs -ls /class6/out7
hadoop fs -cat /class6/out7/part-r-00000
打开part-r-00000文件,可以看到运行结果:
JAMESSALES
JONESRESEARCH
3.8测试例子8:列出工资最高的头三名员工姓名及其工资
3.8.1问题分析
求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为"员工姓名,员工工资";最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。
3.8.2处理流程图
3.8.3编写代码
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class Q8SalaryTop3Salary extends Configured implements Tool { 21 22 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 23 24 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 25 26 // 对员工文件字段进行拆分 27 String[] kv = value.toString().split(","); 28 29 // 输出key为0和value为员工姓名+","+员工工资 30 context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim())); 31 } 32 } 33 34 public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 35 36 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 37 38 // 定义工资前三员工姓名 39 String empName; 40 String firstEmpName = ""; 41 String secondEmpName = ""; 42 String thirdEmpName = ""; 43 44 // 定义工资前三工资 45 long empSalary = 0; 46 long firstEmpSalary = 0; 47 long secondEmpSalary = 0; 48 long thirdEmpSalary = 0; 49 50 // 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名 51 for (Text val : values) { 52 empName = val.toString().split(",")[0]; 53 empSalary = Long.parseLong(val.toString().split(",")[1]); 54 55 if(empSalary > firstEmpSalary) { 56 thirdEmpName = secondEmpName; 57 thirdEmpSalary = secondEmpSalary; 58 secondEmpName = firstEmpName; 59 secondEmpSalary = firstEmpSalary; 60 firstEmpName = empName; 61 firstEmpSalary = empSalary; 62 } else if (empSalary > secondEmpSalary) { 63 thirdEmpName = secondEmpName; 64 thirdEmpSalary = secondEmpSalary; 65 secondEmpName = empName; 66 secondEmpSalary = empSalary; 67 } else if (empSalary > thirdEmpSalary) { 68 thirdEmpName = empName; 69 thirdEmpSalary = empSalary; 70 } 71 } 72 73 // 输出工资前三名信息 74 context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:" + firstEmpSalary)); 75 context.write(new Text( "Second employee name:" + secondEmpName), new Text("Salary:" + secondEmpSalary)); 76 context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:" + thirdEmpSalary)); 77 } 78 } 79 80 @Override 81 public int run(String[] args) throws Exception { 82 83 // 实例化作业对象,设置作业名称 84 Job job = new Job(getConf(), "Q8SalaryTop3Salary"); 85 job.setJobName("Q8SalaryTop3Salary"); 86 87 // 设置Mapper和Reduce类 88 job.setJarByClass(Q8SalaryTop3Salary.class); 89 job.setMapperClass(MapClass.class); 90 job.setReducerClass(Reduce.class); 91 job.setMapOutputKeyClass(IntWritable.class); 92 job.setMapOutputValueClass(Text.class); 93 94 // 设置输入格式类 95 job.setInputFormatClass(TextInputFormat.class); 96 97 // 设置输出格式类 98 job.setOutputKeyClass(Text.class); 99 job.setOutputFormatClass(TextOutputFormat.class);100 job.setOutputValueClass(Text.class);101 102 // 第1个参数为员工数据路径和第2个参数为输出路径103 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();104 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));105 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));106 107 job.waitForCompletion(true);108 return job.isSuccessful() ? 0 : 1;109 }110 111 /**112 * 主方法,执行入口113 * @param args 输入参数114 */115 public static void main(String[] args) throws Exception {116 int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);117 System.exit(res);118 }119 }
3.8.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q8SalaryTop3Salary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q8SalaryTop3Salary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.class
mv *.jar ../..
rm Q8SalaryTop3*.class
3.8.5运行并查看结果
运行Q8SalaryTop3Salary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out8
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8
运行成功后,刷新CentOS HDFS中的输出路径/class6/out8目录
hadoop fs -ls /class6/out8
hadoop fs -cat /class6/out8/part-r-00000
打开part-r-00000文件,可以看到运行结果:
First employee name:KINGSalary:5000
Second employee name:FORDSalary:3000
Third employee name:JONESSalary:2975
3.9测试例子9:将全体员工按照总收入(工资+提成)从高到低排列
3.9.1问题分析
求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数据进行排序。
3.9.2处理流程图
3.9.3编写代码
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.WritableComparable;10 import org.apache.hadoop.mapreduce.Job;11 import org.apache.hadoop.mapreduce.Mapper;12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;14 import org.apache.hadoop.util.GenericOptionsParser;15 import org.apache.hadoop.util.Tool;16 import org.apache.hadoop.util.ToolRunner;17 18 public class Q9EmpSalarySort extends Configured implements Tool {19 20 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {21 22 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {23 24 // 对员工文件字段进行拆分25 String[] kv = value.toString().split(",");26 27 // 输出key为员工所有工资和value为员工姓名28 int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) : Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]);29 context.write(new IntWritable(empAllSalary), new Text(kv[1]));30 }31 }32 33 /**34 * 递减排序算法35 */36 public static class DecreaseComparator extends IntWritable.Comparator {37 public int compare(WritableComparable a, WritableComparable b) {38 return -super.compare(a, b);39 }40 41 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {42 return -super.compare(b1, s1, l1, b2, s2, l2);43 }44 }45 46 @Override47 public int run(String[] args) throws Exception {48 49 // 实例化作业对象,设置作业名称50 Job job = new Job(getConf(), "Q9EmpSalarySort");51 job.setJobName("Q9EmpSalarySort");52 53 // 设置Mapper和Reduce类54 job.setJarByClass(Q9EmpSalarySort.class);55 job.setMapperClass(MapClass.class);56 57 // 设置输出格式类58 job.setMapOutputKeyClass(IntWritable.class);59 job.setMapOutputValueClass(Text.class);60 job.setSortComparatorClass(DecreaseComparator.class);61 62 // 第1个参数为员工数据路径和第2个参数为输出路径63 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();64 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));65 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));66 67 job.waitForCompletion(true);68 return job.isSuccessful() ? 0 : 1;69 }70 71 /**72 * 主方法,执行入口73 * @param args 输入参数74 */75 public static void main(String[] args) throws Exception {76 int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);77 System.exit(res);78 }79 }
3.9.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q9EmpSalarySort.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q9EmpSalarySort.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.class
mv *.jar ../..
rm Q9EmpSalary*.class
3.9.5运行并查看结果
运行Q9EmpSalarySort运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out9
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9
运行成功后,刷新CentOS HDFS中的输出路径/class6/out9目录
hadoop fs -ls /class6/out9
hadoop fs -cat /class6/out9/part-r-00000
打开part-r-00000文件,可以看到运行结果:
5000KING
3000FORD
2975JONES
2850BLAKE
......
3.10测试例子10:求任何两名员工信息传递所需要经过的中间节点数
3.10.1问题分析
该公司所有员工可以形成入下图的树形结构,求两个员工的沟通的中间节点数,可转换在员工树中求两个节点连通所经过的节点数,即从其中一节点到汇合节点经过节点数加上另一节点到汇合节点经过节点数。例如求M到Q所需节点数,可以先找出M到A经过的节点数,然后找出Q到A经过的节点数,两者相加得到M到Q所需节点数。
在作业中首先在Mapper阶段所有员工数据,其中经理数据key为0值、value为"员工编号,员工经理编号",然后在Reduce阶段把所有员工放到员工列表和员工对应经理链表Map中,最后在Reduce的Cleanup中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出。
3.10.2处理流程图
3.10.3编写代码
1 import java.io.IOException; 2 import java.util.ArrayList; 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IntWritable; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q10MiddlePersonsCountForComm extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 27 28 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 29 30 // 对员工文件字段进行拆分 31 String[] kv = value.toString().split(","); 32 33 // 输出key为0和value为员工编号+","+员工经理编号 34 context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3]))); 35 } 36 } 37 38 public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> { 39 40 // 定义员工列表和员工对应经理Map 41 List<String> employeeList = new ArrayList<String>(); 42 Map<String, String> employeeToManagerMap = new HashMap<String, String>(); 43 44 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 45 46 // 在reduce阶段把所有员工放到员工列表和员工对应经理Map中 47 for (Text value : values) { 48 employeeList.add(value.toString().split(",")[0].trim()); 49 employeeToManagerMap.put(value.toString().split(",")[0].trim(), value.toString().split(",")[1].trim()); 50 } 51 } 52 53 @Override 54 protected void cleanup(Context context) throws IOException, InterruptedException { 55 int totalEmployee = employeeList.size(); 56 int i, j; 57 int distance; 58 System.out.println(employeeList); 59 System.out.println(employeeToManagerMap); 60 61 // 对任意两个员工计算出沟通的路径长度并输出 62 for (i = 0; i < (totalEmployee - 1); i++) { 63 for (j = (i + 1); j < totalEmployee; j++) { 64 distance = calculateDistance(i, j); 65 String value = employeeList.get(i) + " and " + employeeList.get(j) + " = " + distance; 66 context.write(NullWritable.get(), new Text(value)); 67 } 68 } 69 } 70 71 /** 72 * 该公司可以由所有员工形成树形结构,求两个员工的沟通的中间节点数,可以转换在员工树中两员工之间的距离 73 * 由于在树中任意两点都会在某上级节点汇合,根据该情况设计了如下算法 74 */ 75 private int calculateDistance(int i, int j) { 76 String employeeA = employeeList.get(i); 77 String employeeB = employeeList.get(j); 78 int distance = 0; 79 80 // 如果A是B的经理,反之亦然 81 if (employeeToManagerMap.get(employeeA).equals(employeeB) || employeeToManagerMap.get(employeeB).equals(employeeA)) { 82 distance = 0; 83 } 84 // A和B在同一经理下 85 else if (employeeToManagerMap.get(employeeA).equals( 86 employeeToManagerMap.get(employeeB))) { 87 distance = 0; 88 } else { 89 // 定义A和B对应经理链表 90 List<String> employeeA_ManagerList = new ArrayList<String>(); 91 List<String> employeeB_ManagerList = new ArrayList<String>(); 92 93 // 获取从A开始经理链表 94 employeeA_ManagerList.add(employeeA); 95 String current = employeeA; 96 while (false == employeeToManagerMap.get(current).isEmpty()) { 97 current = employeeToManagerMap.get(current); 98 employeeA_ManagerList.add(current); 99 }100 101 // 获取从B开始经理链表102 employeeB_ManagerList.add(employeeB);103 current = employeeB;104 while (false == employeeToManagerMap.get(current).isEmpty()) {105 current = employeeToManagerMap.get(current);106 employeeB_ManagerList.add(current);107 }108 109 int ii = 0, jj = 0;110 String currentA_manager, currentB_manager;111 boolean found = false;112 113 // 遍历A与B开始经理链表,找出汇合点计算114 for (ii = 0; ii < employeeA_ManagerList.size(); ii++) {115 currentA_manager = employeeA_ManagerList.get(ii);116 for (jj = 0; jj < employeeB_ManagerList.size(); jj++) {117 currentB_manager = employeeB_ManagerList.get(jj);118 if (currentA_manager.equals(currentB_manager)) {119 found = true;120 break;121 }122 }123 124 if (found) {125 break;126 }127 }128 129 // 最后获取两只之前的路径130 distance = ii + jj - 1;131 }132 133 return distance;134 }135 }136 137 @Override138 public int run(String[] args) throws Exception {139 140 // 实例化作业对象,设置作业名称141 Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");142 job.setJobName("Q10MiddlePersonsCountForComm");143 144 // 设置Mapper和Reduce类145 job.setJarByClass(Q10MiddlePersonsCountForComm.class);146 job.setMapperClass(MapClass.class);147 job.setReducerClass(Reduce.class);148 149 // 设置Mapper输出格式类150 job.setMapOutputKeyClass(IntWritable.class);151 job.setMapOutputValueClass(Text.class);152 153 // 设置Reduce输出键和值类型154 job.setOutputFormatClass(TextOutputFormat.class);155 job.setOutputKeyClass(NullWritable.class);156 job.setOutputValueClass(Text.class);157 158 // 第1个参数为员工数据路径和第2个参数为输出路径159 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();160 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));161 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));162 163 job.waitForCompletion(true);164 return job.isSuccessful() ? 0 : 1;165 }166 167 /**168 * 主方法,执行入口169 * @param args 输入参数170 */171 public static void main(String[] args) throws Exception {172 int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);173 System.exit(res);174 }175 }
3.10.4编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q10MiddlePersonsCountForComm.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q10MiddlePersonsCountForComm.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q10MiddlePersonsCountForComm.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java
把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
jar cvf ./Q10MiddlePersonsCountForComm.jar ./Q10MiddlePersons*.class
mv *.jar ../..
rm Q10MiddlePersons*.class
3.10.5运行并查看结果
运行Q10MiddlePersonsCountForComm运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l员工数据路径:hdfs://hadoop:9000/class6/input/emp
l输出路径:hdfs://hadoop:9000/class6/out10
运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q10MiddlePersonsCountForComm.jar Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out10
运行成功后,刷新CentOS HDFS中的输出路径/class6/out10目录
hadoop fs -ls /class6/out10
hadoop fs -cat /class6/out10/part-r-00000
打开part-r-00000文件,可以看到运行结果:
7369 and 7499 = 4
7369 and 7521 = 4
7369 and 7566 = 1
7369 and 7654 = 4
7369 and 7698 = 3
......
相关推荐
《Hadoop实战》一书通过一系列实战案例,展示了如何在实际项目中运用Hadoop解决具体问题。例如,书中可能包含了数据分析、日志处理、推荐系统构建等方面的应用案例,这些案例不仅提供了理论指导,还详细介绍了具体的...
### Hadoop实战大数据大作业——基于Hadoop的单词统计系统 #### 一、课题简介与研究意义 **课题简介:** 本课题旨在设计一个简单的基于Hadoop平台进行的单词统计系统。该系统需要自行搭建Hadoop伪分布式架构,并...
《Hadoop实战》中文版是一本深入探讨Apache Hadoop生态系统技术的书籍,旨在帮助读者理解和掌握分布式计算的核心概念以及在实际项目中的应用。Hadoop Action是本书的副标题,暗示了其强调实践性的特点,旨在通过具体...
《Hadoop实战第二版》是由国内知名大数据专家陆嘉恒编著的一本深入浅出的Hadoop技术指南。这本书针对Hadoop生态系统进行了详尽的解析,尤其适合对大数据处理感兴趣的IT从业者、学生以及研究人员阅读。作为第二版,...
《Hadoop实战.源代码》这个压缩包包含的资源是关于Hadoop技术的实战指南以及相关的源代码,对于想要深入理解和应用Hadoop的IT从业者来说,这是一个宝贵的资料库。Hadoop是一个开源的分布式计算框架,它允许在大量...
《Hadoop实战-陆嘉恒(高清完整版)》是一本深入浅出介绍Hadoop技术的书籍,尤其适合初学者作为入门教材。Hadoop作为大数据处理领域的基石,其重要性不言而喻。这本书详细讲解了Hadoop的核心概念、架构以及实际应用,...
总结来说,Hadoop实战培训涵盖了大数据处理的挑战、解决方案、Hadoop的核心概念、生态系统及其在各行业的应用实例。对于处理和分析大规模数据,Hadoop提供了强大的工具和框架,是现代大数据时代不可或缺的技术。随着...
根据提供的文件信息,我们可以推断出这是一份关于获取《Hadoop实战》一书PDF版本的资源链接。接下来,我们将围绕这一主题展开讨论,重点介绍Hadoop的基本概念、功能特性以及如何利用这本书籍来深入学习Hadoop技术。 ...
内容概述:本次分享是关于Hadoop实战的第三部分,专注于Hadoop系统的应用开发实例。 知识点详细说明: 1. Hadoop系统:Hadoop是一个能够存储和处理大量数据的分布式系统框架,是大数据技术的重要组成部分。它主要...
《Hadoop实战+源代码》是一本深度探讨Hadoop技术的书籍,结合源代码提供了丰富的实践指导。Hadoop是Apache基金会开发的一个开源分布式计算框架,它允许在廉价硬件上处理和存储海量数据,是大数据处理领域的核心工具...
【Hadoop实战】源代码是针对大数据处理框架Hadoop的一份实践性教程的配套源码。这份源码包含了多个章节的示例代码,旨在帮助读者深入理解和应用Hadoop技术。通过对这些代码的学习和实践,我们可以掌握Hadoop的核心...
#### 三、Hadoop实战应用 ##### 2.1 编写基本的MapReduce程序 - **Mapper**:将输入数据转换成键值对形式。 - **Reducer**:处理Mapper产生的中间结果,进一步加工形成最终输出。 - 通过实例讲解如何编写简单的...
#### 三、Hadoop实战应用 ##### 3.1 HDFS实战 - **文件上传与下载**:了解如何使用Hadoop命令行工具将本地文件上传到HDFS以及从HDFS下载文件。 - **文件管理**:掌握HDFS文件管理操作,如创建目录、删除文件、查看...
总之,`hadoop实战源代码Java`提供了学习和实践Hadoop与Java集成的机会,这对于大数据开发者来说是非常宝贵的资源。通过深入研究这些示例代码,你可以更好地理解Hadoop的API用法,提升处理大数据的能力。在实践中,...
【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...
《数据挖掘三部曲3》中的HADOOP实战第二版中文清晰版是一本专为初学者和有经验的IT从业者设计的指南,旨在帮助读者深入理解并掌握大数据处理框架Hadoop的核心技术和应用。Hadoop是Apache软件基金会开发的一个开源...
本篇将基于书名和描述,结合相关标签,深入探讨Hadoop的核心概念、实战应用以及高级编程技巧。 Hadoop是一个开源的分布式计算框架,由Apache软件基金会维护,其核心设计理念是处理和存储海量数据。Hadoop的主要组成...
#### 四、Hadoop实战应用案例 - **日志分析**:收集来自网站或应用的日志数据,并使用Hadoop进行处理,提取有用的信息,如用户访问趋势、热门页面等。 - **推荐系统**:利用用户的历史行为数据,通过MapReduce等...
首先,《Hadoop实战》这本书旨在帮助初学者快速掌握Hadoop的实际应用。书中会介绍如何安装和配置Hadoop环境,以及如何使用Hadoop MapReduce进行大规模数据处理。MapReduce是Hadoop的核心组件之一,它将大型数据集...