`

Custom KeyValueTextInputFormat

 
阅读更多

在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。

 

而我属于前者:拿来主义者

 

自定义的KeyValueInputFormat:

 

package cn.edu.xmu.dm.mpdemo.ioformat;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

/**
 * desc: custom KeyValueInputFormat
 * <code>DMKeyValueInputFormat</code>
 * 
 * @author chenwq (irwenqiang@gmail.com)
 * @version 1.0 2012/05/19
 */
public class DMKeyValueInputFormat extends FileInputFormat<Text, Text> {

	protected static class KeyVRecordReader extends RecordReader<Text, Text> {

		private static final Log LOG = LogFactory
				.getLog(KeyVRecordReader.class);

		private CompressionCodecFactory compressionCodecs = null;
		private long start;
		private long pos;
		private long end;
		private LineReader in;
		private int maxLineLength;
		private Text key = null;
		private Text value = null;
		private String separator = "\t";

		@Override
		public void initialize(InputSplit genericSplit,
				TaskAttemptContext context) throws IOException,
				InterruptedException {

			FileSplit split = (FileSplit) genericSplit;
			Configuration job = context.getConfiguration();
			this.maxLineLength = job.getInt(
					"mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
			this.separator = job.get("key.value.separator.in.input.line", "\t");
			start = split.getStart();
			end = start + split.getLength();
			final Path file = split.getPath();
			compressionCodecs = new CompressionCodecFactory(job);
			final CompressionCodec codec = compressionCodecs.getCodec(file);

			FileSystem fs = file.getFileSystem(job);
			FSDataInputStream fileIn = fs.open(split.getPath());
			boolean skipFirstLine = false;
			if (codec != null) {
				in = new LineReader(codec.createInputStream(fileIn), job);
				end = Long.MAX_VALUE;
			} else {
				if (start != 0) {
					skipFirstLine = true;
					--start;
					fileIn.seek(start);
				}
				in = new LineReader(fileIn, job);
			}
			if (skipFirstLine) {
				start += in.readLine(new Text(), 0,
						(int) Math.min((long) Integer.MAX_VALUE, end - start));
			}
			this.pos = start;

		}

		@Override
		public synchronized void close() throws IOException {
			if (in != null) {
				in.close();
			}
		}

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			return key;
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			if (start == end) {
				return 0.0f;
			} else {
				return Math.min(1.0f, (pos - start) / (float) (end - start));
			}
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			Text line = new Text();
			if (key == null) {
				key = new Text();
			}
			if (value == null) {
				value = new Text();
			}
			int newSize = 0;
			while (pos < end) {
				newSize = in.readLine(line, maxLineLength, Math.max(
						(int) Math.min(Integer.MAX_VALUE, end - pos),
						maxLineLength));
				// 此处添加额外处理即可,其他地方与TextInputFormat一样。
				if (null != line) {
					String[] kv = line.toString().split(this.separator);
					if (kv.length == 2) {
						key.set(kv[0]);
						value.set(kv[1]);
					} else {
						LOG.info("Skipped line has no separator");
						key.set(line.toString());
						value.set("");
					}
				}
				if (newSize == 0) {
					break;
				}
				pos += newSize;
				if (newSize < maxLineLength) {
					break;
				}

				LOG.info("Skipped line of size " + newSize + " at pos "
						+ (pos - newSize));
			}
			if (newSize == 0) {
				key = null;
				value = null;
				return false;
			} else {
				return true;
			}
		}

	}

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new KeyVRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}
}

 

测试定义的KeyValueInputForamt:

package cn.edu.xmu.dm.mpdemo.ioformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * desc: Test custom KeyValueTextInputFormat
 * <code>KeyValueTextInputFormatDemo</code>
 * 
 * @author chenwq (irwenqiang@gmail.com)
 * @version 1.0 2012/05/19
 */
public class KeyValueTextInputFormatDemo extends Configured implements Tool {
	
	public static class KVMapper extends Mapper<Text, Text, Text, Text>{

		private final static Logger LOG = LoggerFactory
				.getLogger(KVMapper.class);
		@Override
		protected void map(Text key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println(key);
			System.out.println(value);
			LOG.info(key.toString());
			LOG.info(value.toString());
			context.write(key, value);
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		
		String input = "input";
		String output = "output";
		Path inputDir = new Path(input);
		Path outputDir = new Path(output);
		Configuration conf = new Configuration();
		Job job = new Job(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(KVMapper.class);
		job.setNumReduceTasks(0);

		job.setInputFormatClass(DMKeyValueInputFormat.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, inputDir);
		FileOutputFormat.setOutputPath(job, outputDir);
		return job.waitForCompletion(true) ? 0 : -1;
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new KeyValueTextInputFormatDemo(), args);
	}
}
 

 

 

 

0
0
分享到:
评论

相关推荐

    OfficeCustomUIEditorFiles_CustomUI_MicrosoftOffice_

    【标题】"OfficeCustomUIEditorFiles_CustomUI_MicrosoftOffice_" 提到的是一个与 Microsoft Office 自定义用户界面(Custom UI)编辑器相关的文件集合。这个工具主要用于帮助用户自定义 Office 应用程序,如 Word、...

    UDP-Custom-Device.zip

    《NI VeriStand UDP通信Custom Device详解》 在自动化测试和控制领域,National Instruments(NI)的VeriStand是一款广泛使用的实时测试系统软件。它提供了一个灵活的平台,用于快速构建和部署各种测试系统,包括...

    Office Word,Excel Tab页制作工具 customUI

    在Microsoft Office中,CustomUI(自定义用户界面)是一个强大的功能,允许用户对Word和Excel等应用程序的 Ribbon 用户界面进行个性化设置。这个标题和描述提到的"Office Word,Excel Tab页制作工具 customUI"显然是...

    custom-wavwview软件问题

    ### Custom-WavWView软件问题解析与解决方案 #### 软件概述 Custom-WavWView是一款用于音频处理的专业软件,常被应用于音频文件的播放、编辑及分析等多个领域。然而,在实际使用过程中,用户可能会遇到一些技术难题...

    Custom UI Editor

    CustomUIEditor 是一种工具,用于创建和编辑 Microsoft Office 的自定义用户界面。通过 CustomUIEditor,用户可以自定义 Office 应用程序的功能区、菜单和工具栏,以满足特定的需求。这样可以提高用户的工作效率,使...

    office custom UI editor(office2007自定义选项卡制作工具)

    Office Custom UI Editor是一款专为Microsoft Office 2007设计的工具,旨在帮助用户自定义其界面,特别是针对2007版本中的 Ribbon 用户界面。Ribbon 是Office 2007引入的一项重大改变,它将传统的菜单和工具栏转换为...

    CustomUIEditor

    【CustomUIEditor】是一款专为Excel用户设计的工具,它允许用户自定义Excel界面的按钮,将宏和VBA(Visual Basic for Applications)代码转换成直观的按钮,从而简化操作流程,提高工作效率。通过这款软件,用户不再...

    Custom.mxtpro

    Custom.mxtpro

    jquery-ui-1.8.2.custom.min.js,jquery-ui-1.8.4.custom.css

    本篇文章将详细探讨两个特定版本的jQuery UI——1.8.2.custom.min.js和1.8.4.custom.css,以及它们在实际应用中的作用。 首先,我们来看`jquery-ui-1.8.2.custom.min.js`。这是一个压缩和优化过的JavaScript文件,...

    custom-entity-_2.rar_custom entity_自定义实体

    在AutoCAD或其他基于ObjectARX的CAD软件开发中,自定义实体(Custom Entity)是创建扩展功能和定制工作流程的关键部分。自定义实体允许开发者构建特定于应用需求的图形对象,这些对象可以拥有独特的属性、行为和交互...

    mobiscroll-custom-2.17.0插件

    **移动端开发日期插件——Mobiscroll Custom 2.17.0详解** 在移动应用开发中,用户界面的交互性和用户体验是至关重要的。对于日期选择这类常见的功能,一款高效、美观且易于使用的插件至关重要。`Mobiscroll Custom...

    jquery-ui-1.10.2.custom

    在这个主题中,我们将深入探讨的是 `jquery-ui-1.10.2.custom` 版本,这是一个定制化的版本,包含了对 jQuery 1.9.1 的支持。 首先,让我们从 `jquery-ui-1.10.2.custom` 的核心组件开始。这个版本提供了包括但不...

    MFC控件Custom Control使用源码

    在Microsoft Foundation Classes (MFC)库中,Custom Control(自定义控件)是开发者为了实现特定功能或界面效果,通过扩展标准Windows控件而创建的。MFC为开发者提供了便捷的方式来实现这一目标,使得我们可以利用...

    Custom Cursors(替换光标)-4.1.0.zip

    名称:Custom Cursors(替换光标) ---------------------------------------- 版本:4.1.0 作者:cute-cursors.com 分类:网页增强 ---------------------------------------- 概述:用可爱,有趣和新潮的东西替换...

    CustomStyle

    在IT行业中,`CustomStyle`通常指的是自定义样式或者风格,这主要涉及到软件开发,特别是前端Web开发领域。自定义样式允许开发者根据自己的需求或设计规范来调整元素的外观,使其区别于默认样式,创造出独特的用户...

    Office Custom UI Editor

    Office Custom UI Editor是Office2007以上版本菜单、工具栏编辑器。Office2007以上版本是一个完全开放式的界面,开发者可以自己定义Ribbon工具栏和菜单。而Office Custom UI Editor可以轻松地实现这一目的。

    jquery-ui-1.8.16.custom.min.js/jquery-ui-1.8.16.custom.css

    这个压缩包包含两个关键文件:`jquery-ui-1.8.16.custom.min.js` 和 `jquery-ui-1.8.16.custom.css`,这些都是jQuery UI的特定版本,即1.8.16。这个版本在当时是一个广泛使用的稳定版本,提供了丰富的功能和组件。 ...

    CustomControl控件使用vc++

    在Windows编程中,CustomControl(自定义控件)是一种允许开发者根据特定需求创建自定义用户界面元素的方法。在VC++环境中,开发自定义控件通常涉及到MFC(Microsoft Foundation Classes)框架的应用。当我们遇到...

    CustomSet.zip

    在给定的“CustomSet.zip”压缩包中,我们看到一个名为“CustomSet.java”的文件,这很可能是用户自定义的一个简易版本的HashSet实现。这个自定义版本通常是为了学习和理解HashSet的底层工作原理而创建的,或者是...

Global site tag (gtag.js) - Google Analytics