0 引子:
读取两个文件:
hello:
1,zhangsan
2,lisi
3,wangwu
hello1:
1,45
2,56
3,89
最后实现如下输出:
zhangsan,45
lisi,56
wangwu,89
0.1) 从两个文件中得到数据,在map端根据文件名做记录,后在reduce上实现输出, 因为数据在不同文件中,因此必须也只能在reduce端做join操作,在join之前需要依赖map端做的针对文件来源做标记
1 代入如下,主要看自定义map和reduce的写法
package join; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * */ public class MapJoinApp { static String FILE_ROOT = "hdfs://master:9000/"; static String FILE_INPUT = "hdfs://master:9000/files"; static String FILE_OUTPUT = "hdfs://master:9000/out"; public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(FILE_OUTPUT); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, FILE_INPUT); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 目前按照默认方式执行 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活s job.waitForCompletion(true); } } /** * */ class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ String line = ""; @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit)context.getInputSplit(); String filename = split.getPath().getName(); // hello or hello1 文件名 //String pathStr = split.getPath().toString();// hdfs://master:9000/files/hello or hdfs://master:9000/files/hello1 System.out.println(filename); line = v1.toString();// 逐行执行 最后一行就是文件的最后一样内容 String[] v1s = v1.toString().split(","); String v2Str = ""; if("hello".equals(filename)){ // hello文件内容格式为: 1 zhangsan v2Str = "#" + v1s[1]; System.out.println("hello : " + v2Str); } if("hello1".equals(filename)){ // hello1文件内容格式为: 1 45 v2Str = "*" + v1s[1]; System.out.println("hello1 : " + v2Str); } //for(String word : v1s){ context.write(new LongWritable(Long.parseLong(v1s[0])), new Text(v2Str)); //} } } /** */ class MyReducer extends Reducer<LongWritable, Text, Text, Text>{ protected void reduce(LongWritable k2, Iterable<Text> v2s, Context ctx) throws IOException, InterruptedException { System.out.println("reduce ..."); String k3Str = ""; String v3Str = ""; for(Text v2 : v2s){ //System.out.println("k2: " + k2.get() + " v2: " + l.toString()); if(v2.toString().startsWith("#")){ k3Str = v2.toString().substring(1, v2.toString().length()); } if(v2.toString().startsWith("*")){ v3Str = v2.toString().substring(1, v2.toString().length()); } } ctx.write(new Text(k3Str), new Text(v3Str)); } }
2 结果:
[root@master local]# hadoop fs -text /out/part-r-00000 Warning: $HADOOP_HOME is deprecated. zhangsan 45 lisi 56 wangwu 89
相关推荐
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
3. **验证安装**: 重启Eclipse后,如果插件安装成功,你可以在`File` -> `New` -> `Project`中看到新的模板,如"Hadoop Map/Reduce Project"。 4. **配置Hadoop连接**: 在创建新的Hadoop Map/Reduce项目时,需要...
Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...
赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...
Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....
为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...
Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber
在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
# 解压命令 tar -zxvf flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar.tar.gz # 介绍 用于CDH部署 Flink所依赖的jar包
赠送jar包:hadoop-yarn-server-resourcemanager-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-resourcemanager-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-resourcemanager-2.6.0-sources.jar; 赠送...
《Hadoop-eclipse-plugin-2.7.2:在Eclipse中轻松开发Hadoop应用》 在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要...