`

Hadoop0.20+ custom MultipleOutputFormat

 
阅读更多

Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。

此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。

 

重写MultipleOutputFormat需要2个类:

LineRecordWriter

MultipleOutputFormat

 

PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果

 

LineRecordWriter:

 

package cn.xmu.dm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
	private static final String utf8 = "UTF-8";

	protected DataOutputStream out;
	private final byte[] keyValueSeparator;

	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
		this.out = out;
		try {
			this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8
					+ " encoding");
		}
	}

	public LineRecordWriter(DataOutputStream out) {
		this(out, "/t");
	}

	private void writeObject(Object o) throws IOException {
		if (o instanceof Text) {
			Text to = (Text) o;
			out.write(to.getBytes(), 0, to.getLength());
		} else {
			out.write(o.toString().getBytes(utf8));
		}
	}

	public synchronized void write(K key, V value) throws IOException {
		boolean nullKey = key == null || key instanceof NullWritable;
		boolean nullValue = value == null || value instanceof NullWritable;
		if (nullKey && nullValue) {
			return;
		}
		if (!nullKey) {
			writeObject(key);
		}
		if (!(nullKey || nullValue)) {
			out.write(keyValueSeparator);
		}
		if (!nullValue) {
			writeObject(value);
		}
		out.write("\r\n".getBytes());
	}

	public synchronized void close(TaskAttemptContext context)
			throws IOException {
		out.close();
	}
}

 

 

MultipleOutputFormat:

 

 

package cn.xmu.dm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
		extends FileOutputFormat<K, V> {
	private MultiRecordWriter writer = null;
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
			InterruptedException {
		if (writer == null) {
			writer = new MultiRecordWriter(job, getTaskOutputPath(job));
		}
		return writer;
	}
	private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
		Path workPath = null;
		OutputCommitter committer = super.getOutputCommitter(conf);
		if (committer instanceof FileOutputCommitter) {
			workPath = ((FileOutputCommitter) committer).getWorkPath();
		} else {
			Path outputPath = super.getOutputPath(conf);
			if (outputPath == null) {
				throw new IOException("Undefined job output-path");
			}
			workPath = outputPath;
		}
		return workPath;
	}
	
	protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
	public class MultiRecordWriter extends RecordWriter<K, V> {
		
		private HashMap<String, RecordWriter<K, V>> recordWriters = null;
		private TaskAttemptContext job = null;
		
		private Path workPath = null;
		public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
			super();
			this.job = job;
			this.workPath = workPath;
			recordWriters = new HashMap<String, RecordWriter<K, V>>();
		}
		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
			while (values.hasNext()) {
				values.next().close(context);
			}
			this.recordWriters.clear();
		}
		@Override
		public void write(K key, V value) throws IOException, InterruptedException {
		
			String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
			RecordWriter<K, V> rw = this.recordWriters.get(baseName);
			if (rw == null) {
				rw = getBaseRecordWriter(job, baseName);
				this.recordWriters.put(baseName, rw);
			}
			rw.write(key, value);
		}
	
		private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
				throws IOException, InterruptedException {
			Configuration conf = job.getConfiguration();
			boolean isCompressed = getCompressOutput(job);
			String keyValueSeparator = ",";
			RecordWriter<K, V> recordWriter = null;
			if (isCompressed) {
				Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
						GzipCodec.class);
				CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
				Path file = new Path(workPath, baseName + codec.getDefaultExtension());
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
				recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec
						.createOutputStream(fileOut)), keyValueSeparator);
			} else {
				Path file = new Path(workPath, baseName);
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
				recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
			}
			return recordWriter;
		}
	}
}

 

 

PartitionByFilenameOutputFormat:

 

package cn.xmu.dm;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.Text;



public class PartitionByFilenameOutputFormat extends MultipleOutputFormat<Text, Text>{



	@Override

	protected String generateFileNameForKeyValue(Text key, Text value,

			Configuration conf) {

		return value.toString().substring(0, value.toString().indexOf("\t"));

	}



}

 

分享到:
评论

相关推荐

    Hadoop2.2+Zookeeper3.4.5+HBase0.96集群环境搭建

    Hadoop2.2+Zookeeper3.4.5+HBase0.96集群环境搭建 Hadoop2.2+Zookeeper3.4.5+HBase0.96集群环境搭建是大数据处理和存储的重要组件,本文档将指导用户从零开始搭建一个完整的Hadoop2.2+Zookeeper3.4.5+HBase0.96集群...

    Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+ES+Redash等详细安装部署

    在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...

    hadoop2.8.1+hadoop+winutils编译包

    这个压缩包“hadoop2.8.1+hadoop+winutils编译包”显然包含了Hadoop 2.8.1版本的相关组件,特别是针对Windows环境的WinUtils工具。下面我们将深入探讨Hadoop的基本概念、2.8.1版本的特点以及WinUtils在Hadoop中的...

    hadoop-0.20.205.0和hbase-0.90.5,集群和单机 安装配置

    本文将详细介绍如何在三台虚拟机上安装配置Hadoop-0.20.205.0和HBase-0.90.5,包括单机模式和集群模式的安装配置流程。 #### 二、环境准备 首先,我们需要准备三台虚拟机,并安装CentOS-5.7操作系统。这三台虚拟机...

    CentOS下hadoop0.20安装完成版

    hadoop初学者的福音,包含已经安装好hadoop0.20的CentOS7,在vmware下可以直接导入,运行成功;已经配置好的hadoop0.20;windows下用eclipse开发用的插件及对应的eclipse版本;hadoop的入门程序WordCount.java;还有...

    毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip

    毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+...

    Hadoop 0.20 API CHM

    Hadoop 0.20 API 0.20版本新增加了mapreduce包,性能得到很大提高。

    winutils.exe.zip_hadoop2.7+

    在Java大数据开发领域,Hadoop是一个至关重要的分布式计算框架,它允许存储和处理海量数据。Hadoop 2.7+是Hadoop的一个较新版本,提供了许多改进和优化,以提高性能和稳定性。"winutils.exe.zip_hadoop2.7+"这个...

    win10下搭建Hadoop环境(jdk+mysql+hadoop+scala+hive+spark) 3.docx

    在Windows 10环境下搭建Hadoop生态系统,包括JDK、MySQL、Hadoop、Scala、Hive和Spark等组件,是一项繁琐但重要的任务,这将为你提供一个基础的大数据处理平台。下面将详细介绍每个组件的安装与配置过程。 **1. JDK...

    Hadoop+Hbase+Spark+Hive搭建

    Hadoop+Hbase+Spark+Hive搭建指南 Hadoop是Apache开源的大数据处理框架,它提供了可靠的高效的数据存储和处理能力。Hbase是基于Hadoop的分布式NoSQL数据库,提供了高效的数据存储和检索能力。Spark是基于内存的数据...

    hadoop 0.20.203.0 api.chm

    hadoop 0.20.203.0 api.chm ,自己手工制作的文档

    Hadoop2.6+HA+Zookeeper3.4.6+Hbase1.0.0 集群安装详细步骤

    Hadoop2.6+HA+Zookeeper3.4.6+Hbase1.0.0 集群安装详细步骤

    大数据Hadoop+HBase+Spark+Hive集群搭建教程(七月在线)1

    在构建大数据处理环境时,Hadoop、HBase、Spark和Hive是四个核心组件,它们协同工作以实现高效的数据存储、处理和分析。本教程将详细介绍如何在Ubuntu系统上搭建这些组件的集群。 1. **Hadoop**:Hadoop是Apache...

    hadoop2.2+hbase0.96+hive0.12安装整合详细高可靠文档及经验总结

    ### hadoop2.2+hbase0.96+hive0.12安装整合详细高可靠文档及经验总结 #### 一、Hadoop2.2的安装 **问题导读:** 1. Hadoop的安装需要安装哪些软件? 2. Hadoop与HBase整合需要注意哪些问题? 3. Hive与HBase的...

    从VMware中安装CentOS到Hadoop集群+ Hive+ MySQL搭建

    适合新手,详细 01-Java环境安装 02- Eclipse下载与安装 03-VMware虚拟机的安装 04-在VMware中安装CentOS 05- Hadoop集群+ Hive+ MySQL搭建

Global site tag (gtag.js) - Google Analytics