`
philip_kissme
  • 浏览: 16613 次
  • 来自: ...
社区版块
存档分类
最新评论

Hadoop关于大量小压缩文件的问题和解决方法

阅读更多

之前一段时间偶尔会收到 hadoop 集群的 nagios 监控报警,具体报警是几个 resource-manager 节点一直负载超过阀值.找了个空闲时间分析了一下 job-history,发现是一个小伙伴的 job在段时间内创建了大量 map-task 导致的,在解决问题后做个笔记备忘

 

首先分析 job-history 的统计数据



  • 可以发现 map 任务执行的时间很短,但是同时会有大量的 map 任务
  • 与小伙伴沟通后,了解到他的 job 是根据运营侧需求,本周都在应用埋点日志中提取指定按钮的点击计数
  • 应用埋点的日志记录了每个用户的所有访问路径和参数
    1) log-agent 通过 logback 将日志记录到本地
    2)每小时生成一个 gz 压缩包,并上传至 hdfs 指定目录(根据应用标识+ip 生产目录规则)

根据 hadoop 的 map split 机制我们可以得出如下结论

  • 每个 inputfile 会对应多个 map split(根据 hdfs 的 block zise切分)
  • 每个map split会对应一个 map task
  • 由于每个小时生成的 gz 文件均未超过hdfs block zise(128m)
  • 小伙伴要统计的集群中有三个应用节点,排除凌晨时段没有日志产出的情况,大概一周的日志文件树=24*7*3~=350

解决问题

  • 我们知道 hadoop中是可以利用CombineFileInputFormat来合并大量小文件输入,提高 map 性能的.
  • 但默认实现只提供了CombineSequenceFileInputFormat和CombineTextFileInputFormat,没有压缩文件的支持.
  • 所以这里要实现自定义的CombineFileInputFormat来解决该问题

自定义CompressedCombineFileInputFormat

package ctu.components.amada.hadoop.usertrace;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import java.io.IOException;

/**
 * Created by pippo on 14/12/23.
 */
public class CompressedCombineFileInputFormat extends CombineFileInputFormat<CompressedRecordKey, Text> {

	public RecordReader<CompressedRecordKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException {
		return new CombineFileRecordReader<>((CombineFileSplit) split,
				context,
				CompressedCombineFileRecordReader.class);
	}

}

 

 CompressedCombineFileRecordReader

package ctu.components.amada.hadoop.usertrace;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * Created by pippo on 14/12/23.
 */
public class CompressedCombineFileRecordReader extends RecordReader<CompressedRecordKey, Text> {

	private long offset;
	private long end;
	private long pos;
	private CompressedRecordKey key;
	private Text value = new Text();
	private CompressTrunk trunk;
	private LineReader reader;

	public CompressedCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
			throws IOException {

		/*多个压缩文件合并为一个ombine file, 那么实际的压缩文件就是file中的一个trunk*/
		this.trunk = new CompressTrunk(context.getConfiguration(), split.getPath(index));
		/*trunk在combine 中的起始位置*/
		this.offset = split.getOffset(index);
		/* trunk在combine file中的结束位置*/
		this.end = offset + (trunk.isCompress() ? trunk.getFileLength() : split.getLength(index));

		boolean skipFirstLine = false;
		FSDataInputStream in = trunk.open();

		if (offset != 0) {
			skipFirstLine = true;
			--offset;
			in.seek(offset);
		}

		reader = new LineReader(trunk.open());

		// skip first line and re-establish "offset".
		if (skipFirstLine) {
			offset += reader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - offset));
		}
		this.pos = offset;
	}

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

	}

	@Override
	public void close() throws IOException {
		trunk.close();
	}

	@Override
	public float getProgress() throws IOException {
		if (offset == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - offset) / (float) (end - offset));
		}
	}

	@Override
	public boolean nextKeyValue() throws IOException {
		initKey();
		initValue();

		//指定当前记录的读取起始位置
		key.offset = pos;

		int readed = 0;

		//读取一条记录
		if (pos < end) {
			readed = reader.readLine(value);
			pos += readed;
		}

		//如果没有读到任何记录,说明当前 trunk 已经没有更多记录了
		if (readed == 0) {
			key = null;
			value = null;
			return false;
		} else {
			return true;
		}
	}

	private void initKey() {
		if (key == null) {
			key = new CompressedRecordKey();
			key.fileName = trunk.getFileName();
		}
	}

	private void initValue() {
		if (value == null) {
			value = new Text();
		}
	}

	@Override
	public CompressedRecordKey getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	public static class CompressTrunk {

		public CompressTrunk(Configuration configuration, Path compressFile) throws IOException {
			this.configuration = configuration;
			this.compressFile = compressFile;
			this.fs = compressFile.getFileSystem(configuration);
			this.factory = new CompressionCodecFactory(configuration);
			this.codec = factory.getCodec(compressFile);

			prepareReadableFile();
		}

		/*将trunk解压缩到一个临时目录,并提供inputStream供读取*/
		protected void prepareReadableFile() throws IOException {
			if (!isCompress()) {
				readableFile = compressFile;
				return;
			}

			String _readFile = CompressionCodecFactory.removeSuffix(compressFile.toString(),
					codec.getDefaultExtension());
			readableFile = new Path(_readFile);

			InputStream in = null;
			OutputStream out = null;
			try {
				in = codec.createInputStream(fs.open(compressFile));
				out = fs.create(readableFile);
				IOUtils.copyBytes(in, out, configuration);
			} finally {
				IOUtils.closeStream(in);
				IOUtils.closeStream(out);
			}
		}

		private Configuration configuration;
		//源文件
		private Path compressFile;
		//解压后的文件
		private Path readableFile;
		private FileSystem fs;
		private CompressionCodecFactory factory;
		private CompressionCodec codec;

		public boolean isCompress() {
			return codec != null;
		}

		public String getFileName() {
			return readableFile.getName();
		}

		public long getFileLength() throws IOException {
			return fs.getFileStatus(readableFile).getLen();
		}

		private FSDataInputStream in;

		public FSDataInputStream open() throws IOException {
			if (in == null) {
				in = fs.open(readableFile);
			}
			return in;
		}

		//处理完毕后删除临时文件
		public void close() throws IOException {
			if (in != null) {
				in.close();
			}

			if (isCompress()) {
				fs.delete(readableFile, false);
			}
		}
	}

}

 

CompressedRecordKey 

package ctu.components.amada.hadoop.usertrace;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by pippo on 14/12/23.
 */
public class CompressedRecordKey implements WritableComparable {

	//记录所在的文件
	public String fileName;
	//记录在文件中所在的位置
	public long offset;

	public CompressedRecordKey() {
		super();
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.offset = in.readLong();
		this.fileName = in.readUTF();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(offset);
		out.writeUTF(fileName);
	}

	@Override
	public int compareTo(Object o) {
		CompressedRecordKey that = (CompressedRecordKey) o;

		int f = this.fileName.compareTo(that.fileName);
		if (f == 0) {
			return (int) Math.signum((double) (this.offset - that.offset));
		}
		return f;
	}

	@Override
	public boolean equals(Object obj) {
		if (obj instanceof CompressedRecordKey) {
			return this.compareTo(obj) == 0;
		}
		return false;
	}

	@Override
	public int hashCode() {

		final int hashPrime = 47;
		int hash = 13;
		hash = hashPrime * hash + (this.fileName != null ? this.fileName.hashCode() : 0);
		hash = hashPrime * hash + (int) (this.offset ^ (this.offset >>> 16));

		return hash;
	}

	@Override
	public String toString() {
		return this.fileName + "-" + this.offset;
	}

}

 

JOB 配置

	private void buildMapper(Job job) {
		job.setInputFormatClass(CompressedCombineFileInputFormat.class);
		job.setMapperClass(LogMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(UserTrace.class);
		job.setCombinerClass(LogCombiner.class);
	}

 

验证结果 


 

 

  1. 如图所示所有的输入被合并为三个map task处理
  2. 共处理了3.7G的数据(File:Number of bytes read)/解压后37G(HDFS:Number of bytes read)

新的问题

  1. CombineFileInputFormat没有split,导致只有三个map taks
  2. 每个map task的输出文件过大,shuffle 消耗了1个多小时

问题定位

通过查看 hadoop 源码发现,在使用CombineFileInputFormat时,如果没有显示设定CombineFileInputFormat.SPLIT_MAXSIZE,那么在一个 hadoop node 上只会有一个 split


 

问题解决

将CombineFileInputFormat.SPLIT_MAXSIZE设置为和 hadoop 的 block size 一样大小

 

 

结果检验

  • 处理4.5g 的日志(解压后45g) 共耗时20分钟
  • 其中 map 处理1.4亿条记录耗时5分51秒
  • map 的 output 进行 lz4压缩,shuffle 的时间缩短到11分钟


 



 

  • 大小: 72.4 KB
  • 大小: 74 KB
  • 大小: 83.4 KB
  • 大小: 274.6 KB
  • 大小: 223.3 KB
  • 大小: 72.5 KB
  • 大小: 140.1 KB
分享到:
评论

相关推荐

    hadoop压缩支持包和native 文件

    在分布式计算领域,Hadoop是一个不可或缺的开源框架,它主要用于处理和存储大量数据。Hadoop的压缩支持包和native文件是其高效运行的关键组成部分。本文将深入探讨这两个概念,以及它们在Windows和Linux环境下的应用...

    Hadoop 2.2.0 64位native文件(重编译)

    Hadoop 2.2.0 是一个重要的分布式计算框架,主要设计用于处理和存储大量数据。这个64位的native文件是Hadoop针对64位Linux操作系统编译的一...在遇到与native库相关的问题时,更新或替换这些文件是解决问题的有效途径。

    支持snappy压缩的hadoop2.7.2

    Snappy以其快速的压缩和解压速度以及相对较低的内存消耗而闻名,对于提升Hadoop集群的性能有着显著的作用。 首先,让我们深入了解一下Snappy压缩算法。Snappy是由Google开发的,它的设计目标是追求速度而非最高压缩...

    基于LZO的Hadoop文件归档优化方法.docx

    通过对Hadoop架构、LZO压缩算法以及文件归档优化的详细分析,本文为读者揭示了大数据处理和分析中的关键技术和优化策略,有助于计算机科学与技术、软件工程等相关专业的学生以及从业者更好地理解和应用Hadoop,以...

    Hadoop本地环境配置 需要的文件hadoopdll和winutilexe.zip

    2. **解压Hadoop**: 解压缩下载的Hadoop文件到你希望的目录,例如`C:\hadoop`. 3. **配置环境变量**: 在系统环境变量中添加`HADOOP_HOME`,值设为Hadoop的安装路径,如`C:\hadoop`。 4. **配置PATH**: 同样在环境...

    基于hadoop的简易云盘实现.zip

    Hadoop是一个开源的分布式计算框架,它允许处理和存储大量数据,尤其适合大数据处理场景。这个简易云盘的实现可能是为了让学生或开发者了解如何在分布式环境中构建存储系统,或者作为学习Hadoop应用的一个实践项目。...

    (orc + snappy / zlib ) 多线程并行合并小文件工具类 (出自:flink自定义合并orc小文件处)

    通过多线程并行处理和优化的压缩策略,它可以在不牺牲性能的前提下,有效解决小文件问题,提高数据处理的效率和存储的利用率。在实际应用中,可以根据集群资源和数据量调整参数,以达到最佳的合并效果。

    22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件

    本篇文章将详细探讨MapReduce如何使用Gzip、Snappy和Lzo这三种压缩算法来写入和读取文件。 1. Gzip压缩 Gzip是一种广泛使用的压缩算法,其压缩率较高,但压缩和解压缩速度相对较慢。在MapReduce中,通过设置`...

    hadoop2.7.1-win32.zip

    6. `winutils.pdb`:这是程序数据库文件,用于在调试过程中存储有关编译的源代码和调试信息,帮助开发者解决运行时错误和性能问题。 在实际使用中,用户需要将这些文件正确地配置到Hadoop的环境变量中,以确保...

    Hadoop2.7.3 Window10 hadoop.dll winutils.exe

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在普通硬件上处理和存储大量数据。对于Windows用户,配置Hadoop环境可能会遇到一些挑战,因为Hadoop最初是为Linux设计的。然而,通过使用正确的工具和...

    支持snappy压缩的hadoop压缩包

    这个“支持snappy压缩的hadoop压缩包”是专为优化Hadoop性能而设计的,它包含了配置和可能的库文件,使得用户可以直接在本地环境中使用。 Snappy是由Google开发的一种快速、轻量级的压缩算法,它的主要目标不是最高...

    hadoop上传文件共5页.pdf.zip

    【压缩包子文件的文件名称列表】: "赚钱项目" 这个条目可能表示压缩包内的PDF文件内容是关于如何利用Hadoop进行数据驱动的盈利项目,或者是在Hadoop环境中执行的数据分析项目,旨在帮助用户或企业通过数据分析来提高...

    hadoop2.6,window7 32bit,hadoop.dll、winutils.exe等文件

    Hadoop 2.6是Apache Hadoop项目的一个重要版本,它提供了大量的改进和新特性,包括YARN(Yet Another Resource Negotiator)资源管理器,增强了集群资源调度和管理的效率。对于Windows用户来说,尽管Hadoop最初设计...

    基于hadoop的云盘系统

    【压缩包子文件的文件名称列表】:hadoop_disk-master 这个文件名暗示了源代码仓库可能包含整个云盘系统的源代码,其中可能有以下目录和文件: - `src/main/java`: Java源代码,包括SpringBoot微服务、数据访问...

    hadoop.dll 2.7.3

    在压缩包子文件的文件名称列表中,我们看到了hadoop.dll和winutils.exe这两个文件。这通常意味着这是一个针对Windows用户的Hadoop开发或调试包,用户可以直接将这两个文件放置到适当的位置,例如Hadoop的安装目录...

    hadoop-2.6.0

    标签“hadoop-2.6.0”再次强调了讨论的主题,表明所有问题和解决方案都应针对这个特定的Hadoop版本。 在提供的压缩包子文件“2.6.0-hadoop_dll”中,可能包含的就是适用于Hadoop 2.6.0版本的`hadoop.dll`文件。用户...

    hadoop-lzo-0.4.20.jar

    总的来说,Hadoop-LZO是一个针对Hadoop的高效压缩解决方案,适合对数据处理速度有较高要求的场景,但需要根据实际需求和资源限制来选择是否使用。对于Hadoop `3.1.3`版本,使用Hadoop-LZO `0.4.20`是一个合理的版本...

    大数据Hadoop存储与分析处理平台建设方案-大数据Hadoop平台集成实施服务解决方案.docx

    Hadoop知识学习篇主要包括FileSystem总结、文件读取过程、文件写入过程、Hadoop均衡器、Hadoop存档、数据完整性和压缩等几个方面。其中,FileSystem总结是Hadoop文件系统的总结,用于介绍Hadoop文件系统的基本概念和...

    hadoop经典实战教程

    ### Hadoop经典实战教程知识点详解 #### 一、Hadoop简介与生态系统 ...更重要的是,通过具体案例的分析,读者可以更好地理解和应用Hadoop解决实际问题的能力。希望本教程能够帮助大家在大数据处理领域取得更大的成就。

    hadoop-2.7.1.tar.gz.zip

    这是一个压缩文件,最外层是.zip格式,内部包含的是一个名为“hadoop-2.7.1.tar.gz”的文件。在Linux或Mac系统中,通常会先使用`unzip`命令解压最外层的zip文件,然后用`tar -zxvf`命令解压内层的tar.gz文件。...

Global site tag (gtag.js) - Google Analytics