hadoop0.20.2中的API进行了大幅度的重构,导致很多API变成了@Deprecated,而新的API又不全,所以新手学起来感觉很变扭,社区开发者还是建议用hadoop0.20.2的可以沿用老版的API,本着学习的态度,用新的API添加了KeyValueTextInputFormat这个在老版API中存在的类,实际上是对TextInputFormat进行了小幅的修改。
package cn.fnst.cspf.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class KeyValueInputFormat extends FileInputFormat<Text, Text>{
protected static class KeyVRecordReader extends RecordReader<Text,Text>{
private static final Log LOG = LogFactory.getLog(KeyVRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private String separator = "\t";
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.separator = job.get("key.value.separator.in.input.line", "\t");
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public synchronized void close() throws IOException {
if(in!=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
Text line = new Text();
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(line, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
//此处添加额外处理即可,其他地方与TextInputFormat一样。
if(null!=line){
String[] kv = line.toString().split(this.separator);
if(kv.length==2){
key.set("key:"+kv[0]);
value.set("value:"+kv[1]);
}else{
LOG.info("Skipped line has no separator");
key.set(line.toString());
value.set("");
}
}
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new KeyVRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。
分享到:
相关推荐
自定义控件是C#编程中一个重要的概念,它允许开发者根据需求扩展或修改内置控件的功能和外观,以满足特定项目的需求。在本案例中,我们将深入探讨如何自定义ComboBox控件。 ComboBox控件是Windows Forms中一个非常...
本篇文章将深入探讨如何定义和优化自定义模块,以及处理自定义事件。 首先,理解FreeSwitch模块的生命周期是至关重要的。一个模块通常包括加载、初始化、运行和卸载四个阶段。在加载阶段,模块被FreeSwitch核心加载...
在QT编程中,自定义窗口是一项常见的需求,它允许开发者根据应用的需求来设计独特的界面元素和交互方式。本文将深入探讨如何在QT中创建自定义窗口,并实现自由拖动和自定义标题的功能。 首先,我们需要了解QT中的...
鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例...
在.NET Framework中,C#提供了一个强大的平台来创建自定义组件和控件,这使得开发者可以扩展.NET Framework的基础功能,实现特定的需求。本项目聚焦于C#中的自定义组件和控件开发,涵盖了一些常见的实用功能,如速选...
本主题将深入探讨如何在Spring Boot工程中通过自定义response注解、利用Java反射机制、设置自定义拦截器以及实现WebMvcConfigurer接口来实现这一目标。 首先,我们来看自定义response注解。在Spring Boot中,可以...
本话题将深入探讨如何实现UITableViewCell的自定义,特别是实现等高的自定义。 一、UITableViewCell自定义基础 自定义UITableViewCell主要涉及以下几个方面: 1. 创建UITableViewCell子类:通过继承...
在APICloud平台上,开发者可以利用其提供的自定义模块功能,扩展原生的API接口,以满足特定项目的需求。本文将详细解析如何进行自定义模块的开发、打包以及使用过程,结合给定的压缩包文件中的源码和截图,我们将...
在C#编程中,自定义用户控件是提高应用程序界面美观性和功能多样性的重要手段。本教程将基于给定的"C#自定义漂亮按钮"主题,深入讲解如何利用C#和Visual Studio 2010创建一个自定义的按钮控件。我们将主要探讨以下几...
自定义用户控件是提升应用程序功能和界面个性化的重要手段。在这个场景中,我们关注的是一个特定的自定义控件,即基于PictureBox的扩展。PictureBox是.NET Framework提供的一个标准控件,用于显示图像,而自定义用户...
因此,开发者经常需要自定义日期选择器来提供更符合应用风格或特定功能的交互体验。这篇内容将深入探讨如何在Android中创建一个自定义日期选择器,并通过源码分析来增强我们的理解。 首先,我们要明白自定义日期...
在Android开发中,自定义View和自定义属性是提升应用个性化和功能扩展性的重要手段。本文将深入探讨这两个核心概念,以及如何在实际项目中应用它们。 ### 自定义View 自定义View允许开发者创建自己的视图组件,以...
这个自定义键盘的实现主要涉及到UIKeyboardType的扩展以及自定义UIView的使用。 首先,我们了解iOS系统键盘的默认类型。iOS提供了多种键盘类型,如UIKeyboardTypeASCIICapable、UIKeyboardTypeNumberPad等。其中,...
在AutoCAD平台上,开发者可以利用ObjectARX(Autodesk Reactor Extension)库来创建自定义实体,这是一种基于C++的编程接口,允许程序员深入到AutoCAD的内部工作流程,实现扩展功能和定制化操作。本篇文章将详细探讨...
在实际的企业环境中,根据业务需求,我们可能需要对默认的CAS登录页面进行自定义,以提供更符合品牌形象或用户体验的界面。下面将详细讲解如何配置和实现CAS的自定义登录页面。 一、CAS自定义登录页面概述 CAS的...
在Android开发中,自定义组件是一项重要的技能,它允许开发者根据特定需求打造独特的用户界面。本主题主要关注如何在自定义的LinearLayout中嵌入自定义的View,并利用GridView来有效地组织这些视图。 首先,我们要...
在C# WinForm应用开发中,自定义RadioButton控件是一种常见的需求,这通常涉及到扩展.NET Framework提供的默认RadioButton控件的功能,以满足特定的设计或交互需求。本教程将深入讲解如何在Visual Studio 2005及其更...
vb.net 自定义控件 自定义属性 UITypeEditor UI 类型编辑器 实例 提供一个示例 UITypeEditor,它使用 IWindowsFormsEditorService 显示用于用户输入的 Form。 IWindowsFormsEditorService 只能通过 PropertyGrid ...
本篇文章将详细探讨如何在Qt中实现一个自定义的提示框(Tip),即“自定义tip”。 首先,我们要理解“tooltip”在UI设计中的作用。Tooltip是当鼠标悬停在某个控件上时,会短暂显示的一段文字信息,用于提供关于该控件...
在开发这些应用程序时,有时我们需要根据项目需求创建特定的用户界面元素,这正是自定义控件的作用所在。本主题将深入探讨如何在C#中自定义一个类似于系统默认`MessageBox`的控件。 `MessageBox`是.NET Framework...