`

hadoop join实现

 
阅读更多

hadoop的join实现,实现符合关键字,多对多连接

 

key:

 

public class MultiKey implements WritableComparable<MultiKey> {

	private Text departId = new Text();
	private Text departNo = new Text();

	public Text getDepartId() {
		return departId;
	}

	public void setDepartId(String departId) {
		this.departId = new Text(departId);
	}

	public Text getDepartNo() {
		return departNo;
	}

	public void setDepartNo(String departNo) {
		this.departNo = new Text(departNo);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		departId.write(out);
		departNo.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.departId.readFields(in);
		this.departNo.readFields(in);
	}

	@Override
	public int compareTo(MultiKey o) {
		return (this.departId.compareTo(o.departId) !=0)? this.departId.compareTo(o.departId) : this.departNo.compareTo(o.departNo);
		
	}
	
	@Override
	public String toString(){
		return this.departId.toString()+" : "+this.departNo.toString();
	}
	
	@Override
	public int hashCode(){
		return 0;
	}
}
 

 

value:

 

public class Employee implements WritableComparable<Employee> {

	private String empName="";
	private String departId="";
	private String departNo="";
	private String departName="";
	private int flag;
	
	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

	public String getEmpName() {
		return empName;
	}

	public void setEmpName(String empName) {
		this.empName = empName;
	}

	public String getDepartId() {
		return departId;
	}

	public void setDepartId(String departId) {
		this.departId = departId;
	}

	public String getDepartNo() {
		return departNo;
	}

	public void setDepartNo(String departNo) {
		this.departNo = departNo;
	}

	public String getDepartName() {
		return departName;
	}

	public void setDepartName(String departName) {
		this.departName = departName;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.empName);
		out.writeUTF(this.departId);
		out.writeUTF(this.departNo);
		out.writeUTF(this.departName);
		out.writeInt(this.flag);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.empName = in.readUTF();
		this.departId = in.readUTF();
		this.departNo = in.readUTF();
		this.departName = in.readUTF();
		this.flag = in.readInt();
	}
	
	public static void writeAllProperties(DataOutput out,Class<? extends WritableComparable<?>> type,Object obj) throws IllegalArgumentException, IllegalAccessException{
		Field[] fields =  type.getDeclaredFields();
		for (Field field : fields) {
			System.out.println(field.get(obj));
		}
	}

	@Override
	public int compareTo(Employee o) {
		return 0;
	}
	
	
	@Override
	public String toString(){
		return this.empName+"  "+this.departName;
	}

}
 

 

 

maper:

 

 

public class MyJoinMapper extends Mapper<LongWritable, Text, MultiKey, Employee>{
	@Override
	public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
		String line = value.toString();
		String[] array = line.split(",");
		visit(array,context);
	}
	
	private void visit(String[] array,Context context) throws IOException,InterruptedException{
		int i = Integer.valueOf(array[0]);
		MultiKey key = new MultiKey();
		Employee e = new Employee();
		switch (i) {
		case 1://name
			e.setEmpName(array[1]);
			e.setFlag(1);
			break;
		default://depart
			e.setDepartName(array[1]);
			e.setFlag(2);
			break;
		}
		e.setDepartId(array[2]);
		e.setDepartNo(array[3]);
		key.setDepartId(e.getDepartId());
		key.setDepartNo(e.getDepartNo());
		context.write(key, e);
	}
}
 

 

 

reducer:

 

 

public class MyJoinReducer extends Reducer<MultiKey, Employee, IntWritable, Text>{

	List<emp> empList = new LinkedList<emp>();
	List<depart> departList = new LinkedList<MyJoinReducer.depart>();
	
	@Override
	public void reduce(MultiKey key,Iterable<Employee> values,Context context) throws IOException,InterruptedException{
		for (Employee employee : values) {
			visite(employee);
		}
		System.out.println("----------");
		System.out.println(key);
		for (emp em : empList) {
			for (depart de : departList) {
				Employee e = new Employee();
				e.setDepartId(em.departId);
				e.setDepartName(de.departName);
				e.setDepartNo(em.departNo);
				e.setEmpName(em.empName);
				context.write(new IntWritable(1), new Text(e.toString()));
			}
		}

		empList = new LinkedList<emp>();
		departList = new LinkedList<MyJoinReducer.depart>();
	}
	
	private void visite(Employee e){
		switch (e.getFlag()) {
		case 1:
			emp em = new emp();
			em.departId = e.getDepartId();
			em.departNo = e.getDepartName();
			em.empName = e.getEmpName();
			empList.add(em);
			break;

		default:
			depart de = new depart();
			de.departName = e.getDepartName();
			departList.add(de);
			break;
		}
	}
	
	
	private class emp{
		public String empName;
		public String departId;
		public String departNo;
	}
	
	private class depart{
		public String departName;
	}
	
}

 

comparator

 

public class MyJoinComparator extends WritableComparator{

	protected MyJoinComparator() {
		super(MultiKey.class,true);
	}
	
}
 

 

groupcomparator:

 

 

public class MyJoinGroupComparator implements RawComparator<MultiKey> {

	private DataInputBuffer buffer = new DataInputBuffer();

	@Override
	public int compare(MultiKey key1, MultiKey key2) {
		return key1.compareTo(key2);
	}

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		return new MyJoinComparator().compare(b1, s1, l1, b2, s2, l2);
	}

}
 

 

今天iteye的编辑器好坑爹啊,不断的崩溃

 

 

补个测试类

 

public class MyJoinTest {
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		upload("dirk1.txt", "dirk.txt");
		upload("dirk2.txt","dirk2.txt");
		delete();
		Configuration conf = new Configuration();
		Job job = new Job(conf, "joinJob");
		job.setMapperClass(MyJoinMapper.class);
		job.setReducerClass(MyJoinReducer.class);
		job.setMapOutputKeyClass(MultiKey.class);
		job.setMapOutputValueClass(Employee.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		job.setGroupingComparatorClass(MyJoinGroupComparator.class);
		FileInputFormat.addInputPath(job, new Path("/user/dirk3/input"));
		FileOutputFormat.setOutputPath(job, new Path("/user/dirk3/output"));
		job.waitForCompletion(true);
	}
	
	public static void upload(String local,String remote) throws IOException{
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		String l = MyJoinTest.class.getResource("").getPath()+"/"+local;
		fs.copyFromLocalFile(false, true, new Path(l), new Path("/user/dirk3/input/"+remote));
	}
	
	public static void delete() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path("/user/dirk3/output"), true);
	}
	
	public static void run(){
		JobConf jobConf = new JobConf();
		jobConf.setOutputKeyComparatorClass(MyJoinComparator.class);
		jobConf.setOutputValueGroupingComparator(MyJoinComparator.class);
	}

}
 

join的主要实现在reducer中

关于comparator,在通过maper向context中添加key value后,通过combine,partition之后,进入reducer阶段,进行groupComparator,决定哪些key同时进入一个reducer

 

分享到:
评论

相关推荐

    hadoop Join代码(map join 和reduce join)

    本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...

    hadoop join implement

    然而,join操作因其复杂性和数据分布的特点,在Hadoop中实现起来较为困难。具体而言: 1. **数据分布与倾斜问题**:在分布式环境中,数据的不均匀分布会导致join操作性能下降。 2. **MapReduce的局限性**:Hadoop的...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    本课程设计主要围绕如何使用Hadoop的MapReduce实现SQL中的统计、GROUP BY和JOIN操作,这是一次深入理解大数据处理机制的实践过程。 首先,让我们来探讨SQL的统计功能。在SQL中,统计通常涉及到COUNT、SUM、AVG、MAX...

    Hadoop Reduce Join及基于MRV2 API 重写

    在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...

    Hadoop datajoin示例(客户和订单信息)

    文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.

    19、Join操作map side join 和 reduce side join

    本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...

    elasticsearch与hadoop比较

    在使用上,Elasticsearch提供了RESTful接口,使得其可以被轻松集成到各种Web应用中,实现搜索功能。同时,Elasticsearch的聚合功能也非常出色,能够对数据进行高效的统计分析,这一点上Elasticsearch已经超越了传统...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - 这个应用程序很可能是一个示例,演示了如何在Hadoop MapReduce中实现多表关联并处理Job间的依赖和参数传递。它可能包括多个Job,每个Job负责一个或多个表的处理,并通过特定机制将结果传递给后续的Job。 5. **...

    浪潮hadoop

    同时,Hadoop通过MapReduce实现数据处理的并行化,极大地提升了处理大数据的速度。 Hadoop不仅作为数据存储的补充,还能作为数据湖,用于长期存储大量数据,以便在需要时进行分析。此外,Hadoop优化了大文件存储和...

    spark2.1.0-bin-hadoop2.7

    4. 流处理改进:Spark Streaming引入了新的DStream操作,如join和window,增强了实时流处理的能力。 二、Hadoop 2.7集成 Hadoop 2.7是一个稳定且广泛使用的分布式存储和计算框架,其YARN资源管理系统为Spark提供了...

    hadoop 实战 dev_02

    课程内容还涉及了如何从nginx日志中提取访问量最高的IP地址,使用Unix/Linux的工具链,如awk、grep、sort、join等进行简单的日志分析。 综上所述,本课程深入介绍了Hadoop在Web日志分析中的应用,从基本的日志概念...

    Hadoop和hive大数据面试题

    MapReduce则是用于并行处理和计算的大数据处理模型,由“Map”阶段和“Reduce”阶段组成,实现了数据的分而治之。 在Hadoop面试中,可能会遇到以下几个关键知识点: 1. Hadoop的架构:理解Hadoop的主节点...

    基于Hadoop的数据仓库Hive学习指南.doc

    - **不支持记录级别操作**:无法直接更新、插入或删除单条记录,通常通过创建新表或写入文件来实现数据更新。 - **ETL工具**:Hive支持数据提取、转换和加载,适合大规模数据的预处理和分析。 - **类SQL查询语言...

    Hadoop开发者第四期

    - MapReduce框架下的Join实现方法。 - 外部排序和归并Join算法的应用。 - Hive SQL中的Join操作及其优化策略。 - 实际应用场景中的性能对比和案例分析。 #### 四、配置Hive元数据DB为PostgreSQL - **目的**: 将...

    《Hadoop 数据分析平台》课程毕业测试题

    - **解释**: Sqoop是一个用于在Hadoop和关系型数据库之间传输数据的工具,它通过JDBC驱动程序连接到关系型数据库,从而实现数据的导入和导出。因此,正确答案是C:JDBC。 ### 27. Oracle数据导入HDFS的方法 - **...

Global site tag (gtag.js) - Google Analytics