- 浏览: 2193348 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
散仙,在有关Hadoop的上篇博客里,给出了基于Reduce侧的表连接,今天,散仙,就再来看下如何在Map侧高效完成的join,因为在reduce侧进行join在shuffle阶段会消耗大量的时间,如果在Map端进行Join,那么就会节省大量的资源,当然,这也是有具体的应用场景的。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
模拟的测试数据如下:
小表: HDFS路径:hdfs://192.168.75.130:9000/root/dist/a.txt
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
大表:HDFS路径:hdfs://192.168.75.130:9000/root/inputjoindb/b.txt
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
使用Hadoop1.2的版本进行实现,源码如下:
运行日志:
结果如下:
可以看出,结果是正确的,这种方式,非常高效,但通常,只适应于两个表里面,一个表非常大,而另外一张表,则非常小,究竟什么样的算小,基本上当你的内存能够,很轻松的装下,并不会对主程序造成很大影响的时候,我们就可以在Map端通过利用DistributeCached复制链接技术进行Join了。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
模拟的测试数据如下:
小表: HDFS路径:hdfs://192.168.75.130:9000/root/dist/a.txt
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
大表:HDFS路径:hdfs://192.168.75.130:9000/root/inputjoindb/b.txt
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
使用Hadoop1.2的版本进行实现,源码如下:
package com.mapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * 基于Map侧的复制链接 *Hadoop技术交流群: 37693216 * @author qindongliang ***/ public class MapJoin { /*** * 在Map侧setup方法里,取出缓存文件 * 放入HashMap中进行join * * * **/ public static class MMppe extends Mapper<Object, Text, Text, Text>{ /** * 此map是存放小表数据用的 * 注意小表的key是不重复的 * 类似与数据库的外键表 * 在这里的小表,就相当于一个外键表 * * * **/ private HashMap<String,String> map=new HashMap<String, String>(); /** * 输出的Key * */ private Text outputKey=new Text(); /** * 输出的Value * * */ private Text outputValue=new Text(); //存放map的一行数据 String mapInputStr=null; //存放主表的整个列值 String mapInputStrs[] =null; //存放外键表(小表)的,除了链接键之外的整个其他列的拼接字符串 String mapSecondPart=null; /** * Map的初始化方法 * * 主要任务是将小表存入到一个Hash中 * 格式,k=外键 === v=其他列拼接的字符串 * * * **/ @Override protected void setup(Context context)throws IOException, InterruptedException { //读取文件流 BufferedReader br=null; String temp; // 获取DistributedCached里面 的共享文件 Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration()); for(Path p:path){ if(p.getName().endsWith("a.txt")){ br=new BufferedReader(new FileReader(p.toString())); //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8")); while((temp=br.readLine())!=null){ String ss[]=temp.split(","); map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中 } } } //System.out.println("map完:"+map); } /** * * 在map里,直接读取数据,从另一个表的map里 * 获取key进行join就可以了 * * * ***/ @Override protected void map(Object key, Text value,Context context)throws IOException, InterruptedException { //空值跳过 if(value==null||value.toString().equals("")){ return; } this.mapInputStr=value.toString();//读取输入的值 this.mapInputStrs=this.mapInputStr.split(",");//拆分成数组 this.mapSecondPart=map.get(mapInputStrs[0]);//获取外键表的部分 //如果存在此key if(this.mapSecondPart!=null){ this.outputKey.set(mapInputStrs[0]);//输出的key //输出的value是拼接的两个表的数据 this.outputValue.set(this.mapSecondPart+"\t"+mapInputStrs[1]+"\t"+mapInputStrs[2]+"\t"+mapInputStrs[3]); //写入磁盘 context.write(this.outputKey, this.outputValue); } } //驱动类 public static void main(String[] args)throws Exception { JobConf conf=new JobConf(MMppe.class); //小表共享 String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt"; //添加到共享cache里 DistributedCache.addCacheFile(new URI(bpath), conf); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJar("tt.jar"); Job job=new Job(conf, "2222222"); job.setJarByClass(MapJoin.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; //设置Map和Reduce自定义类 job.setMapperClass(MMppe.class); job.setNumReduceTasks(0); //设置Map端输出 // job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reduce端的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew3/"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb/b.txt")); FileOutputFormat.setOutputPath(job, op); System.exit(job.waitForCompletion(true)?0:1); } } }
运行日志:
模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404250130_0011 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404250130_0011 INFO - Counters.log(585) | Counters: 19 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9878 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=0 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | HDFS_BYTES_READ=188 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=55746 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=74 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map input records=4 INFO - Counters.log(589) | Physical memory (bytes) snapshot=78663680 INFO - Counters.log(589) | Spilled Records=0 INFO - Counters.log(589) | CPU time spent (ms)=230 INFO - Counters.log(589) | Total committed heap usage (bytes)=15728640 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=725975040 INFO - Counters.log(589) | Map output records=4 INFO - Counters.log(589) | SPLIT_RAW_BYTES=114
结果如下:
3 忙忙碌碌 15986854789 A 99 2013-03-05 1 三劫散仙 13575468248 B 89 2013-02-05 2 凤舞九天 18965235874 C 69 2013-03-09 3 忙忙碌碌 15986854789 D 56 2013-06-07
可以看出,结果是正确的,这种方式,非常高效,但通常,只适应于两个表里面,一个表非常大,而另外一张表,则非常小,究竟什么样的算小,基本上当你的内存能够,很轻松的装下,并不会对主程序造成很大影响的时候,我们就可以在Map端通过利用DistributeCached复制链接技术进行Join了。
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1232Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1237最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1297前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2152Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 2002上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2144存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2803上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4741安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4488前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2549数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4929前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11668在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2660先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4196Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2548目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3854这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 26121,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3190关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2880有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1607先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
### Hadoop Map-Reduce 教程详析 #### 目标与作用 Hadoop Map-Reduce框架是设计用于处理大规模数据集(多太字节级)的软件框架,它允许在大量廉价硬件集群上(可达数千节点)进行并行处理,确保了数据处理的可靠性...
### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...
本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...
本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...
在大数据处理领域,Hadoop是不可或缺的核心框架,其核心组件MapReduce则是分布式计算的重要实现方式。MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce...
### Hadoop MapReduce 教程知识点详解 #### 一、Hadoop MapReduce 概述 - **定义**:Hadoop MapReduce 是一个基于 Java 的分布式数据处理框架,它能够高效地处理大规模数据集。该框架将任务分解为一系列较小的任务...
每个案例都详细列出了实践步骤,包括如何编写 Map 和 Reduce 函数、如何配置 Hadoop 环境、如何运行 MapReduce 任务等。 #### 六、总结 Hadoop MapReduce 是一种非常强大的分布式数据处理工具,它通过简单的编程...
Hadoop Map Reduce 教程.doc
通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...
标题“hadoop map reduce hbase 一人一档”揭示了这个系统的核心组成部分。Hadoop MapReduce是一种分布式计算框架,用于处理和存储大规模数据集。它通过将复杂任务分解为可并行处理的“映射”和“化简”阶段,使得在...
### Hadoop Join Implementation:关键技术与优化策略 #### 摘要 在大数据处理领域,Hadoop作为主流的大规模数据处理框架之一,其MapReduce模型在并行数据处理方面展现出巨大优势。然而,对于数据间的连接操作(即...
hadoop-0.21.0-datajoin.jar
在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...
Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...
【标题】:“最高气温 map reduce hadoop 实例” 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它专为分布式存储和处理大量数据而设计。本实例将介绍如何使用Hadoop MapReduce解决一个实际问题——找出给定...
【标题】"基于Hadoop的好友推荐系统"揭示了如何利用大数据处理框架Hadoop来构建一个高效、可扩展的社交网络中的好友推荐功能。在现代的社交媒体平台中,好友推荐是提升用户粘性和互动性的重要手段,通过分析用户的...
使用Hadoop Map Reduce分析股票市场 如何运行程序? 首先在您的系统中安装Hadoop。 请按照以下步骤进行安装 然后开始执行给定的命令 cd hadoop-3.2.2 / sbin ./start-dfs.sh ./start-yarn.sh jps 导出HADOOP_...
java运行依赖jar包