写了关于Hadoop的Map侧join
和Reduce的join,今天我们就来在看另外一种比较中立的Join。
SemiJoin,一般称为半链接,其原理是在Map侧过滤掉了一些不需要join的数据,从而大大减少了reduce的shffule时间,因为我们知道,如果仅仅使用Reduce侧连接,那么如果一份数据中,存在大量的无效数据,而这些数据,在join中,并不需要,但是因为没有做过预处理,所以这些数据,直到真正的执行reduce函数时,才被定义为无效数据,而这时候,前面已经执行过shuffle和merge和sort,所以这部分无效的数据,就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势,就越明显。
之所以会出现半连接,这其实也是reduce侧连接的一个变种,只不过我们在Map侧,过滤掉了一些无效的数据,所以减少了reduce过程的shuffle时间,所以能获取一个性能的提升。
具体的原理也是利用DistributedCache将小表的的分发到各个节点上,在Map过程的setup函数里,读取缓存里面的文件,只将小表的链接键存储在hashset里,在map函数执行时,对每一条数据,进行判断,如果这条数据的链接键为空或者在hashset里面不存在,那么则认为这条数据,是无效的数据,所以这条数据,并不会被partition分区后写入磁盘,参与reduce阶段的shuffle和sort,所以在一定程序上,提升了join性能。需要注意的是如果
小表的key依然非常巨大,可能会导致我们的程序出现OOM的情况,那么这时候我们就需要考虑其他的链接方式了。
测试数据如下:
模拟小表数据:
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
模拟大表数据:
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10
代码如下:
- package com.semijoin;
- import java.io.BufferedReader;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- /***
- *
- * Hadoop1.2的版本
- *
- * hadoop的半链接
- *
- * SemiJoin实现
- *
- * @author qindongliang
- *
- * 大数据交流群:376932160
- * 搜索技术交流群:324714439
- *
- *
- *
- * **/
- public class Semjoin {
- /**
- *
- *
- * 自定义一个输出实体
- *
- * **/
- private static class CombineEntity implements WritableComparable<CombineEntity>{
- private Text joinKey;//连接key
- private Text flag;//文件来源标志
- private Text secondPart;//除了键外的其他部分的数据
- public CombineEntity() {
- // TODO Auto-generated constructor stub
- this.joinKey=new Text();
- this.flag=new Text();
- this.secondPart=new Text();
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public Text getFlag() {
- return flag;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public int compareTo(CombineEntity o) {
- // TODO Auto-generated method stub
- return this.joinKey.compareTo(o.joinKey);
- }
- }
- private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
- private CombineEntity combine=new CombineEntity();
- private Text flag=new Text();
- private Text joinKey=new Text();
- private Text secondPart=new Text();
- /**
- * 存储小表的key
- *
- *
- * */
- private HashSet<String> joinKeySet=new HashSet<String>();
- @Override
- protected void setup(Context context)throws IOException, InterruptedException {
- //读取文件流
- BufferedReader br=null;
- String temp;
- // 获取DistributedCached里面 的共享文件
- Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
- for(Path p:path){
- if(p.getName().endsWith("a.txt")){
- br=new BufferedReader(new FileReader(p.toString()));
- //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
- while((temp=br.readLine())!=null){
- String ss[]=temp.split(",");
- //map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
- joinKeySet.add(ss[0]);//加入小表的key
- }
- }
- }
- }
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- //获得文件输入路径
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- if(pathName.endsWith("a.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- * 在这里过滤必须要的连接字符
- *
- * */
- if(joinKeySet.contains(valueItems[0])){
- //设置标志位
- flag.set("0");
- //设置链接键
- joinKey.set(valueItems[0]);
- //设置第二部分
- secondPart.set(valueItems[1]+"\t"+valueItems[2]);
- //封装实体
- combine.setFlag(flag);//标志位
- combine.setJoinKey(joinKey);//链接键
- combine.setSecondPart(secondPart);//其他部分
- //写出
- context.write(combine.getJoinKey(), combine);
- }else{
- System.out.println("a.txt里");
- System.out.println("在小表中无此记录,执行过滤掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }else if(pathName.endsWith("b.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- *
- * 判断是否在集合中
- *
- * */
- if(joinKeySet.contains(valueItems[0])){
- //设置标志位
- flag.set("1");
- //设置链接键
- joinKey.set(valueItems[0]);
- //设置第二部分注意不同的文件的列数不一样
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
- //封装实体
- combine.setFlag(flag);//标志位
- combine.setJoinKey(joinKey);//链接键
- combine.setSecondPart(secondPart);//其他部分
- //写出
- context.write(combine.getJoinKey(), combine);
- }else{
- //执行过滤 ......
- System.out.println("b.txt里");
- System.out.println("在小表中无此记录,执行过滤掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }
- }
- }
- private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
- //存储一个分组中左表信息
- private List<Text> leftTable=new ArrayList<Text>();
- //存储一个分组中右表信息
- private List<Text> rightTable=new ArrayList<Text>();
- private Text secondPart=null;
- private Text output=new Text();
- //一个分组调用一次
- @Override
- protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
- throws IOException, InterruptedException {
- leftTable.clear();//清空分组数据
- rightTable.clear();//清空分组数据
- /**
- * 将不同文件的数据,分别放在不同的集合
- * 中,注意数据量过大时,会出现
- * OOM的异常
- *
- * **/
- for(CombineEntity ce:values){
- this.secondPart=new Text(ce.getSecondPart().toString());
- //左表
- if(ce.getFlag().toString().trim().equals("0")){
- leftTable.add(secondPart);
- }else if(ce.getFlag().toString().trim().equals("1")){
- rightTable.add(secondPart);
- }
- }
- //=====================
- for(Text left:leftTable){
- for(Text right:rightTable){
- output.set(left+"\t"+right);//连接左右数据
- context.write(key, output);//输出
- }
- }
- }
- }
- public static void main(String[] args)throws Exception {
- //Job job=new Job(conf,"myjoin");
- JobConf conf=new JobConf(Semjoin.class);
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- conf.setJar("tt.jar");
- //小表共享
- String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
- //添加到共享cache里
- DistributedCache.addCacheFile(new URI(bpath), conf);
- Job job=new Job(conf, "aaaaa");
- job.setJarByClass(Semjoin.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- //设置Map和Reduce自定义类
- job.setMapperClass(JMapper.class);
- job.setReducerClass(JReduce.class);
- //设置Map端输出
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineEntity.class);
- //设置Reduce端的输出
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileSystem fs=FileSystem.get(conf);
- Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");
- if(fs.exists(op)){
- fs.delete(op, true);
- System.out.println("存在此输出路径,已删除!!!");
- }
- FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
- FileOutputFormat.setOutputPath(job, op);
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
package com.semijoin; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * * Hadoop1.2的版本 * * hadoop的半链接 * * SemiJoin实现 * * @author qindongliang * * 大数据交流群:376932160 * 搜索技术交流群:324714439 * * * * **/ public class Semjoin { /** * * * 自定义一个输出实体 * * **/ private static class CombineEntity implements WritableComparable<CombineEntity>{ private Text joinKey;//连接key private Text flag;//文件来源标志 private Text secondPart;//除了键外的其他部分的数据 public CombineEntity() { // TODO Auto-generated constructor stub this.joinKey=new Text(); this.flag=new Text(); this.secondPart=new Text(); } public Text getJoinKey() { return joinKey; } public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public Text getFlag() { return flag; } public void setFlag(Text flag) { this.flag = flag; } public Text getSecondPart() { return secondPart; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public int compareTo(CombineEntity o) { // TODO Auto-generated method stub return this.joinKey.compareTo(o.joinKey); } } private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{ private CombineEntity combine=new CombineEntity(); private Text flag=new Text(); private Text joinKey=new Text(); private Text secondPart=new Text(); /** * 存储小表的key * * * */ private HashSet<String> joinKeySet=new HashSet<String>(); @Override protected void setup(Context context)throws IOException, InterruptedException { //读取文件流 BufferedReader br=null; String temp; // 获取DistributedCached里面 的共享文件 Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration()); for(Path p:path){ if(p.getName().endsWith("a.txt")){ br=new BufferedReader(new FileReader(p.toString())); //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8")); while((temp=br.readLine())!=null){ String ss[]=temp.split(","); //map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中 joinKeySet.add(ss[0]);//加入小表的key } } } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if(pathName.endsWith("a.txt")){ String valueItems[]=value.toString().split(","); /** * 在这里过滤必须要的连接字符 * * */ if(joinKeySet.contains(valueItems[0])){ //设置标志位 flag.set("0"); //设置链接键 joinKey.set(valueItems[0]); //设置第二部分 secondPart.set(valueItems[1]+"\t"+valueItems[2]); //封装实体 combine.setFlag(flag);//标志位 combine.setJoinKey(joinKey);//链接键 combine.setSecondPart(secondPart);//其他部分 //写出 context.write(combine.getJoinKey(), combine); }else{ System.out.println("a.txt里"); System.out.println("在小表中无此记录,执行过滤掉!"); for(String v:valueItems){ System.out.print(v+" "); } return ; } }else if(pathName.endsWith("b.txt")){ String valueItems[]=value.toString().split(","); /** * * 判断是否在集合中 * * */ if(joinKeySet.contains(valueItems[0])){ //设置标志位 flag.set("1"); //设置链接键 joinKey.set(valueItems[0]); //设置第二部分注意不同的文件的列数不一样 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]); //封装实体 combine.setFlag(flag);//标志位 combine.setJoinKey(joinKey);//链接键 combine.setSecondPart(secondPart);//其他部分 //写出 context.write(combine.getJoinKey(), combine); }else{ //执行过滤 ...... System.out.println("b.txt里"); System.out.println("在小表中无此记录,执行过滤掉!"); for(String v:valueItems){ System.out.print(v+" "); } return ; } } } } private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{ //存储一个分组中左表信息 private List<Text> leftTable=new ArrayList<Text>(); //存储一个分组中右表信息 private List<Text> rightTable=new ArrayList<Text>(); private Text secondPart=null; private Text output=new Text(); //一个分组调用一次 @Override protected void reduce(Text key, Iterable<CombineEntity> values,Context context) throws IOException, InterruptedException { leftTable.clear();//清空分组数据 rightTable.clear();//清空分组数据 /** * 将不同文件的数据,分别放在不同的集合 * 中,注意数据量过大时,会出现 * OOM的异常 * * **/ for(CombineEntity ce:values){ this.secondPart=new Text(ce.getSecondPart().toString()); //左表 if(ce.getFlag().toString().trim().equals("0")){ leftTable.add(secondPart); }else if(ce.getFlag().toString().trim().equals("1")){ rightTable.add(secondPart); } } //===================== for(Text left:leftTable){ for(Text right:rightTable){ output.set(left+"\t"+right);//连接左右数据 context.write(key, output);//输出 } } } } public static void main(String[] args)throws Exception { //Job job=new Job(conf,"myjoin"); JobConf conf=new JobConf(Semjoin.class); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJar("tt.jar"); //小表共享 String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt"; //添加到共享cache里 DistributedCache.addCacheFile(new URI(bpath), conf); Job job=new Job(conf, "aaaaa"); job.setJarByClass(Semjoin.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; //设置Map和Reduce自定义类 job.setMapperClass(JMapper.class); job.setReducerClass(JReduce.class); //设置Map端输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineEntity.class); //设置Reduce端的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb")); FileOutputFormat.setOutputPath(job, op); System.exit(job.waitForCompletion(true)?0:1); } }
运行日志如下:
- 模式: 192.168.75.130:9001
- 存在此输出路径,已删除!!!
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
- WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 50% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002
- INFO - Counters.log(585) | Counters: 29
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | Launched reduce tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=12445
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=2
- INFO - Counters.log(589) | Data-local map tasks=2
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9801
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=172
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=237
- INFO - Counters.log(589) | HDFS_BYTES_READ=455
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=169503
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=227
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=243
- INFO - Counters.log(589) | Map input records=10
- INFO - Counters.log(589) | Reduce shuffle bytes=243
- INFO - Counters.log(589) | Spilled Records=16
- INFO - Counters.log(589) | Map output bytes=215
- INFO - Counters.log(589) | Total committed heap usage (bytes)=336338944
- INFO - Counters.log(589) | CPU time spent (ms)=1770
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=228
- INFO - Counters.log(589) | Reduce input records=8
- INFO - Counters.log(589) | Reduce input groups=4
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=442564608
- INFO - Counters.log(589) | Reduce output records=4
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2184306688
- INFO - Counters.log(589) | Map output records=8
模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 50% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=12445 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=2 INFO - Counters.log(589) | Data-local map tasks=2 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9801 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=237 INFO - Counters.log(589) | HDFS_BYTES_READ=455 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=169503 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=227 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=243 INFO - Counters.log(589) | Map input records=10 INFO - Counters.log(589) | Reduce shuffle bytes=243 INFO - Counters.log(589) | Spilled Records=16 INFO - Counters.log(589) | Map output bytes=215 INFO - Counters.log(589) | Total committed heap usage (bytes)=336338944 INFO - Counters.log(589) | CPU time spent (ms)=1770 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=228 INFO - Counters.log(589) | Reduce input records=8 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=442564608 INFO - Counters.log(589) | Reduce output records=4 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2184306688 INFO - Counters.log(589) | Map output records=8
在map侧过滤的数据,在50030中查看的截图如下:
运行结果如下所示:
- 1 三劫散仙 13575468248 B 89 2013-02-05
- 2 凤舞九天 18965235874 C 69 2013-03-09
- 3 忙忙碌碌 15986854789 A 99 2013-03-05
- 3 忙忙碌碌 15986854789 D 56 2013-06-07
1 三劫散仙 13575468248 B 89 2013-02-05 2 凤舞九天 18965235874 C 69 2013-03-09 3 忙忙碌碌 15986854789 A 99 2013-03-05 3 忙忙碌碌 15986854789 D 56 2013-06-07
至此,这个半链接就完成了,结果正确,在hadoop的几种join方式里,只有在Map侧的链接比较高效,但也需要根据具体的实际情况,进行选择。
相关推荐
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...
通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...
### Hadoop Join Implementation:关键技术与优化策略 #### 摘要 在大数据处理领域,Hadoop作为主流的大规模数据处理框架之一,其MapReduce模型在并行数据处理方面展现出巨大优势。然而,对于数据间的连接操作(即...
在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...
文件汗有三个java类,两个测试文件txt ...MapClass.java TaggedRecordWritable.java customers.txt orders.txt 经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.
org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition org.apache.hadoop.mapreduce.lib.reduce org.apache.hadoop.mapreduce.security ...
2. **Map-Side Join**:当数据可以预先排序且可以划分到Map任务的大小时,可以在Map阶段进行部分Join。每个Mapper会加载其负责的部分小表数据,然后与大表数据进行Join。这种方法减少了网络传输,但需要额外的排序和...
4. 掌握MapReduce的Join算法,包括Reduce Join、Map Join、Semi Join和Bloom Filter。 5. 学习Zookeeper的基本原理和架构,以及如何在分布式环境中部署Zookeeper,包括配置管理Hadoop集群。 6. 掌握Hadoop和Spark...
通过 Java,开发者可以轻松地实现 Map 和 Reduce 函数,从而处理 HDFS(Hadoop Distributed File System)中的数据。 在 HadoopMRExamples-master 这个压缩包中,我们可以期待找到以下内容: 1. **WordCount 示例*...
3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...
作者还提供了使用高级特性如map端join和链式映射的示例。为了将理论与实践相结合,作者引导读者一步一步开发一个实际的MapReduce应用,这将帮助读者深入了解真实的Hadoop项目。 标签中提到的“大数据”是指需要通过...
在执行时,Hadoop MapReduce框架会为每一对join的行调用一次join操作。这种操作对于小数据集来说效率较高,但是当涉及到大数据量时,它的性能会迅速下降,因为它需要处理大量的中间数据,并且在Map和Reduce阶段都要...
课程内容还涉及了如何从nginx日志中提取访问量最高的IP地址,使用Unix/Linux的工具链,如awk、grep、sort、join等进行简单的日志分析。 综上所述,本课程深入介绍了Hadoop在Web日志分析中的应用,从基本的日志概念...
5. Hadoop优化:如何通过调整配置参数提高Hadoop性能,如Block Size、Map任务和Reduce任务的数量等。 接下来是Hive,它是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供SQL-like查询...
Map side join 比 reducer side join 快。 但是只有当您执行映射端连接操作的表之一小到足以放入内存时,映射端连接才足够。 日期集信息 客户数据集:每行包含:CustID、Name、Age、CountryCode、Salary。 交易...