使用半链接的方式,来实现的表join,注意中间存储小表的key,是用HashSet实现的,也就是把数据存在内存里,在map侧,进行key过滤后,然后再Reduce侧,实现join,但如果数据量非常大的情况下,HashSet来存放海量的key可能就会出现OOM的情况,这时候,我们就可以采用另一种join方式,也就是今天散仙将要说的BloomFilter+Reduce Join的方式。
这两种方式,其实没什么太大的区别,唯一的不同就是上次使用的是HashSet来存放key值,这次我们将key存入BloomFilter里,这样以来我们就可以实现无须担心内存不足的缺陷了。
下面我们先来简单了解下什么是布隆过滤器?
Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。如文章标题所述,本文只是做简单介绍,属于科普文章。
应用场景
在正式介绍Bloom Filter算法之前,先来看看什么时候需要用到Bloom Filter算法。
1. HTTP缓存服务器、Web爬虫等
主要工作是判断一条URL是否在现有的URL集合之中(可以认为这里的数据量级上亿)。
对于HTTP缓存服务器,当本地局域网中的PC发起一条HTTP请求时,缓存服务器会先查看一下这个URL是否已经存在于缓存之中,如果存在的话就没有必要去原始的服务器拉取数据了(为了简单起见,我们假设数据没有发生变化),这样既能节省流量,还能加快访问速度,以提高用户体验。
对于Web爬虫,要判断当前正在处理的网页是否已经处理过了,同样需要当前URL是否存在于已经处理过的URL列表之中。
2. 垃圾邮件过滤
假设邮件服务器通过发送方的邮件域或者IP地址对垃圾邮件进行过滤,那么就需要判断当前的邮件域或者IP地址是否处于黑名单之中。如果邮件服务器的通信邮件数量非常大(也可以认为数据量级上亿),那么也可以使用Bloom Filter算法。
1、BloomFilter能解决什么问题?
以少量的内存空间判断一个元素是否属于这个集合, 代价是有一定的错误率
2、工作原理
1. 初始化一个数组, 所有位标为0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始为0)
2. 将已知集合S中的每一个数组, 按以下方式映射到A中
2.0 选取n个互相独立的hash函数 h1, h2, … hk
2.1 将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
2.2 将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
3. 对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S
3、几个前提
1. hash函数的计算不能性能太差, 否则得不偿失
2. 任意两个hash函数之间必须是独立的.
即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义
4、错误率
工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的。在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。关于具体的错误率,这和最优的哈希函数个数以及位数组的大小有关,而这是可以估算求得一个最优解的:
哈希函数个数k、位数组大小m及字符串数量n之间存在相互关系。相关文献证明了对于给定的m、n,当 k = ln(2)* m/n 时出错的概率是最小的。 具体的请看:http://blog.csdn.net/jiaomeng/article/details/1495500
5、基本特征
从以上对基本原理和数学基础的分析,我们可以得到Bloom filter的如下基本特征,用于指导实际应用。
(1)存在一定错误率,发生在正向判断上(存在性),反向判断不会发生错误(不存在性);
(2)错误率是可控制的,通过改变位数组大小、hash函数个数或更低碰撞率的hash函数来调节;
(3)保持较低的错误率,位数组空位至少保持在一半以上;
(4)给定m和n,可以确定最优hash个数,即k = ln2 * (m/n),此时错误率最小;
(5)给定允许的错误率E,可以确定合适的位数组大小,即m >= log2(e) * (n * log2(1/E)),继而确定hash函数个数k;
(6)正向错误率无法完全消除,即使不对位数组大小和hash函数个数进行限制,即无法实现零错误率;
(7)空间效率高,仅保存“存在状态”,但无法存储完整信息,需要其他数据结构辅助存储;
(8)不支持元素删除操作,因为不能保证删除的安全性。
6、应用场景举例:
(1)拼写检查、数据库系统、文件系统
(2)假设要你写一个网络蜘蛛(web crawler)。由于网络间的链接错综复杂,蜘蛛在网络间爬行很可能会形成“环”。为了避免形成“环”,就需要知道蜘蛛已经访问过那些URL。给一个URL,怎样知道蜘蛛是否已经访问过呢?
(3)网络应用
P2P网络中查找资源操作,可以对每条网络通路保存Bloom Filter,当命中时,则选择该通路访问。
广播消息时,可以检测某个IP是否已发包。
检测广播消息包的环路,将Bloom Filter保存在包里,每个节点将自己添加入Bloom Filter。
信息队列管理,使用Counter Bloom Filter管理信息流量。
(4)垃圾邮件地址过滤
像网易,QQ这样的公众电子邮件(email)提供商,总是需要过滤来自发送垃圾邮件的人(spamer)的垃圾邮件。一个办法就是记录下那些发垃圾邮件的email 地址。由于那些发送者不停地在注册新的地址,全世界少说也有几十亿个发垃圾邮件的地址,将他们都存起来则需要大量的网络服务器。如果用哈希表,每存储一亿个 email 地址,就需要1.6GB 的内存(用哈希表实现的具体办法是将每一个email 地址对应成一个八字节的信息指纹,然后将这些信息指纹存入哈希表,由于哈希表的存储效率一般只有50%,因此一个email 地址需要占用十六个字节。一亿个地址大约要1.6GB, 即十六亿字节的内存)。因此存贮几十亿个邮件地址可能需要上百GB 的内存。而Bloom Filter只需要哈希表1/8 到1/4 的大小就能解决同样的问题。Bloom Filter决不会漏掉任何一个在黑名单中的可疑地址。而至于误判问题,常见的补救办法是在建立一个小的白名单,存储那些可能别误判的邮件地址。
(5)Bloomfilter在HBase中的作用
HBase利用Bloomfilter来提高随机读(Get)的性能,对于顺序读(Scan)而言,设置Bloomfilter是没有作用的(0.92以后,如果设置了bloomfilter为ROWCOL,对于指定了qualifier的Scan有一定的优化,但不是那种直接过滤文件,排除在查找范围的形式)
Bloomfilter在HBase中的开销?
Bloomfilter是一个列族(cf)级别的配置属性,如果你在表中设置了Bloomfilter,那么HBase会在生成StoreFile时包含一份bloomfilter结构的数据,称其为MetaBlock;MetaBlock与DataBlock(真实的KeyValue数据)一起由LRUBlockCache维护。所以,开启bloomfilter会有一定的存储及内存cache开销。
Bloomfilter如何提高随机读(Get)的性能?
对于某个region的随机读,HBase会遍历读memstore及storefile(按照一定的顺序),将结果合并返回给客户端。如果你设置了bloomfilter,那么在遍历读storefile时,就可以利用bloomfilter,忽略某些storefile。
注意:hbase的bloom filter是惰性加载的,在写压力比较大的情况下,会有不停的compact并产生storefile,那么新的storefile是不会马上将bloom filter加载到内存的,等到读请求来的时候才加载。
这样问题就来了,第一,如果storefile设置的比较大,max size为2G,这会导致bloom filter也比较大;第二,系统的读写压力都比较大。这样或许会经常出现单个 GET请求花费3-5秒的超时现象。
使用布隆过滤器,之前,需要先对需要处理的key,生成处理的二进制文件,代码如下:
- package com.reducebloomfilterjoin;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.util.zip.GZIPInputStream;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.util.bloom.BloomFilter;
- import org.apache.hadoop.util.bloom.Key;
- import org.apache.hadoop.util.hash.Hash;
- /**
- * 布隆过滤器
- * 生成二进制文件数据
- *
- * **/
- public class TrainingBloomData {
- public static int getOptimalBloomFilterSize(int numRecords,
- float falsePosRate) {
- int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
- .pow(Math.log(2), 2));
- return size;
- }
- public static int getOptimalK(float numMembers, float vectorSize) {
- return (int) Math.round(vectorSize / numMembers * Math.log(2));
- }
- public static void main(String[] args) throws IOException {
- Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt");
- int numMembers = Integer.parseInt("10");
- float falsePosRate = Float.parseFloat("0.01");
- Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin");
- FileSystem fss=FileSystem.get(new Configuration());
- if(fss.exists(bfFile)){
- fss.delete(bfFile, true);
- System.out.println("存在此路径,已经删除!");
- }
- // Calculate our vector size and optimal K value based on approximations
- int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
- int nbHash = getOptimalK(numMembers, vectorSize);
- // create new Bloom filter
- BloomFilter filter = new BloomFilter(vectorSize, nbHash,
- Hash.MURMUR_HASH);
- // Open file for read
- System.out.println("Training Bloom filter of size " + vectorSize
- + " with " + nbHash + " hash functions, " + numMembers
- + " approximate number of records, and " + falsePosRate
- + " false positive rate");
- String line = null;
- int numRecords = 0;
- FileSystem fs = FileSystem.get(new Configuration());
- for (FileStatus status : fs.listStatus(inputFile)) {
- BufferedReader rdr;
- // if file is gzipped, wrap it in a GZIPInputStream
- if (status.getPath().getName().endsWith(".gz")) {
- rdr = new BufferedReader(new InputStreamReader(
- new GZIPInputStream(fs.open(status.getPath()))));
- } else {
- rdr = new BufferedReader(new InputStreamReader(fs.open(status
- .getPath())));
- }
- System.out.println("Reading " + status.getPath());
- while ((line = rdr.readLine()) != null) {
- filter.add(new Key(line.getBytes()));
- ++numRecords;
- }
- rdr.close();
- }
- System.out.println("Trained Bloom filter with " + numRecords
- + " entries.");
- System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
- FSDataOutputStream strm = fs.create(bfFile);
- filter.write(strm);
- strm.flush();
- strm.close();
- System.out.println("Done training Bloom filter.");
- }
- }
package com.reducebloomfilterjoin; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; /** * 布隆过滤器 * 生成二进制文件数据 * * **/ public class TrainingBloomData { public static int getOptimalBloomFilterSize(int numRecords, float falsePosRate) { int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math .pow(Math.log(2), 2)); return size; } public static int getOptimalK(float numMembers, float vectorSize) { return (int) Math.round(vectorSize / numMembers * Math.log(2)); } public static void main(String[] args) throws IOException { Path inputFile = new Path("hdfs://192.168.75.130:9000/root/bloom/kk.txt"); int numMembers = Integer.parseInt("10"); float falsePosRate = Float.parseFloat("0.01"); Path bfFile = new Path("hdfs://192.168.75.130:9000/root/bloom/bloom.bin"); FileSystem fss=FileSystem.get(new Configuration()); if(fss.exists(bfFile)){ fss.delete(bfFile, true); System.out.println("存在此路径,已经删除!"); } // Calculate our vector size and optimal K value based on approximations int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate); int nbHash = getOptimalK(numMembers, vectorSize); // create new Bloom filter BloomFilter filter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); // Open file for read System.out.println("Training Bloom filter of size " + vectorSize + " with " + nbHash + " hash functions, " + numMembers + " approximate number of records, and " + falsePosRate + " false positive rate"); String line = null; int numRecords = 0; FileSystem fs = FileSystem.get(new Configuration()); for (FileStatus status : fs.listStatus(inputFile)) { BufferedReader rdr; // if file is gzipped, wrap it in a GZIPInputStream if (status.getPath().getName().endsWith(".gz")) { rdr = new BufferedReader(new InputStreamReader( new GZIPInputStream(fs.open(status.getPath())))); } else { rdr = new BufferedReader(new InputStreamReader(fs.open(status .getPath()))); } System.out.println("Reading " + status.getPath()); while ((line = rdr.readLine()) != null) { filter.add(new Key(line.getBytes())); ++numRecords; } rdr.close(); } System.out.println("Trained Bloom filter with " + numRecords + " entries."); System.out.println("Serializing Bloom filter to HDFS at " + bfFile); FSDataOutputStream strm = fs.create(bfFile); filter.write(strm); strm.flush(); strm.close(); System.out.println("Done training Bloom filter."); } }
测试数据如下:
小表模拟数据
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
大表模拟数据
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10
代码如下:
- package com.reducebloomfilterjoin;
- import java.io.BufferedReader;
- import java.io.DataInput;
- import java.io.DataInputStream;
- import java.io.DataOutput;
- import java.io.FileInputStream;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.bloom.BloomFilter;
- import org.apache.hadoop.util.bloom.Key;
- /***
- *
- * Hadoop1.2的版本
- *
- *
- * ReduceJoin+布隆过滤器 实现Reduce Join
- *
- * @author qindongliang
- *
- * 大数据交流群:376932160
- * 搜索技术交流群:324714439
- *
- *
- *
- * **/
- public class ReduceBoolmFilter {
- /**
- *
- *
- * 自定义一个输出实体
- *
- * **/
- private static class CombineEntity implements WritableComparable<CombineEntity>{
- private Text joinKey;//连接key
- private Text flag;//文件来源标志
- private Text secondPart;//除了键外的其他部分的数据
- public CombineEntity() {
- // TODO Auto-generated constructor stub
- this.joinKey=new Text();
- this.flag=new Text();
- this.secondPart=new Text();
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public Text getFlag() {
- return flag;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public int compareTo(CombineEntity o) {
- // TODO Auto-generated method stub
- return this.joinKey.compareTo(o.joinKey);
- }
- }
- private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
- private CombineEntity combine=new CombineEntity();
- private Text flag=new Text();
- private Text joinKey=new Text();
- private Text secondPart=new Text();
- /**
- *使用布隆过滤器存储key
- * 代替原来的HashSet存储
- *
- * */
- //private HashSet<String> joinKeySet=new HashSet<String>();
- BloomFilter filter=new BloomFilter();
- @Override
- protected void setup(Context context)throws IOException, InterruptedException {
- //读取文件流
- BufferedReader br=null;
- String temp;
- // 获取DistributedCached里面 的共享文件
- Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
- for (Path path : paths) {
- if (path.toString().contains("bloom.bin")) {
- DataInputStream strm = new DataInputStream(
- new FileInputStream(path.toString()));
- // Read into our Bloom filter.
- filter.readFields(strm);
- strm.close();
- }
- }
- }
- @Override
- protected void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
- //获得文件输入路径
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- if(pathName.endsWith("a.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- * 在这里过滤必须要的连接字符
- *
- * */
- //if(joinKeySet.contains(valueItems[0])){
- if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
- //设置标志位
- flag.set("0");
- //设置链接键
- joinKey.set(valueItems[0]);
- //设置第二部分
- secondPart.set(valueItems[1]+"\t"+valueItems[2]);
- //封装实体
- combine.setFlag(flag);//标志位
- combine.setJoinKey(joinKey);//链接键
- combine.setSecondPart(secondPart);//其他部分
- //写出
- context.write(combine.getJoinKey(), combine);
- }else{
- System.out.println("a.txt里");
- System.out.println("在小表中无此记录,执行过滤掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }else if(pathName.endsWith("b.txt")){
- String valueItems[]=value.toString().split(",");
- /**
- *
- * 判断是否在集合中
- *
- * */
- if(filter.membershipTest(new Key(valueItems[0].getBytes()))){
- //if(joinKeySet.contains(valueItems[0])){
- //if(joinKeySet.contains(valueItems[0])){
- //设置标志位
- flag.set("1");
- //设置链接键
- joinKey.set(valueItems[0]);
- //设置第二部分注意不同的文件的列数不一样
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
- //封装实体
- combine.setFlag(flag);//标志位
- combine.setJoinKey(joinKey);//链接键
- combine.setSecondPart(secondPart);//其他部分
- //写出
- context.write(combine.getJoinKey(), combine);
- }else{
- //执行过滤 ......
- System.out.println("b.txt里");
- System.out.println("在小表中无此记录,执行过滤掉!");
- for(String v:valueItems){
- System.out.print(v+" ");
- }
- return ;
- }
- }
- }
- }
- private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
- //存储一个分组中左表信息
- private List<Text> leftTable=new ArrayList<Text>();
- //存储一个分组中右表信息
- private List<Text> rightTable=new ArrayList<Text>();
- private Text secondPart=null;
- private Text output=new Text();
- //一个分组调用一次
- @Override
- protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
- throws IOException, InterruptedException {
- leftTable.clear();//清空分组数据
- rightTable.clear();//清空分组数据
- /**
- * 将不同文件的数据,分别放在不同的集合
- * 中,注意数据量过大时,会出现
- * OOM的异常
- *
- * **/
- for(CombineEntity ce:values){
- this.secondPart=new Text(ce.getSecondPart().toString());
- //左表
- if(ce.getFlag().toString().trim().equals("0")){
- leftTable.add(secondPart);
- }else if(ce.getFlag().toString().trim().equals("1")){
- rightTable.add(secondPart);
- }
- }
- //=====================
- for(Text left:leftTable){
- for(Text right:rightTable){
- output.set(left+"\t"+right);//连接左右数据
- context.write(key, output);//输出
- }
- }
- }
- }
- public static void main(String[] args)throws Exception {
- //Job job=new Job(conf,"myjoin");
- JobConf conf=new JobConf(ReduceBoolmFilter.class);
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- conf.setJar("tt.jar");
- //小表共享
- String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin";
- //添加到共享cache里
- DistributedCache.addCacheFile(new URI(bpath), conf);
- Job job=new Job(conf, "aaaaa");
- job.setJarByClass(ReduceBoolmFilter.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- //设置Map和Reduce自定义类
- job.setMapperClass(JMapper.class);
- job.setReducerClass(JReduce.class);
- //设置Map端输出
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineEntity.class);
- //设置Reduce端的输出
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileSystem fs=FileSystem.get(conf);
- Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5");
- if(fs.exists(op)){
- fs.delete(op, true);
- System.out.println("存在此输出路径,已删除!!!");
- }
- FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
- FileOutputFormat.setOutputPath(job, op);
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
package com.reducebloomfilterjoin; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; /*** * * Hadoop1.2的版本 * * * ReduceJoin+布隆过滤器 实现Reduce Join * * @author qindongliang * * 大数据交流群:376932160 * 搜索技术交流群:324714439 * * * * **/ public class ReduceBoolmFilter { /** * * * 自定义一个输出实体 * * **/ private static class CombineEntity implements WritableComparable<CombineEntity>{ private Text joinKey;//连接key private Text flag;//文件来源标志 private Text secondPart;//除了键外的其他部分的数据 public CombineEntity() { // TODO Auto-generated constructor stub this.joinKey=new Text(); this.flag=new Text(); this.secondPart=new Text(); } public Text getJoinKey() { return joinKey; } public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public Text getFlag() { return flag; } public void setFlag(Text flag) { this.flag = flag; } public Text getSecondPart() { return secondPart; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public int compareTo(CombineEntity o) { // TODO Auto-generated method stub return this.joinKey.compareTo(o.joinKey); } } private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{ private CombineEntity combine=new CombineEntity(); private Text flag=new Text(); private Text joinKey=new Text(); private Text secondPart=new Text(); /** *使用布隆过滤器存储key * 代替原来的HashSet存储 * * */ //private HashSet<String> joinKeySet=new HashSet<String>(); BloomFilter filter=new BloomFilter(); @Override protected void setup(Context context)throws IOException, InterruptedException { //读取文件流 BufferedReader br=null; String temp; // 获取DistributedCached里面 的共享文件 Path paths[]=DistributedCache.getLocalCacheFiles(context.getConfiguration()); for (Path path : paths) { if (path.toString().contains("bloom.bin")) { DataInputStream strm = new DataInputStream( new FileInputStream(path.toString())); // Read into our Bloom filter. filter.readFields(strm); strm.close(); } } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if(pathName.endsWith("a.txt")){ String valueItems[]=value.toString().split(","); /** * 在这里过滤必须要的连接字符 * * */ //if(joinKeySet.contains(valueItems[0])){ if(filter.membershipTest(new Key(valueItems[0].getBytes()))){ //设置标志位 flag.set("0"); //设置链接键 joinKey.set(valueItems[0]); //设置第二部分 secondPart.set(valueItems[1]+"\t"+valueItems[2]); //封装实体 combine.setFlag(flag);//标志位 combine.setJoinKey(joinKey);//链接键 combine.setSecondPart(secondPart);//其他部分 //写出 context.write(combine.getJoinKey(), combine); }else{ System.out.println("a.txt里"); System.out.println("在小表中无此记录,执行过滤掉!"); for(String v:valueItems){ System.out.print(v+" "); } return ; } }else if(pathName.endsWith("b.txt")){ String valueItems[]=value.toString().split(","); /** * * 判断是否在集合中 * * */ if(filter.membershipTest(new Key(valueItems[0].getBytes()))){ //if(joinKeySet.contains(valueItems[0])){ //if(joinKeySet.contains(valueItems[0])){ //设置标志位 flag.set("1"); //设置链接键 joinKey.set(valueItems[0]); //设置第二部分注意不同的文件的列数不一样 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]); //封装实体 combine.setFlag(flag);//标志位 combine.setJoinKey(joinKey);//链接键 combine.setSecondPart(secondPart);//其他部分 //写出 context.write(combine.getJoinKey(), combine); }else{ //执行过滤 ...... System.out.println("b.txt里"); System.out.println("在小表中无此记录,执行过滤掉!"); for(String v:valueItems){ System.out.print(v+" "); } return ; } } } } private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{ //存储一个分组中左表信息 private List<Text> leftTable=new ArrayList<Text>(); //存储一个分组中右表信息 private List<Text> rightTable=new ArrayList<Text>(); private Text secondPart=null; private Text output=new Text(); //一个分组调用一次 @Override protected void reduce(Text key, Iterable<CombineEntity> values,Context context) throws IOException, InterruptedException { leftTable.clear();//清空分组数据 rightTable.clear();//清空分组数据 /** * 将不同文件的数据,分别放在不同的集合 * 中,注意数据量过大时,会出现 * OOM的异常 * * **/ for(CombineEntity ce:values){ this.secondPart=new Text(ce.getSecondPart().toString()); //左表 if(ce.getFlag().toString().trim().equals("0")){ leftTable.add(secondPart); }else if(ce.getFlag().toString().trim().equals("1")){ rightTable.add(secondPart); } } //===================== for(Text left:leftTable){ for(Text right:rightTable){ output.set(left+"\t"+right);//连接左右数据 context.write(key, output);//输出 } } } } public static void main(String[] args)throws Exception { //Job job=new Job(conf,"myjoin"); JobConf conf=new JobConf(ReduceBoolmFilter.class); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJar("tt.jar"); //小表共享 String bpath="hdfs://192.168.75.130:9000/root/bloom/bloom.bin"; //添加到共享cache里 DistributedCache.addCacheFile(new URI(bpath), conf); Job job=new Job(conf, "aaaaa"); job.setJarByClass(ReduceBoolmFilter.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; //设置Map和Reduce自定义类 job.setMapperClass(JMapper.class); job.setReducerClass(JReduce.class); //设置Map端输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineEntity.class); //设置Reduce端的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew5"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb")); FileOutputFormat.setOutputPath(job, op); System.exit(job.waitForCompletion(true)?0:1); } }
运行日志:
- 模式: 192.168.75.130:9001
- 存在此输出路径,已删除!!!
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
- WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 50% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003
- INFO - Counters.log(585) | Counters: 29
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | Launched reduce tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=12165
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=2
- INFO - Counters.log(589) | Data-local map tasks=2
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9890
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=172
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=237
- INFO - Counters.log(589) | HDFS_BYTES_READ=455
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=169734
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=227
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=243
- INFO - Counters.log(589) | Map input records=10
- INFO - Counters.log(589) | Reduce shuffle bytes=243
- INFO - Counters.log(589) | Spilled Records=16
- INFO - Counters.log(589) | Map output bytes=215
- INFO - Counters.log(589) | Total committed heap usage (bytes)=336338944
- INFO - Counters.log(589) | CPU time spent (ms)=1810
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=228
- INFO - Counters.log(589) | Reduce input records=8
- INFO - Counters.log(589) | Reduce input groups=4
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=444403712
- INFO - Counters.log(589) | Reduce output records=4
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2184323072
- INFO - Counters.log(589) | Map output records=8
模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404300232_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 50% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404300232_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=12165 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=2 INFO - Counters.log(589) | Data-local map tasks=2 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9890 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=237 INFO - Counters.log(589) | HDFS_BYTES_READ=455 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=169734 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=227 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=243 INFO - Counters.log(589) | Map input records=10 INFO - Counters.log(589) | Reduce shuffle bytes=243 INFO - Counters.log(589) | Spilled Records=16 INFO - Counters.log(589) | Map output bytes=215 INFO - Counters.log(589) | Total committed heap usage (bytes)=336338944 INFO - Counters.log(589) | CPU time spent (ms)=1810 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=228 INFO - Counters.log(589) | Reduce input records=8 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=444403712 INFO - Counters.log(589) | Reduce output records=4 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2184323072 INFO - Counters.log(589) | Map output records=8
连接的结果如下:
- 1 三劫散仙 13575468248 B 89 2013-02-05
- 2 凤舞九天 18965235874 C 69 2013-03-09
- 3 忙忙碌碌 15986854789 A 99 2013-03-05
- 3 忙忙碌碌 15986854789 D 56 2013-06-07
1 三劫散仙 13575468248 B 89 2013-02-05 2 凤舞九天 18965235874 C 69 2013-03-09 3 忙忙碌碌 15986854789 A 99 2013-03-05 3 忙忙碌碌 15986854789 D 56 2013-06-07
至此,测试正确,布隆过滤器实际上采用了空间换时间的策略,来提升访问性能。所以在海量数据去重时非常高效。
相关推荐
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
本文将详细介绍 AE 缓冲区代码的实现步骤,并结合 ArcEngine 连接表 join 进行 sâu 细化。 AE 缓冲区代码的实现步骤: 1. 首先,需要声明变量,包括点、Envelope、-map、ActiveView 等变量。 2. 然后,使用 New_...
今天,我们将详细介绍如何使用 SE16H 实现表连接查询,并提供一个实际的示例。 SE16H 介绍 SE16H 是 SAP 提供的一个数据浏览器工具,能够帮助用户快速查询和浏览 SAP 系统中的数据。它支持多种查询方式,包括简单...
在Oracle数据库中,为了获取来自多个表的数据,我们需要使用不同的连接方式来实现这一目的。本文将详细介绍三种主要的表连接方式:嵌套循环连接(Nested Loop Join,简称NL Join)、排序合并连接(Sort Merge Join,...
根据连接方式的不同,可以分为内连接(INNER JOIN)、外连接(LEFT JOIN、RIGHT JOIN 和 FULL JOIN)以及交叉连接(CROSS JOIN)。下面我们将详细探讨每种连接方式的特点、适用场景及其具体的实现方法。 #### 内...
为了实现这一目标,SQL 提供了一种强大的工具——JOIN,用于连接两个或多个表的数据。本文将详细介绍六种主要类型的 JOIN:左连接(LEFT JOIN)、右连接(RIGHT JOIN)、全连接(FULL JOIN)、内连接(INNER JOIN)...
本文将围绕"基于Solr的多表join查询加速方法"这一主题,深入探讨如何在Solr中实现类似join的功能,并优化查询性能。 在Solr中模拟join操作通常有两种策略:嵌入式数据模型和外部数据源查询(ExternalFileField或...
本文将深入探讨如何使用 Spark 实现用户订单数据表的连接操作,这在数据分析、业务洞察以及构建复杂的数据管道时尤其重要。 首先,我们需要理解数据表连接的基本概念。在数据库中,连接(JOIN)是将两个或更多表的...
标题与描述中的“from多张表等于left join其他表”主要涉及的是SQL查询语句中两种不同的连接方式:内连接(通常在FROM子句后直接使用WHERE子句实现)和左连接(LEFT JOIN)。这两种连接方式在处理多表查询时有着不同...
本文将对Oracle中的几种主要表连接方式进行详细介绍:内连接(Inner Join)、自然连接(Natural Join)、左外连接(Left Outer Join)、右外连接(Right Outer Join)、笛卡尔连接(Cartesian Join)、索引连接...
本文将详细介绍如何使用 `UPDATE` 语句结合 `JOIN` 来实现对多个表的更新操作。 #### 1. 基本概念与语法 在 SQL Server 中,`UPDATE` 语句用于修改已存在的记录。通常情况下,我们只需要更新一个表的数据。但在...
### 几种常用的表连接方式详解 在数据库领域,表连接是数据检索和管理的核心技术之一,用于将多个数据表中的信息结合在一起,形成更完整、更有意义的数据集。本文将深入探讨四种常用的表连接方式:嵌套循环连接、...
相等连接是最常见的连接类型,基于两个或更多表之间相同列的值相等来合并数据。例如,在`EMP`和`DEPT`表中,当`EMP.DEPTNO`等于`DEPT.DEPTNO`时,会返回员工和他们所在部门的信息。这可以通过简单的逗号分隔表名或...
实现 SQL 中 JOIN 联接多个表查询 在数据查询中,JOIN 联接是一种非常重要的操作,它可以将多个表中的数据结合起来,实现跨表查询。...我们可以使用 INNER JOIN、LEFT JOIN 和 RIGHT JOIN 等方式来实现多个表查询。
在数据库领域,等值连接(Equi-Join)是一种常见的操作,它用于将两个或多个表的数据基于一个或多个共享的列进行合并。在Java编程中,虽然原生的Java库并不直接支持数据库查询,但我们可以通过编写代码来模拟等值...
主要实现了流与维表的join”揭示了这个项目的核心内容,即在Flink开源框架上扩展实时SQL功能,特别是流处理与维表(通常用于存储静态参考数据)的连接操作。Flink作为一个强大的流处理和批处理框架,提供了一种高效...
总结来说,Oracle中的表连接提供了丰富的数据组合方式,使我们能够灵活地从多个表中提取所需信息。正确理解和运用这些连接类型是数据库查询和数据分析的基础。在实际应用中,应根据具体需求选择合适的连接类型,以...
内连接可以使用 INNER JOIN 或 JOIN 关键字来实现。例如: SQL> Select a.id,a.name,b.name from dave a inner join bl b on a.id=b.id; 内连接的优点是可以减少数据的重复性,但是它也存在一些缺陷,例如如果某个...
标题 "Linq to datable(Left join right join full join)实例" 涉及到的是在.NET框架中,使用LINQ(Language Integrated Query)查询语言处理DataTable对象时,执行不同类型的连接操作,包括左连接(Left Join)、右...
在连接查询中,还可以使用 NATURAL JOIN 关键字来实现自然连接。例如: ```sql SELECT * FROM authors AS a NATURAL JOIN publishers AS p; ``` 这条语句使用了自然连接将 authors 和 publishers 两个表连接起来,...