- 浏览: 2189116 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。
举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的。
下面来看下的散仙今天的测试例子,先看下我们的数据,以及需求。
数据如下:
需求是:
散仙的hadoop版本是1.2的,在1.2的版本里,hadoop支持新的API,但是链式的ChainMapper类和ChainReduce类却不支持新 的,新的在hadoop2.x里面可以使用,差别不大,散仙今天给出的是旧的API的,需要注意一下。
代码如下:
运行日志如下:
产生的数据如下:
总结,测试过程中,发现如果Reduce后面,还有Mapper执行,那么注意一定要,在ChainReducer里面先set一个全局唯一的Reducer,然后再add一个Mapper,否则,在运行的时候,会报空指针异常,这一点需要特别注意!
举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的。
下面来看下的散仙今天的测试例子,先看下我们的数据,以及需求。
数据如下:
手机 5000 电脑 2000 衣服 300 鞋子 1200 裙子 434 手套 12 图书 12510 小商品 5 小商品 3 订餐 2
需求是:
/** * 需求: * 在第一个Mapper里面过滤大于10000万的数据 * 第二个Mapper里面过滤掉大于100-10000的数据 * Reduce里面进行分类汇总并输出 * Reduce后的Mapper里过滤掉商品名长度大于3的数据 */
预计处理完的结果是: 手套 12 订餐 2
散仙的hadoop版本是1.2的,在1.2的版本里,hadoop支持新的API,但是链式的ChainMapper类和ChainReduce类却不支持新 的,新的在hadoop2.x里面可以使用,差别不大,散仙今天给出的是旧的API的,需要注意一下。
代码如下:
package com.qin.test.hadoop.chain; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.reducejoin.NewReduceJoin2; /** * * 测试Hadoop里面的 * ChainMapper和ReduceMapper的使用 * * @author qindongliang * @date 2014年5月7日 * * 大数据交流群: 376932160 * * * * * ***/ public class HaoopChain { /** * 需求: * 在第一个Mapper里面过滤大于10000万的数据 * 第二个Mapper里面过滤掉大于100-10000的数据 * Reduce里面进行分类汇总并输出 * Reduce后的Mapper里过滤掉商品名长度大于3的数据 */ /** * * 过滤掉大于10000万的数据 * * */ private static class AMapper01 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{ @Override public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String text=value.toString(); String texts[]=text.split(" "); System.out.println("AMapper01里面的数据: "+text); if(texts[1]!=null&&texts[1].length()>0){ int count=Integer.parseInt(texts[1]); if(count>10000){ System.out.println("AMapper01过滤掉大于10000数据: "+value.toString()); return; }else{ output.collect(new Text(texts[0]), new Text(texts[1])); } } } } /** * * 过滤掉大于100-10000的数据 * * */ private static class AMapper02 extends MapReduceBase implements Mapper<Text, Text, Text, Text>{ @Override public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int count=Integer.parseInt(value.toString()); if(count>=100&&count<=10000){ System.out.println("AMapper02过滤掉的小于10000大于100的数据: "+key+" "+value); return; } else{ output.collect(key, value); } } } /** * Reuduce里面对同种商品的 * 数量相加数据即可 * * **/ private static class AReducer03 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int sum=0; System.out.println("进到Reduce里了"); while(values.hasNext()){ Text t=values.next(); sum+=Integer.parseInt(t.toString()); } //旧API的集合,不支持foreach迭代 // for(Text t:values){ // sum+=Integer.parseInt(t.toString()); // } output.collect(key, new Text(sum+"")); } } /*** * * Reduce之后的Mapper过滤 * 过滤掉长度大于3的商品名 * * **/ private static class AMapper04 extends MapReduceBase implements Mapper<Text, Text, Text, Text>{ @Override public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int len=key.toString().trim().length(); if(len>=3){ System.out.println("Reduce后的Mapper过滤掉长度大于3的商品名: "+ key.toString()+" "+value.toString()); return ; }else{ output.collect(key, value); } } } /*** * 驱动主类 * **/ public static void main(String[] args) throws Exception{ //Job job=new Job(conf,"myjoin"); JobConf conf=new JobConf(HaoopChain.class); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJobName("t7"); conf.setJar("tt.jar"); conf.setJarByClass(HaoopChain.class); // Job job=new Job(conf, "2222222"); // job.setJarByClass(HaoopChain.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(Text.class); //Map1的过滤 JobConf mapA01=new JobConf(false); ChainMapper.addMapper(conf, AMapper01.class, LongWritable.class, Text.class, Text.class, Text.class, false, mapA01); //Map2的过滤 JobConf mapA02=new JobConf(false); ChainMapper.addMapper(conf, AMapper02.class, Text.class, Text.class, Text.class, Text.class, false, mapA02); //设置Reduce JobConf recduceFinallyConf=new JobConf(false); ChainReducer.setReducer(conf, AReducer03.class, Text.class, Text.class, Text.class, Text.class, false, recduceFinallyConf); //Reduce过后的Mapper过滤 JobConf reduceA01=new JobConf(false); ChainReducer.addMapper(conf, AMapper04.class, Text.class, Text.class, Text.class, Text.class, true, reduceA01); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); // Path op=new Path("hdfs://192.168.75.130:9000/root/outputchain"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } // // org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.75.130:9000/root/inputchain")); org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(conf, op); // //System.exit(conf.waitForCompletion(true)?0:1); JobClient.runJob(conf); } }
运行日志如下:
模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - FileInputFormat.listStatus(199) | Total input paths to process : 1 INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201405072054_0009 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 50% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201405072054_0009 INFO - Counters.log(585) | Counters: 30 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=11357 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=2 INFO - Counters.log(589) | Data-local map tasks=2 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9972 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=183 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=19 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=57 INFO - Counters.log(589) | HDFS_BYTES_READ=391 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=174859 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=19 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=63 INFO - Counters.log(589) | Map input records=10 INFO - Counters.log(589) | Reduce shuffle bytes=63 INFO - Counters.log(589) | Spilled Records=8 INFO - Counters.log(589) | Map output bytes=43 INFO - Counters.log(589) | Total committed heap usage (bytes)=336338944 INFO - Counters.log(589) | CPU time spent (ms)=1940 INFO - Counters.log(589) | Map input bytes=122 INFO - Counters.log(589) | SPLIT_RAW_BYTES=208 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | Reduce input records=4 INFO - Counters.log(589) | Reduce input groups=3 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=460980224 INFO - Counters.log(589) | Reduce output records=2 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2184105984 INFO - Counters.log(589) | Map output records=4
产生的数据如下:
总结,测试过程中,发现如果Reduce后面,还有Mapper执行,那么注意一定要,在ChainReducer里面先set一个全局唯一的Reducer,然后再add一个Mapper,否则,在运行的时候,会报空指针异常,这一点需要特别注意!
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1211Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1235最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1284前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2148Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1997上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2140存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2798上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4725安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4458前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2528数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4924前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11652在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2654先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4193Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2538目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3844这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 26091,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3185关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2865有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1595先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...
使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...
**使用Hadoop实现WordCount实验报告** 实验报告的目的是详细记录使用Hadoop在Windows环境下实现WordCount应用的过程,包括环境配置、WordCount程序的实现以及实验结果分析。本实验旨在理解Hadoop分布式计算的基本...
Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程Hadoop安装使用教程...
使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop进行数据分析需要注意哪些事项?重点做好哪些问题?.zip 使用hadoop...
资源名称:使用Hadoop构建云计算平台内容简介:• 核心框架: HDFS和MapReduce• MapReduce — 任务的分解与结果的汇总• HDFS — Hadoop Distributed File System• — 分布式计算的基石Hadoop是一个Apache的开源...
### Hadoop in Action:...书中在第104至107页提到了ChainMapper和ChainReducer的用法。 通过深入理解这些技术和概念,读者能够更好地掌握Hadoop及其生态系统的强大功能,从而在大数据处理和分析领域取得更佳成果。
在Hadoop生态系统中,`hadoop.dll`和`winutils.exe`是两个关键组件,尤其对于Windows用户来说,它们在本地开发和运行Hadoop相关应用时必不...了解并正确使用这两个文件,对于在Windows上搭建和管理Hadoop环境至关重要。
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是Hadoop发展中的一个重要版本,它包含了众多的优化和改进,旨在提高性能、稳定性和易用性。在这个版本中,`hadoop.dll`...
- **ChainMapper和ChainReducer**:在一个MapReduce作业中串联多个Map和Reduce任务。 - **Counters和Progress监控**:监控作业进度和性能指标。 #### Hadoop编程实践 除了基础的MapReduce编程外,Hadoop生态系统还...
在Hadoop生态系统中,调试工具对于开发者和管理员来说至关重要,特别是在Windows环境中。本文将深入探讨三个关键组件:hadoop....理解和掌握这些工具的使用,将有助于你在Windows环境下更有效地开发和管理Hadoop集群。
使用Hadoop.dll和winutils.exe,用户需要正确设置环境变量,配置Hadoop的配置文件(如core-site.xml、hdfs-site.xml),并确保所有依赖项都已安装,包括Java运行时环境(JRE)。 6. 安全性与稳定性:虽然Hadoop在...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是这个框架的一个稳定版本,它包含了多个改进和优化,以提高性能和稳定性。在这个版本中,Winutils.exe和hadoop.dll是两...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。它是由Apache软件基金会开发并维护的,旨在提供可靠、可扩展的数据处理能力。标题中的"hadoop.dll"和"winutils.exe"是Hadoop在...
在提供的信息中,我们关注的是"Hadoop的dll文件",这是一个动态链接库(DLL)文件,通常在Windows操作系统中使用,用于存储可由多个程序共享的功能和资源。Hadoop本身是基于Java的,因此在Windows环境下运行Hadoop...
在Windows环境下安装Hadoop 3.1.0是学习和使用大数据处理技术的重要步骤。Hadoop是一个开源框架,主要用于分布式存储和处理大规模数据集。在这个过程中,我们将详细讲解Hadoop 3.1.0在Windows上的安装过程以及相关...
使用Hadoop进行数据分析是一个涉及多个步骤的过程,特别是在处理大规模数据集时。以下是一个基本的步骤指南,帮助你使用Hadoop进行数据分析: 1. 环境搭建 安装Hadoop:在集群上安装Hadoop,并配置HDFS(Hadoop ...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。它最初设计的目标是处理和存储海量数据,尤其适合那些不适合在单台机器上运行的大型数据集。本篇将详细介绍如何在Windows系统中安装Hadoop...