- 浏览: 348265 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
Optimizer
public void initialize(HiveConf hiveConf) {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
transformations.add(new ColumnPruner());
}
}
create table tab1 (col1 string, col2 string, col3 int, col4 string, col5 string, col6 string, col7 string);
explain select col1, col2 from tab1 where col3>5;
hive> explain select col1, col2 from tab1 where col3>5;
OK
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_TABREF tab1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL col1)) (TOK_SELEXPR (TOK_TABLE_OR_COL col2))) (TOK_WHERE (> (TOK_TABLE_OR_COL col3) 5))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
tab1
TableScan
alias: tab1
Filter Operator
predicate:
expr: (col3 > 5)
type: boolean
Filter Operator
predicate:
expr: (col3 > 5)
type: boolean
Select Operator
expressions:
expr: col1
type: string
expr: col2
type: string
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.379 seconds
public class ColumnPruner implements Transform {
public ParseContext transform(ParseContext pactx) throws SemanticException {
pGraphContext = pactx;
opToParseCtxMap = pGraphContext.getOpParseCtx();
// generate pruned column list for all relevant operators
ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(opToParseCtxMap);
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", "FIL%"), ColumnPrunerProcFactory
.getFilterProc());
opRules.put(new RuleRegExp("R2", "GBY%"), ColumnPrunerProcFactory
.getGroupByProc());
opRules.put(new RuleRegExp("R3", "RS%"), ColumnPrunerProcFactory
.getReduceSinkProc());
opRules.put(new RuleRegExp("R4", "SEL%"), ColumnPrunerProcFactory
.getSelectProc());
opRules.put(new RuleRegExp("R5", "JOIN%"), ColumnPrunerProcFactory
.getJoinProc());
opRules.put(new RuleRegExp("R6", "MAPJOIN%"), ColumnPrunerProcFactory
.getMapJoinProc());
opRules.put(new RuleRegExp("R7", "TS%"), ColumnPrunerProcFactory
.getTableScanProc());
opRules.put(new RuleRegExp("R8", "LVJ%"), ColumnPrunerProcFactory
.getLateralViewJoinProc());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
.getDefaultProc(), opRules, cppCtx);
GraphWalker ogw = new ColumnPrunerWalker(disp);
// Create a list of topop nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pGraphContext.getTopOps().values());
ogw.startWalking(topNodes, null);
return pGraphContext;
}
}
1)FilterOperator(ColumnPrunerFilterProc )所需字段为:过滤条件需要的字段+孩子节点需要的字段。
2)GroupByOperator(ColumnPrunerGroupByProc)所需字段为:出现在key中的字段+出现在聚合函数中的字段。
3)ReduceSinkOperator(ColumnPrunerReduceSinkProc)所需字段为:(1)孩子节点是JoinOperator(2)孩子节点不是JoinOperator,为出现在key中的字段+出现在value中的字段
4)SelectOperator(ColumnPrunerSelectProc)所需字段为:4.1)如果有孩子节点为FileSinkOperator或者ScriptOperator或者UDTFOperator或者LimitOperator或者UnionOperator,那么从SelectOperator中获取所需字段。 4.2)
5)JoinOperator(ColumnPrunerJoinProc)所需字段为:如果有孩子节点是FileSinkOperator,那么不处理。其他情况:
6)MapJoinOperator(ColumnPrunerMapJoinProc)
7)TableScanOperator(ColumnPrunerTableScanProc)所需字段为:孩子节点需要的字段。
8)LateralViewJoinOperator(ColumnPrunerLateralViewJoinProc)
public static class ColumnPrunerSelectProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
SelectOperator op = (SelectOperator) nd; //org.apache.hadoop.hive.ql.exec.SelectOperator@347448
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
List<String> cols = new ArrayList<String>();
if (op.getChildOperators() != null) {
for (Operator<? extends Serializable> child : op.getChildOperators()) {
// If one of my children is a FileSink or Script, return all columns.
// Without this break, a bug in ReduceSink to Extract edge column
// pruning will manifest
// which should be fixed before remove this
if ((child instanceof FileSinkOperator)
|| (child instanceof ScriptOperator)
|| (child instanceof UDTFOperator)
|| (child instanceof LimitOperator)
|| (child instanceof UnionOperator)) {
// child = org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570
cppCtx.getPrunedColLists()
.put(op, cppCtx.getColsFromSelectExpr(op));
return null; // return null;
}
}
}
cols = cppCtx.genColLists(op);
SelectDesc conf = op.getConf();
// The input to the select does not matter. Go over the expressions
// and return the ones which have a marked column
cppCtx.getPrunedColLists().put(op,
cppCtx.getSelectColsFromChildren(op, cols));
if (conf.isSelStarNoCompute()) {
return null;
}
// do we need to prune the select operator?
List<ExprNodeDesc> originalColList = op.getConf().getColList();
List<String> columns = new ArrayList<String>();
for (ExprNodeDesc expr : originalColList) {
Utilities.mergeUniqElems(columns, expr.getCols());
}
// by now, 'prunedCols' are columns used by child operators, and 'columns'
// are columns used by this select operator.
ArrayList<String> originalOutputColumnNames = conf.getOutputColumnNames();
if (cols.size() < originalOutputColumnNames.size()) {
ArrayList<ExprNodeDesc> newColList = new ArrayList<ExprNodeDesc>();
ArrayList<String> newOutputColumnNames = new ArrayList<String>();
ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
RowResolver new_rr = new RowResolver();
for (String col : cols) {
int index = originalOutputColumnNames.indexOf(col);
newOutputColumnNames.add(col);
newColList.add(originalColList.get(index));
rs_newsignature.add(rs_oldsignature.get(index));
String[] tabcol = old_rr.reverseLookup(col);
ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]);
new_rr.put(tabcol[0], tabcol[1], columnInfo);
}
cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr);
op.getSchema().setSignature(rs_newsignature);
conf.setColList(newColList);
conf.setOutputColumnNames(newOutputColumnNames);
handleChildren(op, cols, cppCtx);
}
return null;
}
}
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {}
ColumnPrunerProcCtx:
public List<String> getColsFromSelectExpr(SelectOperator op) {
List<String> cols = new ArrayList<String>();
SelectDesc conf = op.getConf(); // org.apache.hadoop.hive.ql.plan.SelectDesc@1995c9a
ArrayList<ExprNodeDesc> exprList = conf.getColList(); //[Column[col1], Column[col2]]
for (ExprNodeDesc expr : exprList) {
cols = Utilities.mergeUniqElems(cols, expr.getCols());
}
return cols; // [col1, col2]
}
执行完ColumnPrunerSelectProc 的 process后
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2]}
public static class ColumnPrunerFilterProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
FilterOperator op = (FilterOperator) nd; //org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; //org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
ExprNodeDesc condn = op.getConf().getPredicate(); //class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[col3], Const int 5()
// 获得这个FilterOperator的谓词,这是过滤条件,过滤掉不用的记录。
// get list of columns used in the filter
List<String> cl = condn.getCols(); //[col3] 获得这个谓词需要用到的columns
// merge it with the downstream col list
cppCtx.getPrunedColLists().put(op,
Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op));
return null; //
}
}
// 获得curOp的所有孩子节点需要用到的所有columns。
public List<String> genColLists(Operator<? extends Serializable> curOp)
throws SemanticException {
// curOp = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
List<String> colList = new ArrayList<String>(); //记录孩子节点需要用到的所有columns
if (curOp.getChildOperators() != null) {
for (Operator<? extends Serializable> child : curOp.getChildOperators()) { //遍历所有的孩子节点
// child = org.apache.hadoop.hive.ql.exec.SelectOperator@347448
if (child instanceof CommonJoinOperator) {
int tag = child.getParentOperators().indexOf(curOp);
List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
colList = Utilities.mergeUniqElems(colList, prunList);
} else {
colList = Utilities
.mergeUniqElems(colList, prunedColLists.get(child)); // [col1, col2] 获得孩子节点需要的用到的columns,加入colList
}
}
}
return colList; // [col1, col2]
}
public static List<String> mergeUniqElems(List<String> src, List<String> dest) {
// src=[col1, col2] dest = [col3]
if (dest == null) {
return src;
}
if (src == null) {
return dest;
}
int pos = 0;
while (pos < dest.size()) {
if (!src.contains(dest.get(pos))) {
src.add(dest.get(pos));
}
pos++;
}
return src; // [col1, col2, col3]
}
执行完ColumnPrunerFilterProc 的 process后
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2], org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=[col1, col2, col3]}
ColumnPrunerProcFactory:
private static void pruneOperator(NodeProcessorCtx ctx,
Operator<? extends Serializable> op,
List<String> cols)
throws SemanticException { //保持顺序
// op = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
// cols = [col1, col2, col3]
// the pruning needs to preserve the order of columns in the input schema
RowSchema inputSchema = op.getSchema(); // col1: stringcol2: stringcol3: intcol4: stringcol5: stringcol6: stringcol7: string)
if (inputSchema != null) {
ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
ArrayList<ColumnInfo> inputCols = inputSchema.getSignature(); //[col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
for (ColumnInfo i: inputCols) {
if (cols.contains(i.getInternalName())) {
rs.add(i); // rs = [col1: string, col2: string, col3: int]
}
}
op.getSchema().setSignature(rs);
}
}
Operator:
public RowSchema getSchema() {
return rowSchema;
}
RowSchema:
public void setSignature(ArrayList<ColumnInfo> signature) {
// this.signature = [col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
// signature = [col1: string, col2: string, col3: int]
this.signature = signature;
}
public static class ColumnPrunerTableScanProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
TableScanOperator scanOp = (TableScanOperator) nd; // org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
List<String> cols = cppCtx
.genColLists((Operator<? extends Serializable>) nd); // [col1, col2, col3]
cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); // tab1{(col1,col1: string)(col2,col2: string)(col3,col3: int)(col4,col4: string)(col5,col5: string)(col6,col6: string)(col7,col7: string)}
for (int i = 0; i < cols.size(); i++) {
int position = inputRR.getPosition(cols.get(i));
if (position >=0) {
needed_columns.add(position); // [0, 1, 2]
}
}
scanOp.setNeededColumnIDs(needed_columns); // scanOp=org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
return null; //
}
}
ColumnPrunerProcCtx:
public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
return opToParseCtxMap; // {org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13=org.apache.hadoop.hive.ql.parse.OpParseContext@19e3bdd, org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=org.apache.hadoop.hive.ql.parse.OpParseContext@16c5f50, org.apache.hadoop.hive.ql.exec.SelectOperator@347448=org.apache.hadoop.hive.ql.parse.OpParseContext@1e5a0cb, org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570=org.apache.hadoop.hive.ql.parse.OpParseContext@9f9761}
}
TableScanOperator:
public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
neededColumnIDs = orign_columns; // [0, 1, 2]
}
public void initialize(HiveConf hiveConf) {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
transformations.add(new ColumnPruner());
}
}
create table tab1 (col1 string, col2 string, col3 int, col4 string, col5 string, col6 string, col7 string);
explain select col1, col2 from tab1 where col3>5;
hive> explain select col1, col2 from tab1 where col3>5;
OK
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_TABREF tab1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL col1)) (TOK_SELEXPR (TOK_TABLE_OR_COL col2))) (TOK_WHERE (> (TOK_TABLE_OR_COL col3) 5))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
tab1
TableScan
alias: tab1
Filter Operator
predicate:
expr: (col3 > 5)
type: boolean
Filter Operator
predicate:
expr: (col3 > 5)
type: boolean
Select Operator
expressions:
expr: col1
type: string
expr: col2
type: string
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.379 seconds
public class ColumnPruner implements Transform {
public ParseContext transform(ParseContext pactx) throws SemanticException {
pGraphContext = pactx;
opToParseCtxMap = pGraphContext.getOpParseCtx();
// generate pruned column list for all relevant operators
ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(opToParseCtxMap);
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", "FIL%"), ColumnPrunerProcFactory
.getFilterProc());
opRules.put(new RuleRegExp("R2", "GBY%"), ColumnPrunerProcFactory
.getGroupByProc());
opRules.put(new RuleRegExp("R3", "RS%"), ColumnPrunerProcFactory
.getReduceSinkProc());
opRules.put(new RuleRegExp("R4", "SEL%"), ColumnPrunerProcFactory
.getSelectProc());
opRules.put(new RuleRegExp("R5", "JOIN%"), ColumnPrunerProcFactory
.getJoinProc());
opRules.put(new RuleRegExp("R6", "MAPJOIN%"), ColumnPrunerProcFactory
.getMapJoinProc());
opRules.put(new RuleRegExp("R7", "TS%"), ColumnPrunerProcFactory
.getTableScanProc());
opRules.put(new RuleRegExp("R8", "LVJ%"), ColumnPrunerProcFactory
.getLateralViewJoinProc());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
.getDefaultProc(), opRules, cppCtx);
GraphWalker ogw = new ColumnPrunerWalker(disp);
// Create a list of topop nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pGraphContext.getTopOps().values());
ogw.startWalking(topNodes, null);
return pGraphContext;
}
}
1)FilterOperator(ColumnPrunerFilterProc )所需字段为:过滤条件需要的字段+孩子节点需要的字段。
2)GroupByOperator(ColumnPrunerGroupByProc)所需字段为:出现在key中的字段+出现在聚合函数中的字段。
3)ReduceSinkOperator(ColumnPrunerReduceSinkProc)所需字段为:(1)孩子节点是JoinOperator(2)孩子节点不是JoinOperator,为出现在key中的字段+出现在value中的字段
4)SelectOperator(ColumnPrunerSelectProc)所需字段为:4.1)如果有孩子节点为FileSinkOperator或者ScriptOperator或者UDTFOperator或者LimitOperator或者UnionOperator,那么从SelectOperator中获取所需字段。 4.2)
5)JoinOperator(ColumnPrunerJoinProc)所需字段为:如果有孩子节点是FileSinkOperator,那么不处理。其他情况:
6)MapJoinOperator(ColumnPrunerMapJoinProc)
7)TableScanOperator(ColumnPrunerTableScanProc)所需字段为:孩子节点需要的字段。
8)LateralViewJoinOperator(ColumnPrunerLateralViewJoinProc)
public static class ColumnPrunerSelectProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
SelectOperator op = (SelectOperator) nd; //org.apache.hadoop.hive.ql.exec.SelectOperator@347448
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
List<String> cols = new ArrayList<String>();
if (op.getChildOperators() != null) {
for (Operator<? extends Serializable> child : op.getChildOperators()) {
// If one of my children is a FileSink or Script, return all columns.
// Without this break, a bug in ReduceSink to Extract edge column
// pruning will manifest
// which should be fixed before remove this
if ((child instanceof FileSinkOperator)
|| (child instanceof ScriptOperator)
|| (child instanceof UDTFOperator)
|| (child instanceof LimitOperator)
|| (child instanceof UnionOperator)) {
// child = org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570
cppCtx.getPrunedColLists()
.put(op, cppCtx.getColsFromSelectExpr(op));
return null; // return null;
}
}
}
cols = cppCtx.genColLists(op);
SelectDesc conf = op.getConf();
// The input to the select does not matter. Go over the expressions
// and return the ones which have a marked column
cppCtx.getPrunedColLists().put(op,
cppCtx.getSelectColsFromChildren(op, cols));
if (conf.isSelStarNoCompute()) {
return null;
}
// do we need to prune the select operator?
List<ExprNodeDesc> originalColList = op.getConf().getColList();
List<String> columns = new ArrayList<String>();
for (ExprNodeDesc expr : originalColList) {
Utilities.mergeUniqElems(columns, expr.getCols());
}
// by now, 'prunedCols' are columns used by child operators, and 'columns'
// are columns used by this select operator.
ArrayList<String> originalOutputColumnNames = conf.getOutputColumnNames();
if (cols.size() < originalOutputColumnNames.size()) {
ArrayList<ExprNodeDesc> newColList = new ArrayList<ExprNodeDesc>();
ArrayList<String> newOutputColumnNames = new ArrayList<String>();
ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
RowResolver new_rr = new RowResolver();
for (String col : cols) {
int index = originalOutputColumnNames.indexOf(col);
newOutputColumnNames.add(col);
newColList.add(originalColList.get(index));
rs_newsignature.add(rs_oldsignature.get(index));
String[] tabcol = old_rr.reverseLookup(col);
ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]);
new_rr.put(tabcol[0], tabcol[1], columnInfo);
}
cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr);
op.getSchema().setSignature(rs_newsignature);
conf.setColList(newColList);
conf.setOutputColumnNames(newOutputColumnNames);
handleChildren(op, cols, cppCtx);
}
return null;
}
}
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {}
ColumnPrunerProcCtx:
public List<String> getColsFromSelectExpr(SelectOperator op) {
List<String> cols = new ArrayList<String>();
SelectDesc conf = op.getConf(); // org.apache.hadoop.hive.ql.plan.SelectDesc@1995c9a
ArrayList<ExprNodeDesc> exprList = conf.getColList(); //[Column[col1], Column[col2]]
for (ExprNodeDesc expr : exprList) {
cols = Utilities.mergeUniqElems(cols, expr.getCols());
}
return cols; // [col1, col2]
}
执行完ColumnPrunerSelectProc 的 process后
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2]}
public static class ColumnPrunerFilterProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
FilterOperator op = (FilterOperator) nd; //org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; //org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
ExprNodeDesc condn = op.getConf().getPredicate(); //class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[col3], Const int 5()
// 获得这个FilterOperator的谓词,这是过滤条件,过滤掉不用的记录。
// get list of columns used in the filter
List<String> cl = condn.getCols(); //[col3] 获得这个谓词需要用到的columns
// merge it with the downstream col list
cppCtx.getPrunedColLists().put(op,
Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op));
return null; //
}
}
// 获得curOp的所有孩子节点需要用到的所有columns。
public List<String> genColLists(Operator<? extends Serializable> curOp)
throws SemanticException {
// curOp = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
List<String> colList = new ArrayList<String>(); //记录孩子节点需要用到的所有columns
if (curOp.getChildOperators() != null) {
for (Operator<? extends Serializable> child : curOp.getChildOperators()) { //遍历所有的孩子节点
// child = org.apache.hadoop.hive.ql.exec.SelectOperator@347448
if (child instanceof CommonJoinOperator) {
int tag = child.getParentOperators().indexOf(curOp);
List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
colList = Utilities.mergeUniqElems(colList, prunList);
} else {
colList = Utilities
.mergeUniqElems(colList, prunedColLists.get(child)); // [col1, col2] 获得孩子节点需要的用到的columns,加入colList
}
}
}
return colList; // [col1, col2]
}
public static List<String> mergeUniqElems(List<String> src, List<String> dest) {
// src=[col1, col2] dest = [col3]
if (dest == null) {
return src;
}
if (src == null) {
return dest;
}
int pos = 0;
while (pos < dest.size()) {
if (!src.contains(dest.get(pos))) {
src.add(dest.get(pos));
}
pos++;
}
return src; // [col1, col2, col3]
}
执行完ColumnPrunerFilterProc 的 process后
ColumnPrunerProcCtx:
private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2], org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=[col1, col2, col3]}
ColumnPrunerProcFactory:
private static void pruneOperator(NodeProcessorCtx ctx,
Operator<? extends Serializable> op,
List<String> cols)
throws SemanticException { //保持顺序
// op = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
// cols = [col1, col2, col3]
// the pruning needs to preserve the order of columns in the input schema
RowSchema inputSchema = op.getSchema(); // col1: stringcol2: stringcol3: intcol4: stringcol5: stringcol6: stringcol7: string)
if (inputSchema != null) {
ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
ArrayList<ColumnInfo> inputCols = inputSchema.getSignature(); //[col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
for (ColumnInfo i: inputCols) {
if (cols.contains(i.getInternalName())) {
rs.add(i); // rs = [col1: string, col2: string, col3: int]
}
}
op.getSchema().setSignature(rs);
}
}
Operator:
public RowSchema getSchema() {
return rowSchema;
}
RowSchema:
public void setSignature(ArrayList<ColumnInfo> signature) {
// this.signature = [col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
// signature = [col1: string, col2: string, col3: int]
this.signature = signature;
}
public static class ColumnPrunerTableScanProc implements NodeProcessor {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
TableScanOperator scanOp = (TableScanOperator) nd; // org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
List<String> cols = cppCtx
.genColLists((Operator<? extends Serializable>) nd); // [col1, col2, col3]
cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); // tab1{(col1,col1: string)(col2,col2: string)(col3,col3: int)(col4,col4: string)(col5,col5: string)(col6,col6: string)(col7,col7: string)}
for (int i = 0; i < cols.size(); i++) {
int position = inputRR.getPosition(cols.get(i));
if (position >=0) {
needed_columns.add(position); // [0, 1, 2]
}
}
scanOp.setNeededColumnIDs(needed_columns); // scanOp=org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
return null; //
}
}
ColumnPrunerProcCtx:
public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
return opToParseCtxMap; // {org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13=org.apache.hadoop.hive.ql.parse.OpParseContext@19e3bdd, org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=org.apache.hadoop.hive.ql.parse.OpParseContext@16c5f50, org.apache.hadoop.hive.ql.exec.SelectOperator@347448=org.apache.hadoop.hive.ql.parse.OpParseContext@1e5a0cb, org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570=org.apache.hadoop.hive.ql.parse.OpParseContext@9f9761}
}
TableScanOperator:
public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
neededColumnIDs = orign_columns; // [0, 1, 2]
}
发表评论
-
hive rename table name
2013-09-18 14:28 2599hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2477有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11216like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8743insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8281原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1210select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1551官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1820Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5452drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1369数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6243CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3038select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
在大数据处理领域,Hive是一个非常重要的工具,它提供了一个基于Hadoop的数据仓库基础设施,用于数据查询、分析和管理大规模数据集。本教程将详细讲解如何在Linux环境下安装Hive客户端,以便进行数据操作和分析。 ...
"HIVE安装及详解" HIVE是一种基于Hadoop的数据仓库工具,主要用于处理和分析大规模数据。下面是关于HIVE的安装及详解。 HIVE基本概念 HIVE是什么?HIVE是一种数据仓库工具,主要用于处理和分析大规模数据。它将...
在大数据处理领域,Apache Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询存储在Hadoop集群中的大型数据集。Hive JDBC(Java Database Connectivity)是Hive提供的一种...
在大数据领域,Apache Ambari 是一个用于 Hadoop 集群管理和监控的开源工具,而 Hive 是一个基于 Hadoop 的数据仓库系统,用于处理和分析大规模数据集。本话题聚焦于如何在 Ambari 环境下将 Hive 3.0 升级到 Hive ...
在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询和管理存储在Hadoop分布式文件系统(HDFS)中的大量结构化数据。Hive 1.1.0是Hive的一个版本,提供了...
在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,使得用户可以使用SQL语句来处理存储在Hadoop分布式文件系统(HDFS)上的大数据。...
《DBeaver与Hive连接:hive-jdbc-uber-2.6.5.0-292.jar驱动详解》 在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,广泛用于数据查询和分析。而DBeaver,作为一款跨平台的数据库管理工具,以其用户友好的...
使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...
Apache Hive 是一个基于 Hadoop 的数据仓库工具,用于组织、查询和分析大量数据。它提供了一个SQL-like(HQL,Hive SQL)接口,使得非专业程序员也能方便地处理存储在Hadoop分布式文件系统(HDFS)中的大规模数据集...
在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询和管理存储在Hadoop分布式文件系统(HDFS)中的大量数据。Hive提供了数据整合、元数据管理、查询和分析...
《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第...
在大数据处理领域,Hive是一款基于Hadoop的数据仓库工具,它允许用户使用SQL类的语言(称为HQL)来查询、管理、分析存储在Hadoop分布式文件系统中的大规模数据集。而DataGrip是一款由JetBrains公司开发的强大数据库...
**SpringBoot整合Hive-JDBC详解** 在大数据处理领域,Hadoop生态中的Hive作为一个数据仓库工具,常常用于处理大规模的数据分析任务。而SpringBoot作为Java开发中的微服务框架,以其简洁的配置和快速的开发能力深受...
Hive表生成工具,Hive表生成工具Hive表生成工具
Hive是Apache Hadoop生态系统中的一个数据仓库工具,它允许我们对存储在HDFS上的大数据进行结构化查询和分析。Hive JDBC驱动是Hive与各种数据库管理工具、应用程序之间建立连接的关键组件,使得用户可以通过标准的...
hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+...
在Python中编写Hive脚本主要是为了方便地与Hadoop HIVE数据仓库进行交互,这样可以在数据分析和机器学习流程中无缝地集成大数据处理步骤。以下将详细介绍如何在Python环境中执行Hive查询和管理Hive脚本。 1. **直接...
Hive和HBase是两种大数据处理工具,它们在大数据生态系统中各自扮演着重要角色。Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL-like语法(HQL,Hive Query Language)对大规模数据集进行分析。而HBase是...
在IT行业中,数据库管理和分析是至关重要的任务,而Hive作为一个大数据处理的仓库系统,它提供了对结构化数据的查询和分析能力。当需要通过图形化的数据库管理工具,如DBeaver,与Hive进行交互时,就需要用到特定的...
【Hive原理】 Hive是基于Hadoop平台的数据仓库解决方案,它主要解决了在大数据场景下,业务人员和数据科学家能够通过熟悉的SQL语言进行数据分析的问题。Hive并不存储数据,而是依赖于HDFS进行数据存储,并利用...