- 浏览: 347781 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
hive> select distinct value from src;
hive> select max(key) from src;
因为没有grouping keys,所以只有一个reducer。
2.2 如果有聚合函数或者groupby,做如下处理:
插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化
2.2.1 hive.map.aggr为true,默认是true,开启的,在map端做部分聚合
2.2.1.1 hive.groupby.skewindata为false,默认是关闭的,groupby的数据没有倾斜。
生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作
2.2.1.2 hive.groupby.skewindata为true
生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2 hive.map.aggr为false
2.2.2.1 hive.groupby.skewindata为true
生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2.2 hive.groupby.skewindata为false
生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)
第一种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlan1MR(){
(1)ReduceSinkOperator: It will put all Group By keys and the distinct field (if any) in the map-reduce sort key, and all other fields in the map-reduce value.
(2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
}
第二种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
(1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation evaluation functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode = MERGEPARTIAL)
}
第三种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlan2MR(){
(1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
(3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
第四种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
(1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator: Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT。
(3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
(4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
ReduceSinkOperator的processOp(Object row, int tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机数,random = new Random(12345);。如果有DISTINCT字段,那么key的hash值跟grouping + distinct key有关。
GroupByOperator:
initializeOp(Configuration hconf)
processOp(Object row, int tag)
closeOp(boolean abort)
forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
groupby10.q groupby11.q
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator // insertSelectAllPlanForGroupBy
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partial1
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partials
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN extended
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Needs Tagging: false
Path -> Alias:
hdfs://localhost:54310/user/hive/warehouse/input [input]
Path -> Partition:
hdfs://localhost:54310/user/hive/warehouse/input
Partition
base file name: input
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: input
name: input
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
TotalFiles: 1
MultiFileSpray: false
Stage: Stage-0
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF INPUT))
(TOK_INSERT
(TOK_DESTINATION (TOK_TAB dest1))
(TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
(TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
)
(TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
)
)
SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
if (qbp.getAggregationExprsForClause(dest).size() != 0
|| getGroupByForClause(qbp, dest).size() > 0) { //如果有聚合函数或者有groupby,则执行下面的操作
//multiple distincts is not supported with skew in data
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true") &&
qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
getMsg());
}
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr); //生成一个SelectOperator,所有的字段都会选取,selectStar=true。
if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
.equalsIgnoreCase("true")) {
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("false")) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
} else {
curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
}
} else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true")) {
curr = genGroupByPlan2MR(dest, qb, curr);
} else {
curr = genGroupByPlan1MR(dest, qb, curr);
}
}
}
distince:
count.q.out
groupby11.q.out
groupby10.q.out
nullgroup4_multi_distinct.q.out
join18.q.out
groupby_bigdata.q.out
join18_multi_distinct.q.out
nullgroup4.q.out
auto_join18_multi_distinct.q.out
auto_join18.q.out
(1)map端部分聚合,数据无倾斜,一个MR生成。
genGroupByPlanMapAggr1MR,生成三个Operator:
(1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(1.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(1.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.MERGEPARTIAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2)map端部分聚合,数据倾斜,两个MR生成。
genGroupByPlanMapAggr2MR:
(2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIALS
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.4)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.5)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3)map端不部分聚合,数据倾斜,两个MR生成。
genGroupByPlan2MR:
(3.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIAL1
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3.3)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.4)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(4)map端不部分聚合,数据无倾斜,一个MR生成。
genGroupByPlan1MR:
(4.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(4.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.COMPLETE
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
SemanticAnalyzer.genBodyPlan
optimizeMultiGroupBy (multi-group by with the same distinct)
groupby10.q groupby11.q
hive> select max(key) from src;
因为没有grouping keys,所以只有一个reducer。
2.2 如果有聚合函数或者groupby,做如下处理:
插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化
2.2.1 hive.map.aggr为true,默认是true,开启的,在map端做部分聚合
2.2.1.1 hive.groupby.skewindata为false,默认是关闭的,groupby的数据没有倾斜。
生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作
2.2.1.2 hive.groupby.skewindata为true
生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2 hive.map.aggr为false
2.2.2.1 hive.groupby.skewindata为true
生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2.2 hive.groupby.skewindata为false
生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)
第一种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlan1MR(){
(1)ReduceSinkOperator: It will put all Group By keys and the distinct field (if any) in the map-reduce sort key, and all other fields in the map-reduce value.
(2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
}
第二种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
(1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation evaluation functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode = MERGEPARTIAL)
}
第三种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlan2MR(){
(1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
(3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
第四种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
(1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator: Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT。
(3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
(4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
ReduceSinkOperator的processOp(Object row, int tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机数,random = new Random(12345);。如果有DISTINCT字段,那么key的hash值跟grouping + distinct key有关。
GroupByOperator:
initializeOp(Configuration hconf)
processOp(Object row, int tag)
closeOp(boolean abort)
forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
groupby10.q groupby11.q
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator // insertSelectAllPlanForGroupBy
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partial1
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partials
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN extended
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Needs Tagging: false
Path -> Alias:
hdfs://localhost:54310/user/hive/warehouse/input [input]
Path -> Partition:
hdfs://localhost:54310/user/hive/warehouse/input
Partition
base file name: input
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: input
name: input
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
TotalFiles: 1
MultiFileSpray: false
Stage: Stage-0
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF INPUT))
(TOK_INSERT
(TOK_DESTINATION (TOK_TAB dest1))
(TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
(TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
)
(TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
)
)
SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
if (qbp.getAggregationExprsForClause(dest).size() != 0
|| getGroupByForClause(qbp, dest).size() > 0) { //如果有聚合函数或者有groupby,则执行下面的操作
//multiple distincts is not supported with skew in data
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true") &&
qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
getMsg());
}
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr); //生成一个SelectOperator,所有的字段都会选取,selectStar=true。
if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
.equalsIgnoreCase("true")) {
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("false")) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
} else {
curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
}
} else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true")) {
curr = genGroupByPlan2MR(dest, qb, curr);
} else {
curr = genGroupByPlan1MR(dest, qb, curr);
}
}
}
distince:
count.q.out
groupby11.q.out
groupby10.q.out
nullgroup4_multi_distinct.q.out
join18.q.out
groupby_bigdata.q.out
join18_multi_distinct.q.out
nullgroup4.q.out
auto_join18_multi_distinct.q.out
auto_join18.q.out
(1)map端部分聚合,数据无倾斜,一个MR生成。
genGroupByPlanMapAggr1MR,生成三个Operator:
(1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(1.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(1.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.MERGEPARTIAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2)map端部分聚合,数据倾斜,两个MR生成。
genGroupByPlanMapAggr2MR:
(2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIALS
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.4)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.5)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3)map端不部分聚合,数据倾斜,两个MR生成。
genGroupByPlan2MR:
(3.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIAL1
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3.3)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.4)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(4)map端不部分聚合,数据无倾斜,一个MR生成。
genGroupByPlan1MR:
(4.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(4.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.COMPLETE
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
SemanticAnalyzer.genBodyPlan
optimizeMultiGroupBy (multi-group by with the same distinct)
groupby10.q groupby11.q
发表评论
-
hive rename table name
2013-09-18 14:28 2594hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2470有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11208like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8734insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8277原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1206select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1544官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1813Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5442drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1360数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6238CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3032select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
set hive.groupby.mapaggr.checkinterval = 100000000; //在 Map 端进行聚合操作的条目数目 set hive.groupby.skewindata = true; //解决数据倾斜的万能钥匙 当map阶段运行不了的时候,可以设置 set hive.map.aggr ...
3. 调整 Hive 的 group by 操作的键对应的记录条数,设置 set hive.groupby.mapaggr.checkinterval=100000; Hadoop HDFS 参数调优 1. 调整 HDFS 的存储格式,例如使用 SequenceFile格式; 2. 调整 HDFS 的压缩格式...
9. hive.groupby.skewindata 该参数决定了group by操作是否支持倾斜的数据。如果设置为true,则Hive将支持倾斜的数据,默认值为false。 10. hive.merge.mapfiles 该参数决定了是否开启合并Map端小文件,对于...
Hive支持大部分SQL标准,包括SELECT、FROM、WHERE、GROUP BY、JOIN等操作,使得用户可以方便地对Hadoop上的数据进行查询和分析。 【Hive函数】 Hive提供了丰富的内置函数,包括聚合函数(SUM、COUNT、AVG等)、数学...
它支持SELECT、INSERT、UPDATE、DELETE等基本操作,以及JOIN、GROUP BY、HAVING等复杂查询。 4. **编译与执行计划**:Hive将HQL语句转换为MapReduce任务,或者在更现代的Hadoop版本中,转换为Tez或Spark任务。这...
利用Hive进行复杂用户行为大数据分析及优化案例(全套视频+课件+代码+讲义+工具软件),具体内容包括: ...15_Hive中的数据倾斜及解决方案-group by 16_Hive中使用正则加载数据 17_Hive中使用Python脚本进行预处理
3.1 Group By 28 3.2 Order /Sort By 28 4. Hive Join 29 5. HIVE参数设置 31 6. HIVE UDF 33 6.1 基本函数 33 6.1.1 关系操作符 33 6.1.2 代数操作符 34 6.1.3 逻辑操作符 35 6.1.4 复杂类型操作符 35 6.1.5 内建...
包括 Hadoop 计算框架的特性、常用优化手段、全排序、笛卡尔积、EXIST 子句、决定 reducer 个数、合并 MapReduce 操作、Bucket 与 Sampling、Partition 和 JOIN 的优化、处理小文件和 GroupBy 的优化。 7. Hive ...
- **Group By的实现原理**:在Map阶段,Hive会将Group By字段组合成key,发送到相应的Reduce任务。Reduce阶段利用排序功能,保存最后一个键值,以处理每个分组的数据。 - **Distinct的实现原理**:对于单个...
2. **HQL语法**:详细解析Hive Query Language,包括SELECT、FROM、WHERE、GROUP BY、JOIN等基本和高级查询操作。 3. **表和分区**:讨论Hive中的表创建、分区策略以及如何有效管理大量数据。 4. **数据类型和函数...
- **背景**:在Hive中,对于单个`GROUP BY`子句下包含多个`COUNT(DISTINCT)`的情况,Hive只能支持其中一个`COUNT(DISTINCT)`。 - **示例**:下面的查询是可以在Hive中正确执行的: ```sql SELECT pv_users....
hive的group by 和集合函数 hive的Order By/Sort By/Distribute By Join查询,join只支持等值连接 LEFT,RIGHT 和 FULL OUTER JOIN LEFT SEMI JOIN Hive当前没有实现 IN/EXISTS 子查询,可以用 LEFT SEMI JOIN 重写子...
GROUP BY ...`用于对数据进行分组统计。 **3.2 排序** - `ORDER BY`和`SORT BY`分别用于全局排序和局部排序。 #### 四、Hive Join - Hive支持多种类型的连接操作,包括内连接、外连接等。 #### 五、Hive参数...
HQL支持常见的SQL操作,如SELECT、FROM、WHERE、GROUP BY等。 3. **表和分区(Tables and Partitions)**:Hive中的表是逻辑上的概念,对应HDFS上的目录。分区是对大表进行逻辑划分的一种方式,可以提高查询效率。 ...
在这里,我们将学习一些基本的Hive查询语句,包括SELECT、FROM、WHERE、GROUP BY、HAVING等语句。 四、总结 在这篇文章中,我们学习了如何配置Hive环境,包括安装Hive、配置Hadoop环境、配置Hive Metastore数据库...
- **分组与聚合**:GROUP BY和HAVING用于数据分组和条件过滤,配合COUNT、SUM、AVG等聚合函数进行统计分析。 - **窗口函数**:ROW_NUMBER、RANK、LEAD和LAG等窗口函数在分析排序数据时非常有用,例如计算排名或...
6. **HQL查询**:介绍Hive查询语言,包括SELECT、WHERE、GROUP BY、JOIN等基本操作,以及更复杂的子查询和聚合函数的使用。 7. **Hive函数**:本章涉及Hive内置函数的使用,如统计函数、日期函数、字符串函数等,使...
* GROUP BY 语句:用于对数据进行分组。 * HAVING 语句:用于指定分组的过滤条件。 Hive 的语法还提供了多种函数,例如 SUM、AVG、MAX、MIN 等,用于对数据进行聚合和分析。 ORACLE SQL 对应的 HSQL 语法支持 ...
针对`GROUP BY`和`JOIN`操作,可以通过`hive.groupby.skewindata`、`hive.optimize.skewjoin`等参数调整,使数据均匀分布。对于大小表JOIN,可调整`hive.mapjoin.smalltable.filesize`等相关参数。 3. **避免全局...
4. **GROUP BY与ORDER BY的使用**:GROUP BY后跟ORDER BY会导致额外的排序,可能增加计算成本。若无特别需求,可尽量避免。 5. **使用子查询替换自连接**:某些情况下,子查询可能比自连接更高效。 三、Hive性能...