在执行Reduce Shuffle的过程中,偶尔会遇到Shuffle Error,但是重启任务之后,Shuffle Error会消失,当然这只是在某些特定情况下才会报出来的错误。虽然在每次执行很短的时间报出这个错误,但是如果单个Reducer的错误数量超出maxAttempt,就会导致整个任务失败。
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#50 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
本分析过程同时借鉴了这篇blog:
结合hadoop 2.2.0的源代码来对整个失败过程进行简要分析。
从代码分析来看,最底层Fetcher.run方法执行时出现的错误,在Shuffle.run方法中,会启动一定数量的Fetcher线程(数量由参数mapreduce.reduce.shuffle.parallelcopies决定,我们配置的事50个,是不是有点多,默认是5),Fetcher线程用来从map端copy数据到Reducer端本地。
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers]; for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); } // Wait for shuffle to complete successfully while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); synchronized (this) { if (throwable != null) { throw new ShuffleError("error in shuffle in " + throwingThreadName, throwable); } } }
当任意一个Fetcher发生异常时,就会在scheduler的等待后能够在主线程发现,停掉整个Reducer。
public synchronized void reportException(Throwable t) { if (throwable == null) { throwable = t; throwingThreadName = Thread.currentThread().getName(); // Notify the scheduler so that the reporting thread finds the // exception immediately. synchronized (scheduler) { scheduler.notifyAll(); } } }
在异常堆栈发生的地方,Fetcher中调用copyFromHost方法,调用到Fetcher的114行,merger.reserve方法会调用MergerManagerImpl.reserve
@Override public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { if (!canShuffleToMemory(requestedSize)) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize, jobConf, mapOutputFile, fetcher, true); } ...
重点是这个canShuffleToMemory方法,它会决定是启动OnDiskMapOutput还是InMemoryMapOutput类,标准就是需要的内存数量小于设置的限制。
private boolean canShuffleToMemory(long requestedSize) { return (requestedSize < maxSingleShuffleLimit); }
在初始化MergerManageImpl的时候设置了这个限制,MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES(mapreduce.reduce.memory.totalbytes)这个参数我们并没有设置,因此使用的是Runtime.getRuntime.maxMemory()*maxInMemCopyUse, MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT(mapreduce.reduce.shuffle.input.buffer.percent) 参数使用的是0.70,也就是最大内存的70%用于做Shuffle/Merge,比如当前Reducer端内存设置成2G,那么就会有1.4G内存。
final float maxInMemCopyUse = jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f); this.memoryLimit = (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse); final float singleShuffleMemoryLimitPercent = jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT); this.maxSingleShuffleLimit = (long)(memoryLimit * singleShuffleMemoryLimitPercent);
而单个Shuffle最大能够使用多少内存,还需要再乘一个参数:MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT(mapreduce.reduce.shuffle.memory.limit.percent),我们当前并没有设置这个参数,那么默认值为0.25f,此时单个Shuffle最大能够使用1.4G*0.25f=350M内存。
InMemory会在初始化时接收一个size参数,这个size的计算方式暂时未知,用于初始化其BoundedByteArrayOutputStream,
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId, MergeManagerImpl<K, V> merger, int size, CompressionCodec codec, boolean primaryMapOutput) {
这个size也就是BoundedByteArrayOutputStream作为byte[]的大小:
public BoundedByteArrayOutputStream(int capacity, int limit) { this(new byte[capacity], 0, limit); }
OOM也就是出现在这一行。
而我们出的错可能就是出现在判定为使用InMemoryMapOutput但是分配内存时出现的错误,试想使用50个Fetcher线程,单个线程设置为最大接收350M,而堆的最大内存为2G,这样只要有7个Fetcher线程判断为使用InMemoryMapOutput,且同时开始接收数据,就可能造成Java Heap的OOM错误,从而导致Shuffle Error。
我觉得我们可以对使用的参数进行一定的调整,比如说减少Fetcher线程的数量,减少单个Shuffle使用InMemory操作的比例让其OnDisk操作等等,来避免这个问题。
相关推荐
Hadoop Mapreduce过程shuffle过程全解析,Shuffle过程
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
【基于Hadoop的电影影评数据分析】是一项大数据课程的大作业,旨在利用Hadoop的分布式处理能力来分析电影影评数据。Hadoop是一个由Apache软件基金会开发的开源框架,专为处理和存储大规模数据而设计。它由四个核心...
综上所述,“Hadoop之外卖订单数据分析系统”涵盖了大数据处理的多个方面,包括数据存储、处理、分析和可视化。通过合理运用Hadoop及其生态中的工具,我们可以对海量的外卖订单数据进行深度挖掘,为企业提供有价值的...
在Hadoop MapReduce框架中,shuffle和排序是两个至关重要的步骤,它们发生在map阶段和reduce阶段之间,确保数据被正确地处理和聚合。...理解和掌握这一过程对于优化Hadoop作业性能和解决可能出现的问题至关重要。
简单说一下hadoop和spark的shuffle过程
基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...
Hadoop应运而生,作为开源的分布式文件系统和并行计算框架,它有效地解决了大规模数据处理的问题,被广泛应用于各行各业。 Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一个高...
标题中的“基于Hadoop的股票大数据分析系统”指的是利用Apache Hadoop框架来处理和分析海量的股票市场数据。Hadoop是一个开源的分布式计算框架,它允许在大规模集群中存储和处理大量数据。在这个系统中,Hadoop可能...
而实现这种大规模数据分析的关键技术就是Apache Hadoop,一种分布式计算框架,它使得处理和存储海量数据成为可能。 Hadoop的核心由两个主要组件构成:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是...
Hadoop豆瓣电影数据分析(Hadoop)操作源码
【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...
然后,讨论了基于Hadoop的成绩分析系统的需求分析和开发工具。接着,详细介绍了Hadoop集群的搭建过程,包括VMWARE安装、CENTOS6.8安装和Hadoop的安装与配置。 在编码实现部分,本文介绍了使用MapReduce实现成绩分析...
在大数据处理领域,Hadoop是一个不可或缺的核心框架...通过Hadoop、HBase和Kafka的组合,我们可以高效地处理、分析和传递海量通话记录数据,从而为电信公司提供深度洞见,优化服务,提升客户满意度,甚至预测市场趋势。
本文档将详细介绍如何利用Hadoop进行网站日志分析的实践过程,涵盖了从数据获取、Hadoop环境搭建到数据处理和存储的全过程。 1. 项目概述: 该项目主要针对一个由技术培训机构主办的技术学习论坛的日志数据进行分析...
在交通大数据的实际应用中,例如车辆行驶状况分析、交通事故的判断分析、高速道路车辆异常事件的检测等,Hadoop分布式存储和分析平台提供了强大的数据处理能力。它能够支持海量的交通视频数据高效、精准的查询和分析...
4. **数据探索**:通过Hive或Spark SQL进行初步的数据分析和探索。 5. **深度分析**:使用Spark或其他工具进行复杂的数据挖掘和机器学习任务。 6. **结果可视化**:将分析结果通过Tableau、QlikView等工具进行可视...
通过对问题的分析和解决,可以总结出以下知识点: 1. Hadoop 文件上传失败的原因分析: 在上传文件到 Hadoop HDFS 文件系统中失败的原因是因为权限问题,具体来说,是因为当前用户没有写入权限。在 Tomcat 中观察...