比较简单,直接上代码:
这是MapReduce功能代码:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyInverseIndex {
public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUTPUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
conf.set("hadoop.job.user","hadoop");
conf.set("mapred.job.tracker", "10.103.240.160:9001");
final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setJarByClass(MyInverseIndex.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);//设置个数为1
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
/**
* 只适用于文档中只出现一行 (可以一次读取整个文档)
*
* @author hadoop
*
*/
public static class MyMapper extends Mapper<NullWritable, BytesWritable, Text, Text> {
Map<String, Integer> map = new HashMap();
@Override
public void map(NullWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String line = new String(value.getBytes(), "utf-8").trim();
String[] words = line.split(" ");
for (String s : words) {
if (map.containsKey(s)) {
map.put(s, map.get(s) + 1);
} else {
map.put(s, 1);
}
}
Set<String> keys = map.keySet();
for (Iterator it = keys.iterator(); it.hasNext();) {
String s = (String) it.next();
context.write(new Text(s),
new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
}
map.clear();
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer fileList = new StringBuffer();
for(Text value : values){
fileList.append(value.toString()+";");
}
result.set(fileList.toString());
context.write(key, result);
}
}
}
这是重写的InputFormat类:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* 为了阻止文件分片,并将该整个文件作为一条记录处理
*
* @author Xiaoye
*
*/
public class WholeFileInputFormat extends
FileInputFormat<NullWritable, BytesWritable> {
/**
* 重写该方法将阻止将一个文件分片
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
/**
* 获取RecordReader,该类用来将分片分割成记录,从而生成key和value值给map处理。
*
*/
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
这是RecordReader实现类:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 继承RecordReader
* 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。
* 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片
* 的所有内容
* @author Xiaoye
*/
public class WholeFileRecordReader extends
RecordReader<NullWritable, BytesWritable> {
// 用来盛放传递过来的分片
private FileSplit fileSplit;
private Configuration conf;
//将作为key-value中的value值返回
private BytesWritable value = new BytesWritable();
// 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成
private boolean processed = false;
/**
* 设置RecordReader的分片和配置对象。
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
/**
* 核心算法
* 用来产生key-value值
* 将生成的value值存入value对象中
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
/*
* 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容
* 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中
*/
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
//do nothing
}
}
其中有需要注意的 fileSplit中存放的是分片信息,并没有分片内容。
在BytesWritable 转为byte[]再转为String类型时需要调用trim()方法,不然会有空格影响key值的group。
分享到:
相关推荐
综上所述,InputFormat是Hadoop MapReduce编程模型中的核心组件之一,它通过定义数据的分片和读取机制,允许开发者以灵活的方式处理各种格式的数据。理解InputFormat的设计和实现,对于有效地使用Hadoop进行大规模...
例如,如果你正在处理一种结构化的日志文件,其中每个事件由开始和结束的特定字符串包围,你可以在RecordReader中寻找这些限定符,然后将限定符之间的内容作为键值对返回。这样,Mapper就可以直接处理事件数据,而...
每个InputSplit由一条条记录组成,通过RecordReader的实现来读取InputSplit中的每条记录,并将读取的记录交给Map函数来处理。 3. 记录读取器(RecordReader):RecordReader是InputFormat的实现类,负责读取...
RecordReader负责读取数据文件并按需拆分记录,而InputFormat负责初始化RecordReader并确定数据分片(Splits)。 4. **实例代码:按照空格拆分日志文件** 假设我们有一个日志文件,每行包含多字段,字段之间用空格...
1. **TextInputFormat**:这是最基础的InputFormat实现,它将每一行视为一个记录,键是LongWritable类型的偏移量,表示行在文件中的起始位置,值是Text类型的行内容,不包括行结束符。例如,一个InputSplit可能包含...
小文件合并是指将多个小文件整合成少数几个大文件,以减少MapReduce任务的数量,提高数据读取和处理速度。在Hive中,我们可以通过Hadoop的InputFormat类和RecordReader类来实现自定义的文件合并逻辑。 Java代码实现...
Hadoop作为开源框架之一,实现了MapReduce的核心思想,为大数据处理提供了强大的支持。 在Hadoop MapReduce框架中,数据处理主要分为两个阶段:Map阶段和Reduce阶段。这两个阶段的工作流程如下: 1. **用户提交...
HDFS是Google的GFS(Google File System)的开源实现,它是一个分布式文件系统,设计用于跨多台服务器存储和处理大型数据集。在HDFS中,NameNode作为主节点,负责元数据管理,包括文件系统的命名空间和文件的块映射...
如果我们使用`TextInputFormat`,那么每条日志可能会被错误地分割为多行,而使用`SdfTextInputFormat`,我们可以确保每个Map任务处理的是完整的一条日志记录,避免了数据的割裂和处理错误。 为了使用这个自定义输入...
5. **InputFormat与RecordReader**: `InputFormat`负责将输入数据分割成适合`Mapper`处理的小块,`RecordReader`则从这些块中读取记录。 6. **OutputFormat与RecordWriter**: `OutputFormat`定义如何将`Reducer`...
5. **NLineInputFormat**: 这个输入格式将每个N行作为一个split,常用于将多行数据作为单个输入处理,例如在处理CSV文件时。 6. **CombineFileInputFormat**: 该类用于合并多个文件输入格式,减少Map任务的数量,...
`RecordReader`和`RecordWriter`处理单个输入和输出记录。`TaskTracker`和`JobTracker`是MapReduce的调度和管理组件,负责任务分配和监控。 【源码学习的重要性】 通过阅读Hadoop源代码,开发者可以: 1. 理解...
TaskTracker 向 JobTracker 索求下一个 Map/Reduce 任务,Mapper 任务从 InputFormat 创建 RecordReader,循环读入 FileSplits 的内容生成 Key 与 Value,传给 Mapper 函数,处理完后中间结果写成 SequenceFile。...
- YARN(Yet Another Resource Negotiator)在Hadoop 2.x中作为资源管理系统,负责分配内存、CPU等资源,确保文件分发和处理的高效进行。 总的来说,"大数据MapReduce文件分发"涵盖了从数据分块、数据传输、任务...
8. **RecordReader与RecordWriter**:RecordReader从InputFormat创建的输入分片中读取键值对,RecordWriter则负责将Reduce输出写入OutputFormat定义的输出格式。 9. **JobTracker与TaskTracker**:在Hadoop 1.x中,...
在分布式计算领域,Hadoop MapReduce 是一个广泛使用的框架,用于处理和存储大规模数据集。在某些场景下,我们可能需要将多个小文件合并成一个大文件,以便更有效地进行MapReduce作业。"HadoopMR-CombineLocalFiles...
大数据平台中的MapReduce是由Google公司的Jeffrey Dean和Sanjay Ghemawat开发的一个针对大规模群组中的海量数据处理的分布式编程模型。MapReduce实现了两个功能:Map函数应用于集合中的所有成员,然后返回一个基于这...
2. **输入分片**:InputFormat组件负责将输入文件切分成逻辑上的片段(InputSplit),并为每个片段分配一个RecordReader。 3. **Map阶段**:RecordReader读取数据并传递给Mapper,Mapper对输入数据进行处理,并输出...
2. **InputFormat和RecordReader**:InputFormat定义了如何将数据切分成输入块,并且负责将这些块转化为键值对供Map任务处理。RecordReader则从输入块中读取单个记录。 3. **OutputFormat和RecordWriter**:...
2. **InputFormat和OutputFormat**:这两个接口定义了如何将数据分成输入分片(InputSplit)以及如何将这些分片映射到Map任务。`TextInputFormat`和`TextOutputFormat`是最基础的实现,分别用于处理文本文件的输入和...