- 浏览: 112041 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
我们Hadoop集群中将近百分之80的作业是通过Hive来提交的,由于Hive写起来简单便捷,而且我们又提供了Hive Web Client,所以使用范围很广,包括ba,pm,po,sales都在使用hive进行ad-hoc查询,但是hive在降低用户使用门槛的同时,也使得用户经常写不合理开销很大的语句,生成了很多的mapreduce job,占用了大量slot数,其中最典型的例子就是分区表查询,不指定分区条件,导致hive没有做partition pruner优化,进而读入了所有的表数据,占用大量IO和计算资源。
为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,第一期先不会直接block住语句,而是记录有问题的语句来公告警示.
具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.
compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";
private final SessionState ss = SessionState.get();
private final LogHelper console = SessionState.getConsole();
private Hive hive = null;
private String username;
private String currentDatabase = "default";
private String hql;
private String whereHql;
private String tableAlias;
private String tableName;
private String tableDatabaseName;
private Boolean needCheckPartition = false;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
try {
hql = ss.getCmd().toLowerCase();
hql = StringUtils.replaceChars(hql, '\n', ' ');
if (hql.contains("where")) {
whereHql = hql.substring(hql.indexOf("where"));
}
username = ShimLoader.getHadoopShims().getUserName(context.getConf());
if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
try {
hive = context.getHive();
currentDatabase = hive.getCurrentDatabase();
} catch (HiveException e) {
throw new SemanticException(e);
}
extractFromClause((ASTNode) ast.getChild(0));
if (needCheckPartition && !StringUtils.isBlank(tableName)) {
String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
: tableDatabaseName;
String tbname = tableName;
String[] parts = tableName.split(".");
if (parts.length == 2) {
dbname = parts[0];
tbname = parts[1];
}
Table t = hive.getTable(dbname, tbname);
if (t.isPartitioned()) {
if (StringUtils.isBlank(whereHql)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
} else {
List<FieldSchema> partitionKeys = t.getPartitionKeys();
List<String> partitionNames = new ArrayList<String>();
for (int i = 0; i < partitionKeys.size(); i++) {
partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
}
if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
}
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
return ast;
}
private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
for (String pk : partitionKeys) {
if (sql.contains(pk)) {
return true;
}
if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
return true;
}
}
return false;
}
private void extractFromClause(ASTNode ast) {
if (HiveParser.TOK_FROM == ast.getToken().getType()) {
ASTNode refNode = (ASTNode) ast.getChild(0);
if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
int refNodeChildCount = refNode.getChildCount();
if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
if (tabNameNode.getChildCount() == 2) {
tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
.toLowerCase();
} else if (tabNameNode.getChildCount() == 1) {
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
.toLowerCase();
} else {
return;
}
if (refNodeChildCount == 2) {
tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
.toLowerCase();
}
needCheckPartition = true;
}
}
}
}
@Override
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
// LogHelper console = SessionState.getConsole();
// Set<ReadEntity> readEntitys = context.getInputs();
// console.printInfo("Total Read Entity Size:" + readEntitys.size());
// for (ReadEntity readEntity : readEntitys) {
// Partition p = readEntity.getPartition();
// Table t = readEntity.getTable();
// }
}
}
为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,第一期先不会直接block住语句,而是记录有问题的语句来公告警示.
具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.
compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";
private final SessionState ss = SessionState.get();
private final LogHelper console = SessionState.getConsole();
private Hive hive = null;
private String username;
private String currentDatabase = "default";
private String hql;
private String whereHql;
private String tableAlias;
private String tableName;
private String tableDatabaseName;
private Boolean needCheckPartition = false;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
try {
hql = ss.getCmd().toLowerCase();
hql = StringUtils.replaceChars(hql, '\n', ' ');
if (hql.contains("where")) {
whereHql = hql.substring(hql.indexOf("where"));
}
username = ShimLoader.getHadoopShims().getUserName(context.getConf());
if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
try {
hive = context.getHive();
currentDatabase = hive.getCurrentDatabase();
} catch (HiveException e) {
throw new SemanticException(e);
}
extractFromClause((ASTNode) ast.getChild(0));
if (needCheckPartition && !StringUtils.isBlank(tableName)) {
String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
: tableDatabaseName;
String tbname = tableName;
String[] parts = tableName.split(".");
if (parts.length == 2) {
dbname = parts[0];
tbname = parts[1];
}
Table t = hive.getTable(dbname, tbname);
if (t.isPartitioned()) {
if (StringUtils.isBlank(whereHql)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
} else {
List<FieldSchema> partitionKeys = t.getPartitionKeys();
List<String> partitionNames = new ArrayList<String>();
for (int i = 0; i < partitionKeys.size(); i++) {
partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
}
if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
}
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
return ast;
}
private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
for (String pk : partitionKeys) {
if (sql.contains(pk)) {
return true;
}
if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
return true;
}
}
return false;
}
private void extractFromClause(ASTNode ast) {
if (HiveParser.TOK_FROM == ast.getToken().getType()) {
ASTNode refNode = (ASTNode) ast.getChild(0);
if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
int refNodeChildCount = refNode.getChildCount();
if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
if (tabNameNode.getChildCount() == 2) {
tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
.toLowerCase();
} else if (tabNameNode.getChildCount() == 1) {
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
.toLowerCase();
} else {
return;
}
if (refNodeChildCount == 2) {
tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
.toLowerCase();
}
needCheckPartition = true;
}
}
}
}
@Override
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
// LogHelper console = SessionState.getConsole();
// Set<ReadEntity> readEntitys = context.getInputs();
// console.printInfo("Total Read Entity Size:" + readEntitys.size());
// for (ReadEntity readEntity : readEntitys) {
// Partition p = readEntity.getPartition();
// Table t = readEntity.getTable();
// }
}
}
发表评论
-
hive + hbase
2015-01-04 10:42 773环境配置: hadoop-2.0.0-cdh4.3.0 (4 ... -
hive 数据倾斜
2014-08-27 09:03 687链接:http://www.alidata.org/archi ... -
hive 分通总结
2014-08-27 08:42 576总结分析: 1. 定义了桶,但要生成桶的数据,只能是由其他表 ... -
深入了解Hive Index具体实现
2014-08-25 08:51 739索引是标准的数据库技术,hive 0.7版本之后支持索引。hi ... -
explain hive index
2014-08-24 16:44 1148设置索引: 使用聚合索引优化groupby操作 hive> ... -
Hive 中内部表与外部表的区别与创建方法
2014-08-15 17:11 763分类: Hive 2013-12-07 11:56 ... -
hive map和reduce的控制
2014-08-15 16:14 625一、 控制hive任务中的map数: 1. 通 ... -
hive 压缩策略
2014-08-15 15:16 1769Hive使用的是Hadoop的文件 ... -
hive 在mysql中创建备用数据库
2014-08-15 09:21 881修改hive-site.xml <property> ... -
HIVE 窗口及分析函数
2014-08-11 16:21 1189HIVE 窗口及分析函数 使 ... -
hive 内置函数
2014-08-11 09:06 30701.sort_array(): sort_array(arra ... -
hive lateral view
2014-08-09 14:59 2026通过Lateral view可以方便的将UDTF得到的行转列的 ... -
hive数据的导出
2014-07-28 21:53 445在本博客的《Hive几种数据导入方式》文章中,谈到了Hive中 ... -
hive udaf
2014-07-25 16:11 755package com.lwz.udaf; import o ... -
hive自定义InputFormat
2014-07-25 09:13 862自定义分隔符 package com.lwz.inputf; ... -
HiveServer2连接ZooKeeper出现Too many connections问题的解决
2014-07-24 08:49 1768HiveServer2连接ZooKeeper出现Too man ... -
hive 常用命令
2014-07-17 22:22 6961.hive通过外部设置参数传入脚本中: hiv ... -
CouderaHadoop中hive的Hook扩展
2014-07-16 21:18 3338最近在做关于CDH4.3.0的hive封装,其中遇到了很多问题 ... -
hive 的常用命令
2014-07-16 10:07 0设置、查看hive当前的角色: set sys ... -
hive 授权
2014-07-15 10:51 934Hive授权(Security配置) 博客分类: Hive分 ...
相关推荐
Hive表分区,里面有比较详细的Hive表分区方法,希望能够有所帮助。
Hive查询表分区的MR原理启动详解 Hive是一款基于Hadoop的数据仓库工具,主要用于处理结构化和半结构化的数据。MR(MapReduce)是Hadoop中的一种编程模型,用于处理大规模数据。在Hive中,MR原理启动是指使用...
### 修改Hive表分区名称的方法 在大数据处理领域中,Apache Hive是一款广泛使用的数据仓库工具,它能够将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能,使得用户能够通过简单的SQL语句来处理存储在...
- **提高查询性能**:利用分区裁剪技术,可以在查询执行阶段排除不相关的分区,显著提升查询速度。 - **简化数据管理**:对于大量数据的管理变得更加简单,如删除旧数据或导入新数据等操作仅涉及特定分区。 #### ...
01.hive查询语法--基本查询--条件查询--关联查询.mp4
【标题】:“Hive分区导入”是大数据处理中常见的操作,它涉及到Hadoop生态中的Hive组件,用于高效管理和查询大规模数据。Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL类...
1. **表分区**:分区是Hive提高查询效率的重要手段。通过将大表按特定字段(如日期、地区等)划分为小块,可以减少不必要的数据扫描,提高查询速度。例如,对于日志数据,可以按照日期进行分区。 2. **选择合适的...
### Hive查询优化详解 #### 一、Hive基础与架构 **Hive**作为Hadoop生态中的重要组成部分,被广泛应用于大数据分析领域。它通过提供类SQL语言(HiveQL)来简化对Hadoop分布式文件系统(HDFS)中存储的大规模数据集...
在Java开发中,有时我们需要与大数据处理平台...通过以上步骤,你将能够利用Java与Hive进行有效的交互,执行简单的查询以及更复杂的分析任务。确保理解Hive的架构和最佳实践,这将有助于提高查询效率并确保数据安全性。
在Hive中,内置的时间函数可以满足大部分日常需求,如获取年份、月份、日期等,但并不直接支持按月份加减。为了实现这样的功能,我们可以自定义UDF(User Defined Function),扩展Hive的功能。标题中的“hive时间按...
其实是因为Hive存放的数据是没有索引的,如果没有建立分区直接查询,Hive就会暴力查询,效率很低,所以通过分区能很好提高Hive的查询效率。分区还能够更加方便的管理一些特殊数据,例如一些日志数据,可以是一个天一...
- **背景**:Hive不支持直接使用`HAVING`关键字,但可以通过嵌套子查询并在外层查询中使用`WHERE`条件来实现类似的功能。 - **示例**:如果想实现如下标准SQL的`HAVING`查询: ```sql SELECT gender, COUNT(*) as...
DBMS_REDEFINITION方法是Oracle提供的一个工具包,用于在不锁定原表的情况下,将非分区表在线转换为分区表。通过DBMS_REDEFINITION包中的步骤和函数,可以实现分区表的在线重定义,这种方式对系统影响最小,但需要...
在Hive中,查询操作是数据处理的核心,它允许用户从大数据存储中提取所需的信息。以下将详细解析Hive查询语法及其基本查询方法。 一、Hive查询语法 Hive的查询语句遵循标准SQL的基本结构,但也有一些Hive特有的扩展...
hive双分区外部表复合数据结构博客的数据资料,欢迎下载。
**动态分区**是指在运行时确定分区的值,这样可以在不知道具体分区信息的情况下灵活地添加新分区。 - 设置为非严格模式:`set hive.exec.dynamic.partition.mode=nonstrict;` - 启动动态分区:`set hive.exec....
在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于数据查询和分析。在某些场景下,我们可能需要对数据进行加密或者解密操作,以保护敏感信息或实现特定的数据处理需求。Base64是一种常见的编码...
### Hive 数据加载与导出详解 #### 创建 Hive 表 在使用 Hive 进行数据分析之前,首先需要创建一张表来存储数据。以下是一段创建分区表的 SQL 语句示例: ```sql CREATE TABLE db_0309.emp ( empno INT, ename ...
### Spark或MR引擎插入的数据,Hive表查询数据为0的问题解析 #### 问题背景与现象 在大数据处理场景中,经常会遇到使用不同执行引擎(如Spark、MapReduce (MR) 或 Tez)进行数据处理的情况。其中一种常见的问题是...
总结来说,这个示例展示了如何利用Hive的UDF功能解决特定问题,即在Hive查询中进行日期的月份级别加减。通过创建和注册自定义的Java类,我们可以扩展Hive的内置功能,以适应更复杂的数据处理场景。对于大数据分析...