hadoop0.20.2中的API进行了大幅度的重构,导致很多API变成了@Deprecated,而新的API又不全,所以新手学起来感觉很变扭,社区开发者还是建议用hadoop0.20.2的可以沿用老版的API,本着学习的态度,用新的API添加了KeyValueTextInputFormat这个在老版API中存在的类,实际上是对TextInputFormat进行了小幅的修改。
package cn.fnst.cspf.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class KeyValueInputFormat extends FileInputFormat<Text, Text>{
protected static class KeyVRecordReader extends RecordReader<Text,Text>{
private static final Log LOG = LogFactory.getLog(KeyVRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private String separator = "\t";
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.separator = job.get("key.value.separator.in.input.line", "\t");
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public synchronized void close() throws IOException {
if(in!=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
Text line = new Text();
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(line, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
//此处添加额外处理即可,其他地方与TextInputFormat一样。
if(null!=line){
String[] kv = line.toString().split(this.separator);
if(kv.length==2){
key.set("key:"+kv[0]);
value.set("value:"+kv[1]);
}else{
LOG.info("Skipped line has no separator");
key.set(line.toString());
value.set("");
}
}
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new KeyVRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。
分享到:
相关推荐
- KeyValueTextInputFormat:每行的第一个分隔符前为键,后为值,分隔符可自定义,默认为制表符。 - 自定义InputFormat:可以按需定制键值对的解析规则。 2. Map阶段: - 对于默认的TextInputFormat,需要在map...
2. **KeyValueTextInputFormat**:此格式允许用户自定义分隔符(默认为制表符`\t`)将每行拆分为key-value对。例如,如果输入数据是: ``` Rich learning form\tIntelligent learning engine ``` 那么键值对将...
例如,开发者可以通过覆写getSplits()方法,实现自定义的数据分片策略,以适应特定的数据分布或计算需求。 综上所述,InputFormat是Hadoop MapReduce编程模型中的核心组件之一,它通过定义数据的分片和读取机制,...
3. **KeyValueTextInputFormat**: 这种输入格式将每行数据按照分隔符(如制表符或冒号)拆分为键和值,通常用于处理键值对数据。 4. **SequenceFileInputFormat**: 支持Hadoop的序列化二进制文件格式,这是一种高效...
98_KeyValueTextInputFormat* p$ O1 z- h, n" e( x1 s& c% z' v 99_join mapper端连接- N, S# O2 }6 m0 T 100_join reduce端连接0 N1 |* R5 n* D8 C+ i 101_hadoop Namenode HA配置8 [( ^7 Q1 W' y3 q 102_avro串行...