`
小野bupt
  • 浏览: 14728 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

重写RecordReader和InputFormat实现单个文件不分片,整个分片作为一条记录处理。(倒排索引)

 
阅读更多

比较简单,直接上代码:

这是MapReduce功能代码:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyInverseIndex {
	public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
	public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		conf.set("hadoop.job.user","hadoop");
		conf.set("mapred.job.tracker", "10.103.240.160:9001");
		
		final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setJarByClass(MyInverseIndex.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(1);//设置个数为1
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
		
	}

	/**
	 * 只适用于文档中只出现一行 (可以一次读取整个文档)
	 * 
	 * @author hadoop
	 * 
	 */
	public static class MyMapper extends Mapper<NullWritable, BytesWritable, Text, Text> {
		Map<String, Integer> map = new HashMap();

		@Override
		public  void map(NullWritable key, BytesWritable value, Context context)
				throws IOException, InterruptedException {
			String line = new String(value.getBytes(), "utf-8").trim();
			String[] words = line.split(" ");
			for (String s : words) {
				if (map.containsKey(s)) {
					map.put(s, map.get(s) + 1);
				} else {
					map.put(s, 1);
				}
			}
			Set<String> keys = map.keySet();
			for (Iterator it = keys.iterator(); it.hasNext();) {
				String s = (String) it.next();
				context.write(new Text(s),
						new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
			}
			map.clear();
		}
	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {		
		private Text result = new Text();
		// 实现reduce函数
		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			StringBuffer fileList = new StringBuffer();
			for(Text value : values){
				fileList.append(value.toString()+";");
			}
			result.set(fileList.toString());
			context.write(key, result);
		}
	}
		
}
这是重写的InputFormat类:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * 为了阻止文件分片,并将该整个文件作为一条记录处理
 * 
 * @author Xiaoye
 * 
 */
public class WholeFileInputFormat extends
		FileInputFormat<NullWritable, BytesWritable> {

	/**
	 * 重写该方法将阻止将一个文件分片
	 */
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	/**
	 * 获取RecordReader,该类用来将分片分割成记录,从而生成key和value值给map处理。
	 * 
	 */
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		WholeFileRecordReader reader = new WholeFileRecordReader();
		reader.initialize(split, context);
		return reader;
	}
}

这是RecordReader实现类:


package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 继承RecordReader
 * 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。
 * 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片
 * 的所有内容
 * @author Xiaoye
 */
public class WholeFileRecordReader extends
		RecordReader<NullWritable, BytesWritable> {
	// 用来盛放传递过来的分片
	private FileSplit fileSplit;
	private Configuration conf;
	//将作为key-value中的value值返回
	private BytesWritable value = new BytesWritable();
	// 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成
	private boolean processed = false;

	/**
	 * 设置RecordReader的分片和配置对象。
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		this.fileSplit = (FileSplit) split;
		this.conf = context.getConfiguration();
	}

	/**
	 * 核心算法
	 * 用来产生key-value值
	 * 将生成的value值存入value对象中
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!processed) {
			/*
			 * 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容
			 * 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中
			 */
			byte[] contents = new byte[(int) fileSplit.getLength()];
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(conf);
			FSDataInputStream in = null;
			try {
				in = fs.open(file);
				IOUtils.readFully(in, contents, 0, contents.length);
				value.set(contents, 0, contents.length);
			} finally {
				IOUtils.closeStream(in);
			}
			processed = true;
			return true;
		}
		return false;
	}

	@Override
	public NullWritable getCurrentKey() throws IOException,
			InterruptedException {
		return NullWritable.get();
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return processed ? 1.0f : 0.0f;
	}

	@Override
	public void close() throws IOException {
		//do nothing
	}

}

其中有需要注意的 fileSplit中存放的是分片信息,并没有分片内容。

在BytesWritable 转为byte[]再转为String类型时需要调用trim()方法,不然会有空格影响key值的group。

分享到:
评论

相关推荐

    Hadoop源码解析---MapReduce之InputFormat

    综上所述,InputFormat是Hadoop MapReduce编程模型中的核心组件之一,它通过定义数据的分片和读取机制,允许开发者以灵活的方式处理各种格式的数据。理解InputFormat的设计和实现,对于有效地使用Hadoop进行大规模...

    自定义MapReduce的InputFormat

    例如,如果你正在处理一种结构化的日志文件,其中每个事件由开始和结束的特定字符串包围,你可以在RecordReader中寻找这些限定符,然后将限定符之间的内容作为键值对返回。这样,Mapper就可以直接处理事件数据,而...

    MapReduce计算模型详讲(结合源码深入解读)

    每个InputSplit由一条条记录组成,通过RecordReader的实现来读取InputSplit中的每条记录,并将读取的记录交给Map函数来处理。 3. 记录读取器(RecordReader):RecordReader是InputFormat的实现类,负责读取...

    hive inputformat

    RecordReader负责读取数据文件并按需拆分记录,而InputFormat负责初始化RecordReader并确定数据分片(Splits)。 4. **实例代码:按照空格拆分日志文件** 假设我们有一个日志文件,每行包含多字段,字段之间用空格...

    【MapReduce篇03】MapReduce之InputFormat数据输入1

    1. **TextInputFormat**:这是最基础的InputFormat实现,它将每一行视为一个记录,键是LongWritable类型的偏移量,表示行在文件中的起始位置,值是Text类型的行内容,不包括行结束符。例如,一个InputSplit可能包含...

    hive数据表-小文件合并代码(java)

    小文件合并是指将多个小文件整合成少数几个大文件,以减少MapReduce任务的数量,提高数据读取和处理速度。在Hive中,我们可以通过Hadoop的InputFormat类和RecordReader类来实现自定义的文件合并逻辑。 Java代码实现...

    Hadoop-海量文件的分布式计算处理方案.docx

    HDFS是Google的GFS(Google File System)的开源实现,它是一个分布式文件系统,设计用于跨多台服务器存储和处理大型数据集。在HDFS中,NameNode作为主节点,负责元数据管理,包括文件系统的命名空间和文件的块映射...

    SdfTextInputFormat.java

    如果我们使用`TextInputFormat`,那么每条日志可能会被错误地分割为多行,而使用`SdfTextInputFormat`,我们可以确保每个Map任务处理的是完整的一条日志记录,避免了数据的割裂和处理错误。 为了使用这个自定义输入...

    Hadoop客户端Java代码.zip

    5. **InputFormat与RecordReader**: `InputFormat`负责将输入数据分割成适合`Mapper`处理的小块,`RecordReader`则从这些块中读取记录。 6. **OutputFormat与RecordWriter**: `OutputFormat`定义如何将`Reducer`...

    hadoop 1.04 api

    5. **NLineInputFormat**: 这个输入格式将每个N行作为一个split,常用于将多行数据作为单个输入处理,例如在处理CSV文件时。 6. **CombineFileInputFormat**: 该类用于合并多个文件输入格式,减少Map任务的数量,...

    hadoop source code源代码

    `RecordReader`和`RecordWriter`处理单个输入和输出记录。`TaskTracker`和`JobTracker`是MapReduce的调度和管理组件,负责任务分配和监控。 【源码学习的重要性】 通过阅读Hadoop源代码,开发者可以: 1. 理解...

    分布式编程模式MapReduce应用[参考].pdf

    TaskTracker 向 JobTracker 索求下一个 Map/Reduce 任务,Mapper 任务从 InputFormat 创建 RecordReader,循环读入 FileSplits 的内容生成 Key 与 Value,传给 Mapper 函数,处理完后中间结果写成 SequenceFile。...

    大数据MapReduce文件分发

    - YARN(Yet Another Resource Negotiator)在Hadoop 2.x中作为资源管理系统,负责分配内存、CPU等资源,确保文件分发和处理的高效进行。 总的来说,"大数据MapReduce文件分发"涵盖了从数据分块、数据传输、任务...

    mapreduce源码

    8. **RecordReader与RecordWriter**:RecordReader从InputFormat创建的输入分片中读取键值对,RecordWriter则负责将Reduce输出写入OutputFormat定义的输出格式。 9. **JobTracker与TaskTracker**:在Hadoop 1.x中,...

    HadoopMR-CombineLocalFiles:它将目录中的所有本地文件合并为一个文件

    在分布式计算领域,Hadoop MapReduce 是一个广泛使用的框架,用于处理和存储大规模数据集。在某些场景下,我们可能需要将多个小文件合并成一个大文件,以便更有效地进行MapReduce作业。"HadoopMR-CombineLocalFiles...

    大数据平台-MapReduce介绍.pdf

    大数据平台中的MapReduce是由Google公司的Jeffrey Dean和Sanjay Ghemawat开发的一个针对大规模群组中的海量数据处理的分布式编程模型。MapReduce实现了两个功能:Map函数应用于集合中的所有成员,然后返回一个基于这...

    Hadoop分布式文件系统(HDFS)运行测试

    2. **输入分片**:InputFormat组件负责将输入文件切分成逻辑上的片段(InputSplit),并为每个片段分配一个RecordReader。 3. **Map阶段**:RecordReader读取数据并传递给Mapper,Mapper对输入数据进行处理,并输出...

    hadoop的java_api

    2. **InputFormat和RecordReader**:InputFormat定义了如何将数据切分成输入块,并且负责将这些块转化为键值对供Map任务处理。RecordReader则从输入块中读取单个记录。 3. **OutputFormat和RecordWriter**:...

    hadoop api及Hadoop结构与设计

    2. **InputFormat和OutputFormat**:这两个接口定义了如何将数据分成输入分片(InputSplit)以及如何将这些分片映射到Map任务。`TextInputFormat`和`TextOutputFormat`是最基础的实现,分别用于处理文本文件的输入和...

Global site tag (gtag.js) - Google Analytics