`
乡里伢崽
  • 浏览: 112041 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

利用SemanticAnalyzerHook回过滤不加分区条件的Hive查询

    博客分类:
  • hive
 
阅读更多
我们Hadoop集群中将近百分之80的作业是通过Hive来提交的,由于Hive写起来简单便捷,而且我们又提供了Hive Web Client,所以使用范围很广,包括ba,pm,po,sales都在使用hive进行ad-hoc查询,但是hive在降低用户使用门槛的同时,也使得用户经常写不合理开销很大的语句,生成了很多的mapreduce job,占用了大量slot数,其中最典型的例子就是分区表查询,不指定分区条件,导致hive没有做partition pruner优化,进而读入了所有的表数据,占用大量IO和计算资源。
       
    为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,第一期先不会直接block住语句,而是记录有问题的语句来公告警示.
       
    具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.

compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;

public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
  private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";

  private final SessionState ss = SessionState.get();
  private final LogHelper console = SessionState.getConsole();
  private Hive hive = null;
  private String username;
  private String currentDatabase = "default";
  private String hql;
  private String whereHql;
  private String tableAlias;
  private String tableName;
  private String tableDatabaseName;
  private Boolean needCheckPartition = false;

  @Override
  public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
      throws SemanticException {
    try {
      hql = ss.getCmd().toLowerCase();
      hql = StringUtils.replaceChars(hql, '\n', ' ');
      if (hql.contains("where")) {
        whereHql = hql.substring(hql.indexOf("where"));
      }
      username = ShimLoader.getHadoopShims().getUserName(context.getConf());

      if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
        try {
          hive = context.getHive();
          currentDatabase = hive.getCurrentDatabase();
        } catch (HiveException e) {
          throw new SemanticException(e);
        }

        extractFromClause((ASTNode) ast.getChild(0));

        if (needCheckPartition && !StringUtils.isBlank(tableName)) {
          String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
              : tableDatabaseName;
          String tbname = tableName;
          String[] parts = tableName.split(".");
          if (parts.length == 2) {
            dbname = parts[0];
            tbname = parts[1];
          }
          Table t = hive.getTable(dbname, tbname);
          if (t.isPartitioned()) {
            if (StringUtils.isBlank(whereHql)) {
              console.printError(String.format(NO_PARTITION_WARNING, hql, username));
            } else {
              List<FieldSchema> partitionKeys = t.getPartitionKeys();
              List<String> partitionNames = new ArrayList<String>();
              for (int i = 0; i < partitionKeys.size(); i++) {
                partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
              }

              if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
                console.printError(String.format(NO_PARTITION_WARNING, hql, username));
              }
            }
          }
        }

      }
    } catch (Exception ex) {
      ex.printStackTrace();
    }
    return ast;
  }

  private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
    for (String pk : partitionKeys) {
      if (sql.contains(pk)) {
        return true;
      }
      if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
        return true;
      }
    }
    return false;
  }

  private void extractFromClause(ASTNode ast) {
    if (HiveParser.TOK_FROM == ast.getToken().getType()) {
      ASTNode refNode = (ASTNode) ast.getChild(0);
      if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
        ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
        int refNodeChildCount = refNode.getChildCount();
        if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
          if (tabNameNode.getChildCount() == 2) {
            tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
            tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
                .toLowerCase();
          } else if (tabNameNode.getChildCount() == 1) {
            tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
                .toLowerCase();
          } else {
            return;
          }

          if (refNodeChildCount == 2) {
            tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
                .toLowerCase();
          }
          needCheckPartition = true;
        }
      }
    }
  }

  @Override
  public void postAnalyze(HiveSemanticAnalyzerHookContext context,
      List<Task<? extends Serializable>> rootTasks) throws SemanticException {
    // LogHelper console = SessionState.getConsole();
    // Set<ReadEntity> readEntitys = context.getInputs();
    // console.printInfo("Total Read Entity Size:" + readEntitys.size());
    // for (ReadEntity readEntity : readEntitys) {
    // Partition p = readEntity.getPartition();
    // Table t = readEntity.getTable();
    // }
  }
}
分享到:
评论

相关推荐

    Hive表分区

    Hive表分区,里面有比较详细的Hive表分区方法,希望能够有所帮助。

    龙战于野大数据MR原理启动hive查询表分区.docx

    Hive查询表分区的MR原理启动详解 Hive是一款基于Hadoop的数据仓库工具,主要用于处理结构化和半结构化的数据。MR(MapReduce)是Hadoop中的一种编程模型,用于处理大规模数据。在Hive中,MR原理启动是指使用...

    修改hive表分区名称

    ### 修改Hive表分区名称的方法 在大数据处理领域中,Apache Hive是一款广泛使用的数据仓库工具,它能够将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能,使得用户能够通过简单的SQL语句来处理存储在...

    hive数据分区时分区字段不可为中文。.doc

    - **提高查询性能**:利用分区裁剪技术,可以在查询执行阶段排除不相关的分区,显著提升查询速度。 - **简化数据管理**:对于大量数据的管理变得更加简单,如删除旧数据或导入新数据等操作仅涉及特定分区。 #### ...

    01.hive查询语法--基本查询--条件查询--关联查询.mp4

    01.hive查询语法--基本查询--条件查询--关联查询.mp4

    hive分区导入

    【标题】:“Hive分区导入”是大数据处理中常见的操作,它涉及到Hadoop生态中的Hive组件,用于高效管理和查询大规模数据。Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL类...

    Hive查询优化整理与Hive简易版思维导图

    1. **表分区**:分区是Hive提高查询效率的重要手段。通过将大表按特定字段(如日期、地区等)划分为小块,可以减少不必要的数据扫描,提高查询速度。例如,对于日志数据,可以按照日期进行分区。 2. **选择合适的...

    hive查询优化

    ### Hive查询优化详解 #### 一、Hive基础与架构 **Hive**作为Hadoop生态中的重要组成部分,被广泛应用于大数据分析领域。它通过提供类SQL语言(HiveQL)来简化对Hadoop分布式文件系统(HDFS)中存储的大规模数据集...

    java_hive简单查询

    在Java开发中,有时我们需要与大数据处理平台...通过以上步骤,你将能够利用Java与Hive进行有效的交互,执行简单的查询以及更复杂的分析任务。确保理解Hive的架构和最佳实践,这将有助于提高查询效率并确保数据安全性。

    hive时间按月份加减UDF

    在Hive中,内置的时间函数可以满足大部分日常需求,如获取年份、月份、日期等,但并不直接支持按月份加减。为了实现这样的功能,我们可以自定义UDF(User Defined Function),扩展Hive的功能。标题中的“hive时间按...

    Hive的分区表

    其实是因为Hive存放的数据是没有索引的,如果没有建立分区直接查询,Hive就会暴力查询,效率很低,所以通过分区能很好提高Hive的查询效率。分区还能够更加方便的管理一些特殊数据,例如一些日志数据,可以是一个天一...

    部分普通sql查询在hive中的实现方式

    - **背景**:Hive不支持直接使用`HAVING`关键字,但可以通过嵌套子查询并在外层查询中使用`WHERE`条件来实现类似的功能。 - **示例**:如果想实现如下标准SQL的`HAVING`查询: ```sql SELECT gender, COUNT(*) as...

    BLOG_如何将一个普通表转换为分区表.pdf

    DBMS_REDEFINITION方法是Oracle提供的一个工具包,用于在不锁定原表的情况下,将非分区表在线转换为分区表。通过DBMS_REDEFINITION包中的步骤和函数,可以实现分区表的在线重定义,这种方式对系统影响最小,但需要...

    Hive中查询操作

    在Hive中,查询操作是数据处理的核心,它允许用户从大数据存储中提取所需的信息。以下将详细解析Hive查询语法及其基本查询方法。 一、Hive查询语法 Hive的查询语句遵循标准SQL的基本结构,但也有一些Hive特有的扩展...

    hive 双分区外部表 复合数据结构 样例

    hive双分区外部表复合数据结构博客的数据资料,欢迎下载。

    hive分区表分通表建表语句详解和例子

    **动态分区**是指在运行时确定分区的值,这样可以在不知道具体分区信息的情况下灵活地添加新分区。 - 设置为非严格模式:`set hive.exec.dynamic.partition.mode=nonstrict;` - 启动动态分区:`set hive.exec....

    base64加密解密的hive udf函数

    在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于数据查询和分析。在某些场景下,我们可能需要对数据进行加密或者解密操作,以保护敏感信息或实现特定的数据处理需求。Base64是一种常见的编码...

    hive数据加载导出查询

    ### Hive 数据加载与导出详解 #### 创建 Hive 表 在使用 Hive 进行数据分析之前,首先需要创建一张表来存储数据。以下是一段创建分区表的 SQL 语句示例: ```sql CREATE TABLE db_0309.emp ( empno INT, ename ...

    spark或mr引擎插入的数据,hive表查询数据为0

    ### Spark或MR引擎插入的数据,Hive表查询数据为0的问题解析 #### 问题背景与现象 在大数据处理场景中,经常会遇到使用不同执行引擎(如Spark、MapReduce (MR) 或 Tez)进行数据处理的情况。其中一种常见的问题是...

    hive按月份加减udf范例

    总结来说,这个示例展示了如何利用Hive的UDF功能解决特定问题,即在Hive查询中进行日期的月份级别加减。通过创建和注册自定义的Java类,我们可以扩展Hive的内置功能,以适应更复杂的数据处理场景。对于大数据分析...

Global site tag (gtag.js) - Google Analytics