- 浏览: 2189257 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:
要求输出1:
要求输出2:
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
在eclipse下,执行,打印日志内容如下:
执行完,我们在输出目录里里面查看
执行完,内容如下:
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12
要求输出1:
a 45 b 12,567 三劫 1,32,899 秦东亮 34,72,100
要求输出2:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
package com.qin.groupsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /** * @author qindongliang * * 大数据交流群:376932160 * * * **/ public class GroupSort { /** * map任务 * * */ public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{ private DescSort tx=new DescSort(); private IntWritable second=new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("执行map"); // System.out.println("进map了"); //mos.write(namedOutput, key, value); String ss[]=value.toString().split(";"); String mkey=ss[0]; int mvalue=Integer.parseInt(ss[1]); tx.setFirstKey(mkey); tx.setSecondKey(mvalue); second.set(mvalue); context.write(tx, second); } } /*** * Reduce任务 * * **/ public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{ @Override protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx) throws IOException, InterruptedException { System.out.println("执行reduce"); StringBuffer sb=new StringBuffer(); for(IntWritable t:arg1){ // sb.append(t).append(","); //con ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString())); /**这种写法,是这种输出 *a 45 *b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100 */ } if(sb.length()>0){ sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号 } // 在循环里拼接,在循环外输出是这种格式 // b 12,567 // 三劫 1,32,899 // 秦东亮 34,72,100 // ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString())); } } /*** * * 自定义组合键 * **/ public static class DescSort implements WritableComparable{ public DescSort() { // TODO Auto-generated constructor stub } private String firstKey; private int secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } // @Override // public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, // int arg4, int arg5) { // return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 // } // // @Override // public int compare(Object a, Object b) { // // return -super.compare(a, b);//注意使用负号来完成降序 // } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub DescSort d=(DescSort)o; //this在前代表升序 return this.getFirstKey().compareTo(d.getFirstKey()); } } /** * 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 * * **/ public static class TextComparator extends WritableComparator{ public TextComparator() { // TODO Auto-generated constructor stub super(DescSort.class,true);//注册Comparator } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("执行TextComparator分组排序"); DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; return d1.getFirstKey().compareTo(d2.getFirstKey()); } } /** * 组内排序的策略 * 按照第二个字段排序 * * */ public static class TextIntCompartator extends WritableComparator{ public TextIntCompartator() { super(DescSort.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; System.out.println("执行组内排序TextIntCompartator"); if(!d1.getFirstKey().equals(d2.getFirstKey())){ return d1.getFirstKey().compareTo(d2.getFirstKey()); }else{ return d1.getSecondKey()-d2.getSecondKey();//0,-1,1 } } } /** * 分区策略 * * */ public static class KeyPartition extends Partitioner<DescSort, IntWritable>{ @Override public int getPartition(DescSort key, IntWritable arg1, int arg2) { // TODO Auto-generated method stub System.out.println("执行自定义分区KeyPartition"); return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2; } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(GroupSort.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); // job.setNumReduceTasks(3);//设置为3 job.setMapperClass(GMapper.class); job.setReducerClass(GReduce.class); /**设置分区函数*/ job.setPartitionerClass(KeyPartition.class); //分组函数,Reduce前的一次排序 job.setGroupingComparatorClass(TextComparator.class); //组内排序Map输出完毕后,对key进行的一次排序 job.setSortComparatorClass(TextIntCompartator.class); //TextComparator.class //TextIntCompartator.class // job.setGroupingComparatorClass(TextIntCompartator.class); //组内排序Map输出完毕后,对key进行的一次排序 // job.setSortComparatorClass(TextComparator.class); job.setMapOutputKeyClass(DescSort.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在eclipse下,执行,打印日志内容如下:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=86 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=162 INFO - Counters.log(589) | HDFS_BYTES_READ=205 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=93 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=162 INFO - Counters.log(589) | Map input records=9 INFO - Counters.log(589) | Reduce shuffle bytes=162 INFO - Counters.log(589) | Spilled Records=18 INFO - Counters.log(589) | Map output bytes=138 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=970 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=9 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336 INFO - Counters.log(589) | Reduce output records=9 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488 INFO - Counters.log(589) | Map output records=9
执行完,我们在输出目录里里面查看
执行完,内容如下:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1213Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1236最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1284前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2149Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1997上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2140存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2798上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4725安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4458前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2528数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4924前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11652在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2654先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4193Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2538目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3844这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 26091,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3185关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2865有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1595先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
在Hadoop MapReduce的早期版本(0.20.0之前),二次排序通常通过设置`setPartitionerClass`、`setOutputKeyComparatorClass`和`setOutputValueGroupingComparator`来实现。而在0.20.0及之后的版本,这些设置被替换为...
在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...
在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储和计算的能力。本示例中的"had
**一、Hadoop二次排序** 在Hadoop MapReduce的默认过程中,键值对在Map阶段是局部排序的,然后在Reduce阶段进行全局排序。然而,这种排序可能不满足某些特定的应用场景,如按地理位置排序或者按时间戳排序等。这时,...
这时,我们需要利用二次排序来实现。二次排序通常包括两个部分:Combiner和Partitioner的定制以及Comparator的使用。 1. **Combiner定制**:Combiner是可选的,它可以在Map阶段对局部数据进行预处理,减少网络传输...
5. 实际案例展示如何在Hadoop中实现二次排序 6. 图形化解释帮助理解数据处理流程 通过对这些内容的学习,读者可以深入理解大数据排序的复杂性和Hadoop在其中的角色,进一步提升大数据处理的能力。
在实际应用中,二次排序的具体实现可以通过自定义Key类和比较器来完成。例如,通过实现WritableComparable接口,用户可以定义自己的Key类,并在其中实现自定义的比较逻辑。在自定义的Key类中,需要覆写compareTo方法...
二次排序在Hadoop中主要涉及Map、Reduce阶段以及自定义类的实现。 1. 二次排序原理: - Map阶段:Map任务接收输入数据,将其切分为多个Split,由InputFormat的RecordReader将数据读取并转化为键值对(通常是, Text...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户...
- 二次排序:在compareTo方法中设置多个判断条件,如在bean对象排序中,可以实现更复杂的排序需求。 4. 自定义排序与WritableComparable接口: 当需要对自定义对象(如本案例中的FlowBean)进行排序时,需要实现...
此外,如果一个job配置了`groupingComparator`,则在相同的key之间,还可以按照value进行二次排序。 2. **分区间的排序(Between-partition sorting)**:在所有key内部排序完成后,shuffle阶段会按照Partitioner的...
【Hadoop二次开发必懂】深入理解MapReduce 在Hadoop的世界中,MapReduce是一种分布式计算模型,用于处理和生成大规模数据集。它基于“分而治之”的理念,将大任务拆解为一系列可并行执行的小任务,极大地提高了处理...
#### 二、Hadoop的组成 Hadoop主要由以下几个部分组成: 1. **Hadoop Common**:这是Hadoop的核心部分,包括文件系统(HDFS)和远程过程调用(RPC)等基础服务。 2. **HDFS (Hadoop Distributed File System)**:...
Hadoop的设计目标是在低成本的硬件集群上实现高可靠性和高性能的大规模数据处理。 **Hadoop的主要优点:** - **扩容能力强**:能够轻松扩展到数千台服务器。 - **成本低廉**:利用普通商用服务器构建大规模集群。 -...
然而,Secondary Sort允许我们在每个分区内部进行二次排序,通常是基于值或其他属性。在这个例子中,文档展示了如何通过自定义的`SecondarySortPartitioner`类实现这一功能。这个类继承了`Partitioner`接口,并重写...