在MR的时候经常会遇到多数据源join的问题,如果简单的分析任务采用hive处理就好,如果复杂一点需要自己写MR。
多数据源采用MultipleInputs类的addInputPath方法添加。
Job类
public class EfcOrderProRangeOdJob extends Configured implements Tool { //TODO 路径 private final static String INTPUT_A = "D:/order/order/"; private final static String INTPUT_B = "D:/order/address/"; private final static String OUTPUT = "D:/testAAAAA/"; // private final static String OUTPUT = "/warehouse/tmp/pt_eft_order_pro_range/"; private final static String OUTPUT_TABLE = "fct_pt_icr_trade_day"; public static void main(String[] args) { try { int res = ToolRunner.run(new Configuration(), new EfcOrderProRangeOdJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { try { String start = "20130217"; //TODO Configuration conf = ConfUtil.getConf(getConf()); conf.set("start", start); Job job1 = Job.getInstance(conf, "pt_eft_order_pro_range_first"); Path pathOrder = new Path(INTPUT_A); Path pathAddress = new Path(INTPUT_B); Path output = new Path(OUTPUT + start + "/"); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output,true); } job1.setMapOutputKeyClass(TextPair.class); job1.setMapOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job1, output); MultipleInputs.addInputPath(job1, pathOrder, TextInputFormat.class, EfcOrderProRangeOrderMapper.class); MultipleInputs.addInputPath(job1, pathAddress, TextInputFormat.class, EfcOrderProRangeAddressMapper.class); job1.setReducerClass(EfcOrderProRangeReducer.class); job1.setJarByClass(EfcOrderProRangeOdJob.class); Job job2 = Job.getInstance(conf,"pt_eft_order_pro_range_second"); FileInputFormat.setInputPaths(job2, output); job2.setMapperClass(EfcOrderProRangeSecondMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); TableMapReduceUtil.initTableReducerJob(OUTPUT_TABLE, EfcOrderProRangeSecondReducer.class, job2); return JobChainHandler.handleJobChain(job1, job2, "pt_eft_order_pro_range"); } catch (Exception e) { e.printStackTrace(); return 0; } } public static class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } public int compareTo(TextPair tp) { return first.compareTo(tp.first); } } }
mapper1类
public class EfcOrderProRangeOrderMapper extends Mapper<LongWritable, Text, TextPair, Text>{ private static final int ORDER_ID_INDEX = 2; private static final int ORDER_STATUS_INDEX = 5; private static final String EFFECTIVE_STATUS = "3"; private static final String COL_SPLITER = "\001"; @Override public void map(LongWritable key, Text value, Context context) { try { String [] order = value.toString().split(COL_SPLITER); String orderId = order[ORDER_ID_INDEX]; String status = order[ORDER_STATUS_INDEX]; if(!EFFECTIVE_STATUS.equals(status)){ return; } TextPair textPair = new TextPair(new Text(orderId),new Text("order")); context.write(textPair, new Text(status)); } catch (Exception e) { e.printStackTrace(); } } }
mapper2类
public class EfcOrderProRangeAddressMapper extends Mapper<LongWritable, Text, TextPair, Text>{ //TODO 通过hivemeta去取index private static final int ORDER_ID_INDEX = 0; private static final int PROVINCE_ID_INDEX = 1; private static final String COL_SPLITER = "\001"; @Override public void map(LongWritable key, Text value, Context context) { try { String [] address = value.toString().split(COL_SPLITER); String orderId = address[ORDER_ID_INDEX]; String province = address[PROVINCE_ID_INDEX]; TextPair textPair = new TextPair(new Text(orderId),new Text("address")); context.write(textPair, new Text(province)); } catch (Exception e) { e.printStackTrace(); } } }
reducer端做join操作,通过TextPair中的second来获取来源,取得需要取得的维度。
public class EfcOrderProRangeReducer extends Reducer<TextPair,Text,Text,Text>{ private static final String COL_SPLITER = "\001"; @Override protected void reduce(TextPair key, Iterable<Text> values, Context context) { try { Text tag = key.getSecond(); Text orderId = key.getFirst(); String status = null;String province = null; StringBuilder out = new StringBuilder(); for (Text value : values) { if(tag.toString().equals("order")){ status = value.toString(); } if(tag.toString().equals("address")){ province = value.toString(); } } if (province != null && status != null){ out.append(orderId.toString()).append(COL_SPLITER).append(status).append(COL_SPLITER).append(province); context.write(null, new Text(out.toString())); } } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
ReduceJoin 的实现原理是,在 Map 阶段将来自不同数据源的数据输出到 Reduce 阶段,然后在 Reduce 阶段对数据进行 Join 操作。 ReduceJoin 的优点是可以处理大量的数据,但是,ReduceJoin 也存在一些缺点,例如,...
在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在...
在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...
3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...
在Hadoop MapReduce中,数据处理的核心任务之一就是JOIN操作,它相当于关系数据库中的连接操作,用于合并来自不同数据源的相关信息。本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码...
在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...
这些运算符包括数据源(DataSource)、应用(Apply)、窗口(Window)、选择(Select)、连接(Join)、并集(Union)和排序(Sort)等操作。在LOT之上,还有一个名为伏羲的调度系统,负责调度LOT的任务执行。 为了...
13. 使用 MapReduce 实现 Join 操作:使用 MapReduce 来实现数据的 Join 操作,以便将多个数据源合并成一个结果。 14. 使用 MapReduce 实现排序:使用 MapReduce 来实现数据的排序,以便对数据进行排序处理。 15. ...
3. 协调机制:多路联接中涉及到多个数据源,有效的协调机制可以提高数据联接的效率,减少不必要的计算和数据传输。 4. 中间结果大小:中间结果的大小直接影响到整个MapReduce作业的性能,尤其是在多路联接中,如何...
在项目中,你会看到源代码文件,包括`WordCount.java`,这是实现MapReduce任务的主要类。这个类会包含Map和Reduce函数的定义,以及主程序逻辑,用于提交作业到Hadoop集群。 在Eclipse中编译并运行程序后,Hadoop会...
通过组合多个MapReduce任务,可以解决更复杂的数据处理问题,如Join操作和聚合查询。此外,Hadoop的YARN框架提供了资源管理和调度功能,使得多种计算框架如Spark和Flink能在同一集群上协同工作,进一步提升了大数据...
源代码可能会初始化MPI环境,定义并行任务,利用MapReduce思想处理数据,以及通过pthread库进行多线程编程。通过对这个源码的分析和运行,学习者可以深入理解并行计算的基本原理和实际应用,进一步提升在分布式系统...
技术点7 将HBase 作为MapReduce 的数据源 2.3 将数据导出Hadoop 2.3.1 将数据导入本地文件系统 技术点8 自动复制HDFS 中的文件 2.3.2 数据库 技术点9 使用Sqoop 将数据导入MySQL 2.3.3 Hbase 技术点...
在“MapReduce-master”这个压缩包中,可能包含了MapReduce作业的源代码、数据文件、作业描述以及可能的测试脚本。通过分析和运行这些代码,学习者可以更好地理解和实践MapReduce编程模型,从而提升在大数据处理领域...
4. Join:合并来自不同数据源的数据,例如,将销售数据与客户信息进行关联。 5. Filtering:根据条件过滤数据,只保留满足条件的记录。 6. Sorting:对数据进行排序,例如,按销售额排序产品列表。 7. Sampling:从...
3. **join操作**:在分布式环境中合并来自不同数据源的数据。例如,可以将用户信息与购买记录进行关联,以便进行更复杂的分析。 4. **数据分析**:例如,统计网页链接关系、分析用户行为模式等,这些都是大数据分析...
1. **实时JOIN处理**:通过集成实时JOIN功能,系统能够高效地处理来自不同数据源的流数据,实现数据间的实时关联操作。 2. **灵活的Condition管理**:通过对Condition的灵活管理和监控,确保数据处理过程中各种...