- 浏览: 2183182 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
Hadoop里面的MapReduce编程模型,非常灵活,大部分环节我们都可以重写它的API,来灵活定制我们自己的一些特殊需求。
今天散仙要说的这个分区函数Partitioner,也是一样如此,下面我们先来看下Partitioner的作用:
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。
Hadoop默认使用的分区函数是Hash Partitioner,源码如下:
大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。
这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。核心代码如下:
全部代码如下:
运行情况如下:
运行后的结果文件如下:
其中,part-r-000000里面的数据
其中,part-r-000001里面的数据
其中,part-r-000002里面的数据
至此,我们使用自定义的分区策略完美的实现了,数据分区了。
总结:引用一段话
(Partition)分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了
基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。
这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写Partitioner类中的compare函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。
今天散仙要说的这个分区函数Partitioner,也是一样如此,下面我们先来看下Partitioner的作用:
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。
Hadoop默认使用的分区函数是Hash Partitioner,源码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.mapreduce.Partitioner; /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。
河南省;1 河南;2 中国;3 中国人;4 大;1 小;3 中;11
这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。核心代码如下:
/** * Partitioner * * * */ public static class PPartition extends Partitioner<Text, Text>{ @Override public int getPartition(Text arg0, Text arg1, int arg2) { /** * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 * * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 * 有几个分区,就设置为几 * */ String key=arg0.toString(); if(key.length()==1){ return 1%arg2; }else if(key.length()==2){ return 2%arg2; }else if(key.length()==3){ return 3%arg2; } return 0; } }
全部代码如下:
package com.partition.test; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /** * @author qindongliang * * 大数据交流群:376932160 * * * **/ public class MyTestPartition { /** * map任务 * * */ public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { // System.out.println("进map了"); //mos.write(namedOutput, key, value); String ss[]=value.toString().split(";"); context.write(new Text(ss[0]), new Text(ss[1])); } } /** * Partitioner * * * */ public static class PPartition extends Partitioner<Text, Text>{ @Override public int getPartition(Text arg0, Text arg1, int arg2) { /** * 自定义分区,实现长度不同的字符串,分到不同的reduce里面 * * 现在只有3个长度的字符串,所以可以把reduce的个数设置为3 * 有几个分区,就设置为几 * */ String key=arg0.toString(); if(key.length()==1){ return 1%arg2; }else if(key.length()==2){ return 2%arg2; }else if(key.length()==3){ return 3%arg2; } return 0; } } /*** * Reduce任务 * * **/ public static class PReduce extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { String key=arg0.toString().split(",")[0]; System.out.println("key==> "+key); for(Text t:arg1){ //System.out.println("Reduce: "+arg0.toString()+" "+t.toString()); arg2.write(arg0, t); } } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(MyTestPartition.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); job.setPartitionerClass(PPartition.class); job.setNumReduceTasks(3);//设置为3 job.setMapperClass(PMapper.class); // MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class); // MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class); job.setReducerClass(PReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行情况如下:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0005 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 11% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 22% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 55% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0005 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=3 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7422 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=30036 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=61 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=93 INFO - Counters.log(589) | HDFS_BYTES_READ=179 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=218396 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=61 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=68 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=93 INFO - Counters.log(589) | Map input records=7 INFO - Counters.log(589) | Reduce shuffle bytes=93 INFO - Counters.log(589) | Spilled Records=14 INFO - Counters.log(589) | Map output bytes=61 INFO - Counters.log(589) | Total committed heap usage (bytes)=207491072 INFO - Counters.log(589) | CPU time spent (ms)=2650 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=111 INFO - Counters.log(589) | Reduce input records=7 INFO - Counters.log(589) | Reduce input groups=7 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=422174720 INFO - Counters.log(589) | Reduce output records=7 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2935713792 INFO - Counters.log(589) | Map output records=7
运行后的结果文件如下:
其中,part-r-000000里面的数据
中国人 4 河南省 1
其中,part-r-000001里面的数据
中 11 大 1 小 3
其中,part-r-000002里面的数据
中国 3 河南 2
至此,我们使用自定义的分区策略完美的实现了,数据分区了。
总结:引用一段话
(Partition)分区出现的必要性,如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区,但是该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构的优势。事实上我们可以这样做,首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。主要的思路是使用一个partitioner来描述全局排序的输出。比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了
基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。
这样,我们就可以对利用分布式文件系统进行大数据量的排序了,我们也可以重写Partitioner类中的compare函数,来定义比较的规则,从而可以实现字符串或其他非数字类型的排序,也可以实现二次排序乃至多次排序。
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1173Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1228最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1267前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2144Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1991上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2129存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2791上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4702安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4448前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2523数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4919前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11624在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2645先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4186Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2522目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3831这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 26031,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3177关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2848有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1564先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
Hadoop 自定义 Partitioner 实现
Hadoop 自定义 Partitioner 实现
在分布式计算领域,Hadoop 是一个广泛使用的开源框架,它为大数据处理提供了高效、可靠的解决方案。其中,Partitioner 是 Hadoop MapReduce 框架中的关键组件,它负责决定 map 函数产生的中间键值对(key-value ...
在Hadoop MapReduce框架中,Partitioner、SortComparator和GroupingComparator是三个至关重要的组件,它们共同决定了数据如何被分发、排序以及分组,从而影响到整个MapReduce作业的性能和结果的正确性。接下来,我们...
这个名为"Partitioner.zip"的压缩包很可能包含了关于Hadoop MapReduce Partitioner的相关资料或实现代码。 Partitioner在Hadoop MapReduce的工作流程中主要负责将Map阶段产生的中间键值对分发到各个Reduce任务中去...
### Hadoop命令使用手册中文版知识点详解 #### 一、Hadoop概述 Hadoop是一款开源软件框架,主要用于处理大规模数据集(通常在集群环境中)。它能够高效地存储和处理非常大的数据集,使得用户能够在相对较低成本的...
5. **Chap 6 - 数据分析与挖掘**:此章节可能深入到使用Hadoop进行数据分析和挖掘,包括使用Mahout库进行机器学习,或者使用Spark进行实时流处理。 6. **Chap 7 - 高级MapReduce技术**:涵盖如使用Secondary Sort...
4. **MapReduce编程**:理解Map函数和Reduce函数的实现,以及Combiner、Partitioner和Reducer的角色。编写Java MapReduce程序,并学习如何优化性能,如使用CombineReducer和本地化数据处理。 5. **HDFS操作**:文件...
4. **Hadoop API**:学习使用Hadoop API进行数据读写和处理,例如FileSystem API用于文件操作,InputFormat和OutputFormat定义输入输出格式,Mapper和Reducer实现数据处理逻辑。 5. **MapReduce编程**:理解...
【Hadoop使用】知识点详解 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在廉价硬件上处理大规模数据。Hadoop的核心组件包括Hadoop Distributed File System (HDFS)和MapReduce。以下是对Hadoop...
3. **MapReduce API**:掌握如何编写Mapper和Reducer类,理解InputFormat、OutputFormat以及Partitioner的工作原理,以及JobConf配置的使用。 4. **Hadoop配置**:了解如何通过XML配置文件设置Hadoop集群参数,例如...
7. **扩展性与插件开发**:学习如何为Hadoop开发自定义InputFormat、OutputFormat、Partitioner、Combiner等组件。 8. **实战项目**:结合实际案例,运用所学知识解决大数据处理问题,如日志分析、推荐系统等。 ...
同时,MapReduce的编程模型,如Mapper、Reducer、Partitioner、Combiner等,都在这个版本的源码中体现。 3. **YARN**:YARN是Hadoop 2.x引入的重要变化,它将资源管理和任务调度从MapReduce中分离出来,使得Hadoop...
本主题聚焦于如何使用Hadoop实现大矩阵乘法,这是一个在计算机科学和数据分析中常见的运算,特别是在机器学习和数值计算中。在Hadoop上实现大矩阵乘法,可以充分利用其并行计算的优势,提高计算效率。 大矩阵乘法的...
2. **HDFS操作**:源码可能包含使用HDFS API进行文件上传、下载、删除和检查等操作的示例,这些操作对于理解和使用Hadoop至关重要。 3. **MapReduce编程**:书中的例子会展示如何编写Map和Reduce函数,处理各种类型...
$ hadoop jar hadoop-streaming.jar -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -mapper mymapper -reducer myreducer -input input -output output ``` 这里 `org.apache.hadoop.mapred...
3. **hadoop-mapreduce-client**: 提供了MapReduce编程接口,包括JobClient、Mapper、Reducer和Partitioner等类。 4. **hadoop-yarn**: YARN(Yet Another Resource Negotiator)是Hadoop的资源管理系统,负责集群...
10. Partitioner:Partitioner的作用是将key均匀分布到不同的ReduceTask上,以优化并行计算。 11. Zookeeper角色:Zookeeper集群包含Leader、Follower和Observer三种角色,用于分布式协调。 12. Znode类型:...
9. **Hadoop配置**:在使用Hadoop Java API时,通常需要通过`Configuration`对象设置参数,如HDFS的地址、MapReduce的配置信息等。 10. **Hadoop客户端**:在Hadoop 2.x中,客户端库包含了一组API,用于与Hadoop...
同时,源码包也方便了开发者进行扩展和优化,例如自定义InputFormat、OutputFormat、Partitioner、Reducer等,以适应特定的业务需求。 此外,由于这个源码包是基于Maven结构生成的,所以它应该包含了所有依赖项的...