浏览 1926 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2014-04-22
我们都知道在数据库里,多个表之间是可以根据某个链接键进行join的,这也是数据库的范式规范,通过主外键的关联,由此来减少数据冗余,提高性能。当然近几年,随着NOSQL的兴起,出现了基于列的的列式数据库,典型的有Hbase,MongonDB,Cassdran,等等,NOSQL数据库弱化了关联,直接将一整条数据,存入一列,以及去掉了数据库的部分事务特性,从而在海量数据面前显得游刃有余,当然,大部分的NOSQL不支持join操作,也没有绝对的必要支持,因为现在,我们完全是把一整条数据存在了一起,虽然多了许多冗余,但也换来了比较高检索性能,扩展性能,可靠性能。但某些业务场景下,我们仍然需要Join操作,这时候怎么办?
如果数据量比较大的情况下,我们可以使用Hadoop的MapReduce来完成大表join,尤其对Hbase的某些表进行join操作,当然我们也可以使用Hive或Pig来完成,其实质在后台还是运行的一个MR程序。 那么,散仙今天就来看下如何使用MapReduce来完成一个join操作,Hadoop的join分为很多种例如;Reduce链接,Map侧链接,半链接和Reduce侧链接+BloomFilter等等,各个链接都有自己特定的应用场景,没有绝对的谁好谁坏。 今天散仙要说的是,基于Reduce侧的链接,原理如下: 1、在Reudce端进行连接。 在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下: Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。 本次的实现是基于hadoop的旧版API+contribu扩展包里的,DataJoin的工具类辅助来完成的,下篇博客,将会给出,基于新版API,独立来完成Reduce侧的连接示例。 现在看下散仙的两个文件的测试数据,一个是a.txt,另一个是b.txt <pre name="code" class="java"> a文件的数据 1,三劫散仙,13575468248 2,凤舞九天,18965235874 3,忙忙碌碌,15986854789 4,少林寺方丈,15698745862 </pre> <pre name="code" class="java"> b文件的数据 3,A,99,2013-03-05 1,B,89,2013-02-05 2,C,69,2013-03-09 3,D,56,2013-06-07 </pre> 源码如下: <pre name="code" class="java">package com.qin.reducejoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import com.qin.joinreduceerror.JoinReduce; /*** * * Hadoop1.2的版本,旧版本实现的Reduce侧连接 * * @author qindongliang * * 大数据交流群:376932160 * 搜索技术交流群:324714439 * * */ public class DataJoin extends Configured implements Tool { /** * * Map实现 * * */ public static class MapClass extends DataJoinMapperBase { /** * 读取输入的文件路径 * * **/ protected Text generateInputTag(String inputFile) { //返回文件路径,做标记 return new Text(inputFile); } /*** * 分组的Key * * **/ protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } /** * * Reduce进行笛卡尔积 * * **/ public static class Reduce extends DataJoinReducerBase { /*** * 笛卡尔积 * * */ protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length &lt; 2) return null; String joinedStr = ""; for (int i=0; i&lt;values.length; i++) { if (i &gt; 0) {joinedStr += ",";} TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } /** * * 自定义的输出类型 * * ***/ public static class TaggedWritable extends TaggedMapOutput { private Writable data; /** * 注意加上构造方法 * * */ public TaggedWritable() { // TODO Auto-generated constructor stub } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { this.tag.write(out); //此行代码很重要 out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入此部分代码,否则,可能报空指针异常 String temp=in.readUTF(); if (this.data == null|| !this.data.getClass().getName().equals(temp)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); job.set("mapred.job.tracker","192.168.75.130:9001"); ////读取person中的数据字段 job.setJar("tt.jar"); job.setJarByClass(DataJoin.class); System.out.println("模式: "+job.get("mapred.job.tracker"));; String path="hdfs://192.168.75.130:9000/root/outputjoindb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } Path in = new Path("hdfs://192.168.75.130:9000/root/inputjoindb"); // Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, p); job.setJobName("cee"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } } </pre> 运行,日志 <pre name="code" class="java">模式: 192.168.75.130:9001 输出路径存在,已删除! INFO - NativeCodeLoader.&lt;clinit&gt;(43) | Loaded the native-hadoop library WARN - LoadSnappy.&lt;clinit&gt;(46) | Snappy native library not loaded INFO - FileInputFormat.listStatus(199) | Total input paths to process : 2 INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0025 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 33% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0025 INFO - Counters.log(585) | Counters: 30 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=14335 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=3 INFO - Counters.log(589) | Data-local map tasks=3 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9868 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=207 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=837 INFO - Counters.log(589) | HDFS_BYTES_READ=513 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=221032 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=849 INFO - Counters.log(589) | Map input records=8 INFO - Counters.log(589) | Reduce shuffle bytes=849 INFO - Counters.log(589) | Spilled Records=16 INFO - Counters.log(589) | Map output bytes=815 INFO - Counters.log(589) | Total committed heap usage (bytes)=496644096 INFO - Counters.log(589) | CPU time spent (ms)=2080 INFO - Counters.log(589) | Map input bytes=187 INFO - Counters.log(589) | SPLIT_RAW_BYTES=306 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | Reduce input records=8 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=623570944 INFO - Counters.log(589) | Reduce output records=4 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2908262400 INFO - Counters.log(589) | Map output records=8 </pre> 运行结果,如下图所示: 可以看出,MR正确的完成了join操作,需要注意的是Reduce侧连接的不足之处: 之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。 另外一点需要注意的是,散仙在eclipse里进行调试,Local模式下会报异常,建议提交到hadoop的测试集群上进行测试。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |