`
weitao1026
  • 浏览: 1056992 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

使用半链接的方式,来实现的表join

阅读更多

使用半链接的方式,来实现的表join,注意中间存储小表的key,是用HashSet实现的,也就是把数据存在内存里,在map侧,进行key过滤后,然后再Reduce侧,实现join,但如果数据量非常大的情况下,HashSet来存放海量的key可能就会出现OOM的情况,这时候,我们就可以采用另一种join方式,也就是今天散仙将要说的BloomFilter+Reduce Join的方式。

这两种方式,其实没什么太大的区别,唯一的不同就是上次使用的是HashSet来存放key值,这次我们将key存入BloomFilter里,这样以来我们就可以实现无须担心内存不足的缺陷了。



下面我们先来简单了解下什么是布隆过滤器?

Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。如文章标题所述,本文只是做简单介绍,属于科普文章。

应用场景
在正式介绍Bloom Filter算法之前,先来看看什么时候需要用到Bloom Filter算法。
1. HTTP缓存服务器、Web爬虫等
主要工作是判断一条URL是否在现有的URL集合之中(可以认为这里的数据量级上亿)。
对于HTTP缓存服务器,当本地局域网中的PC发起一条HTTP请求时,缓存服务器会先查看一下这个URL是否已经存在于缓存之中,如果存在的话就没有必要去原始的服务器拉取数据了(为了简单起见,我们假设数据没有发生变化),这样既能节省流量,还能加快访问速度,以提高用户体验。
对于Web爬虫,要判断当前正在处理的网页是否已经处理过了,同样需要当前URL是否存在于已经处理过的URL列表之中。

2. 垃圾邮件过滤
假设邮件服务器通过发送方的邮件域或者IP地址对垃圾邮件进行过滤,那么就需要判断当前的邮件域或者IP地址是否处于黑名单之中。如果邮件服务器的通信邮件数量非常大(也可以认为数据量级上亿),那么也可以使用Bloom Filter算法。


1、BloomFilter能解决什么问题?
     以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率

2、工作原理
     1. 初始化一个数组, 所有位标为0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始为0)
     2. 将已知集合S中的每一个数组, 按以下方式映射到A中
          2.0  选取n个互相独立的hash函数 h1, h2, … hk
          2.1  将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
          2.2  将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
     3.  对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
          如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S


3、几个前提
     1. hash函数的计算不能性能太差, 否则得不偿失
     2. 任意两个hash函数之间必须是独立的.
          即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义


4、错误率
     工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的。在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。关于具体的错误率,这和最优的哈希函数个数以及位数组的大小有关,而这是可以估算求得一个最优解的:
哈希函数个数k、位数组大小m及字符串数量n之间存在相互关系。相关文献证明了对于给定的m、n,当 k = ln(2)* m/n 时出错的概率是最小的。  具体的请看:http://blog.csdn.net/jiaomeng/article/details/1495500


5、基本特征
从以上对基本原理和数学基础的分析,我们可以得到Bloom filter的如下基本特征,用于指导实际应用。
(1)存在一定错误率,发生在正向判断上(存在性),反向判断不会发生错误(不存在性);
(2)错误率是可控制的,通过改变位数组大小、hash函数个数或更低碰撞率的hash函数来调节;
(3)保持较低的错误率,位数组空位至少保持在一半以上;
(4)给定m和n,可以确定最优hash个数,即k = ln2 * (m/n),此时错误率最小;
(5)给定允许的错误率E,可以确定合适的位数组大小,即m >= log2(e) * (n * log2(1/E)),继而确定hash函数个数k;
(6)正向错误率无法完全消除,即使不对位数组大小和hash函数个数进行限制,即无法实现零错误率;
(7)空间效率高,仅保存“存在状态”,但无法存储完整信息,需要其他数据结构辅助存储;
(8)不支持元素删除操作,因为不能保证删除的安全性。


6、应用场景举例:
(1)拼写检查、数据库系统、文件系统
(2)假设要你写一个网络蜘蛛(web crawler)。由于网络间的链接错综复杂,蜘蛛在网络间爬行很可能会形成“环”。为了避免形成“环”,就需要知道蜘蛛已经访问过那些URL。给一个URL,怎样知道蜘蛛是否已经访问过呢?
(3)网络应用
  P2P网络中查找资源操作,可以对每条网络通路保存Bloom Filter,当命中时,则选择该通路访问。
  广播消息时,可以检测某个IP是否已发包。
  检测广播消息包的环路,将Bloom Filter保存在包里,每个节点将自己添加入Bloom Filter。
  信息队列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾邮件地址过滤
  像网易,QQ这样的公众电子邮件(email)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的email 地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将他们都存起来则需要大量的网络服务器。如果用哈希表,每存储一亿个 email 地址,就需要1.6GB 的内存(用哈希表实现的具体办法是将每一个email 地址对应成一个八字节的信息指纹,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有50%,因此一个email 地址需要占用十六个字节。一亿个地址大约要1.6GB, 即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百GB 的内存。而Bloom Filter只需要哈希表1/8 到1/4 的大小就能解决同样的问题。Bloom Filter决不会漏掉任何一个在黑名单中的可疑地址。而至于误判问题,常见的补救办法是在建立一个小的白名单,存储那些可能别误判的邮件地址。
(5)Bloomfilter在HBase中的作用
      HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化,但不是那种直接过滤文件,排除在查找范围的形式)
      Bloomfilter在HBase中的开销?
Bloomfilter是一个列族(cf)级别的配置属性,如果你在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MetaBlock;MetaBlock与DataBlock(真实的KeyValue数据)一起由LRUBlockCache维护。所以,开启bloomfilter会有一定的存储及内存cache开销。
     Bloomfilter如何提高随机读(Get)的性能?
对于某个region的随机读,HBase会遍历读memstore及storefile(按照一定的顺序),将结果合并返回给客户端。如果你设置了bloomfilter,那么在遍历读storefile时,就可以利用bloomfilter,忽略某些storefile。
     注意:hbase的bloom filter是惰性加载的,在写压力比较大的情况下,会有不停的compact并产生storefile,那么新的storefile是不会马上将bloom filter加载到内存的,等到读请求来的时候才加载。
这样问题就来了,第一,如果storefile设置的比较大,max size为2G,这会导致bloom filter也比较大;第二,系统的读写压力都比较大。这样或许会经常出现单个 GET请求花费3-5秒的超时现象。

使用布隆过滤器,之前,需要先对需要处理的key,生成处理的二进制文件,代码如下:

Java代码 复制代码 收藏代码
  1. package com.reducebloomfilterjoin;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.IOException;  
  5. import java.io.InputStreamReader;  
  6. import java.util.zip.GZIPInputStream;  
  7.   
  8. import org.apache.hadoop.conf.Configuration;  
  9. import org.apache.hadoop.fs.FSDataOutputStream;  
  10. import org.apache.hadoop.fs.FileStatus;  
  11. import org.apache.hadoop.fs.FileSystem;  
  12. import org.apache.hadoop.fs.Path;  
  13. import org.apache.hadoop.util.bloom.BloomFilter;  
  14. import org.apache.hadoop.util.bloom.Key;  
  15. import org.apache.hadoop.util.hash.Hash;  
  16.   
  17. /** 
  18.  * 布隆过滤器 
  19.  * 生成二进制文件数据 
  20.  *  
  21.  * **/  
  22. public class TrainingBloomData {  
  23.   
  24.     public static int getOptimalBloomFilterSize(int numRecords,  
  25.   
  26.     float falsePosRate) {  
  27.         int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math  
  28.                 .pow(Math.log(2), 2));  
  29.   
  30.         return size;  
  31.   
  32.     }  
  33.   
  34.       
  35.     public static int getOptimalK(float numMembers, float vectorSize) {  
  36.           
  37.                 return (int) Math.round(vectorSize / numMembers * Math.log(2));  
  38.           
  39.             }  
  40.       
  41.     public static void main(String[] args) throws IOException {  
  42.   
  43.            
  44.   
  45.         Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt");  
  46.   
  47.         int numMembers = Integer.parseInt("10");  
  48.   
  49.         float falsePosRate = Float.parseFloat("0.01");  
  50.   
  51.         Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin");  
  52.   
  53.           
  54.          FileSystem fss=FileSystem.get(new Configuration());  
  55.          if(fss.exists(bfFile)){  
  56.              fss.delete(bfFile, true);  
  57.              System.out.println("存在此路径,已经删除!");  
  58.          }  
  59.    
  60.   
  61.         // Calculate our vector size and optimal K value based on approximations  
  62.   
  63.         int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);  
  64.   
  65.         int nbHash = getOptimalK(numMembers, vectorSize);  
  66.   
  67.    
  68.   
  69.         // create new Bloom filter  
  70.   
  71.         BloomFilter filter = new BloomFilter(vectorSize, nbHash,  
  72.   
  73.                 Hash.MURMUR_HASH);  
  74.   
  75.   
  76.   
  77.         // Open file for read  
  78.   
  79.    
  80.   
  81.         System.out.println("Training Bloom filter of size " + vectorSize  
  82.   
  83.                 + " with " + nbHash + " hash functions, " + numMembers  
  84.   
  85.                 + " approximate number of records, and " + falsePosRate  
  86.   
  87.                 + " false positive rate");  
  88.   
  89.    
  90.   
  91.         String line = null;  
  92.   
  93.         int numRecords = 0;  
  94.   
  95.         FileSystem fs = FileSystem.get(new Configuration());  
  96.   
  97.         for (FileStatus status : fs.listStatus(inputFile)) {  
  98.   
  99.             BufferedReader rdr;  
  100.   
  101.             // if file is gzipped, wrap it in a GZIPInputStream  
  102.   
  103.             if (status.getPath().getName().endsWith(".gz")) {  
  104.   
  105.                 rdr = new BufferedReader(new InputStreamReader(  
  106.   
  107.                         new GZIPInputStream(fs.open(status.getPath()))));  
  108.   
  109.             } else {  
  110.   
  111.                 rdr = new BufferedReader(new InputStreamReader(fs.open(status  
  112.   
  113.                         .getPath())));  
  114.   
  115.             }  
  116.   
  117.    
  118.   
  119.             System.out.println("Reading " + status.getPath());  
  120.   
  121.             while ((line = rdr.readLine()) != null) {  
  122.   
  123.                 filter.add(new Key(line.getBytes()));  
  124.   
  125.                 ++numRecords;  
  126.   
  127.             }  
  128.   
  129.    
  130.   
  131.             rdr.close();  
  132.   
  133.         }  
  134.   
  135.    
  136.   
  137.         System.out.println("Trained Bloom filter with " + numRecords  
  138.   
  139.                 + " entries.");  
  140.   
  141.    
  142.   
  143.         System.out.println("Serializing Bloom filter to HDFS at " + bfFile);  
  144.   
  145.         FSDataOutputStream strm = fs.create(bfFile);  
  146.   
  147.         filter.write(strm);  
  148.   
  149.    
  150.   
  151.         strm.flush();  
  152.   
  153.         strm.close();  
  154.   
  155.    
  156.   
  157.         System.out.println("Done training Bloom filter.");  
  158.   
  159.    
  160.   
  161.     }  
  162.   
  163.       
  164.       
  165.       
  166.       
  167.   
  168. }  
package com.reducebloomfilterjoin;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

/**
 * 布隆过滤器
 * 生成二进制文件数据
 * 
 * **/
public class TrainingBloomData {

	public static int getOptimalBloomFilterSize(int numRecords,

	float falsePosRate) {
		int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
				.pow(Math.log(2), 2));

		return size;

	}

	
	public static int getOptimalK(float numMembers, float vectorSize) {
		
		        return (int) Math.round(vectorSize / numMembers * Math.log(2));
		
		    }
	
	public static void main(String[] args) throws IOException {

		 

        Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt");

        int numMembers = Integer.parseInt("10");

        float falsePosRate = Float.parseFloat("0.01");

        Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin");

        
         FileSystem fss=FileSystem.get(new Configuration());
         if(fss.exists(bfFile)){
        	 fss.delete(bfFile, true);
        	 System.out.println("存在此路径,已经删除!");
         }
 

        // Calculate our vector size and optimal K value based on approximations

        int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);

        int nbHash = getOptimalK(numMembers, vectorSize);

 

        // create new Bloom filter

        BloomFilter filter = new BloomFilter(vectorSize, nbHash,

                Hash.MURMUR_HASH);



        // Open file for read

 

        System.out.println("Training Bloom filter of size " + vectorSize

                + " with " + nbHash + " hash functions, " + numMembers

                + " approximate number of records, and " + falsePosRate

                + " false positive rate");

 

        String line = null;

        int numRecords = 0;

        FileSystem fs = FileSystem.get(new Configuration());

        for (FileStatus status : fs.listStatus(inputFile)) {

            BufferedReader rdr;

            // if file is gzipped, wrap it in a GZIPInputStream

            if (status.getPath().getName().endsWith(".gz")) {

                rdr = new BufferedReader(new InputStreamReader(

                        new GZIPInputStream(fs.open(status.getPath()))));

            } else {

                rdr = new BufferedReader(new InputStreamReader(fs.open(status

                        .getPath())));

            }

 

            System.out.println("Reading " + status.getPath());

            while ((line = rdr.readLine()) != null) {

                filter.add(new Key(line.getBytes()));

                ++numRecords;

            }

 

            rdr.close();

        }

 

        System.out.println("Trained Bloom filter with " + numRecords

                + " entries.");

 

        System.out.println("Serializing Bloom filter to HDFS at " + bfFile);

        FSDataOutputStream strm = fs.create(bfFile);

        filter.write(strm);

 

        strm.flush();

        strm.close();

 

        System.out.println("Done training Bloom filter.");

 

    }

	
	
	
	

}


测试数据如下:
小表模拟数据
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862

大表模拟数据
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10


代码如下:

Java代码 复制代码 收藏代码
  1. package com.reducebloomfilterjoin;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.DataInput;  
  5. import java.io.DataInputStream;  
  6. import java.io.DataOutput;  
  7. import java.io.FileInputStream;  
  8. import java.io.FileReader;  
  9. import java.io.IOException;  
  10. import java.net.URI;  
  11. import java.util.ArrayList;  
  12. import java.util.HashSet;  
  13. import java.util.List;  
  14.   
  15. import org.apache.hadoop.conf.Configuration;  
  16. import org.apache.hadoop.filecache.DistributedCache;  
  17. import org.apache.hadoop.fs.FileSystem;  
  18. import org.apache.hadoop.fs.Path;  
  19. import org.apache.hadoop.io.LongWritable;  
  20. import org.apache.hadoop.io.Text;  
  21. import org.apache.hadoop.io.WritableComparable;  
  22.   
  23. import org.apache.hadoop.mapred.JobConf;  
  24. import org.apache.hadoop.mapreduce.Job;  
  25. import org.apache.hadoop.mapreduce.Mapper;  
  26. import org.apache.hadoop.mapreduce.Reducer;  
  27. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  28. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  29. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  30. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  31. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  32. import org.apache.hadoop.util.bloom.BloomFilter;  
  33. import org.apache.hadoop.util.bloom.Key;  
  34.   
  35. /*** 
  36.  *  
  37.  * Hadoop1.2的版本 
  38.  *  
  39.  *  
  40.  * ReduceJoin+布隆过滤器   实现Reduce Join 
  41.  *  
  42.  * @author qindongliang 
  43.  *  
  44.  *    大数据交流群:376932160 
  45.  *  搜索技术交流群:324714439 
  46.  *  
  47.  *  
  48.  *  
  49.  * **/  
  50. public class ReduceBoolmFilter {  
  51.       
  52.       
  53.        
  54.     /** 
  55.      *  
  56.      *  
  57.      * 自定义一个输出实体 
  58.      *  
  59.      * **/  
  60.     private static class CombineEntity implements WritableComparable<CombineEntity>{  
  61.   
  62.           
  63.         private Text joinKey;//连接key  
  64.         private Text flag;//文件来源标志  
  65.         private Text secondPart;//除了键外的其他部分的数据  
  66.           
  67.           
  68.         public CombineEntity() {  
  69.             // TODO Auto-generated constructor stub  
  70.             this.joinKey=new Text();  
  71.             this.flag=new Text();  
  72.             this.secondPart=new Text();  
  73.         }  
  74.           
  75.         public Text getJoinKey() {  
  76.             return joinKey;  
  77.         }  
  78.   
  79.         public void setJoinKey(Text joinKey) {  
  80.             this.joinKey = joinKey;  
  81.         }  
  82.   
  83.         public Text getFlag() {  
  84.             return flag;  
  85.         }  
  86.   
  87.         public void setFlag(Text flag) {  
  88.             this.flag = flag;  
  89.         }  
  90.   
  91.         public Text getSecondPart() {  
  92.             return secondPart;  
  93.         }  
  94.   
  95.         public void setSecondPart(Text secondPart) {  
  96.             this.secondPart = secondPart;  
  97.         }  
  98.   
  99.         @Override  
  100.         public void readFields(DataInput in) throws IOException {  
  101.             this.joinKey.readFields(in);  
  102.             this.flag.readFields(in);  
  103.             this.secondPart.readFields(in);  
  104.               
  105.         }  
  106.   
  107.         @Override  
  108.         public void write(DataOutput out) throws IOException {  
  109.             this.joinKey.write(out);  
  110.             this.flag.write(out);  
  111.             this.secondPart.write(out);  
  112.               
  113.         }  
  114.   
  115.         @Override  
  116.         public int compareTo(CombineEntity o) {  
  117.             // TODO Auto-generated method stub  
  118.             return this.joinKey.compareTo(o.joinKey);  
  119.         }  
  120.           
  121.           
  122.           
  123.     }  
  124.       
  125.       
  126.       
  127.       
  128.     private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{  
  129.           
  130.         private CombineEntity combine=new CombineEntity();  
  131.         private Text flag=new Text();  
  132.         private  Text joinKey=new Text();  
  133.         private Text secondPart=new Text();  
  134.         /** 
  135.          *使用布隆过滤器存储key 
  136.          * 代替原来的HashSet存储 
  137.          *  
  138.          * */  
  139.         //private HashSet<String> joinKeySet=new HashSet<String>();  
  140.           
  141.         BloomFilter filter=new BloomFilter();  
  142.         @Override  
  143.         protected void setup(Context context)throws IOException, InterruptedException {  
  144.            
  145.             //读取文件流  
  146.             BufferedReader br=null;  
  147.             String temp;  
  148.             // 获取DistributedCached里面 的共享文件  
  149.             Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());  
  150.                   for (Path path : paths) {  
  151.                         
  152.                                           if (path.toString().contains("bloom.bin")) {  
  153.                         
  154.                                               DataInputStream strm = new DataInputStream(  
  155.                         
  156.                                                       new FileInputStream(path.toString()));  
  157.                         
  158.                                               // Read into our Bloom filter.  
  159.                         
  160.                                               filter.readFields(strm);  
  161.                         
  162.                                               strm.close();  
  163.                         
  164.                                           }  
  165.                         
  166.                                       }  
  167.                     
  168.   
  169.            
  170.               
  171.               
  172.         }  
  173.           
  174.           
  175.           
  176.         @Override  
  177.         protected void map(LongWritable key, Text value,Context context)  
  178.                 throws IOException, InterruptedException {  
  179.               
  180.           
  181.                //获得文件输入路径  
  182.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
  183.           
  184.             if(pathName.endsWith("a.txt")){  
  185.                   
  186.                 String  valueItems[]=value.toString().split(",");  
  187.                   
  188.                   
  189.                 /** 
  190.                  * 在这里过滤必须要的连接字符 
  191.                  *  
  192.                  * */  
  193.                 //if(joinKeySet.contains(valueItems[0])){  
  194.                 if(filter.membershipTest(new Key(valueItems[0].getBytes()))){  
  195.                     //设置标志位  
  196.                     flag.set("0");     
  197.                     //设置链接键  
  198.                     joinKey.set(valueItems[0]);            
  199.                     //设置第二部分  
  200.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]);  
  201.                       
  202.                     //封装实体  
  203.                     combine.setFlag(flag);//标志位  
  204.                     combine.setJoinKey(joinKey);//链接键  
  205.                     combine.setSecondPart(secondPart);//其他部分  
  206.                       
  207.                      //写出  
  208.                     context.write(combine.getJoinKey(), combine);     
  209.                 }else{  
  210.                     System.out.println("a.txt里");  
  211.                     System.out.println("在小表中无此记录,执行过滤掉!");  
  212.                     for(String v:valueItems){  
  213.                         System.out.print(v+"   ");  
  214.                     }  
  215.                       
  216.                     return ;  
  217.                       
  218.                 }  
  219.                   
  220.                   
  221.                   
  222.             }else if(pathName.endsWith("b.txt")){  
  223.                 String  valueItems[]=value.toString().split(",");  
  224.                   
  225.                 /** 
  226.                  *  
  227.                  * 判断是否在集合中 
  228.                  *  
  229.                  * */  
  230.                 if(filter.membershipTest(new Key(valueItems[0].getBytes()))){  
  231.                 //if(joinKeySet.contains(valueItems[0])){  
  232.                     //if(joinKeySet.contains(valueItems[0])){  
  233.                     //设置标志位  
  234.                     flag.set("1");     
  235.                       
  236.                     //设置链接键  
  237.                     joinKey.set(valueItems[0]);  
  238.             
  239.                     //设置第二部分注意不同的文件的列数不一样  
  240.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);  
  241.                       
  242.                     //封装实体  
  243.                     combine.setFlag(flag);//标志位  
  244.                     combine.setJoinKey(joinKey);//链接键  
  245.                     combine.setSecondPart(secondPart);//其他部分  
  246.                       
  247.                      //写出  
  248.                     context.write(combine.getJoinKey(), combine);  
  249.                       
  250.                       
  251.                 }else{                    
  252.                     //执行过滤 ......  
  253.                     System.out.println("b.txt里");  
  254.                     System.out.println("在小表中无此记录,执行过滤掉!");  
  255.                     for(String v:valueItems){  
  256.                         System.out.print(v+"   ");  
  257.                     }  
  258.                       
  259.                     return ;  
  260.                       
  261.                       
  262.                 }  
  263.                   
  264.               
  265.                   
  266.                   
  267.             }  
  268.               
  269.               
  270.               
  271.               
  272.                
  273.            
  274.               
  275.         }  
  276.           
  277.     }  
  278.       
  279.       
  280.     private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{  
  281.           
  282.           
  283.         //存储一个分组中左表信息  
  284.         private List<Text> leftTable=new ArrayList<Text>();  
  285.         //存储一个分组中右表信息  
  286.         private List<Text> rightTable=new ArrayList<Text>();  
  287.           
  288.         private Text secondPart=null;  
  289.           
  290.         private Text output=new Text();  
  291.           
  292.           
  293.            
  294.         //一个分组调用一次  
  295.         @Override  
  296.         protected void reduce(Text key, Iterable<CombineEntity> values,Context context)  
  297.                 throws IOException, InterruptedException {  
  298.              leftTable.clear();//清空分组数据  
  299.              rightTable.clear();//清空分组数据  
  300.                
  301.                
  302.              /** 
  303.               * 将不同文件的数据,分别放在不同的集合 
  304.               * 中,注意数据量过大时,会出现 
  305.               * OOM的异常 
  306.               *  
  307.               * **/  
  308.                
  309.              for(CombineEntity ce:values){  
  310.                    
  311.                  this.secondPart=new Text(ce.getSecondPart().toString());  
  312.                    
  313.                    
  314.                  //左表  
  315.                    
  316.                  if(ce.getFlag().toString().trim().equals("0")){  
  317.                      leftTable.add(secondPart);  
  318.                        
  319.                  }else if(ce.getFlag().toString().trim().equals("1")){  
  320.                        
  321.                      rightTable.add(secondPart);  
  322.                        
  323.                  }  
  324.                    
  325.                    
  326.                    
  327.                    
  328.              }  
  329.                
  330.              //=====================  
  331.              for(Text left:leftTable){  
  332.                    
  333.                  for(Text right:rightTable){  
  334.                        
  335.                      output.set(left+"\t"+right);//连接左右数据  
  336.                      context.write(key, output);//输出  
  337.                  }  
  338.                    
  339.              }  
  340.                
  341.                
  342.                
  343.               
  344.         }  
  345.           
  346.     }  
  347.       
  348.       
  349.       
  350.       
  351.       
  352.       
  353.       
  354.       
  355.     public static void main(String[] args)throws Exception {  
  356.           
  357.            
  358.       
  359.       
  360.          //Job job=new Job(conf,"myjoin");  
  361.          JobConf conf=new JobConf(ReduceBoolmFilter.class);   
  362.            conf.set("mapred.job.tracker","192.168.75.130:9001");  
  363.             conf.setJar("tt.jar");  
  364.             
  365.             
  366.             //小表共享  
  367.             String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin";  
  368.             //添加到共享cache里  
  369.         DistributedCache.addCacheFile(new URI(bpath), conf);  
  370.           
  371.           Job job=new Job(conf, "aaaaa");  
  372.          job.setJarByClass(ReduceBoolmFilter.class);  
  373.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
  374.            
  375.            
  376.          //设置Map和Reduce自定义类  
  377.          job.setMapperClass(JMapper.class);  
  378.          job.setReducerClass(JReduce.class);  
  379.            
  380.          //设置Map端输出  
  381.          job.setMapOutputKeyClass(Text.class);  
  382.          job.setMapOutputValueClass(CombineEntity.class);  
  383.            
  384.          //设置Reduce端的输出  
  385.          job.setOutputKeyClass(Text.class);  
  386.          job.setOutputValueClass(Text.class);  
  387.            
  388.       
  389.          job.setInputFormatClass(TextInputFormat.class);  
  390.          job.setOutputFormatClass(TextOutputFormat.class);  
  391.            
  392.        
  393.          FileSystem fs=FileSystem.get(conf);  
  394.            
  395.          Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5");  
  396.            
  397.          if(fs.exists(op)){  
  398.              fs.delete(op, true);  
  399.              System.out.println("存在此输出路径,已删除!!!");  
  400.          }  
  401.            
  402.            
  403.       FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));  
  404.       FileOutputFormat.setOutputPath(job, op);  
  405.          
  406.       System.exit(job.waitForCompletion(true)?0:1);  
  407.         
  408.         
  409.            
  410.            
  411.            
  412.            
  413.            
  414.            
  415.            
  416.                   
  417.                    
  418.           
  419.           
  420.           
  421.     }  
  422.       
  423.       
  424.       
  425.   
  426. }  
package com.reducebloomfilterjoin;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

/***
 * 
 * Hadoop1.2的版本
 * 
 * 
 * ReduceJoin+布隆过滤器   实现Reduce Join
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 * 
 * **/
public class ReduceBoolmFilter {
	
	
	 
	/**
	 * 
	 * 
	 * 自定义一个输出实体
	 * 
	 * **/
	private static class CombineEntity implements WritableComparable<CombineEntity>{

		
		private Text joinKey;//连接key
		private Text flag;//文件来源标志
		private Text secondPart;//除了键外的其他部分的数据
		
		
		public CombineEntity() {
			// TODO Auto-generated constructor stub
			this.joinKey=new Text();
			this.flag=new Text();
			this.secondPart=new Text();
		}
		
		public Text getJoinKey() {
			return joinKey;
		}

		public void setJoinKey(Text joinKey) {
			this.joinKey = joinKey;
		}

		public Text getFlag() {
			return flag;
		}

		public void setFlag(Text flag) {
			this.flag = flag;
		}

		public Text getSecondPart() {
			return secondPart;
		}

		public void setSecondPart(Text secondPart) {
			this.secondPart = secondPart;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.joinKey.readFields(in);
			this.flag.readFields(in);
			this.secondPart.readFields(in);
			
		}

		@Override
		public void write(DataOutput out) throws IOException {
			this.joinKey.write(out);
			this.flag.write(out);
			this.secondPart.write(out);
			
		}

		@Override
		public int compareTo(CombineEntity o) {
			// TODO Auto-generated method stub
			return this.joinKey.compareTo(o.joinKey);
		}
		
		
		
	}
	
	
	
	
	private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
		
		private CombineEntity combine=new CombineEntity();
		private Text flag=new Text();
		private  Text joinKey=new Text();
		private Text secondPart=new Text();
		/**
		 *使用布隆过滤器存储key
		 * 代替原来的HashSet存储
		 * 
		 * */
		//private HashSet<String> joinKeySet=new HashSet<String>();
		
		BloomFilter filter=new BloomFilter();
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		 
			//读取文件流
			BufferedReader br=null;
			String temp;
			// 获取DistributedCached里面 的共享文件
			Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
				  for (Path path : paths) {
					  
					                      if (path.toString().contains("bloom.bin")) {
					  
					                          DataInputStream strm = new DataInputStream(
					  
					                                  new FileInputStream(path.toString()));
					  
					                          // Read into our Bloom filter.
					  
					                          filter.readFields(strm);
					  
					                          strm.close();
					  
					                      }
					  
					                  }
				  

		 
			
			
		}
		
		
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
		
			   //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
		
            if(pathName.endsWith("a.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	
            	
            	/**
            	 * 在这里过滤必须要的连接字符
            	 * 
            	 * */
            	//if(joinKeySet.contains(valueItems[0])){
            	if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
            		//设置标志位
                	flag.set("0");   
                	//设置链接键
                	joinKey.set(valueItems[0]);          
                	//设置第二部分
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);	
            	}else{
            		System.out.println("a.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            	}
            	
            	
            	
            }else if(pathName.endsWith("b.txt")){
            	String  valueItems[]=value.toString().split(",");
            	
            	/**
            	 * 
            	 * 判断是否在集合中
            	 * 
            	 * */
            	if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
            	//if(joinKeySet.contains(valueItems[0])){
            		//if(joinKeySet.contains(valueItems[0])){
                	//设置标志位
                	flag.set("1");   
                	
                	//设置链接键
                	joinKey.set(valueItems[0]);
          
                	//设置第二部分注意不同的文件的列数不一样
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);
            		
            		
            	}else{            		
            		//执行过滤 ......
            		System.out.println("b.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            		
            	}
            	
            
            	
            	
            }
            
            
			
			
			 
		 
			
		}
		
	}
	
	
	private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
		
		
		//存储一个分组中左表信息
		private List<Text> leftTable=new ArrayList<Text>();
		//存储一个分组中右表信息
		private List<Text> rightTable=new ArrayList<Text>();
		
		private Text secondPart=null;
		
		private Text output=new Text();
		
		
		 
		//一个分组调用一次
		@Override
		protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
				throws IOException, InterruptedException {
			 leftTable.clear();//清空分组数据
			 rightTable.clear();//清空分组数据
			 
			 
			 /**
			  * 将不同文件的数据,分别放在不同的集合
			  * 中,注意数据量过大时,会出现
			  * OOM的异常
			  * 
			  * **/
			 
			 for(CombineEntity ce:values){
				 
				 this.secondPart=new Text(ce.getSecondPart().toString());
				 
				 
				 //左表
				 
				 if(ce.getFlag().toString().trim().equals("0")){
					 leftTable.add(secondPart);
					 
				 }else if(ce.getFlag().toString().trim().equals("1")){
					 
					 rightTable.add(secondPart);
					 
				 }
				 
				 
				 
				 
			 }
			 
			 //=====================
			 for(Text left:leftTable){
				 
				 for(Text right:rightTable){
					 
					 output.set(left+"\t"+right);//连接左右数据
					 context.write(key, output);//输出
				 }
				 
			 }
			 
			 
			 
			
		}
		
	}
	
	
	
	
	
	
	
	
	public static void main(String[] args)throws Exception {
		
		 
	
	
		 //Job job=new Job(conf,"myjoin");
		 JobConf conf=new JobConf(ReduceBoolmFilter.class); 
		   conf.set("mapred.job.tracker","192.168.75.130:9001");
		    conf.setJar("tt.jar");
		  
		  
			//小表共享
			String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin";
			//添加到共享cache里
		DistributedCache.addCacheFile(new URI(bpath), conf);
		
		  Job job=new Job(conf, "aaaaa");
		 job.setJarByClass(ReduceBoolmFilter.class);
		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		 
		 
		 //设置Map和Reduce自定义类
		 job.setMapperClass(JMapper.class);
		 job.setReducerClass(JReduce.class);
		 
		 //设置Map端输出
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(CombineEntity.class);
		 
		 //设置Reduce端的输出
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
		 
	
		 job.setInputFormatClass(TextInputFormat.class);
		 job.setOutputFormatClass(TextOutputFormat.class);
		 
	 
		 FileSystem fs=FileSystem.get(conf);
		 
		 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5");
		 
		 if(fs.exists(op)){
			 fs.delete(op, true);
			 System.out.println("存在此输出路径,已删除!!!");
		 }
		 
		 
	  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
	  FileOutputFormat.setOutputPath(job, op);
	   
	  System.exit(job.waitForCompletion(true)?0:1);
	  
	  
		 
		 
		 
		 
		 
		 
		 
				
				 
		
		
		
	}
	
	
	

}



运行日志:

Java代码 复制代码 收藏代码
  1. 模式:  192.168.75.130:9001  
  2. 存在此输出路径,已删除!!!  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2  
  5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  12. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  13. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003  
  14. INFO - Counters.log(585) | Counters: 29  
  15. INFO - Counters.log(587) |   Job Counters   
  16. INFO - Counters.log(589) |     Launched reduce tasks=1  
  17. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12165  
  18. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  20. INFO - Counters.log(589) |     Launched map tasks=2  
  21. INFO - Counters.log(589) |     Data-local map tasks=2  
  22. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9890  
  23. INFO - Counters.log(587) |   File Output Format Counters   
  24. INFO - Counters.log(589) |     Bytes Written=172  
  25. INFO - Counters.log(587) |   FileSystemCounters  
  26. INFO - Counters.log(589) |     FILE_BYTES_READ=237  
  27. INFO - Counters.log(589) |     HDFS_BYTES_READ=455  
  28. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169734  
  29. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172  
  30. INFO - Counters.log(587) |   File Input Format Counters   
  31. INFO - Counters.log(589) |     Bytes Read=227  
  32. INFO - Counters.log(587) |   Map-Reduce Framework  
  33. INFO - Counters.log(589) |     Map output materialized bytes=243  
  34. INFO - Counters.log(589) |     Map input records=10  
  35. INFO - Counters.log(589) |     Reduce shuffle bytes=243  
  36. INFO - Counters.log(589) |     Spilled Records=16  
  37. INFO - Counters.log(589) |     Map output bytes=215  
  38. INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944  
  39. INFO - Counters.log(589) |     CPU time spent (ms)=1810  
  40. INFO - Counters.log(589) |     Combine input records=0  
  41. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228  
  42. INFO - Counters.log(589) |     Reduce input records=8  
  43. INFO - Counters.log(589) |     Reduce input groups=4  
  44. INFO - Counters.log(589) |     Combine output records=0  
  45. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=444403712  
  46. INFO - Counters.log(589) |     Reduce output records=4  
  47. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184323072  
  48. INFO - Counters.log(589) |     Map output records=8  
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12165
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9890
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=455
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169734
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=227
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=10
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1810
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=444403712
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184323072
INFO - Counters.log(589) |     Map output records=8


连接的结果如下:

Java代码 复制代码 收藏代码
  1. 1   三劫散仙    13575468248 B   89  2013-02-05  
  2. 2   凤舞九天    18965235874 C   69  2013-03-09  
  3. 3   忙忙碌碌    15986854789 A   99  2013-03-05  
  4. 3   忙忙碌碌    15986854789 D   56  2013-06-07  
1	三劫散仙	13575468248	B	89	2013-02-05
2	凤舞九天	18965235874	C	69	2013-03-09
3	忙忙碌碌	15986854789	A	99	2013-03-05
3	忙忙碌碌	15986854789	D	56	2013-06-07


至此,测试正确,布隆过滤器实际上采用了空间换时间的策略,来提升访问性能。所以在海量数据去重时非常高效。

分享到:
评论

相关推荐

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    AE缓冲区代码 + ArcEngine连接表join

    本文将详细介绍 AE 缓冲区代码的实现步骤,并结合 ArcEngine 连接表 join 进行 sâu 细化。 AE 缓冲区代码的实现步骤: 1. 首先,需要声明变量,包括点、Envelope、-map、ActiveView 等变量。 2. 然后,使用 New_...

    在SAP前台用TCODE:SE16H实现表连接查询

    今天,我们将详细介绍如何使用 SE16H 实现表连接查询,并提供一个实际的示例。 SE16H 介绍 SE16H 是 SAP 提供的一个数据浏览器工具,能够帮助用户快速查询和浏览 SAP 系统中的数据。它支持多种查询方式,包括简单...

    Oracle数据库3种主要表连接方式对比

    在Oracle数据库中,为了获取来自多个表的数据,我们需要使用不同的连接方式来实现这一目的。本文将详细介绍三种主要的表连接方式:嵌套循环连接(Nested Loop Join,简称NL Join)、排序合并连接(Sort Merge Join,...

    inner join-outer join-cross join查询方式总结

    根据连接方式的不同,可以分为内连接(INNER JOIN)、外连接(LEFT JOIN、RIGHT JOIN 和 FULL JOIN)以及交叉连接(CROSS JOIN)。下面我们将详细探讨每种连接方式的特点、适用场景及其具体的实现方法。 #### 内...

    SQL 连接 JOIN 例解左连接,右连接,全连接,内连接,交叉连接,自连接

    为了实现这一目标,SQL 提供了一种强大的工具——JOIN,用于连接两个或多个表的数据。本文将详细介绍六种主要类型的 JOIN:左连接(LEFT JOIN)、右连接(RIGHT JOIN)、全连接(FULL JOIN)、内连接(INNER JOIN)...

    基于Solr的多表join查询加速方法

    本文将围绕"基于Solr的多表join查询加速方法"这一主题,深入探讨如何在Solr中实现类似join的功能,并优化查询性能。 在Solr中模拟join操作通常有两种策略:嵌入式数据模型和外部数据源查询(ExternalFileField或...

    Spark实现用户订单数据表连接

    本文将深入探讨如何使用 Spark 实现用户订单数据表的连接操作,这在数据分析、业务洞察以及构建复杂的数据管道时尤其重要。 首先,我们需要理解数据表连接的基本概念。在数据库中,连接(JOIN)是将两个或更多表的...

    from 多张表 等于 left join 其他表.

    标题与描述中的“from多张表等于left join其他表”主要涉及的是SQL查询语句中两种不同的连接方式:内连接(通常在FROM子句后直接使用WHERE子句实现)和左连接(LEFT JOIN)。这两种连接方式在处理多表查询时有着不同...

    ORACLE表连接方式

    本文将对Oracle中的几种主要表连接方式进行详细介绍:内连接(Inner Join)、自然连接(Natural Join)、左外连接(Left Outer Join)、右外连接(Right Outer Join)、笛卡尔连接(Cartesian Join)、索引连接...

    SQLServerUpdate多表Join

    本文将详细介绍如何使用 `UPDATE` 语句结合 `JOIN` 来实现对多个表的更新操作。 #### 1. 基本概念与语法 在 SQL Server 中,`UPDATE` 语句用于修改已存在的记录。通常情况下,我们只需要更新一个表的数据。但在...

    几种常用的表连接方式

    ### 几种常用的表连接方式详解 在数据库领域,表连接是数据检索和管理的核心技术之一,用于将多个数据表中的信息结合在一起,形成更完整、更有意义的数据集。本文将深入探讨四种常用的表连接方式:嵌套循环连接、...

    Oracle数据库中表的四种连接方式讲解

    相等连接是最常见的连接类型,基于两个或更多表之间相同列的值相等来合并数据。例如,在`EMP`和`DEPT`表中,当`EMP.DEPTNO`等于`DEPT.DEPTNO`时,会返回员工和他们所在部门的信息。这可以通过简单的逗号分隔表名或...

    实现SQL中JOIN联接多个表查询.doc

    实现 SQL 中 JOIN 联接多个表查询 在数据查询中,JOIN 联接是一种非常重要的操作,它可以将多个表中的数据结合起来,实现跨表查询。...我们可以使用 INNER JOIN、LEFT JOIN 和 RIGHT JOIN 等方式来实现多个表查询。

    java 实现两张表的等值连接

    在数据库领域,等值连接(Equi-Join)是一种常见的操作,它用于将两个或多个表的数据基于一个或多个共享的列进行合并。在Java编程中,虽然原生的Java库并不直接支持数据库查询,但我们可以通过编写代码来模拟等值...

    基于开源的flink对其实时sql进行扩展;主要实现了流与维表的join

    主要实现了流与维表的join”揭示了这个项目的核心内容,即在Flink开源框架上扩展实时SQL功能,特别是流处理与维表(通常用于存储静态参考数据)的连接操作。Flink作为一个强大的流处理和批处理框架,提供了一种高效...

    Oracle中表的四种连接方式讲解

    总结来说,Oracle中的表连接提供了丰富的数据组合方式,使我们能够灵活地从多个表中提取所需信息。正确理解和运用这些连接类型是数据库查询和数据分析的基础。在实际应用中,应根据具体需求选择合适的连接类型,以...

    Oracle表连接的具体讲解

    内连接可以使用 INNER JOIN 或 JOIN 关键字来实现。例如: SQL&gt; Select a.id,a.name,b.name from dave a inner join bl b on a.id=b.id; 内连接的优点是可以减少数据的重复性,但是它也存在一些缺陷,例如如果某个...

    Linq to datable(Left join right join full join)实例

    标题 "Linq to datable(Left join right join full join)实例" 涉及到的是在.NET框架中,使用LINQ(Language Integrated Query)查询语言处理DataTable对象时,执行不同类型的连接操作,包括左连接(Left Join)、右...

    数据库多表连接查询详解

    在连接查询中,还可以使用 NATURAL JOIN 关键字来实现自然连接。例如: ```sql SELECT * FROM authors AS a NATURAL JOIN publishers AS p; ``` 这条语句使用了自然连接将 authors 和 publishers 两个表连接起来,...

Global site tag (gtag.js) - Google Analytics