`

Custom KeyValueTextInputFormat

 
阅读更多

在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。

 

而我属于前者:拿来主义者

 

自定义的KeyValueInputFormat:

 

package cn.edu.xmu.dm.mpdemo.ioformat;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

/**
 * desc: custom KeyValueInputFormat
 * <code>DMKeyValueInputFormat</code>
 * 
 * @author chenwq (irwenqiang@gmail.com)
 * @version 1.0 2012/05/19
 */
public class DMKeyValueInputFormat extends FileInputFormat<Text, Text> {

	protected static class KeyVRecordReader extends RecordReader<Text, Text> {

		private static final Log LOG = LogFactory
				.getLog(KeyVRecordReader.class);

		private CompressionCodecFactory compressionCodecs = null;
		private long start;
		private long pos;
		private long end;
		private LineReader in;
		private int maxLineLength;
		private Text key = null;
		private Text value = null;
		private String separator = "\t";

		@Override
		public void initialize(InputSplit genericSplit,
				TaskAttemptContext context) throws IOException,
				InterruptedException {

			FileSplit split = (FileSplit) genericSplit;
			Configuration job = context.getConfiguration();
			this.maxLineLength = job.getInt(
					"mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
			this.separator = job.get("key.value.separator.in.input.line", "\t");
			start = split.getStart();
			end = start + split.getLength();
			final Path file = split.getPath();
			compressionCodecs = new CompressionCodecFactory(job);
			final CompressionCodec codec = compressionCodecs.getCodec(file);

			FileSystem fs = file.getFileSystem(job);
			FSDataInputStream fileIn = fs.open(split.getPath());
			boolean skipFirstLine = false;
			if (codec != null) {
				in = new LineReader(codec.createInputStream(fileIn), job);
				end = Long.MAX_VALUE;
			} else {
				if (start != 0) {
					skipFirstLine = true;
					--start;
					fileIn.seek(start);
				}
				in = new LineReader(fileIn, job);
			}
			if (skipFirstLine) {
				start += in.readLine(new Text(), 0,
						(int) Math.min((long) Integer.MAX_VALUE, end - start));
			}
			this.pos = start;

		}

		@Override
		public synchronized void close() throws IOException {
			if (in != null) {
				in.close();
			}
		}

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			return key;
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			if (start == end) {
				return 0.0f;
			} else {
				return Math.min(1.0f, (pos - start) / (float) (end - start));
			}
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			Text line = new Text();
			if (key == null) {
				key = new Text();
			}
			if (value == null) {
				value = new Text();
			}
			int newSize = 0;
			while (pos < end) {
				newSize = in.readLine(line, maxLineLength, Math.max(
						(int) Math.min(Integer.MAX_VALUE, end - pos),
						maxLineLength));
				// 此处添加额外处理即可,其他地方与TextInputFormat一样。
				if (null != line) {
					String[] kv = line.toString().split(this.separator);
					if (kv.length == 2) {
						key.set(kv[0]);
						value.set(kv[1]);
					} else {
						LOG.info("Skipped line has no separator");
						key.set(line.toString());
						value.set("");
					}
				}
				if (newSize == 0) {
					break;
				}
				pos += newSize;
				if (newSize < maxLineLength) {
					break;
				}

				LOG.info("Skipped line of size " + newSize + " at pos "
						+ (pos - newSize));
			}
			if (newSize == 0) {
				key = null;
				value = null;
				return false;
			} else {
				return true;
			}
		}

	}

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new KeyVRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}
}

 

测试定义的KeyValueInputForamt:

package cn.edu.xmu.dm.mpdemo.ioformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * desc: Test custom KeyValueTextInputFormat
 * <code>KeyValueTextInputFormatDemo</code>
 * 
 * @author chenwq (irwenqiang@gmail.com)
 * @version 1.0 2012/05/19
 */
public class KeyValueTextInputFormatDemo extends Configured implements Tool {
	
	public static class KVMapper extends Mapper<Text, Text, Text, Text>{

		private final static Logger LOG = LoggerFactory
				.getLogger(KVMapper.class);
		@Override
		protected void map(Text key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println(key);
			System.out.println(value);
			LOG.info(key.toString());
			LOG.info(value.toString());
			context.write(key, value);
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		
		String input = "input";
		String output = "output";
		Path inputDir = new Path(input);
		Path outputDir = new Path(output);
		Configuration conf = new Configuration();
		Job job = new Job(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(KVMapper.class);
		job.setNumReduceTasks(0);

		job.setInputFormatClass(DMKeyValueInputFormat.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, inputDir);
		FileOutputFormat.setOutputPath(job, outputDir);
		return job.waitForCompletion(true) ? 0 : -1;
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new KeyValueTextInputFormatDemo(), args);
	}
}
 

 

 

 

0
0
分享到:
评论

相关推荐

    OfficeCustomUIEditorFiles_CustomUI_MicrosoftOffice_

    【标题】"OfficeCustomUIEditorFiles_CustomUI_MicrosoftOffice_" 提到的是一个与 Microsoft Office 自定义用户界面(Custom UI)编辑器相关的文件集合。这个工具主要用于帮助用户自定义 Office 应用程序,如 Word、...

    UDP-Custom-Device.zip

    《NI VeriStand UDP通信Custom Device详解》 在自动化测试和控制领域,National Instruments(NI)的VeriStand是一款广泛使用的实时测试系统软件。它提供了一个灵活的平台,用于快速构建和部署各种测试系统,包括...

    Office Word,Excel Tab页制作工具 customUI

    在Microsoft Office中,CustomUI(自定义用户界面)是一个强大的功能,允许用户对Word和Excel等应用程序的 Ribbon 用户界面进行个性化设置。这个标题和描述提到的"Office Word,Excel Tab页制作工具 customUI"显然是...

    Custom UI Editor

    CustomUIEditor 是一种工具,用于创建和编辑 Microsoft Office 的自定义用户界面。通过 CustomUIEditor,用户可以自定义 Office 应用程序的功能区、菜单和工具栏,以满足特定的需求。这样可以提高用户的工作效率,使...

    office custom UI editor(office2007自定义选项卡制作工具)

    Office Custom UI Editor是一款专为Microsoft Office 2007设计的工具,旨在帮助用户自定义其界面,特别是针对2007版本中的 Ribbon 用户界面。Ribbon 是Office 2007引入的一项重大改变,它将传统的菜单和工具栏转换为...

    Custom.mxtpro

    Custom.mxtpro

    jquery-ui-1.8.2.custom.min.js,jquery-ui-1.8.4.custom.css

    本篇文章将详细探讨两个特定版本的jQuery UI——1.8.2.custom.min.js和1.8.4.custom.css,以及它们在实际应用中的作用。 首先,我们来看`jquery-ui-1.8.2.custom.min.js`。这是一个压缩和优化过的JavaScript文件,...

    mobiscroll-custom-2.17.0插件

    **移动端开发日期插件——Mobiscroll Custom 2.17.0详解** 在移动应用开发中,用户界面的交互性和用户体验是至关重要的。对于日期选择这类常见的功能,一款高效、美观且易于使用的插件至关重要。`Mobiscroll Custom...

    jquery-ui-1.10.2.custom

    在这个主题中,我们将深入探讨的是 `jquery-ui-1.10.2.custom` 版本,这是一个定制化的版本,包含了对 jQuery 1.9.1 的支持。 首先,让我们从 `jquery-ui-1.10.2.custom` 的核心组件开始。这个版本提供了包括但不...

    MFC控件Custom Control使用源码

    在Microsoft Foundation Classes (MFC)库中,Custom Control(自定义控件)是开发者为了实现特定功能或界面效果,通过扩展标准Windows控件而创建的。MFC为开发者提供了便捷的方式来实现这一目标,使得我们可以利用...

    CustomStyle

    在IT行业中,`CustomStyle`通常指的是自定义样式或者风格,这主要涉及到软件开发,特别是前端Web开发领域。自定义样式允许开发者根据自己的需求或设计规范来调整元素的外观,使其区别于默认样式,创造出独特的用户...

    jquery-ui-1.8.16.custom.min.js/jquery-ui-1.8.16.custom.css

    这个压缩包包含两个关键文件:`jquery-ui-1.8.16.custom.min.js` 和 `jquery-ui-1.8.16.custom.css`,这些都是jQuery UI的特定版本,即1.8.16。这个版本在当时是一个广泛使用的稳定版本,提供了丰富的功能和组件。 ...

    CustomControl控件使用vc++

    在Windows编程中,CustomControl(自定义控件)是一种允许开发者根据特定需求创建自定义用户界面元素的方法。在VC++环境中,开发自定义控件通常涉及到MFC(Microsoft Foundation Classes)框架的应用。当我们遇到...

    CustomSet.zip

    在给定的“CustomSet.zip”压缩包中,我们看到一个名为“CustomSet.java”的文件,这很可能是用户自定义的一个简易版本的HashSet实现。这个自定义版本通常是为了学习和理解HashSet的底层工作原理而创建的,或者是...

    CustomUIEditor.zip

    在Excel中,我们常常需要执行复杂的宏命令来自动化日常任务,而CustomUIEditor工具则提供了一个方便的方式来将这些宏转换为用户友好的按钮,使得操作更加直观,极大地提升了工作效率。这个工具允许用户自定义Excel的...

    Office Custom UI Editor

    Office Custom UI Editor是Office2007以上版本菜单、工具栏编辑器。Office2007以上版本是一个完全开放式的界面,开发者可以自己定义Ribbon工具栏和菜单。而Office Custom UI Editor可以轻松地实现这一目的。

    Building.Android.UIs.with.Custom.Views

    Move beyond default UI templates, create and customize amazing UIs with Android Custom View Enable smooth data flow and create futuristic UIs by creating flexible custom views Scale your apps with ...

    custom-filestroe-2.16-SNAPSHOT.jar

    custom-filestroe-2.16,geoserver的自定义切片存储路径插件,基于geoserver2.16x开发。可发布arcgis server、cesiumlab等切片

    Custom Message Support.zip

    You can create your own ROS custom messages and use them in MATLAB® and ROS networks to transmit information. Use roboticsAddons to install the necessary software for custom message support. To learn...

Global site tag (gtag.js) - Google Analytics