`
zhenghangcx
  • 浏览: 19012 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MapReduce实现reduce端join,多数据源

 
阅读更多

在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。

 

多数据源采用MultipleInputs类的addInputPath方法添加。

 

Job类

public class EfcOrderProRangeOdJob extends Configured implements Tool {
	
	//TODO 路径
	private final static String INTPUT_A = "D:/order/order/";
	private final static String INTPUT_B = "D:/order/address/";
	private final static String OUTPUT = "D:/testAAAAA/";
//	private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/";
	private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day";

	public static void main(String[] args) {
		try {
			int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		try {
			String start = "20130217"; //TODO
			Configuration conf = ConfUtil.getConf(getConf());
			conf.set("start", start);
			Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first");
			
			Path pathOrder = new Path(INTPUT_A);
			Path pathAddress = new Path(INTPUT_B);
			Path output = new Path(OUTPUT + start + "/");
			
			FileSystem fs = FileSystem.get(conf);
			if(fs.exists(output)){
				fs.delete(output,true);
			}
			
			job1.setMapOutputKeyClass(TextPair.class);
			job1.setMapOutputValueClass(Text.class);
			
			FileOutputFormat.setOutputPath(job1, output);
			MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class);
			MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class);
			
			job1.setReducerClass(EfcOrderProRangeReducer.class);
			job1.setJarByClass(EfcOrderProRangeOdJob.class);
			
			Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second");
			FileInputFormat.setInputPaths(job2, output);
			job2.setMapperClass(EfcOrderProRangeSecondMapper.class);
			job2.setMapOutputKeyClass(Text.class);
			job2.setMapOutputValueClass(IntWritable.class);
			TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2);
			
			return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range");
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}
	
	public static class TextPair implements WritableComparable<TextPair> {
		private Text first;
		private Text second;

		public TextPair() {
			set(new Text(), new Text());
		}

		public TextPair(String first, String second) {
			set(new Text(first), new Text(second));
		}

		public TextPair(Text first, Text second) {
			set(first, second);
		}

		public void set(Text first, Text second) {
			this.first = first;
			this.second = second;
		}

		public Text getFirst() {
			return first;
		}

		public Text getSecond() {
			return second;
		}

		public void write(DataOutput out) throws IOException {
			first.write(out);
			second.write(out);
		}

		public void readFields(DataInput in) throws IOException {
			first.readFields(in);
			second.readFields(in);
		}

		public int compareTo(TextPair tp) {
			return first.compareTo(tp.first);
		}
	}
}

 

mapper1类

public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{
	private static final int ORDER_ID_INDEX = 2;
	private static final int ORDER_STATUS_INDEX = 5;
	private static final String EFFECTIVE_STATUS = "3";
	private static final String COL_SPLITER = "\001";
	
	@Override
	public void map(LongWritable key, Text value, Context context) {
		try {
			String [] order = value.toString().split(COL_SPLITER);
			String orderId = order[ORDER_ID_INDEX];
			String status = order[ORDER_STATUS_INDEX];
			if(!EFFECTIVE_STATUS.equals(status)){
				return;
			}
			TextPair textPair = new TextPair(new Text(orderId),new Text("order"));
			context.write(textPair, new Text(status));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

mapper2类

public class EfcOrderProRangeAddressMapper extends Mapper<LongWritable, Text, TextPair, Text>{
	//TODO 通过hivemeta去取index
	private static final int ORDER_ID_INDEX = 0;
	private static final int PROVINCE_ID_INDEX = 1;
	private static final String COL_SPLITER = "\001";
	@Override
	public void map(LongWritable key, Text value, Context context) {
		try {
			
			String [] address = value.toString().split(COL_SPLITER);
			String orderId = address[ORDER_ID_INDEX];
			String province = address[PROVINCE_ID_INDEX];
			
			TextPair textPair = new TextPair(new Text(orderId),new Text("address"));
			
			context.write(textPair, new Text(province));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

reducer端做join操作,通过TextPair中的second来获取来源,取得需要取得的维度。

public class EfcOrderProRangeReducer extends Reducer<TextPair,Text,Text,Text>{
	
	private static final String COL_SPLITER = "\001";
	@Override
	protected void reduce(TextPair key, Iterable<Text> values, Context context) {
		try {
			Text tag = key.getSecond();
			Text orderId = key.getFirst();
			String status = null;String province = null;
			StringBuilder out = new StringBuilder();
			
			for (Text value : values) {
				if(tag.toString().equals("order")){
					status = value.toString();
				}
				if(tag.toString().equals("address")){
					province = value.toString();
				}
			}
			if (province != null && status != null){
				out.append(orderId.toString()).append(COL_SPLITER).append(status).append(COL_SPLITER).append(province);
				context.write(null, new Text(out.toString()));
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

分享到:
评论

相关推荐

    【MapReduce篇06】MapReduce之MapJoin和ReduceJoin1

    ReduceJoin 的实现原理是,在 Map 阶段将来自不同数据源的数据输出到 Reduce 阶段,然后在 Reduce 阶段对数据进行 Join 操作。 ReduceJoin 的优点是可以处理大量的数据,但是,ReduceJoin 也存在一些缺点,例如,...

    19、Join操作map side join 和 reduce side join

    在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...

    hadoop Join代码(map join 和reduce join)

    在Hadoop MapReduce中,数据处理的核心任务之一就是JOIN操作,它相当于关系数据库中的连接操作,用于合并来自不同数据源的相关信息。本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码...

    Hadoop Reduce Join及基于MRV2 API 重写

    在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...

    少杰 (徐东):ODPS MapReduce对外开放实践

    这些运算符包括数据源(DataSource)、应用(Apply)、窗口(Window)、选择(Select)、连接(Join)、并集(Union)和排序(Sort)等操作。在LOT之上,还有一个名为伏羲的调度系统,负责调度LOT的任务执行。 为了...

    云应用系统开发第二次项目(mapreduce)

    13. 使用 MapReduce 实现 Join 操作:使用 MapReduce 来实现数据的 Join 操作,以便将多个数据源合并成一个结果。 14. 使用 MapReduce 实现排序:使用 MapReduce 来实现数据的排序,以便对数据进行排序处理。 15. ...

    使用MapReduce高效处理多路联接

    3. 协调机制:多路联接中涉及到多个数据源,有效的协调机制可以提高数据联接的效率,减少不必要的计算和数据传输。 4. 中间结果大小:中间结果的大小直接影响到整个MapReduce作业的性能,尤其是在多路联接中,如何...

    MapReduce版的HelloWorld

    在项目中,你会看到源代码文件,包括`WordCount.java`,这是实现MapReduce任务的主要类。这个类会包含Map和Reduce函数的定义,以及主程序逻辑,用于提交作业到Hadoop集群。 在Eclipse中编译并运行程序后,Hadoop会...

    《大数据导论》MapReduce的应用.docx

    通过组合多个MapReduce任务,可以解决更复杂的数据处理问题,如Join操作和聚合查询。此外,Hadoop的YARN框架提供了资源管理和调度功能,使得多种计算框架如Spark和Flink能在同一集群上协同工作,进一步提升了大数据...

    mpi.rar_MPI_mapReduce_pthread mpi_并行计算

    源代码可能会初始化MPI环境,定义并行任务,利用MapReduce思想处理数据,以及通过pthread库进行多线程编程。通过对这个源码的分析和运行,学习者可以深入理解并行计算的基本原理和实际应用,进一步提升在分布式系统...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点7 将HBase 作为MapReduce 的数据源 2.3 将数据导出Hadoop 2.3.1 将数据导入本地文件系统 技术点8 自动复制HDFS 中的文件 2.3.2 数据库 技术点9 使用Sqoop 将数据导入MySQL 2.3.3 Hbase 技术点...

    MapReduce:这些是MapReduce(CS6240)中课程并行数据处理的作业

    在“MapReduce-master”这个压缩包中,可能包含了MapReduce作业的源代码、数据文件、作业描述以及可能的测试脚本。通过分析和运行这些代码,学习者可以更好地理解和实践MapReduce编程模型,从而提升在大数据处理领域...

    mapreduce-patterns-examples

    4. Join:合并来自不同数据源的数据,例如,将销售数据与客户信息进行关联。 5. Filtering:根据条件过滤数据,只保留满足条件的记录。 6. Sorting:对数据进行排序,例如,按销售额排序产品列表。 7. Sampling:从...

    Hadoop-MapReduce-Cookbook-Example-Code:Hadoop MapReduce Cookbook 示例代码

    3. **join操作**:在分布式环境中合并来自不同数据源的数据。例如,可以将用户信息与购买记录进行关联,以便进行更复杂的分析。 4. **数据分析**:例如,统计网页链接关系、分析用户行为模式等,这些都是大数据分析...

    阿里巴巴分布式流数据实时与持续计算 强琦

    1. **实时JOIN处理**:通过集成实时JOIN功能,系统能够高效地处理来自不同数据源的流数据,实现数据间的实时关联操作。 2. **灵活的Condition管理**:通过对Condition的灵活管理和监控,确保数据处理过程中各种...

Global site tag (gtag.js) - Google Analytics