- 浏览: 1791262 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
奔跑的小牛:
例子都打不开
如何使用JVisualVM进行性能分析 -
蜗牛coder:
好东西[color=blue][/color]
Lucene学习:全文检索的基本原理 -
lovesunweina:
不在haoop中是在linux系统中,映射IP的时候,不能使用 ...
java.io.IOException: Incomplete HDFS URI, no host -
evening_xxxy:
挺好的, 谢谢分享
如何利用 JConsole观察分析Java程序的运行,进行排错调优 -
di1984HIT:
学习了~~~
ant使用ssh和linux交互 如:上传文件
Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。
研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:
FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out);
从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。
设置输入、输出格式:
job.setInputFormat(SequenceFileInputFormat.class); job.setOutputFormat(MapFileOutputFormat.class);
输出格式
看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。
接口 :org.apache.hadoop.mapred.OutputFormat<K , V >
public interface OutputFormat<K, V> { RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress) throws IOException; void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; }
checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。
getRecordWriter :把输出键值对 output <key, value> 写入到输出路径中。
mapred下面的实现有三个,如下图:
基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat
public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) throws IOException; public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); // normalize the output directory outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); // check its existence if (fs.exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }
这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。
子类见下图:
子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat
public class MapFileOutputFormat extends FileOutputFormat<WritableComparable, Writable> { public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do compressionType = SequenceFileOutputFormat.getOutputCompressionType(job); // find the right codec Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } // ignore the progress parameter, since MapFile is local final MapFile.Writer out = new MapFile.Writer(job, fs, file.toString(), job.getOutputKeyClass().asSubclass(WritableComparable.class), job.getOutputValueClass().asSubclass(Writable.class), compressionType, codec, progress); return new RecordWriter<WritableComparable, Writable>() { public void write(WritableComparable key, Writable value) throws IOException { out.append(key, value); } public void close(Reporter reporter) throws IOException { out.close();} }; } }
关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。
上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:
自己的实现: org.apache.nutch.parse.ParseOutputFormat
public class ParseOutputFormat implements OutputFormat<Text, Parse> { //这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义 public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { Path out = FileOutputFormat.getOutputPath(job); if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME))) throw new IOException("Segment already parsed!"); } //下面获取了三个输入句柄,分别向三个路径中输出键值对 public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { ...... Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径 Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径 //一个写入 final MapFile.Writer textOut = new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class, CompressionType.RECORD, progress); //第二个写入 final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, compType, progress); //第三个写入 final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); return new RecordWriter<Text, Parse>() { public void write(Text key, Parse parse)throws IOException { ...... crawlOut.append(key, d); ....... crawlOut.append(new Text(newUrl), newDatum); ...... crawlOut.append(key, adjust); ...... dataOut.append(key, parseData); ...... crawlOut.append(key, datum); } } //关闭三个句柄 public void close(Reporter reporter) throws IOException { textOut.close(); dataOut.close(); crawlOut.close(); } }; } }
ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。
关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。
发表评论
-
HBase配置LZO压缩
2011-07-10 22:40 6197系统: gentoo HDFS: hadoop:hado ... -
HBase RegionServer 退出 ( ZooKeeper session expired)
2011-04-23 08:32 9102RegionServer 由于 ZooKeeper sessi ... -
HBase迁移数据方案1(两个集群不能通信)
2011-03-30 18:23 3883前一篇文章里面介绍了 两个可以直接通信的集群之间很容易拷贝数据 ... -
HBase如何迁移数据
2011-03-10 13:42 6525HBase如何迁移数据?这里有个方案:http://blog. ... -
HBase如何存取多个版本的值
2011-03-07 16:11 27252HBase如何存取多个版本 ... -
HBase简介(很好的梳理资料)
2011-01-30 10:18 130760一、 简介 history s ... -
Google_三大论文中文版(Bigtable、 GFS、 Google MapReduce)
2010-11-28 16:30 22200做个中文版下载源: http://dl.iteye.c ... -
hadoop主节点(NameNode)备份策略以及恢复方法
2010-11-11 19:35 27830一、dits和fsimage 首先要提到 ... -
HRegionServer: ZooKeeper session expired
2010-11-01 14:21 11500Hbase不稳定,分析日志 ... -
Bad connect ack with firstBadLink
2010-10-25 13:20 8333hbase报的错误,经过分析是Hadoop不能写入数据了。可恶 ... -
hbase0.20.支持多个主节点容灾切换功能(只激活当前某个节点,其他节点备份)
2010-09-09 14:53 2879http://wiki.apache.org/hadoop/H ... -
java.io.IOException: Incomplete HDFS URI, no host
2010-09-07 08:31 16235ERROR org.apache.hadoop.hdfs.se ... -
升级hadoop0.20.2到hadoop-0.21.0
2010-09-05 11:52 7760按照新的文档来 更新配置: http://hadoop.apa ... -
hadoop-hdfs启动又自动退出的问题
2010-05-20 10:45 6165hadoop-hdfs启动又自动退出的问题,折腾了我1天时间啊 ... -
在windows平台下Eclipse调试Hadoop/Nutch
2010-04-29 14:34 3305即让碰到这个问题说明 准备工作都做好了,软件包,环境什么的这里 ... -
Hadoop运行mapreduce实例时,抛出错误 All datanodes xxx.xxx.xxx.xxx:xxx are bad. Aborting…
2010-04-29 14:26 6443Hadoop运行mapreduce实例时,抛出错误 All d ... -
cygwin 添加用户
2010-04-13 17:48 7425http://hi.baidu.com/skychen1900 ... -
nutch总体输入输出流程图解析
2010-04-12 16:58 2476附件里面有word文档,请下 ... -
nutch分布式搭建
2010-04-06 17:54 6832如何在eclipse中跑nutch :http://jiaj ... -
解析Nutch插件系统
2010-03-31 16:31 6623nutch系统架构的一个亮点就是插件,借鉴这个架构我们 ...
相关推荐
### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...
另外,它还支持直接在Eclipse中启动和停止Job,以及调试Map和Reduce函数,这对于调试和优化代码来说是非常重要的。 Hadoop-Eclipse-Plugin-2.6.4版本是针对Hadoop 2.x系列的,因此,它支持YARN资源管理器,这使得在...
3. **创建MapReduce项目**:在Eclipse中,你可以通过"File" -> "New" -> "Project" -> "Map/Reduce Project"创建一个新的Hadoop项目。选择合适的Hadoop版本(这里是2.7.6),然后为项目命名。 4. **编写MapReduce...
本文将深入探讨如何使用Eclipse IDE结合hadoop-eclipse-plugin-2.6.0.jar插件,实现在Windows环境下进行远程连接到Hadoop集群,尤其适用于64位操作系统。 首先,我们要理解Hadoop的核心概念。Hadoop是由Apache基金...
你需要导入Hadoop的相关库,创建Mapper和Reducer类,并实现它们的map()、reduce()方法。此外,还需要配置Job参数,如输入路径、输出路径、Mapper和Reducer类等,并提交Job到Hadoop集群执行。 压缩包中的"tfidf"文件...
1. 设置Hadoop安装路径:进入Eclipse的"Window" -> "Preferences" -> "Hadoop Map/Reduce",在"Local Hadoop Installation"中指定你的Hadoop安装目录。 2. 配置集群信息:如果你的Hadoop集群不是本地模式,需要在...
《Hadoop MapReduce实战指南——基于<hadoop-map-reduce-demo>项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...
### Map-Reduce 实现细节与问题解决 #### 客户端操作流程 Map-Reduce 的启动过程始于客户端向系统提交任务。此过程的核心是通过 `JobClient` 类的 `runJob` 静态方法来实现。具体步骤如下: 1. **JobClient 对象...
Hadoop是Apache软件基金会开发的一个开源框架,它实现了MapReduce模型,使得在大规模集群上进行数据处理变得更加简单。 在描述中提到的“hadoop测试代码”意味着这个压缩包可能包含了用Hadoop实现的MapReduce程序或...
标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...
1. **创建MapReduce项目**:安装插件后,Eclipse会新增一个"New" -> "Other" -> "Hadoop Map/Reduce Project"选项,点击即可创建一个MapReduce项目,提供了模板化的Job类和Mapper/Reducer类。 2. **编辑HDFS文件**...
Hadoop Map-reduce Job Scheduler Resources Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 ...
当你准备好测试或运行作业时,只需在Eclipse的"Run Configurations"中选择"Map/Reduce Job",设置好输入和输出路径,以及必要的参数,然后点击"Run",作业就会被提交到集群执行。 值得注意的是,hadoop-eclipse-...
本文将深入探讨Hadoop Common 2.6.0版本,特别是在MapReduce框架下进行Driver测试的关键知识点。这个压缩包“hadoop-common-2.6.0-bin-master.zip.rar”包含了运行和测试Hadoop MapReduce项目所需的核心组件。 首先...
在Hadoop MapReduce中,`JobConf`对象是任务配置的核心,它包含了关于输入格式、输出格式、Mapper、Reducer类以及各种配置参数。`InputFormat`和`OutputFormat`定义了如何读取和写入数据,而`Mapper`和`Reducer`则是...
完成代码编写后,可以通过Eclipse的“Run As”菜单选择“Hadoop Job”,指定输入数据的位置和输出结果的目录。插件会自动将程序打包成JAR文件,并提交到Hadoop集群执行。在运行过程中,Eclipse会显示作业的进度和...
在Hadoop生态系统中,MapReduce是一种分布式编程模型,主要用于处理和生成大数据集。它通过将大规模数据分割成小块,然后在多台机器上并行处理这些数据块,最后将结果汇总,从而实现高效的批量数据处理。MapReduce的...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在廉价硬件上处理大量数据,实现了大数据的存储和处理。`hadoop-3.1.3-src.tar.gz` 是Hadoop 3.1.3版本的源代码压缩包,包含了所有相关的Java源代码和...