`

hadoop reduce端join ---> 打标记

 
阅读更多

 

 

0 引子:

 

读取两个文件:

hello:

1,zhangsan
2,lisi
3,wangwu

 

hello1:

1,45
2,56
3,89

 

最后实现如下输出:

zhangsan,45
lisi,56
wangwu,89

 

0.1) 从两个文件中得到数据,在map端根据文件名做记录,后在reduce上实现输出, 因为数据在不同文件中,因此必须也只能在reduce端做join操作,在join之前需要依赖map端做的针对文件来源做标记

 

1 代入如下,主要看自定义map和reduce的写法

 

package join;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 *
 */
public class MapJoinApp {

	static String FILE_ROOT = "hdfs://master:9000/";
	static String FILE_INPUT = "hdfs://master:9000/files";
	static String FILE_OUTPUT = "hdfs://master:9000/out";
	public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(FILE_OUTPUT);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, FILE_INPUT);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		//1.3 分区
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5 TODO 规约
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活s
		job.waitForCompletion(true);
		
	}
	
}

/**
 *
 */
class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
	String line = "";
	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		FileSplit split = (FileSplit)context.getInputSplit();
		String filename = split.getPath().getName(); // hello or  hello1 文件名
		//String pathStr = split.getPath().toString();// hdfs://master:9000/files/hello or hdfs://master:9000/files/hello1
		System.out.println(filename);
		line = v1.toString();// 逐行执行 最后一行就是文件的最后一样内容
		String[] v1s = v1.toString().split(",");
		String v2Str = "";
		if("hello".equals(filename)){ // hello文件内容格式为: 1	zhangsan
			v2Str = "#" + v1s[1];
			System.out.println("hello : " +  v2Str);
		}
		if("hello1".equals(filename)){ // hello1文件内容格式为: 1	45
			v2Str = "*" + v1s[1];
			System.out.println("hello1 : " +  v2Str);
		}
		//for(String word : v1s){
			context.write(new LongWritable(Long.parseLong(v1s[0])), new Text(v2Str));
		//}
	}
	
	
}

/**
 */
class MyReducer extends Reducer<LongWritable, Text, Text, Text>{

	protected void reduce(LongWritable k2, Iterable<Text> v2s, Context ctx)
			throws IOException, InterruptedException {
		System.out.println("reduce ...");
		
		String k3Str = "";
		String v3Str = "";
		
		for(Text v2 : v2s){
			//System.out.println("k2: " + k2.get() + " v2: " + l.toString());
			if(v2.toString().startsWith("#")){
				k3Str = v2.toString().substring(1, v2.toString().length());
			}
			if(v2.toString().startsWith("*")){
				v3Str = v2.toString().substring(1, v2.toString().length());
			}
		}
		
		ctx.write(new Text(k3Str), new Text(v3Str));
	}
	
}


 

2 结果:

 

[root@master local]# hadoop fs -text /out/part-r-00000
Warning: $HADOOP_HOME is deprecated.

zhangsan        45
lisi    56
wangwu  89

 

 

分享到:
评论

相关推荐

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

    hadoop插件apache-hadoop-3.1.0-winutils-master.zip

    标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...

    hadoop-eclipse-plugin-2.10.0.jar

    Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...

    Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码

    Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...

    hadoop-yarn-client-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...

    hadoop-common-2.6.0-bin-master.zip

    `hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...

    hadoop-mapreduce-client-common-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...

    hadoop-eclipse-plugin-2.7.3和2.7.7

    hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包

    eclipse运行mr插件hadoop-eclipse-plugin-2.6.0.jar

    3. **验证安装**: 重启Eclipse后,如果插件安装成功,你可以在`File` -&gt; `New` -&gt; `Project`中看到新的模板,如"Hadoop Map/Reduce Project"。 4. **配置Hadoop连接**: 在创建新的Hadoop Map/Reduce项目时,需要...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...

    hadoop-yarn-common-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-common-2.6.5.jar 赠送原API文档:hadoop-yarn-common-2.6.5-javadoc.jar 赠送源代码:hadoop-yarn-common-2.6.5-sources.jar 包含翻译后的API文档:hadoop-yarn-common-2.6.5-javadoc-...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....

    hadoop-eclipse-plugin1.2.1 and hadoop-eclipse-plugin2.8.0

    为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...

    flink-shaded-hadoop-2-uber-2.7.2-10.0.jar

    Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...

    hadoop-eclipse-plugin-3.1.3.jar

    hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03

    flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar

    # 解压命令 tar -zxvf flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar.tar.gz # 介绍 用于CDH部署 Flink所依赖的jar包

    hadoop-yarn-server-resourcemanager-2.6.0-API文档-中文版.zip

    赠送jar包:hadoop-yarn-server-resourcemanager-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-resourcemanager-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-resourcemanager-2.6.0-sources.jar; 赠送...

    Hadoop-eclipse-plugin-2.7.2

    《Hadoop-eclipse-plugin-2.7.2:在Eclipse中轻松开发Hadoop应用》 在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要...

Global site tag (gtag.js) - Google Analytics