`
刘小小尘
  • 浏览: 67553 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

sequencefile处理小文件实例

 
阅读更多

今天没事,写了下sequencefile处理小文件实例

废话不说,直接上代码


WholeFileRecordReader:

package com.pzoom.mr.sequence;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
	private FileSplit fileSplit ;
	private Configuration conf ;
	private boolean processed = false ;
	private BytesWritable value = new BytesWritable() ;
	@Override
	public void close() throws IOException {
	}
	@Override
	public NullWritable getCurrentKey() throws IOException,
			InterruptedException {
		return NullWritable.get();
	}
	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return processed ? 1.0f : 0.0f;
	}
	@Override
	public void initialize(InputSplit inputsplit,
			TaskAttemptContext taskattemptcontext) throws IOException,
			InterruptedException {
		this.fileSplit = (FileSplit) inputsplit ;
		this.conf = taskattemptcontext.getConfiguration();
	}
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!processed) {
			byte[]  contents = new byte[(int)fileSplit.getLength()] ;
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(conf);
			FSDataInputStream in = null ;
			
			in = fs.open(file);
			IOUtils.readFully(in, contents, 0, contents.length);
			value.set(contents, 0, contents.length);
			IOUtils.closeStream(in);
			processed = true ;
			return true ;
		}
		return false;
	}



}


WholeFileInputFormat:只需要重载createRecordReader()和isSplitable()方法就行,代码如下:

package com.pzoom.mr.sequence;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
	
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false ;
	}

	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(
			InputSplit inputsplit, TaskAttemptContext taskattemptcontext)
			throws IOException, InterruptedException {
		WholeFileRecordReader reader = new WholeFileRecordReader();
		reader.initialize(inputsplit, taskattemptcontext);
		return reader;
	}
	
}

测试类:SmallFilesToSequenceFileConverter

package com.pzoom.mr.sequence;

import java.io.IOException;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SmallFilesToSequenceFileConverter {
	static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
		private Text filenameKey;
		
		@Override
		protected void setup(Context context) throws IOException,
		InterruptedException {
			InputSplit split = context.getInputSplit();
			Path path = ((FileSplit)split).getPath();
			filenameKey = new Text(path.toString());
		}

		@Override
		protected void map(NullWritable key, BytesWritable value,
				Context context) throws IOException, InterruptedException {
			context.write(filenameKey, value);
		}

	}
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration() ;
		Job job = new Job(conf,"fdaf");
		
		FileInputFormat.setInputPaths(job, new Path("D:/keywordzip-zip/testNull")) ;
		FileOutputFormat.setOutputPath(job, new Path("D:/mapreduce-out/1AA" + new Random().nextInt(100))) ;
		job.setJarByClass(SmallFilesToSequenceFileConverter.class);
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);
		job.setMapperClass(SequenceFileMapper.class);
		
		job.waitForCompletion(true);
	}
}







以上代码可以正确运行,输出二进制文件

分享到:
评论

相关推荐

    SequenceFile转换成MapFile

    SequenceFile是一种二进制文件格式,常用于存储键值对数据,适合大规模数据的处理和传输。MapFile则是SequenceFile的一种优化形式,它增加了索引机制,使得随机访问和查找性能得到提升。在某些场景下,我们可能需要...

    HDFS小文件处理方案

    ### HDFS小文件处理方案详解 #### 一、概述与挑战 HDFS(Hadoop Distributed File System)设计初衷是为了高效地存储和处理大型文件。然而,在面对大量的小文件时,HDFS面临着诸多挑战: 1. **内存限制**:...

    云计算技术实验报告六SequenceFile使用

    2. **SequenceFile封装**:接下来,使用Hadoop的SequenceFile API将这些小文件封装成一个单一的SequenceFile。SequenceFile允许高效地存储和检索键值对,且支持压缩,可以显著提高数据存储和传输的效率。在实验中,...

    content.zip

    本篇将深入探讨SequenceFile及其在Java环境下的操作,结合给定的"content.zip"压缩包,我们将分析如何进行小文件合并、读取和写入SequenceFile。 SequenceFile的特性在于它的紧凑性和可伸缩性。每个记录由一个键...

    hadoop权威指南 ncdc2015年数据

    (2) 使用Hadoop的SequenceFile、Avro或Parquet等格式,这些格式能更高效地处理小文件;(3) 使用HBase等NoSQL数据库进行数据存储,它们更适用于大量小键值对。 5. **NCDC数据应用**:NCDC的气候数据可以用于多种分析...

    Java流机制在Hadoop分布式文件系统中的应用.zip

    总的来说,Java流机制在Hadoop中的应用使得开发者能够轻松地实现数据的读取、写入、传输和处理,从而充分发挥Hadoop分布式文件系统的优势,处理海量数据。无论是简单的文件读写,还是复杂的MapReduce任务,Java流都...

    Hadoop文件的存储格式实例详解

    本篇文章将深入探讨Hadoop文件的存储格式,尤其是SequenceFile格式,它是一种广泛使用的二进制文件格式,适合大规模数据处理。 首先,我们要了解最基础的1.txt纯文本格式。这种格式是最直观易读的,它由多行记录...

    《Hive数据仓库案例教程》教学大纲.pdf

    Hive作为一个基于Hadoop的数据仓库工具,它的主要功能是将结构化的数据文件映射为数据库表,并提供SQL-like查询功能,方便用户对大规模数据进行批处理分析。 课程内容分为十一个章节,涵盖了从基础理论到实际操作的...

    Hive的案例详解.pdf

    2. **灵活性**:Hive支持多种数据存储格式,例如文本文件、CSV文件、SequenceFile等,并且还支持自定义数据存储格式,这使得Hive能够灵活应对各种不同的数据来源和需求。 3. **可扩展性**:Hive的设计使其能够轻松地...

    MapReduce专家级版本总结

    **SequenceFile**是Hadoop API提供的一种二进制文件存储方式,可以直接将键值对序列化到文件中。它的优点包括支持压缩、良好的本地化支持以及高效的序列化速度。但同时也存在需要合并文件、文件较大等缺点。 ### ...

    分布式编程模式MapReduce应用[参考].pdf

    TaskTracker 向 JobTracker 索求下一个 Map/Reduce 任务,Mapper 任务从 InputFormat 创建 RecordReader,循环读入 FileSplits 的内容生成 Key 与 Value,传给 Mapper 函数,处理完后中间结果写成 SequenceFile。...

    Hadoop权威指南(第三版)

    Hadoop的核心组件包括Hadoop分布式文件系统(HDFS)和MapReduce计算框架,它们共同提供了一种高效、可靠的方式来处理大规模数据。 #### MapReduce简介 MapReduce是一种编程模型和相关的实现,用于处理并生成大数据...

    hive工作调优小结

    - **本地模式**:适用于小规模数据处理,可提高处理速度。 - **伪分布式模式**:适用于测试环境或小规模集群。 - **完全分布式模式**:适用于大规模生产环境。 **优化建议**: - 通过设置**Hive.exec.mode.local....

    Hadoop权威指引---中文版.pdf

    - 基于文件的数据结构如SequenceFile和Avro,方便高效地存储和处理结构化和半结构化数据。 5. **MapReduce应用开发** - 开发MapReduce作业需要配置API,并设置合适的开发环境。 - 单元测试确保代码质量,本地...

    大数据技术分享 Hadoop运行原理分析 共3页.pdf

    - **DataNode**:负责处理文件系统客户端或NameNode发送来的文件系统元数据命令。 2. **MapReduce**:一种编程模型,用于处理大规模数据集。MapReduce的主要组成部分包括JobTracker和TaskTracker。 - **...

    Hadoop权威指南(中文版)2015上传.rar

    Nutch系统利用Hadoop进行数据处理的精选实例 总结 Rackspace的日志处理 简史 选择Hadoop 收集和存储 日志的MapReduce模型 关于Cascading 字段、元组和管道 操作 Tap类,Scheme对象和Flow对象 Cascading实战 灵活性 ...

    Hadoop权威指南 第二版(中文版)

     Nutch系统利用Hadoop进行数据处理的精选实例  总结  Rackspace的日志处理  简史  选择Hadoop  收集和存储  日志的MapReduce模型  关于Cascading  字段、元组和管道  操作  Tap类,Scheme对象和Flow对象 ...

    Hadoop The Definitive Guide PDF

    - **文件基数据结构**:Hadoop 支持多种文件基数据结构,如 SequenceFile 和 MapFile,这些结构有助于更高效地访问和处理大数据集。 #### 5. 进阶MapReduce主题 - **排序和连接数据**:MapReduce 可以用来对大型...

Global site tag (gtag.js) - Google Analytics