hadoop0.20.2中的API进行了大幅度的重构,导致很多API变成了@Deprecated,而新的API又不全,所以新手学起来感觉很变扭,社区开发者还是建议用hadoop0.20.2的可以沿用老版的API,本着学习的态度,用新的API添加了KeyValueTextInputFormat这个在老版API中存在的类,实际上是对TextInputFormat进行了小幅的修改。
package cn.fnst.cspf.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class KeyValueInputFormat extends FileInputFormat<Text, Text>{
protected static class KeyVRecordReader extends RecordReader<Text,Text>{
private static final Log LOG = LogFactory.getLog(KeyVRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private String separator = "\t";
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.separator = job.get("key.value.separator.in.input.line", "\t");
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public synchronized void close() throws IOException {
if(in!=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
Text line = new Text();
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(line, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
//此处添加额外处理即可,其他地方与TextInputFormat一样。
if(null!=line){
String[] kv = line.toString().split(this.separator);
if(kv.length==2){
key.set("key:"+kv[0]);
value.set("value:"+kv[1]);
}else{
LOG.info("Skipped line has no separator");
key.set(line.toString());
value.set("");
}
}
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new KeyVRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。
分享到:
相关推荐
自定义控件是C#编程中一个重要的概念,它允许开发者根据需求扩展或修改内置控件的功能和外观,以满足特定项目的需求。在本案例中,我们将深入探讨如何自定义ComboBox控件。 ComboBox控件是Windows Forms中一个非常...
本篇文章将深入探讨如何定义和优化自定义模块,以及处理自定义事件。 首先,理解FreeSwitch模块的生命周期是至关重要的。一个模块通常包括加载、初始化、运行和卸载四个阶段。在加载阶段,模块被FreeSwitch核心加载...
在QT编程中,自定义窗口是一项常见的需求,它允许开发者根据应用的需求来设计独特的界面元素和交互方式。本文将深入探讨如何在QT中创建自定义窗口,并实现自由拖动和自定义标题的功能。 首先,我们需要了解QT中的...
"C#自定义控件库"是指使用C#语言编写的、由开发者自定义的控件集合,这些控件可以扩展.NET Framework的标准控件集,为用户提供更丰富的界面元素和功能。自定义控件是软件开发中的一个重要环节,特别是在UI设计和用户...
本篇将深入探讨如何自定义FreeMarker标签,以扩展其功能并适应特定项目需求。 首先,理解FreeMarker的默认标签语法至关重要。FreeMarker使用${...}表达式来插入变量,#{...}用于输出注释,以及、等控制结构进行条件...
本教程将深入讲解如何在WinForms中自定义CheckBox控件,以满足特定的界面或功能需求。 首先,自定义CheckBox控件主要是为了扩展其默认功能,比如改变其外观、添加额外的事件处理或者提供更复杂的交互逻辑。在VS2005...
鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例鸿蒙自定义组件实例...
本主题将深入探讨如何在Spring Boot工程中通过自定义response注解、利用Java反射机制、设置自定义拦截器以及实现WebMvcConfigurer接口来实现这一目标。 首先,我们来看自定义response注解。在Spring Boot中,可以...
在APICloud平台上,开发者可以利用其提供的自定义模块功能,扩展原生的API接口,以满足特定项目的需求。本文将详细解析如何进行自定义模块的开发、打包以及使用过程,结合给定的压缩包文件中的源码和截图,我们将...
在C#编程中,自定义用户控件是提高应用程序界面美观性和功能多样性的重要手段。本教程将基于给定的"C#自定义漂亮按钮"主题,深入讲解如何利用C#和Visual Studio 2010创建一个自定义的按钮控件。我们将主要探讨以下几...
自定义用户控件是提升应用程序功能和界面个性化的重要手段。在这个场景中,我们关注的是一个特定的自定义控件,即基于PictureBox的扩展。PictureBox是.NET Framework提供的一个标准控件,用于显示图像,而自定义用户...
在Android开发中,自定义View和自定义属性是提升应用个性化和功能扩展性的重要手段。本文将深入探讨这两个核心概念,以及如何在实际项目中应用它们。 ### 自定义View 自定义View允许开发者创建自己的视图组件,以...
在AutoCAD平台上,开发者可以利用ObjectARX(Autodesk Reactor Extension)库来创建自定义实体,这是一种基于C++的编程接口,允许程序员深入到AutoCAD的内部工作流程,实现扩展功能和定制化操作。本篇文章将详细探讨...
在实际的企业环境中,根据业务需求,我们可能需要对默认的CAS登录页面进行自定义,以提供更符合品牌形象或用户体验的界面。下面将详细讲解如何配置和实现CAS的自定义登录页面。 一、CAS自定义登录页面概述 CAS的...
在C# WinForm应用开发中,自定义RadioButton控件是一种常见的需求,这通常涉及到扩展.NET Framework提供的默认RadioButton控件的功能,以满足特定的设计或交互需求。本教程将深入讲解如何在Visual Studio 2005及其更...
本篇文章将详细探讨如何在Qt中实现一个自定义的提示框(Tip),即“自定义tip”。 首先,我们要理解“tooltip”在UI设计中的作用。Tooltip是当鼠标悬停在某个控件上时,会短暂显示的一段文字信息,用于提供关于该控件...
在开发这些应用程序时,有时我们需要根据项目需求创建特定的用户界面元素,这正是自定义控件的作用所在。本主题将深入探讨如何在C#中自定义一个类似于系统默认`MessageBox`的控件。 `MessageBox`是.NET Framework...
这个自定义键盘的实现主要涉及到UIKeyboardType的扩展以及自定义UIView的使用。 首先,我们了解iOS系统键盘的默认类型。iOS提供了多种键盘类型,如UIKeyboardTypeASCIICapable、UIKeyboardTypeNumberPad等。其中,...
在Microsoft Foundation Classes (MFC)库中,开发者可以利用自定义消息和自定义类来扩展框架的功能,以满足特定项目的需求。自定义消息是Windows消息系统的一部分,允许程序员定义自己的消息类型,而自定义类则提供...
本资料包“C#自定义控件.rar”显然是关于如何在C#环境中创建和使用自定义控件的教程或示例代码。 自定义控件的创建通常分为几个步骤: 1. **基础类选择**:首先,你需要选择一个基础类来继承。这可以是系统提供的...