数据倾斜是指:map/reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点大很多,从而导致某几个节点迟迟运行不完。
在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显。主要是因为在job完成后所得到的Counters是整个job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因造成map处理数据量的差异过大,使得这些平均值能代表的价值降低。hive的执行时分阶段的,map处理数据量的差异取决于一个stage的reduce输出,所以如何将数据均匀的分配到各个reduce中,就是解决数据倾斜的根本所在。
1、数据倾斜的原因
1.1操作
关键词 | 情形 | 后果 |
join | 其中一个表较小,但是key集中 | 分发都某一个或几个reduce上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值和空值过多 | 这些空值都是由一个reduce处理,非常慢 | |
gorup by | group by维度过少,某值的数量过多 | 处理某值的reduce非常耗时 |
count distinct | 某特殊值过多 | 处理此特殊值的reduce耗时 |
order by |
|
1.2原因
- key分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某型sql语句本身就有数据倾斜
1.3表现
任务进度长时间维持在99%,查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。最长时长远大于平均时长。
2、数据倾斜的解决方法
2.1参数调整
- hive.map.aggr=true----map端部分聚合,相当于Combiner;
- hive.groupby.skewindata=true----有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询会有两个MR job。第一个MR job中,Map的输出结果集合会随机分不到Reduce中,每个Reduce做聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分配到不同的Reduce中,从而达到负载均衡的目的;第二个MR job再根据预处理的数据结果按照Group by key分布到reduce中(这个过程可以保证相同的Group By key分布到同一个Reduce中),最后完成最终的聚合操作。
2.2SQL语句调节
①关于join----关于驱动表的选取,选用join key分布最均匀的表作为驱动表。做好裁减和filter操作,以达到两表做join的时候,数据量相对变小的效果。
- 大小表join:使用map join让小的维度表(1000条以下的记录表)先进内存。在map端完成reduce。
- 大表join大表:把空值的key变成一个字符串加上随机数,把倾斜的数据分配到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
②count distinct大量相同特殊值
count distinct时将值为空的情况单独处理,如果是计算count distinct,可以不处理,直接过滤,在最后结果中加1.如果还有其他计算,需要进行group by,可以先将值为空记录单独处理,再和其他计算结果进行union。
③gorup by维度过小:采用sum() group by的方式来替换count(distinct)完成计算。
④特殊情况特殊处理:在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
3、典型的业务场景
3.1空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id关联,会碰到数据倾斜的问题。
解决方法1:user_id为空的不参与关联
select * from log a join users b on a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
解决方法2:赋予空值分新的key值
select * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法1效率更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,jobs是2.方法2job是1。这个优化适合无效ID产生的倾斜问题。把空值key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。
3.2小表不小不大,怎么用map join解决倾斜问题
使用map join解决小表关联大表的数据倾斜问题,这个方法使用频率非常高,单如果小表很大,大到map join会出现bug或异常,这时候需要特殊的处理。如下:
select * from log a left outer join users b on a.user_id = b.user_id;
users表有600w+的记录,把users分发到所有map上也是不小的开销,而且map join不支持这么大的小表。如果用普通的join,又会碰到数据倾斜的问题。解决方法如下:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
3.3不同数据类型关联产生数据倾斜
场景:用户表user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的join操作时,默认的hash操作会按照int类型来进行分配,这样导致所有string类型Id的记录分配到一个reduce中。
解决方法:把数字类型转换成字符串类型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
4、总结
使map的输出数据更均匀的分布到reduce中去,是我们的最终目标。由于Hash算法的局限性,按key Hash会或多或少的造成数据倾斜。大量经验表明数据倾斜的原因是人为的建表疏忽或业务逻辑可以规避的。在此给出较为通用的步骤:
- 采用log表,那些user_id比较倾斜,得到一个结果表temp1。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样时并不可少的。
- 数据的分布符合社会学统计规则,贫富不均。倾斜的key不会太多,就想一个社会的富人不多,奇特的人不多一样。所以temp1记录数会很少。把temp1和users做map join生成temp2,把temp2读到distribute file cache。这是一个map过程。
- map读入users和log,假设记录来自log,则检查user_id是否在temp2里,如果是,则输出到本地文件a,否则生成<user_id,value>的key,value对,假如记录来自member,生成<user_id,value>的key,value对,进入reduce阶段。
- 最终把a文件,把stage3 reduce阶段输出的文件合并起写到hdfs。
如果确认业务需要这样倾斜的逻辑,考虑以下的优化方案:
1、对于join,在判断小表不大于1G的情况下,使用map join
2、对于group by或distinct,设定 hive.groupby.skewindata=true
3、尽量使用上述的SQL语句调节进行优化
相关推荐
- Hive支持内连接(INNER JOIN)、左连接(LEFT JOIN)、右连接(RIGHT JOIN)以及全连接(FULL OUTER JOIN),但在某些情况下可能需要特殊处理以避免数据倾斜。 - **Inner Join**: - `SELECT t1.*, t2.* FROM ...
在Hive中,处理数据倾斜的常见方法是增加Map/Reduce作业的数量,通过增加更多的分区来分散数据负载,这类似于将大数据任务拆分成多个小任务来执行。 HiveQL执行过程中,可以将其视为底层的Map/Reduce程序来优化。...
- **手动分区**:通过对数据进行预处理,例如按照某个键进行分组,然后再加载到Hive表中,从而实现手动分区,减轻数据倾斜的影响。 通过上述方法,不仅可以提高Hive的查询性能,还能确保数据处理流程更加高效稳定。...
- 通过调整并行度或使用广播join参数来缓解数据倾斜。 - 对于严重倾斜的数据,可以通过预处理脚本来增加数据的离散度。 - 实现skew join技术,进一步优化数据分布。 2. **监控机制**: - 利用Spark UI手动监控...
3. **UDAF优化**:像`sum`, `count`, `max`, `min`等聚合函数,Hadoop在Map端进行了汇总合并优化,能有效应对数据倾斜问题。然而,`count(distinct)`在大数据量场景下效率低,因为它涉及分组和排序,容易导致倾斜。 ...
数据量大不是问题,数据倾斜是个问题 jobs数比较多的作业运行效率相对比较低,比如及时有几百行的表,如果多次关联汇总,产生十几个jobs,耗时很长,原因是map reduce作业初始化的时间是比较长的
在实际应用中,Hadoop可能会遇到数据倾斜、网络延迟、性能瓶颈等问题。解决这些问题通常需要优化数据分布策略、调整MapReduce参数、增加硬件资源或者采用更高效的数据处理框架如Spark。 总结来说,Hadoop在淘宝网的...
Hive的核心设计目标是为大数据提供便捷的数据汇总、分析和查询能力,同时保持高度的可伸缩性和容错性。它通过将SQL查询转换为MapReduce任务在Hadoop集群上执行,使得非程序员也能轻松操作大数据。 **Hive的架构组件...
涉及的知识点包括但不限于 Spark 优化与任务管理,RDD 特性,Spark 缓存与 Checkpoint 差异,数据倾斜解决方案,Spark SQL 内部工作流程,数据仓库理论基础,Hive 内外表特性对比及调优技巧,YARN 资源调度详解,...
1. 数据倾斜解决方案:数据倾斜是指数据分布不均衡,导致某些节点上的数据量远远大于其他节点。解决数据倾斜的方法包括使用SkewJoin、使用更高效的数据存储格式等。 2. 小文件问题解决方案:小文件问题是指大量的小...
**Hive的优化**: Hive可以通过设置各种配置参数来优化性能,例如调整map/reduce任务的数量、内存分配、数据倾斜处理等。另外,使用Tez或Spark作为执行引擎可以提升查询速度,因为它们提供了更高效的执行模型。 **...
数据倾斜排查与解决:数据倾斜是MapReduce作业执行不均衡的现象,可以通过多种方法解决,比如对数据进行预处理、调整分区策略等。 Hive小文件问题解决:可以通过合并小文件、调整HDFS的块大小来解决。 Hive优化:...
在Hive中解决数据倾斜的常见策略包括:调整map阶段输出键值的分布、使用salting技术添加随机前缀、采用分区裁剪和小表广播等。 Hive的HSQL转换为MapReduce的过程: Hive SQL语句首先被编译器转换为Hive的抽象语法树...
- 对于 5 秒钟一个窗口的设置,这种数据倾斜可能会导致某些 Task 的处理时间较长,而其他 Task 的处理时间较短,从而影响整体处理效率。 - **Spark Shuffle 理解:** - Shuffle 是指在 Map 阶段结束后将数据重新...
解决方法包括增加并行度、使用Rebalance/Rescale算子、通过HashJoin减少数据倾斜等。 - 对于特定场景(如热门视频UV统计),可以考虑使用Random Key技术或者Split Distinct来分散热点数据的压力。底层原理主要是...
系统设计者需要合理安排数据的分布,以避免数据倾斜,并确保各个节点间能够有效协调工作。同时,系统还需具备处理节点故障的能力,保证数据的完整性和一致性,这对于系统的可靠性至关重要。通过适当的容错机制和数据...