`
bupt04406
  • 浏览: 348265 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hive ColumnPruner

    博客分类:
  • Hive
 
阅读更多
Optimizer

  public void initialize(HiveConf hiveConf) {
    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
      transformations.add(new ColumnPruner());
    }
  }


create table tab1 (col1 string, col2 string, col3 int, col4 string, col5 string, col6 string, col7 string);
explain select col1, col2 from tab1 where col3>5;


hive> explain select col1, col2 from tab1 where col3>5;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF tab1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL col1)) (TOK_SELEXPR (TOK_TABLE_OR_COL col2))) (TOK_WHERE (> (TOK_TABLE_OR_COL col3) 5))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        tab1
          TableScan
            alias: tab1
            Filter Operator
              predicate:
                  expr: (col3 > 5)
                  type: boolean
              Filter Operator
                predicate:
                    expr: (col3 > 5)
                    type: boolean
                Select Operator
                  expressions:
                        expr: col1
                        type: string
                        expr: col2
                        type: string
                  outputColumnNames: _col0, _col1
                  File Output Operator
                    compressed: false
                    GlobalTableId: 0
                    table:
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1


Time taken: 0.379 seconds





public class ColumnPruner implements Transform {

  public ParseContext transform(ParseContext pactx) throws SemanticException {
    pGraphContext = pactx;
    opToParseCtxMap = pGraphContext.getOpParseCtx();

    // generate pruned column list for all relevant operators
    ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(opToParseCtxMap);

    // create a walker which walks the tree in a DFS manner while maintaining
    // the operator stack. The dispatcher
    // generates the plan from the operator tree
    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
    opRules.put(new RuleRegExp("R1", "FIL%"), ColumnPrunerProcFactory
        .getFilterProc());
    opRules.put(new RuleRegExp("R2", "GBY%"), ColumnPrunerProcFactory
        .getGroupByProc());
    opRules.put(new RuleRegExp("R3", "RS%"), ColumnPrunerProcFactory
        .getReduceSinkProc());
    opRules.put(new RuleRegExp("R4", "SEL%"), ColumnPrunerProcFactory
        .getSelectProc());
    opRules.put(new RuleRegExp("R5", "JOIN%"), ColumnPrunerProcFactory
        .getJoinProc());
    opRules.put(new RuleRegExp("R6", "MAPJOIN%"), ColumnPrunerProcFactory
        .getMapJoinProc());
    opRules.put(new RuleRegExp("R7", "TS%"), ColumnPrunerProcFactory
        .getTableScanProc());
    opRules.put(new RuleRegExp("R8", "LVJ%"), ColumnPrunerProcFactory
        .getLateralViewJoinProc());
    // The dispatcher fires the processor corresponding to the closest matching
    // rule and passes the context along
    Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
        .getDefaultProc(), opRules, cppCtx);
    GraphWalker ogw = new ColumnPrunerWalker(disp);

    // Create a list of topop nodes
    ArrayList<Node> topNodes = new ArrayList<Node>();
    topNodes.addAll(pGraphContext.getTopOps().values());
    ogw.startWalking(topNodes, null);
    return pGraphContext;
  }

}




1)FilterOperator(ColumnPrunerFilterProc )所需字段为:过滤条件需要的字段+孩子节点需要的字段。
2)GroupByOperator(ColumnPrunerGroupByProc)所需字段为:出现在key中的字段+出现在聚合函数中的字段。
3)ReduceSinkOperator(ColumnPrunerReduceSinkProc)所需字段为:(1)孩子节点是JoinOperator(2)孩子节点不是JoinOperator,为出现在key中的字段+出现在value中的字段
4)SelectOperator(ColumnPrunerSelectProc)所需字段为:4.1)如果有孩子节点为FileSinkOperator或者ScriptOperator或者UDTFOperator或者LimitOperator或者UnionOperator,那么从SelectOperator中获取所需字段。  4.2)
5)JoinOperator(ColumnPrunerJoinProc)所需字段为:如果有孩子节点是FileSinkOperator,那么不处理。其他情况:
6)MapJoinOperator(ColumnPrunerMapJoinProc)
7)TableScanOperator(ColumnPrunerTableScanProc)所需字段为:孩子节点需要的字段。
8)LateralViewJoinOperator(ColumnPrunerLateralViewJoinProc)

  public static class ColumnPrunerSelectProc implements NodeProcessor {
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
        Object... nodeOutputs) throws SemanticException {
      SelectOperator op = (SelectOperator) nd; //org.apache.hadoop.hive.ql.exec.SelectOperator@347448
      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
      List<String> cols = new ArrayList<String>();

      if (op.getChildOperators() != null) {
        for (Operator<? extends Serializable> child : op.getChildOperators()) {
          // If one of my children is a FileSink or Script, return all columns.
          // Without this break, a bug in ReduceSink to Extract edge column
          // pruning will manifest
          // which should be fixed before remove this
          if ((child instanceof FileSinkOperator)
              || (child instanceof ScriptOperator)
              || (child instanceof UDTFOperator)
              || (child instanceof LimitOperator)
              || (child instanceof UnionOperator)) {
           // child = org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570
            cppCtx.getPrunedColLists()
                .put(op, cppCtx.getColsFromSelectExpr(op));
            return null;  // return null;
          }
        }
      }
      cols = cppCtx.genColLists(op);

      SelectDesc conf = op.getConf();
      // The input to the select does not matter. Go over the expressions
      // and return the ones which have a marked column
      cppCtx.getPrunedColLists().put(op,
          cppCtx.getSelectColsFromChildren(op, cols));

      if (conf.isSelStarNoCompute()) {
        return null;
      }

      // do we need to prune the select operator?
      List<ExprNodeDesc> originalColList = op.getConf().getColList();
      List<String> columns = new ArrayList<String>();
      for (ExprNodeDesc expr : originalColList) {
        Utilities.mergeUniqElems(columns, expr.getCols());
      }
      // by now, 'prunedCols' are columns used by child operators, and 'columns'
      // are columns used by this select operator.
      ArrayList<String> originalOutputColumnNames = conf.getOutputColumnNames();
      if (cols.size() < originalOutputColumnNames.size()) {
        ArrayList<ExprNodeDesc> newColList = new ArrayList<ExprNodeDesc>();
        ArrayList<String> newOutputColumnNames = new ArrayList<String>();
        ArrayList<ColumnInfo> rs_oldsignature = op.getSchema().getSignature();
        ArrayList<ColumnInfo> rs_newsignature = new ArrayList<ColumnInfo>();
        RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver();
        RowResolver new_rr = new RowResolver();
        for (String col : cols) {
          int index = originalOutputColumnNames.indexOf(col);
          newOutputColumnNames.add(col);
          newColList.add(originalColList.get(index));
          rs_newsignature.add(rs_oldsignature.get(index));
          String[] tabcol = old_rr.reverseLookup(col);
          ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]);
          new_rr.put(tabcol[0], tabcol[1], columnInfo);
        }
        cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr);
        op.getSchema().setSignature(rs_newsignature);
        conf.setColList(newColList);
        conf.setOutputColumnNames(newOutputColumnNames);
        handleChildren(op, cols, cppCtx);
      }
      return null;
    }

}


ColumnPrunerProcCtx:
     private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {}

ColumnPrunerProcCtx:
  public List<String> getColsFromSelectExpr(SelectOperator op) {
    List<String> cols = new ArrayList<String>();
    SelectDesc conf = op.getConf(); // org.apache.hadoop.hive.ql.plan.SelectDesc@1995c9a
    ArrayList<ExprNodeDesc> exprList = conf.getColList(); //[Column[col1], Column[col2]]
    for (ExprNodeDesc expr : exprList) {
      cols = Utilities.mergeUniqElems(cols, expr.getCols());
    }
    return cols; // [col1, col2]
  }

执行完ColumnPrunerSelectProc 的 process后
ColumnPrunerProcCtx:
     private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2]}


  public static class ColumnPrunerFilterProc implements NodeProcessor {
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
        Object... nodeOutputs) throws SemanticException {
      FilterOperator op = (FilterOperator) nd; //org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; //org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
      ExprNodeDesc condn = op.getConf().getPredicate(); //class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[col3], Const int 5()
//  获得这个FilterOperator的谓词,这是过滤条件,过滤掉不用的记录。
      // get list of columns used in the filter
      List<String> cl = condn.getCols(); //[col3]  获得这个谓词需要用到的columns
      // merge it with the downstream col list
      cppCtx.getPrunedColLists().put(op,
          Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
       
      pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op));

      return null; //
    }
  }

// 获得curOp的所有孩子节点需要用到的所有columns。
  public List<String> genColLists(Operator<? extends Serializable> curOp)
      throws SemanticException {
// curOp = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
    List<String> colList = new ArrayList<String>();   //记录孩子节点需要用到的所有columns
    if (curOp.getChildOperators() != null) {
      for (Operator<? extends Serializable> child : curOp.getChildOperators()) { //遍历所有的孩子节点
   // child = org.apache.hadoop.hive.ql.exec.SelectOperator@347448
        if (child instanceof CommonJoinOperator) {
          int tag = child.getParentOperators().indexOf(curOp);
          List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
          colList = Utilities.mergeUniqElems(colList, prunList);
        } else {
          colList = Utilities
              .mergeUniqElems(colList, prunedColLists.get(child)); // [col1, col2]    获得孩子节点需要的用到的columns,加入colList
        }
      }
    }
    return colList; // [col1, col2]
  }

  public static List<String> mergeUniqElems(List<String> src, List<String> dest) {
  //  src=[col1, col2]      dest = [col3]
    if (dest == null) {
      return src;
    }
    if (src == null) {
      return dest;
    }
    int pos = 0;

    while (pos < dest.size()) {
      if (!src.contains(dest.get(pos))) {
        src.add(dest.get(pos));
      }
      pos++;
    }

    return src; // [col1, col2, col3]
  }



执行完ColumnPrunerFilterProc 的 process后
ColumnPrunerProcCtx:
     private final Map<Operator<? extends Serializable>, List<String>> prunedColLists; // {org.apache.hadoop.hive.ql.exec.SelectOperator@347448=[col1, col2], org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=[col1, col2, col3]}


ColumnPrunerProcFactory:
  private static void pruneOperator(NodeProcessorCtx ctx,
      Operator<? extends Serializable> op,
      List<String> cols)
      throws SemanticException {   //保持顺序
// op = org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb
// cols = [col1, col2, col3]
    // the pruning needs to preserve the order of columns in the input schema
    RowSchema inputSchema = op.getSchema(); // col1: stringcol2: stringcol3: intcol4: stringcol5: stringcol6: stringcol7: string)
    if (inputSchema != null) {
      ArrayList<ColumnInfo> rs = new ArrayList<ColumnInfo>();
      ArrayList<ColumnInfo> inputCols = inputSchema.getSignature(); //[col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
    for (ColumnInfo i: inputCols) {
        if (cols.contains(i.getInternalName())) {
          rs.add(i);  // rs = [col1: string, col2: string, col3: int]
        }
    }
      op.getSchema().setSignature(rs);
    }
  }

Operator:
  public RowSchema getSchema() {
    return rowSchema;
  }
RowSchema:
  public void setSignature(ArrayList<ColumnInfo> signature) {
  // this.signature =  [col1: string, col2: string, col3: int, col4: string, col5: string, col6: string, col7: string]
  // signature =  [col1: string, col2: string, col3: int]
    this.signature = signature;
  }





  public static class ColumnPrunerTableScanProc implements NodeProcessor {
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
        Object... nodeOutputs) throws SemanticException {
      TableScanOperator scanOp = (TableScanOperator) nd; // org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; // org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcCtx@bec43f
      List<String> cols = cppCtx
          .genColLists((Operator<? extends Serializable>) nd);  // [col1, col2, col3]
      cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
          cols);
      ArrayList<Integer> needed_columns = new ArrayList<Integer>();
      RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); // tab1{(col1,col1: string)(col2,col2: string)(col3,col3: int)(col4,col4: string)(col5,col5: string)(col6,col6: string)(col7,col7: string)}
      for (int i = 0; i < cols.size(); i++) {
        int position = inputRR.getPosition(cols.get(i));
        if (position >=0) {
          needed_columns.add(position); // [0, 1, 2]
        }
      }
      scanOp.setNeededColumnIDs(needed_columns);  // scanOp=org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13
      return null; //
    }
  }

ColumnPrunerProcCtx:
  public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
    return opToParseCtxMap; // {org.apache.hadoop.hive.ql.exec.TableScanOperator@5bda13=org.apache.hadoop.hive.ql.parse.OpParseContext@19e3bdd, org.apache.hadoop.hive.ql.exec.FilterOperator@1bcfbeb=org.apache.hadoop.hive.ql.parse.OpParseContext@16c5f50, org.apache.hadoop.hive.ql.exec.SelectOperator@347448=org.apache.hadoop.hive.ql.parse.OpParseContext@1e5a0cb, org.apache.hadoop.hive.ql.exec.FileSinkOperator@de6570=org.apache.hadoop.hive.ql.parse.OpParseContext@9f9761}
  }

TableScanOperator:
  public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
    neededColumnIDs = orign_columns; // [0, 1, 2]
  }
分享到:
评论

相关推荐

    hive客户端安装_hive客户端安装_hive_

    在大数据处理领域,Hive是一个非常重要的工具,它提供了一个基于Hadoop的数据仓库基础设施,用于数据查询、分析和管理大规模数据集。本教程将详细讲解如何在Linux环境下安装Hive客户端,以便进行数据操作和分析。 ...

    HIVE安装及详解

    "HIVE安装及详解" HIVE是一种基于Hadoop的数据仓库工具,主要用于处理和分析大规模数据。下面是关于HIVE的安装及详解。 HIVE基本概念 HIVE是什么?HIVE是一种数据仓库工具,主要用于处理和分析大规模数据。它将...

    Hive_JDBC.zip_hive java_hive jdbc_hive jdbc pom_java hive_maven连

    在大数据处理领域,Apache Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询存储在Hadoop集群中的大型数据集。Hive JDBC(Java Database Connectivity)是Hive提供的一种...

    Ambari下Hive3.0升级到Hive4.0

    在大数据领域,Apache Ambari 是一个用于 Hadoop 集群管理和监控的开源工具,而 Hive 是一个基于 Hadoop 的数据仓库系统,用于处理和分析大规模数据集。本话题聚焦于如何在 Ambari 环境下将 Hive 3.0 升级到 Hive ...

    Hive驱动1.1.0.zip

    在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询和管理存储在Hadoop分布式文件系统(HDFS)中的大量结构化数据。Hive 1.1.0是Hive的一个版本,提供了...

    连接hive依赖的jar包_hive连接方式

    在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,使得用户可以使用SQL语句来处理存储在Hadoop分布式文件系统(HDFS)上的大数据。...

    DBeaver链接hive驱动包下载: hive-jdbc-uber-2.6.5.0-292.jar

    《DBeaver与Hive连接:hive-jdbc-uber-2.6.5.0-292.jar驱动详解》 在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,广泛用于数据查询和分析。而DBeaver,作为一款跨平台的数据库管理工具,以其用户友好的...

    Hive3.1.2编译源码

    使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...

    apache-hive-2.3.9-bin.tar大数据HIVE.zip

    Apache Hive 是一个基于 Hadoop 的数据仓库工具,用于组织、查询和分析大量数据。它提供了一个SQL-like(HQL,Hive SQL)接口,使得非专业程序员也能方便地处理存储在Hadoop分布式文件系统(HDFS)中的大规模数据集...

    hive相关jar包

    在大数据处理领域,Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL(HQL,Hive Query Language)查询和管理存储在Hadoop分布式文件系统(HDFS)中的大量数据。Hive提供了数据整合、元数据管理、查询和分析...

    《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf

    《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第...

    hive 驱动包 hive 链接 datagrip的驱动包

    在大数据处理领域,Hive是一款基于Hadoop的数据仓库工具,它允许用户使用SQL类的语言(称为HQL)来查询、管理、分析存储在Hadoop分布式文件系统中的大规模数据集。而DataGrip是一款由JetBrains公司开发的强大数据库...

    SpringBoot整合hive-jdbc示例

    **SpringBoot整合Hive-JDBC详解** 在大数据处理领域,Hadoop生态中的Hive作为一个数据仓库工具,常常用于处理大规模的数据分析任务。而SpringBoot作为Java开发中的微服务框架,以其简洁的配置和快速的开发能力深受...

    Hive表生成工具,Hive表生成工具Hive表生成工具

    Hive表生成工具,Hive表生成工具Hive表生成工具

    hive驱动包hive-jdbc-uber-2.6.5.0-292.jar(用户客户端连接使用)

    Hive是Apache Hadoop生态系统中的一个数据仓库工具,它允许我们对存储在HDFS上的大数据进行结构化查询和分析。Hive JDBC驱动是Hive与各种数据库管理工具、应用程序之间建立连接的关键组件,使得用户可以通过标准的...

    hive-site.xml

    hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+hadoop配置文件hive+...

    如何在python中写hive脚本

    在Python中编写Hive脚本主要是为了方便地与Hadoop HIVE数据仓库进行交互,这样可以在数据分析和机器学习流程中无缝地集成大数据处理步骤。以下将详细介绍如何在Python环境中执行Hive查询和管理Hive脚本。 1. **直接...

    hive所有jar文件

    Hive和HBase是两种大数据处理工具,它们在大数据生态系统中各自扮演着重要角色。Hive是一个基于Hadoop的数据仓库工具,它允许用户使用SQL-like语法(HQL,Hive Query Language)对大规模数据集进行分析。而HBase是...

    数据库工具连接hive的驱动包

    在IT行业中,数据库管理和分析是至关重要的任务,而Hive作为一个大数据处理的仓库系统,它提供了对结构化数据的查询和分析能力。当需要通过图形化的数据库管理工具,如DBeaver,与Hive进行交互时,就需要用到特定的...

    Hive总结.docx

    【Hive原理】 Hive是基于Hadoop平台的数据仓库解决方案,它主要解决了在大数据场景下,业务人员和数据科学家能够通过熟悉的SQL语言进行数据分析的问题。Hive并不存储数据,而是依赖于HDFS进行数据存储,并利用...

Global site tag (gtag.js) - Google Analytics