- 浏览: 347811 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create%2FDropTable
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[
[ROW FORMAT row_format] [STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0)
]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0)
[AS select_statement] (Note: this feature is only available starting with 0.5.0.)
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
LIKE existing_table_or_view_name
[LOCATION hdfs_path]
create table tablePartition(s string) partitioned by(pt string);
alter table tablePartition add if not exists partition(pt='1');
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) { // line = create table tablePartition(s string) partitioned by(pt string);
ret = processCmd(command);
}
CliDriver
public int processCmd(String cmd) { // cmd = create table tablePartition(s string) partitioned by(pt string)
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); // tokens[0] = create
创建return new Driver();来处理
Driver
CommandProcessorResponse run(String command) {
// command = create table tablePartition(s string) partitioned by(pt string)
int ret = compile(command);
ret = execute();
}
(1)compile
public int compile(String command) {
return new SemanticAnalyzer(conf);
BaseSemanticAnalyzer sem.analyze(tree, ctx);
SemanticAnalyzer analyzeInternal(ASTNode ast)
SemanticAnalyzer ASTNode analyzeCreateTable(ASTNode ast, QB qb )
plan = new QueryPlan(command, sem);
}
SemanticAnalyzer:
public void analyzeInternal(ASTNode ast) throws SemanticException {
// 带有create的hivesql会执行下面的代码
// analyze create table command
if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) {
isCreateTable = true;
// if it is not CTAS, we don't need to go further and just return
if ((child = analyzeCreateTable(ast, qb)) == null) {
return;
}
}
}
private ASTNode analyzeCreateTable(ASTNode ast, QB qb)
throws SemanticException {
String tableName = getUnescapedName((ASTNode)ast.getChild(0)); //第一个孩子节点是表名
final int CREATE_TABLE = 0; // regular CREATE TABLE
final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT)
final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS)
Hive.g:
tableFileFormat
@init { msgs.push("table file format specification"); }
@after { msgs.pop(); }
:
KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE
| KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE
| KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE
| KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral (KW_INPUTDRIVER inDriver=StringLiteral KW_OUTPUTDRIVER outDriver=StringLiteral)?
-> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt $inDriver? $outDriver?)
| KW_STORED KW_BY storageHandler=StringLiteral
(KW_WITH KW_SERDEPROPERTIES serdeprops=tableProperties)?
-> ^(TOK_STORAGEHANDLER $storageHandler $serdeprops?)
| KW_STORED KW_AS genericSpec=Identifier
-> ^(TOK_FILEFORMAT_GENERIC $genericSpec)
;
Hive.g里面的词法分析,把STORED AS SEQUENCEFILE直接转化成了一个TOK_TBLSEQUENCEFILE节点。
(1)处理file format信息,[STORED AS file_format]
(2)有IF NOT EXISTS,那么ifNotExists = true;
(3)有EXTERNAL,外部表isExt = true;
(4)LIKE ,获得like的表名,command_type = CTLT; likeTableName存在需要处理一些不符合语法的情况。
(5)如果有查询语句,说明是[AS select_statement],command_type = CTAS;selectStmt = child;
(6)有columns,处理所有的非分区字段信息[(col_name data_type [COMMENT col_comment], ...)],cols = getColumns(child);
(7)表的注释[COMMENT table_comment],comment = unescapeSQLString(child.getChild(0).getText());
(8)有分区字段信息,处理所有的分区字段信息,[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)],partCols = getColumns((ASTNode) child.getChild(0), false);
(9)处理BUCKETS信息,[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
(10)处理ROW FORMAT信息,有4个fieldDelim、collItemDelim、mapKeyDelim、lineDelim,lineDelim如果不是"\n"或者"10"那么会抛出异常,[ROW FORMAT row_format]
(11)处理location信息,[LOCATION hdfs_path],location = unescapeSQLString(child.getChild(0).getText());
(12)处理表属性信息,[TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0),用一个map来存,Map<String, String> tblProps
(13)处理serde信息,STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0)
(14)如果不是SEQUENCEFILE、RCFILE、TEXTFILE三者之中的一个,那么抛出异常。
if (ifNotExists) { //如果有IF NOT EXISTS
根据tableName在数据库中获取这个表,如果能够获取到,表已存在则返回null.
}
CreateTableDesc crtTblDesc = null;
switch (command_type) { //根据3种不同类型的建表语句执行不同的操作
case CREATE_TABLE: // REGULAR CREATE TABLE DDL
crtTblDesc=new CreateTableDesc(***);
new DDLWork,new DDLTask加入rootTasks
case CTLT: // create table like <tbl_name>
CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc
new DDLWork,new DDLTask加入rootTasks
case CTAS: // create table as select
先验证表tableName不存在,如果存在则抛出异常。
crtTblDesc = new CreateTableDesc();
qb.setTableDesc(crtTblDesc);
return selectStmt; // CTAS返回查询子节点。
}
return null; //普通的CREATE_TABLE或者CTLT,返回null
}
create table tablePartition(s string) partitioned by(pt string);
hive> explain create table tablePartition(s string) partitioned by(pt string);
OK
ABSTRACT SYNTAX TREE:
(TOK_CREATETABLE tablePartition TOK_LIKETABLE (TOK_TABCOLLIST (TOK_TABCOL s TOK_STRING)) (TOK_TABLEPARTCOLS (TOK_TABCOLLIST (TOK_TABCOL pt TOK_STRING))))
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Create Table Operator:
Create Table
columns: s string
if not exists: false
input format: org.apache.hadoop.mapred.TextInputFormat
# buckets: -1
output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
partition columns: pt string
name: tablePartition
isExternal: false
private ASTNode analyzeCreateTable(ASTNode ast, QB qb)
throws SemanticException {
String tableName = unescapeIdentifier(ast.getChild(0).getText()); // 表名tablePartition
String likeTableName = null;
List<FieldSchema> cols = new ArrayList<FieldSchema>();
List<FieldSchema> partCols = new ArrayList<FieldSchema>();
List<String> bucketCols = new ArrayList<String>();
List<Order> sortCols = new ArrayList<Order>();
int numBuckets = -1;
String fieldDelim = null;
String fieldEscape = null;
String collItemDelim = null;
String mapKeyDelim = null;
String lineDelim = null;
String comment = null;
String inputFormat = null;
String outputFormat = null;
String location = null;
String serde = null;
String storageHandler = null;
Map<String, String> serdeProps = new HashMap<String, String>();
Map<String, String> tblProps = null;
boolean ifNotExists = false;
boolean isExt = false;
ASTNode selectStmt = null;
final int CREATE_TABLE = 0; // regular CREATE TABLE
final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT)
final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS)
int command_type = CREATE_TABLE; // 0
LOG.info("Creating table " + tableName + " position="
+ ast.getCharPositionInLine()); // 11/08/28 05:20:26 INFO parse.SemanticAnalyzer: Creating table tablePartition position=13
int numCh = ast.getChildCount(); // numCh = 4
/*
* Check the 1st-level children and do simple semantic checks: 1) CTLT and
* CTAS should not coexists. 2) CTLT or CTAS should not coexists with column
* list (target table schema). 3) CTAS does not support partitioning (for
* now).
*/
for (int num = 1; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num); // num=1,child =TOK_LIKETABLE; num=2,child=TOK_TABCOLLIST; num=3,child=TOK_TABLEPARTCOLS
switch (child.getToken().getType()) {
case HiveParser.TOK_IFNOTEXISTS:
ifNotExists = true;
break;
case HiveParser.KW_EXTERNAL:
isExt = true;
break;
case HiveParser.TOK_LIKETABLE:
if (child.getChildCount() > 0) { //
likeTableName = unescapeIdentifier(child.getChild(0).getText());
if (likeTableName != null) {
if (command_type == CTAS) {
throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE
.getMsg());
}
if (cols.size() != 0) {
throw new SemanticException(ErrorMsg.CTLT_COLLST_COEXISTENCE
.getMsg());
}
}
command_type = CTLT;
}
break;
case HiveParser.TOK_QUERY: // CTAS
if (command_type == CTLT) {
throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE.getMsg());
}
if (cols.size() != 0) {
throw new SemanticException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg());
}
if (partCols.size() != 0 || bucketCols.size() != 0) {
boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
if (dynPart == false) {
throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
} else {
// TODO: support dynamic partition for CTAS
throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
}
}
if (isExt) {
throw new SemanticException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg());
}
command_type = CTAS;
selectStmt = child;
break;
case HiveParser.TOK_TABCOLLIST:
cols = getColumns(child); //见下面 [FieldSchema(name:s, type:string, comment:null)]
break;
case HiveParser.TOK_TABLECOMMENT:
comment = unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPARTCOLS:
partCols = getColumns((ASTNode) child.getChild(0), false); //[FieldSchema(name:pt, type:string, comment:null)]
break;
case HiveParser.TOK_TABLEBUCKETS:
bucketCols = getColumnNames((ASTNode) child.getChild(0));
if (child.getChildCount() == 2) {
numBuckets = (Integer.valueOf(child.getChild(1).getText()))
.intValue();
} else {
sortCols = getColumnNamesOrder((ASTNode) child.getChild(1));
numBuckets = (Integer.valueOf(child.getChild(2).getText()))
.intValue();
}
break;
case HiveParser.TOK_TABLEROWFORMAT:
child = (ASTNode) child.getChild(0);
int numChildRowFormat = child.getChildCount();
for (int numC = 0; numC < numChildRowFormat; numC++) {
ASTNode rowChild = (ASTNode) child.getChild(numC);
switch (rowChild.getToken().getType()) {
case HiveParser.TOK_TABLEROWFORMATFIELD:
fieldDelim = unescapeSQLString(rowChild.getChild(0).getText());
if (rowChild.getChildCount() >= 2) {
fieldEscape = unescapeSQLString(rowChild.getChild(1).getText());
}
break;
case HiveParser.TOK_TABLEROWFORMATCOLLITEMS:
collItemDelim = unescapeSQLString(rowChild.getChild(0).getText());
break;
case HiveParser.TOK_TABLEROWFORMATMAPKEYS:
mapKeyDelim = unescapeSQLString(rowChild.getChild(0).getText());
break;
case HiveParser.TOK_TABLEROWFORMATLINES:
lineDelim = unescapeSQLString(rowChild.getChild(0).getText());
if (!lineDelim.equals("\n") && !lineDelim.equals("10")) {
throw new SemanticException(
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg());
}
break;
default:
assert false;
}
}
break;
case HiveParser.TOK_TABLESERIALIZER:
child = (ASTNode) child.getChild(0);
serde = unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
readProps(
(ASTNode) (child.getChild(1).getChild(0)),
serdeProps);
}
break;
case HiveParser.TOK_TBLSEQUENCEFILE:
inputFormat = SEQUENCEFILE_INPUT;
outputFormat = SEQUENCEFILE_OUTPUT;
break;
case HiveParser.TOK_TBLTEXTFILE:
inputFormat = TEXTFILE_INPUT;
outputFormat = TEXTFILE_OUTPUT;
break;
case HiveParser.TOK_TBLRCFILE:
inputFormat = RCFILE_INPUT;
outputFormat = RCFILE_OUTPUT;
serde = COLUMNAR_SERDE;
break;
case HiveParser.TOK_TABLEFILEFORMAT:
inputFormat = unescapeSQLString(child.getChild(0).getText());
outputFormat = unescapeSQLString(child.getChild(1).getText());
break;
case HiveParser.TOK_TABLELOCATION:
location = unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPROPERTIES:
tblProps = DDLSemanticAnalyzer.getProps((ASTNode) child.getChild(0));
break;
case HiveParser.TOK_STORAGEHANDLER:
storageHandler = unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
readProps(
(ASTNode) (child.getChild(1).getChild(0)),
serdeProps);
}
break;
default:
assert false;
}
}
if ((command_type == CTAS) && (storageHandler != null)) { // false
throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg());
}
if ((inputFormat == null) && (storageHandler == null)) { //true
assert outputFormat == null;
if ("SequenceFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
inputFormat = SEQUENCEFILE_INPUT;
outputFormat = SEQUENCEFILE_OUTPUT;
} else if ("RCFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
inputFormat = RCFILE_INPUT;
outputFormat = RCFILE_OUTPUT;
serde = COLUMNAR_SERDE;
} else {
inputFormat = TEXTFILE_INPUT; // org.apache.hadoop.mapred.TextInputFormat
outputFormat = TEXTFILE_OUTPUT;//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
}
}
默认的文件格式可以通过配置文件设置:
// HiveConf里面 HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
// hive-default.xml里面:
<property>
<name>hive.default.fileformat</name>
<value>TextFile</value>
<description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override</description>
</property>
// check for existence of table
if (ifNotExists) { // false,create的时候没有加上IF NOT EXISTS
try {
if (null != db.getTable(db.getCurrentDatabase(), tableName, false)) {
return null;
}
} catch (HiveException e) {
e.printStackTrace();
}
}
// Handle different types of CREATE TABLE command
CreateTableDesc crtTblDesc = null;
switch (command_type) { // 0
case CREATE_TABLE: // REGULAR CREATE TABLE DDL
crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
outputFormat, location, serde, storageHandler, serdeProps,
tblProps, ifNotExists);
validateCreateTable(crtTblDesc); //
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
crtTblDesc), conf)); //创建DDLWork,createTblDesc不为空。 创建DDLTask,加入rootTasks。
break;
case CTLT: // create table like <tbl_name>
CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc(tableName,
isExt, location, ifNotExists, likeTableName);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
crtTblLikeDesc), conf));
break;
case CTAS: // create table as select
// check for existence of table. Throw an exception if it exists.
try {
Table tab = db.getTable(db.getCurrentDatabase(), tableName, false);
// do not throw exception if table does not exist
if (tab != null) {
throw new SemanticException(ErrorMsg.TABLE_ALREADY_EXISTS
.getMsg(tableName));
}
} catch (HiveException e) { // may be unable to get meta data
throw new SemanticException(e);
}
crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
outputFormat, location, serde, storageHandler, serdeProps,
tblProps, ifNotExists);
qb.setTableDesc(crtTblDesc);
return selectStmt;
default:
assert false; // should never be unknown command type
}
return null; //返回null
}
BaseSemanticAnalyzer:
protected List<FieldSchema> getColumns(ASTNode ast) throws SemanticException { //获得columns的信息
//ast = TOK_TABCOLLIST
return getColumns(ast, true);
}
protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException {
List<FieldSchema> colList = new ArrayList<FieldSchema>(); // []
int numCh = ast.getChildCount(); // 1
for (int i = 0; i < numCh; i++) {
FieldSchema col = new FieldSchema();
ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL
String name = child.getChild(0).getText(); // s
if(lowerCase) { // true
name = name.toLowerCase(); // s
}
// child 0 is the name of the column
col.setName(unescapeIdentifier(name)); //FieldSchema(name:s, type:null, comment:null)
// child 1 is the type of the column
ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING
col.setType(getTypeStringFromAST(typeChild)); //FieldSchema(name:s, type:string, comment:null)
// child 2 is the optional comment of the column
if (child.getChildCount() == 3) {
col.setComment(unescapeSQLString(child.getChild(2).getText()));
}
colList.add(col); //[FieldSchema(name:s, type:string, comment:null)]
}
return colList; //[FieldSchema(name:s, type:string, comment:null)]
}
BaseSemanticAnalyzer:
protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException {
List<FieldSchema> colList = new ArrayList<FieldSchema>();
int numCh = ast.getChildCount(); // 1
for (int i = 0; i < numCh; i++) {
FieldSchema col = new FieldSchema();
ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL
String name = child.getChild(0).getText(); // pt
if(lowerCase) { //false
name = name.toLowerCase();
}
// child 0 is the name of the column
col.setName(unescapeIdentifier(name)); // FieldSchema(name:pt, type:null, comment:null)
// child 1 is the type of the column
ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING
col.setType(getTypeStringFromAST(typeChild)); // FieldSchema(name:pt, type:string, comment:null)
// child 2 is the optional comment of the column
if (child.getChildCount() == 3) {
col.setComment(unescapeSQLString(child.getChild(2).getText()));
}
colList.add(col); // [FieldSchema(name:pt, type:string, comment:null)]
}
return colList; //[FieldSchema(name:pt, type:string, comment:null)]
}
CreateTableDesc
public CreateTableDesc(String tableName, boolean isExternal,
List<FieldSchema> cols, List<FieldSchema> partCols,
List<String> bucketCols, List<Order> sortCols, int numBuckets,
String fieldDelim, String fieldEscape, String collItemDelim,
String mapKeyDelim, String lineDelim, String comment, String inputFormat,
String outputFormat, String location, String serName,
String storageHandler,
Map<String, String> serdeProps,
Map<String, String> tblProps,
boolean ifNotExists) {
this.tableName = tableName; // tablePartition
this.isExternal = isExternal; // false
this.bucketCols = new ArrayList<String>(bucketCols); // []
this.sortCols = new ArrayList<Order>(sortCols); // []
this.collItemDelim = collItemDelim; // null
this.cols = new ArrayList<FieldSchema>(cols); // [FieldSchema(name:s, type:string, comment:null)]
this.comment = comment; // null
this.fieldDelim = fieldDelim; // null
this.fieldEscape = fieldEscape; // null
this.inputFormat = inputFormat; // org.apache.hadoop.mapred.TextInputFormat
this.outputFormat = outputFormat; // org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
this.lineDelim = lineDelim; // null
this.location = location; // null
this.mapKeyDelim = mapKeyDelim; // null
this.numBuckets = numBuckets; // -1
this.partCols = new ArrayList<FieldSchema>(partCols); // [FieldSchema(name:pt, type:string, comment:null)]
this.serName = serName; // null
this.storageHandler = storageHandler; // null
this.serdeProps = serdeProps; // {}
this.tblProps = tblProps; // null
this.ifNotExists = ifNotExists; // false
}
CreateTableDesc包含的信息:
(1)表名,tablePartition
(2)是否为外部表,默认是false
(3)非分区字段信息,cols
(4)inputFormat
(5)outputFormat
(6)分区字段信息,partCols
(7)ifNotExists,默认为false
private void validateCreateTable(CreateTableDesc crtTblDesc)
throws SemanticException {
if ((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) { // false
// for now make sure that serde exists
if (StringUtils.isEmpty(crtTblDesc.getSerName())
|| !SerDeUtils.shouldGetColsFromSerDe(crtTblDesc.getSerName())) {
throw new SemanticException(ErrorMsg.INVALID_TBL_DDL_SERDE.getMsg());
}
return;
}
if (crtTblDesc.getStorageHandler() == null) {
try {
Class<?> origin = Class.forName(crtTblDesc.getOutputFormat(), true,
JavaUtils.getClassLoader()); //class org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
.getOutputFormatSubstitute(origin); // org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
if (replaced == null) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE
.getMsg());
}
} catch (ClassNotFoundException e) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg());
}
}
List<String> colNames = validateColumnNameUniqueness(crtTblDesc.getCols());//验证没有两个column的名字是相同的。 [s]
if (crtTblDesc.getBucketCols() != null) { //
// all columns in cluster and sort are valid columns
Iterator<String> bucketCols = crtTblDesc.getBucketCols().iterator();
while (bucketCols.hasNext()) {
String bucketCol = bucketCols.next();
boolean found = false;
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = colNamesIter.next();
if (bucketCol.equalsIgnoreCase(colName)) {
found = true;
break;
}
}
if (!found) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
}
}
}
if (crtTblDesc.getSortCols() != null) {
// all columns in cluster and sort are valid columns
Iterator<Order> sortCols = crtTblDesc.getSortCols().iterator();
while (sortCols.hasNext()) {
String sortCol = sortCols.next().getCol();
boolean found = false;
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = colNamesIter.next();
if (sortCol.equalsIgnoreCase(colName)) {
found = true;
break;
}
}
if (!found) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
}
}
}
if (crtTblDesc.getPartCols() != null) { //分区的名字和columns的名字没有相同的。
// there is no overlap between columns and partitioning columns
Iterator<FieldSchema> partColsIter = crtTblDesc.getPartCols().iterator();
while (partColsIter.hasNext()) {
String partCol = partColsIter.next().getName();
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = unescapeIdentifier(colNamesIter.next());
if (partCol.equalsIgnoreCase(colName)) {
throw new SemanticException(
ErrorMsg.COLUMN_REPEATED_IN_PARTITIONING_COLS.getMsg());
}
}
}
}
}
DDLWork 他的createTblDesc 不为空
生成的task是org.apache.hadoop.hive.ql.exec.DDLTask
(2)execute
Driver:
public int execute() {
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
driverCxt.addToRunnable(tsk);
}
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); // Driver.launchTask
}
}
}
Driver.launchTask
TaskRunner.runSequential
Task. executeTask
DDLTask. Execute
CreateTableDesc crtTbl = work.getCreateTblDesc();
if (crtTbl != null) {
return createTable(db, crtTbl);
}
DDLTask. createTable
Hive.createTable
HiveMetaStoreClient.createTable
HiveMetaStore.createTable
HiveMetaStore.create_table_core
ObjectStore
DDLTask:
/**
* Create a new table.
*
* @param db
* The database in question.
* @param crtTbl
* This is the table we're creating.
* @return Returns 0 when execution succeeds and above 0 if it fails.
* @throws HiveException
* Throws this exception if an unexpected error occurs.
*/
private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException {
// create the table
Table tbl = new Table(db.getCurrentDatabase(), crtTbl.getTableName());
//Hive.java db是Hive的一个实例
public String getCurrentDatabase() {
if (null == currentDatabase) {
currentDatabase = DEFAULT_DATABASE_NAME; DEFAULT_DATABASE_NAME默认是"default"
}
return currentDatabase;
}
if (crtTbl.getPartCols() != null) {
tbl.setPartCols(crtTbl.getPartCols()); //[FieldSchema(name:pt, type:string, comment:null)]
}
if (crtTbl.getNumBuckets() != -1) {
tbl.setNumBuckets(crtTbl.getNumBuckets());
}
if (crtTbl.getStorageHandler() != null) {
tbl.setProperty(
org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE,
crtTbl.getStorageHandler());
}
HiveStorageHandler storageHandler = tbl.getStorageHandler(); // null
if (crtTbl.getSerName() != null) { // null
tbl.setSerializationLib(crtTbl.getSerName());
} else {
if (crtTbl.getFieldDelim() != null) { // null false
tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl
.getFieldDelim());
}
if (crtTbl.getFieldEscape() != null) { // null false
tbl.setSerdeParam(Constants.ESCAPE_CHAR, crtTbl.getFieldEscape());
}
if (crtTbl.getCollItemDelim() != null) {
tbl
.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl
.getCollItemDelim());
}
if (crtTbl.getMapKeyDelim() != null) {
tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
}
if (crtTbl.getLineDelim() != null) {
tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
}
}
if (crtTbl.getSerdeProps() != null) { //serde属性
Iterator<Entry<String, String>> iter = crtTbl.getSerdeProps().entrySet()
.iterator();
while (iter.hasNext()) {
Entry<String, String> m = iter.next();
tbl.setSerdeParam(m.getKey(), m.getValue());
}
}
if (crtTbl.getTblProps() != null) { //表属性
tbl.getTTable().getParameters().putAll(crtTbl.getTblProps());
}
/*
* We use LazySimpleSerDe by default.
*
* If the user didn't specify a SerDe, and any of the columns are not simple
* types, we will have to use DynamicSerDe instead.
*/
if (crtTbl.getSerName() == null) { //serdename为空
if (storageHandler == null) {
LOG.info("Default to LazySimpleSerDe for table " + crtTbl.getTableName()); //11/08/28 06:01:29 INFO exec.DDLTask: Default to LazySimpleSerDe for table tablePartition
tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
// 默认使用serde是org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
} else {
String serDeClassName = storageHandler.getSerDeClass().getName();
LOG.info("Use StorageHandler-supplied " + serDeClassName
+ " for table " + crtTbl.getTableName());
tbl.setSerializationLib(serDeClassName);
}
} else {
// let's validate that the serde exists
validateSerDe(crtTbl.getSerName());
}
if (crtTbl.getCols() != null) { //[FieldSchema(name:s, type:string, comment:null)]
tbl.setFields(crtTbl.getCols());
}
if (crtTbl.getBucketCols() != null) {
tbl.setBucketCols(crtTbl.getBucketCols());
}
if (crtTbl.getSortCols() != null) {
tbl.setSortCols(crtTbl.getSortCols());
}
if (crtTbl.getComment() != null) {
tbl.setProperty("comment", crtTbl.getComment()); //表的注释
}
if (crtTbl.getLocation() != null) {
tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri());
}
tbl.setInputFormatClass(crtTbl.getInputFormat());
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
tbl.getTTable().getSd().setInputFormat(
tbl.getInputFormatClass().getName()); // org.apache.hadoop.mapred.TextInputFormat
tbl.getTTable().getSd().setOutputFormat(
tbl.getOutputFormatClass().getName());//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
if (crtTbl.isExternal()) { //如果是外部表
tbl.setProperty("EXTERNAL", "TRUE"); //添加一个表的属性是EXTERNAL=TRUE
tbl.setTableType(TableType.EXTERNAL_TABLE);//设置表的类型是EXTERNAL_TABLE
}
// If the sorted columns is a superset of bucketed columns, store this fact.
// It can be later used to
// optimize some group-by queries. Note that, the order does not matter as
// long as it in the first
// 'n' columns where 'n' is the length of the bucketed columns.
if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) {
List<String> bucketCols = tbl.getBucketCols();
List<Order> sortCols = tbl.getSortCols();
if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) {
boolean found = true;
Iterator<String> iterBucketCols = bucketCols.iterator();
while (iterBucketCols.hasNext()) {
String bucketCol = iterBucketCols.next();
boolean colFound = false;
for (int i = 0; i < bucketCols.size(); i++) {
if (bucketCol.equals(sortCols.get(i).getCol())) {
colFound = true;
break;
}
}
if (colFound == false) {
found = false;
break;
}
}
if (found) {
tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
}
}
}
int rc = setGenericTableAttributes(tbl); //设置通用属性
if (rc != 0) {
return rc;
}
// create the table
db.createTable(tbl, crtTbl.getIfNotExists()); //Hive.createTable(Table tbl, boolean ifNotExists)
// add the table privilege to creator
if (SessionState.get() != null) {
String userName = SessionState.get().getUserName();
Authorizer authorizer = SessionState.get().getAuthorizer();
try {
if (userName != null) {
AuthorizeEntry entry =
new AuthorizeEntry(null, tbl, null,
PrivilegeLevel.TABLE_LEVEL.getPrivileges(),
PrivilegeLevel.TABLE_LEVEL);
authorizer.grant(userName, userName, entry);
}
} catch (AuthorizeException e) {
throw new HiveException(e);
}
}
work.getOutputs().add(new WriteEntity(tbl));
return 0; //成功,返回0
}
默认的一些信息:
Default to LazySimpleSerDe for table tablePartition
sd . serdeInfo. serializationLib 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Table默认的inputFormatClass是
class org.apache.hadoop.mapred.TextInputFormat
Table默认的outputFormatClass是
class org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Table
public Table(String databaseName, String tableName) { // default tablePartition
this(getEmptyTable(databaseName, tableName));
}
static org.apache.hadoop.hive.metastore.api.Table
getEmptyTable(String databaseName, String tableName) {
StorageDescriptor sd = new StorageDescriptor();
{
sd.setSerdeInfo(new SerDeInfo());
sd.setNumBuckets(-1);
sd.setBucketCols(new ArrayList<String>());
sd.setCols(new ArrayList<FieldSchema>());
sd.setParameters(new HashMap<String, String>());
sd.setSortCols(new ArrayList<Order>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
// We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does
// not support a table with no columns.
sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
sd.getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1"); //这个是默认加入的
sd.setInputFormat(SequenceFileInputFormat.class.getName()); // 默认是sequencefile
sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());
}
org.apache.hadoop.hive.metastore.api.Table t = new org.apache.hadoop.hive.metastore.api.Table();
{
t.setSd(sd);
t.setPartitionKeys(new ArrayList<FieldSchema>());
t.setParameters(new HashMap<String, String>());
t.setTableType(TableType.MANAGED_TABLE.toString()); //默认是MANAGED_TABLE类型的表
t.setDbName(databaseName); // default
t.setTableName(tableName); // tablePartition
t.setDbName(databaseName); //多余的
}
return t;
}
Constants.SERIALIZATION_FORMAT
public static final String SERIALIZATION_FORMAT = "serialization.format" 默认为1
public enum TableType {
MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW
}
Hive:
public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
try {
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
tbl.setDbName(getCurrentDatabase());
}
if (tbl.getCols().size() == 0) {
tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
tbl.getDeserializer()));
}
tbl.checkValidity(); //检查表的合法性,如至少需要有一个column、表名不为空,且名字是字母数字或者下划线的组合、没有两个column的名字相同、分区字段和非分区字段没有相同的字段名
getMSC().createTable(tbl.getTTable()); // HiveMetaStoreClient
} catch (AlreadyExistsException e) {
if (!ifNotExists) {
throw new HiveException(e);
}
} catch (Exception e) {
throw new HiveException(e);
}
}
HiveMetaStoreClient
public void createTable(Table tbl) throws AlreadyExistsException,
InvalidObjectException, MetaException, NoSuchObjectException, TException {
HiveMetaHook hook = getHook(tbl);
if (hook != null) {
hook.preCreateTable(tbl);
}
boolean success = false;
try {
client.create_table(tbl); // org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler
if (hook != null) {
hook.commitCreateTable(tbl);
}
success = true; //成功
} finally {
if (!success && (hook != null)) {
hook.rollbackCreateTable(tbl);
}
}
}
HiveMetaStore.HMSHandler:
public void create_table(final Table tbl) throws AlreadyExistsException,
MetaException, InvalidObjectException {
incrementCounter("create_table");
logStartFunction("create_table: db=" + tbl.getDbName() + " tbl="
+ tbl.getTableName()); // 11/08/28 06:12:02 INFO metastore.HiveMetaStore: 0: create_table: db=default tbl=tablePartition
try {
executeWithRetry(new Command<Boolean>() {
@Override
Boolean run(RawStore ms) throws Exception {
create_table_core(ms, tbl); //
return Boolean.TRUE; //返回true
}
});
} catch (AlreadyExistsException e) {
throw e;
} catch (MetaException e) {
throw e;
} catch (InvalidObjectException e) {
throw e;
} catch (Exception e) {
assert(e instanceof RuntimeException);
throw (RuntimeException)e;
}
}
HiveMetaStore.HMSHandler:
private <T> T executeWithRetry(Command<T> cmd) throws Exception {
T ret = null;
boolean gotNewConnectUrl = false;
boolean reloadConf = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.METASTOREFORCERELOADCONF); // false
if (reloadConf) {
updateConnectionURL(getConf(), null);
}
int retryCount = 0;
Exception caughtException = null;
while(true) {
try {
RawStore ms = getMS(reloadConf || gotNewConnectUrl); // ObjectStore
ret = cmd.run(ms);
break;
} catch (javax.jdo.JDOFatalDataStoreException e) {
caughtException = e;
} catch (javax.jdo.JDODataStoreException e) {
caughtException = e;
}
if (retryCount >= retryLimit) {
throw caughtException;
}
assert(retryInterval >= 0);
retryCount++;
LOG.error(
String.format(
"JDO datastore error. Retrying metastore command " +
"after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
Thread.sleep(retryInterval);
// If we have a connection error, the JDO connection URL hook might
// provide us with a new URL to access the datastore.
String lastUrl = getConnectionURL(getConf());
gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
}
return ret; // 返回true
}
private void create_table_core(final RawStore ms, final Table tbl)
throws AlreadyExistsException, MetaException, InvalidObjectException {
if (!MetaStoreUtils.validateName(tbl.getTableName())
|| !MetaStoreUtils.validateColNames(tbl.getSd().getCols())
|| (tbl.getPartitionKeys() != null && !MetaStoreUtils
.validateColNames(tbl.getPartitionKeys()))) { //验证表名、非分区字段名、分区字段名的合法性
throw new InvalidObjectException(tbl.getTableName()
+ " is not a valid object name");
}
Path tblPath = null;
boolean success = false, madeDir = false;
try {
ms.openTransaction();
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
if (tbl.getSd().getLocation() == null
|| tbl.getSd().getLocation().isEmpty()) {
tblPath = wh.getDefaultTablePath(
tbl.getDbName(), tbl.getTableName()); // 默认的路径加上表名就是表的默认路径hdfs://localhost:54310/user/hive/warehouse/tablepartition
} else {
if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
LOG.warn("Location: " + tbl.getSd().getLocation()
+ " specified for non-external table:" + tbl.getTableName());
}
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
tbl.getSd().setLocation(tblPath.toString()); //设置表的路径信息hdfs://localhost:54310/user/hive/warehouse/tablepartition
}
// get_table checks whether database exists, it should be moved here
if (is_table_exists(tbl.getDbName(), tbl.getTableName())) { //如果表已经存在则判抛出异常。
throw new AlreadyExistsException("Table " + tbl.getTableName()
+ " already exists");
}
if (tblPath != null) { // 路径不为空
if (!wh.isDir(tblPath)) { // 路径是存在与否,若存在是否是目录
if (!wh.mkdirs(tblPath)) { //路径不存在,创建
throw new MetaException(tblPath
+ " is not a directory or unable to create one");
}
madeDir = true; //创建成功
}
}
// set create time
long time = System.currentTimeMillis() / 1000;
tbl.setCreateTime((int) time); //设置表的创建时间
tbl.putToParameters(Constants.DDL_TIME, Long.toString(time)); //最后操作时间
ms.createTable(tbl); //保存到数据库中
success = ms.commitTransaction(); //事务是否成功
} finally {
if (!success) { //
ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(tblPath, true);
}
}
}
}
ObjectStore:
public void createTable(Table tbl) throws InvalidObjectException,
MetaException {
boolean commited = false;
try {
openTransaction();
MTable mtbl = convertToMTable(tbl);
pm.makePersistent(mtbl); //保存到数据库中,通过DataNucleus
commited = commitTransaction();
} finally {
if (!commited) {
rollbackTransaction();
}
}
}
好的,我再看看,谢谢!
您好,我想问下,这段代码是不是说明,hive 的outputformat是不能自定义的,只能是org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,谢谢。
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[
[ROW FORMAT row_format] [STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0)
]
[LOCATION hdfs_path]
[TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0)
[AS select_statement] (Note: this feature is only available starting with 0.5.0.)
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
LIKE existing_table_or_view_name
[LOCATION hdfs_path]
create table tablePartition(s string) partitioned by(pt string);
alter table tablePartition add if not exists partition(pt='1');
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) { // line = create table tablePartition(s string) partitioned by(pt string);
ret = processCmd(command);
}
CliDriver
public int processCmd(String cmd) { // cmd = create table tablePartition(s string) partitioned by(pt string)
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); // tokens[0] = create
创建return new Driver();来处理
Driver
CommandProcessorResponse run(String command) {
// command = create table tablePartition(s string) partitioned by(pt string)
int ret = compile(command);
ret = execute();
}
(1)compile
public int compile(String command) {
return new SemanticAnalyzer(conf);
BaseSemanticAnalyzer sem.analyze(tree, ctx);
SemanticAnalyzer analyzeInternal(ASTNode ast)
SemanticAnalyzer ASTNode analyzeCreateTable(ASTNode ast, QB qb )
plan = new QueryPlan(command, sem);
}
SemanticAnalyzer:
public void analyzeInternal(ASTNode ast) throws SemanticException {
// 带有create的hivesql会执行下面的代码
// analyze create table command
if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) {
isCreateTable = true;
// if it is not CTAS, we don't need to go further and just return
if ((child = analyzeCreateTable(ast, qb)) == null) {
return;
}
}
}
private ASTNode analyzeCreateTable(ASTNode ast, QB qb)
throws SemanticException {
String tableName = getUnescapedName((ASTNode)ast.getChild(0)); //第一个孩子节点是表名
final int CREATE_TABLE = 0; // regular CREATE TABLE
final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT)
final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS)
Hive.g:
tableFileFormat
@init { msgs.push("table file format specification"); }
@after { msgs.pop(); }
:
KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE
| KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE
| KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE
| KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral (KW_INPUTDRIVER inDriver=StringLiteral KW_OUTPUTDRIVER outDriver=StringLiteral)?
-> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt $inDriver? $outDriver?)
| KW_STORED KW_BY storageHandler=StringLiteral
(KW_WITH KW_SERDEPROPERTIES serdeprops=tableProperties)?
-> ^(TOK_STORAGEHANDLER $storageHandler $serdeprops?)
| KW_STORED KW_AS genericSpec=Identifier
-> ^(TOK_FILEFORMAT_GENERIC $genericSpec)
;
Hive.g里面的词法分析,把STORED AS SEQUENCEFILE直接转化成了一个TOK_TBLSEQUENCEFILE节点。
(1)处理file format信息,[STORED AS file_format]
(2)有IF NOT EXISTS,那么ifNotExists = true;
(3)有EXTERNAL,外部表isExt = true;
(4)LIKE ,获得like的表名,command_type = CTLT; likeTableName存在需要处理一些不符合语法的情况。
(5)如果有查询语句,说明是[AS select_statement],command_type = CTAS;selectStmt = child;
(6)有columns,处理所有的非分区字段信息[(col_name data_type [COMMENT col_comment], ...)],cols = getColumns(child);
(7)表的注释[COMMENT table_comment],comment = unescapeSQLString(child.getChild(0).getText());
(8)有分区字段信息,处理所有的分区字段信息,[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)],partCols = getColumns((ASTNode) child.getChild(0), false);
(9)处理BUCKETS信息,[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
(10)处理ROW FORMAT信息,有4个fieldDelim、collItemDelim、mapKeyDelim、lineDelim,lineDelim如果不是"\n"或者"10"那么会抛出异常,[ROW FORMAT row_format]
(11)处理location信息,[LOCATION hdfs_path],location = unescapeSQLString(child.getChild(0).getText());
(12)处理表属性信息,[TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0),用一个map来存,Map<String, String> tblProps
(13)处理serde信息,STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0)
(14)如果不是SEQUENCEFILE、RCFILE、TEXTFILE三者之中的一个,那么抛出异常。
if (ifNotExists) { //如果有IF NOT EXISTS
根据tableName在数据库中获取这个表,如果能够获取到,表已存在则返回null.
}
CreateTableDesc crtTblDesc = null;
switch (command_type) { //根据3种不同类型的建表语句执行不同的操作
case CREATE_TABLE: // REGULAR CREATE TABLE DDL
crtTblDesc=new CreateTableDesc(***);
new DDLWork,new DDLTask加入rootTasks
case CTLT: // create table like <tbl_name>
CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc
new DDLWork,new DDLTask加入rootTasks
case CTAS: // create table as select
先验证表tableName不存在,如果存在则抛出异常。
crtTblDesc = new CreateTableDesc();
qb.setTableDesc(crtTblDesc);
return selectStmt; // CTAS返回查询子节点。
}
return null; //普通的CREATE_TABLE或者CTLT,返回null
}
create table tablePartition(s string) partitioned by(pt string);
hive> explain create table tablePartition(s string) partitioned by(pt string);
OK
ABSTRACT SYNTAX TREE:
(TOK_CREATETABLE tablePartition TOK_LIKETABLE (TOK_TABCOLLIST (TOK_TABCOL s TOK_STRING)) (TOK_TABLEPARTCOLS (TOK_TABCOLLIST (TOK_TABCOL pt TOK_STRING))))
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Create Table Operator:
Create Table
columns: s string
if not exists: false
input format: org.apache.hadoop.mapred.TextInputFormat
# buckets: -1
output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
partition columns: pt string
name: tablePartition
isExternal: false
private ASTNode analyzeCreateTable(ASTNode ast, QB qb)
throws SemanticException {
String tableName = unescapeIdentifier(ast.getChild(0).getText()); // 表名tablePartition
String likeTableName = null;
List<FieldSchema> cols = new ArrayList<FieldSchema>();
List<FieldSchema> partCols = new ArrayList<FieldSchema>();
List<String> bucketCols = new ArrayList<String>();
List<Order> sortCols = new ArrayList<Order>();
int numBuckets = -1;
String fieldDelim = null;
String fieldEscape = null;
String collItemDelim = null;
String mapKeyDelim = null;
String lineDelim = null;
String comment = null;
String inputFormat = null;
String outputFormat = null;
String location = null;
String serde = null;
String storageHandler = null;
Map<String, String> serdeProps = new HashMap<String, String>();
Map<String, String> tblProps = null;
boolean ifNotExists = false;
boolean isExt = false;
ASTNode selectStmt = null;
final int CREATE_TABLE = 0; // regular CREATE TABLE
final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT)
final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS)
int command_type = CREATE_TABLE; // 0
LOG.info("Creating table " + tableName + " position="
+ ast.getCharPositionInLine()); // 11/08/28 05:20:26 INFO parse.SemanticAnalyzer: Creating table tablePartition position=13
int numCh = ast.getChildCount(); // numCh = 4
/*
* Check the 1st-level children and do simple semantic checks: 1) CTLT and
* CTAS should not coexists. 2) CTLT or CTAS should not coexists with column
* list (target table schema). 3) CTAS does not support partitioning (for
* now).
*/
for (int num = 1; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num); // num=1,child =TOK_LIKETABLE; num=2,child=TOK_TABCOLLIST; num=3,child=TOK_TABLEPARTCOLS
switch (child.getToken().getType()) {
case HiveParser.TOK_IFNOTEXISTS:
ifNotExists = true;
break;
case HiveParser.KW_EXTERNAL:
isExt = true;
break;
case HiveParser.TOK_LIKETABLE:
if (child.getChildCount() > 0) { //
likeTableName = unescapeIdentifier(child.getChild(0).getText());
if (likeTableName != null) {
if (command_type == CTAS) {
throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE
.getMsg());
}
if (cols.size() != 0) {
throw new SemanticException(ErrorMsg.CTLT_COLLST_COEXISTENCE
.getMsg());
}
}
command_type = CTLT;
}
break;
case HiveParser.TOK_QUERY: // CTAS
if (command_type == CTLT) {
throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE.getMsg());
}
if (cols.size() != 0) {
throw new SemanticException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg());
}
if (partCols.size() != 0 || bucketCols.size() != 0) {
boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
if (dynPart == false) {
throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
} else {
// TODO: support dynamic partition for CTAS
throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
}
}
if (isExt) {
throw new SemanticException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg());
}
command_type = CTAS;
selectStmt = child;
break;
case HiveParser.TOK_TABCOLLIST:
cols = getColumns(child); //见下面 [FieldSchema(name:s, type:string, comment:null)]
break;
case HiveParser.TOK_TABLECOMMENT:
comment = unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPARTCOLS:
partCols = getColumns((ASTNode) child.getChild(0), false); //[FieldSchema(name:pt, type:string, comment:null)]
break;
case HiveParser.TOK_TABLEBUCKETS:
bucketCols = getColumnNames((ASTNode) child.getChild(0));
if (child.getChildCount() == 2) {
numBuckets = (Integer.valueOf(child.getChild(1).getText()))
.intValue();
} else {
sortCols = getColumnNamesOrder((ASTNode) child.getChild(1));
numBuckets = (Integer.valueOf(child.getChild(2).getText()))
.intValue();
}
break;
case HiveParser.TOK_TABLEROWFORMAT:
child = (ASTNode) child.getChild(0);
int numChildRowFormat = child.getChildCount();
for (int numC = 0; numC < numChildRowFormat; numC++) {
ASTNode rowChild = (ASTNode) child.getChild(numC);
switch (rowChild.getToken().getType()) {
case HiveParser.TOK_TABLEROWFORMATFIELD:
fieldDelim = unescapeSQLString(rowChild.getChild(0).getText());
if (rowChild.getChildCount() >= 2) {
fieldEscape = unescapeSQLString(rowChild.getChild(1).getText());
}
break;
case HiveParser.TOK_TABLEROWFORMATCOLLITEMS:
collItemDelim = unescapeSQLString(rowChild.getChild(0).getText());
break;
case HiveParser.TOK_TABLEROWFORMATMAPKEYS:
mapKeyDelim = unescapeSQLString(rowChild.getChild(0).getText());
break;
case HiveParser.TOK_TABLEROWFORMATLINES:
lineDelim = unescapeSQLString(rowChild.getChild(0).getText());
if (!lineDelim.equals("\n") && !lineDelim.equals("10")) {
throw new SemanticException(
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg());
}
break;
default:
assert false;
}
}
break;
case HiveParser.TOK_TABLESERIALIZER:
child = (ASTNode) child.getChild(0);
serde = unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
readProps(
(ASTNode) (child.getChild(1).getChild(0)),
serdeProps);
}
break;
case HiveParser.TOK_TBLSEQUENCEFILE:
inputFormat = SEQUENCEFILE_INPUT;
outputFormat = SEQUENCEFILE_OUTPUT;
break;
case HiveParser.TOK_TBLTEXTFILE:
inputFormat = TEXTFILE_INPUT;
outputFormat = TEXTFILE_OUTPUT;
break;
case HiveParser.TOK_TBLRCFILE:
inputFormat = RCFILE_INPUT;
outputFormat = RCFILE_OUTPUT;
serde = COLUMNAR_SERDE;
break;
case HiveParser.TOK_TABLEFILEFORMAT:
inputFormat = unescapeSQLString(child.getChild(0).getText());
outputFormat = unescapeSQLString(child.getChild(1).getText());
break;
case HiveParser.TOK_TABLELOCATION:
location = unescapeSQLString(child.getChild(0).getText());
break;
case HiveParser.TOK_TABLEPROPERTIES:
tblProps = DDLSemanticAnalyzer.getProps((ASTNode) child.getChild(0));
break;
case HiveParser.TOK_STORAGEHANDLER:
storageHandler = unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
readProps(
(ASTNode) (child.getChild(1).getChild(0)),
serdeProps);
}
break;
default:
assert false;
}
}
if ((command_type == CTAS) && (storageHandler != null)) { // false
throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg());
}
if ((inputFormat == null) && (storageHandler == null)) { //true
assert outputFormat == null;
if ("SequenceFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
inputFormat = SEQUENCEFILE_INPUT;
outputFormat = SEQUENCEFILE_OUTPUT;
} else if ("RCFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
inputFormat = RCFILE_INPUT;
outputFormat = RCFILE_OUTPUT;
serde = COLUMNAR_SERDE;
} else {
inputFormat = TEXTFILE_INPUT; // org.apache.hadoop.mapred.TextInputFormat
outputFormat = TEXTFILE_OUTPUT;//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
}
}
默认的文件格式可以通过配置文件设置:
// HiveConf里面 HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
// hive-default.xml里面:
<property>
<name>hive.default.fileformat</name>
<value>TextFile</value>
<description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override</description>
</property>
// check for existence of table
if (ifNotExists) { // false,create的时候没有加上IF NOT EXISTS
try {
if (null != db.getTable(db.getCurrentDatabase(), tableName, false)) {
return null;
}
} catch (HiveException e) {
e.printStackTrace();
}
}
// Handle different types of CREATE TABLE command
CreateTableDesc crtTblDesc = null;
switch (command_type) { // 0
case CREATE_TABLE: // REGULAR CREATE TABLE DDL
crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
outputFormat, location, serde, storageHandler, serdeProps,
tblProps, ifNotExists);
validateCreateTable(crtTblDesc); //
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
crtTblDesc), conf)); //创建DDLWork,createTblDesc不为空。 创建DDLTask,加入rootTasks。
break;
case CTLT: // create table like <tbl_name>
CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc(tableName,
isExt, location, ifNotExists, likeTableName);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
crtTblLikeDesc), conf));
break;
case CTAS: // create table as select
// check for existence of table. Throw an exception if it exists.
try {
Table tab = db.getTable(db.getCurrentDatabase(), tableName, false);
// do not throw exception if table does not exist
if (tab != null) {
throw new SemanticException(ErrorMsg.TABLE_ALREADY_EXISTS
.getMsg(tableName));
}
} catch (HiveException e) { // may be unable to get meta data
throw new SemanticException(e);
}
crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols,
bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
outputFormat, location, serde, storageHandler, serdeProps,
tblProps, ifNotExists);
qb.setTableDesc(crtTblDesc);
return selectStmt;
default:
assert false; // should never be unknown command type
}
return null; //返回null
}
BaseSemanticAnalyzer:
protected List<FieldSchema> getColumns(ASTNode ast) throws SemanticException { //获得columns的信息
//ast = TOK_TABCOLLIST
return getColumns(ast, true);
}
protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException {
List<FieldSchema> colList = new ArrayList<FieldSchema>(); // []
int numCh = ast.getChildCount(); // 1
for (int i = 0; i < numCh; i++) {
FieldSchema col = new FieldSchema();
ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL
String name = child.getChild(0).getText(); // s
if(lowerCase) { // true
name = name.toLowerCase(); // s
}
// child 0 is the name of the column
col.setName(unescapeIdentifier(name)); //FieldSchema(name:s, type:null, comment:null)
// child 1 is the type of the column
ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING
col.setType(getTypeStringFromAST(typeChild)); //FieldSchema(name:s, type:string, comment:null)
// child 2 is the optional comment of the column
if (child.getChildCount() == 3) {
col.setComment(unescapeSQLString(child.getChild(2).getText()));
}
colList.add(col); //[FieldSchema(name:s, type:string, comment:null)]
}
return colList; //[FieldSchema(name:s, type:string, comment:null)]
}
BaseSemanticAnalyzer:
protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException {
List<FieldSchema> colList = new ArrayList<FieldSchema>();
int numCh = ast.getChildCount(); // 1
for (int i = 0; i < numCh; i++) {
FieldSchema col = new FieldSchema();
ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL
String name = child.getChild(0).getText(); // pt
if(lowerCase) { //false
name = name.toLowerCase();
}
// child 0 is the name of the column
col.setName(unescapeIdentifier(name)); // FieldSchema(name:pt, type:null, comment:null)
// child 1 is the type of the column
ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING
col.setType(getTypeStringFromAST(typeChild)); // FieldSchema(name:pt, type:string, comment:null)
// child 2 is the optional comment of the column
if (child.getChildCount() == 3) {
col.setComment(unescapeSQLString(child.getChild(2).getText()));
}
colList.add(col); // [FieldSchema(name:pt, type:string, comment:null)]
}
return colList; //[FieldSchema(name:pt, type:string, comment:null)]
}
CreateTableDesc
public CreateTableDesc(String tableName, boolean isExternal,
List<FieldSchema> cols, List<FieldSchema> partCols,
List<String> bucketCols, List<Order> sortCols, int numBuckets,
String fieldDelim, String fieldEscape, String collItemDelim,
String mapKeyDelim, String lineDelim, String comment, String inputFormat,
String outputFormat, String location, String serName,
String storageHandler,
Map<String, String> serdeProps,
Map<String, String> tblProps,
boolean ifNotExists) {
this.tableName = tableName; // tablePartition
this.isExternal = isExternal; // false
this.bucketCols = new ArrayList<String>(bucketCols); // []
this.sortCols = new ArrayList<Order>(sortCols); // []
this.collItemDelim = collItemDelim; // null
this.cols = new ArrayList<FieldSchema>(cols); // [FieldSchema(name:s, type:string, comment:null)]
this.comment = comment; // null
this.fieldDelim = fieldDelim; // null
this.fieldEscape = fieldEscape; // null
this.inputFormat = inputFormat; // org.apache.hadoop.mapred.TextInputFormat
this.outputFormat = outputFormat; // org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
this.lineDelim = lineDelim; // null
this.location = location; // null
this.mapKeyDelim = mapKeyDelim; // null
this.numBuckets = numBuckets; // -1
this.partCols = new ArrayList<FieldSchema>(partCols); // [FieldSchema(name:pt, type:string, comment:null)]
this.serName = serName; // null
this.storageHandler = storageHandler; // null
this.serdeProps = serdeProps; // {}
this.tblProps = tblProps; // null
this.ifNotExists = ifNotExists; // false
}
CreateTableDesc包含的信息:
(1)表名,tablePartition
(2)是否为外部表,默认是false
(3)非分区字段信息,cols
(4)inputFormat
(5)outputFormat
(6)分区字段信息,partCols
(7)ifNotExists,默认为false
private void validateCreateTable(CreateTableDesc crtTblDesc)
throws SemanticException {
if ((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) { // false
// for now make sure that serde exists
if (StringUtils.isEmpty(crtTblDesc.getSerName())
|| !SerDeUtils.shouldGetColsFromSerDe(crtTblDesc.getSerName())) {
throw new SemanticException(ErrorMsg.INVALID_TBL_DDL_SERDE.getMsg());
}
return;
}
if (crtTblDesc.getStorageHandler() == null) {
try {
Class<?> origin = Class.forName(crtTblDesc.getOutputFormat(), true,
JavaUtils.getClassLoader()); //class org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
.getOutputFormatSubstitute(origin); // org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
if (replaced == null) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE
.getMsg());
}
} catch (ClassNotFoundException e) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg());
}
}
List<String> colNames = validateColumnNameUniqueness(crtTblDesc.getCols());//验证没有两个column的名字是相同的。 [s]
if (crtTblDesc.getBucketCols() != null) { //
// all columns in cluster and sort are valid columns
Iterator<String> bucketCols = crtTblDesc.getBucketCols().iterator();
while (bucketCols.hasNext()) {
String bucketCol = bucketCols.next();
boolean found = false;
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = colNamesIter.next();
if (bucketCol.equalsIgnoreCase(colName)) {
found = true;
break;
}
}
if (!found) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
}
}
}
if (crtTblDesc.getSortCols() != null) {
// all columns in cluster and sort are valid columns
Iterator<Order> sortCols = crtTblDesc.getSortCols().iterator();
while (sortCols.hasNext()) {
String sortCol = sortCols.next().getCol();
boolean found = false;
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = colNamesIter.next();
if (sortCol.equalsIgnoreCase(colName)) {
found = true;
break;
}
}
if (!found) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
}
}
}
if (crtTblDesc.getPartCols() != null) { //分区的名字和columns的名字没有相同的。
// there is no overlap between columns and partitioning columns
Iterator<FieldSchema> partColsIter = crtTblDesc.getPartCols().iterator();
while (partColsIter.hasNext()) {
String partCol = partColsIter.next().getName();
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = unescapeIdentifier(colNamesIter.next());
if (partCol.equalsIgnoreCase(colName)) {
throw new SemanticException(
ErrorMsg.COLUMN_REPEATED_IN_PARTITIONING_COLS.getMsg());
}
}
}
}
}
DDLWork 他的createTblDesc 不为空
生成的task是org.apache.hadoop.hive.ql.exec.DDLTask
(2)execute
Driver:
public int execute() {
// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
driverCxt.addToRunnable(tsk);
}
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); // Driver.launchTask
}
}
}
Driver.launchTask
TaskRunner.runSequential
Task. executeTask
DDLTask. Execute
CreateTableDesc crtTbl = work.getCreateTblDesc();
if (crtTbl != null) {
return createTable(db, crtTbl);
}
DDLTask. createTable
Hive.createTable
HiveMetaStoreClient.createTable
HiveMetaStore.createTable
HiveMetaStore.create_table_core
ObjectStore
DDLTask:
/**
* Create a new table.
*
* @param db
* The database in question.
* @param crtTbl
* This is the table we're creating.
* @return Returns 0 when execution succeeds and above 0 if it fails.
* @throws HiveException
* Throws this exception if an unexpected error occurs.
*/
private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException {
// create the table
Table tbl = new Table(db.getCurrentDatabase(), crtTbl.getTableName());
//Hive.java db是Hive的一个实例
public String getCurrentDatabase() {
if (null == currentDatabase) {
currentDatabase = DEFAULT_DATABASE_NAME; DEFAULT_DATABASE_NAME默认是"default"
}
return currentDatabase;
}
if (crtTbl.getPartCols() != null) {
tbl.setPartCols(crtTbl.getPartCols()); //[FieldSchema(name:pt, type:string, comment:null)]
}
if (crtTbl.getNumBuckets() != -1) {
tbl.setNumBuckets(crtTbl.getNumBuckets());
}
if (crtTbl.getStorageHandler() != null) {
tbl.setProperty(
org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE,
crtTbl.getStorageHandler());
}
HiveStorageHandler storageHandler = tbl.getStorageHandler(); // null
if (crtTbl.getSerName() != null) { // null
tbl.setSerializationLib(crtTbl.getSerName());
} else {
if (crtTbl.getFieldDelim() != null) { // null false
tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl
.getFieldDelim());
}
if (crtTbl.getFieldEscape() != null) { // null false
tbl.setSerdeParam(Constants.ESCAPE_CHAR, crtTbl.getFieldEscape());
}
if (crtTbl.getCollItemDelim() != null) {
tbl
.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl
.getCollItemDelim());
}
if (crtTbl.getMapKeyDelim() != null) {
tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
}
if (crtTbl.getLineDelim() != null) {
tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
}
}
if (crtTbl.getSerdeProps() != null) { //serde属性
Iterator<Entry<String, String>> iter = crtTbl.getSerdeProps().entrySet()
.iterator();
while (iter.hasNext()) {
Entry<String, String> m = iter.next();
tbl.setSerdeParam(m.getKey(), m.getValue());
}
}
if (crtTbl.getTblProps() != null) { //表属性
tbl.getTTable().getParameters().putAll(crtTbl.getTblProps());
}
/*
* We use LazySimpleSerDe by default.
*
* If the user didn't specify a SerDe, and any of the columns are not simple
* types, we will have to use DynamicSerDe instead.
*/
if (crtTbl.getSerName() == null) { //serdename为空
if (storageHandler == null) {
LOG.info("Default to LazySimpleSerDe for table " + crtTbl.getTableName()); //11/08/28 06:01:29 INFO exec.DDLTask: Default to LazySimpleSerDe for table tablePartition
tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
// 默认使用serde是org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
} else {
String serDeClassName = storageHandler.getSerDeClass().getName();
LOG.info("Use StorageHandler-supplied " + serDeClassName
+ " for table " + crtTbl.getTableName());
tbl.setSerializationLib(serDeClassName);
}
} else {
// let's validate that the serde exists
validateSerDe(crtTbl.getSerName());
}
if (crtTbl.getCols() != null) { //[FieldSchema(name:s, type:string, comment:null)]
tbl.setFields(crtTbl.getCols());
}
if (crtTbl.getBucketCols() != null) {
tbl.setBucketCols(crtTbl.getBucketCols());
}
if (crtTbl.getSortCols() != null) {
tbl.setSortCols(crtTbl.getSortCols());
}
if (crtTbl.getComment() != null) {
tbl.setProperty("comment", crtTbl.getComment()); //表的注释
}
if (crtTbl.getLocation() != null) {
tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri());
}
tbl.setInputFormatClass(crtTbl.getInputFormat());
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
tbl.getTTable().getSd().setInputFormat(
tbl.getInputFormatClass().getName()); // org.apache.hadoop.mapred.TextInputFormat
tbl.getTTable().getSd().setOutputFormat(
tbl.getOutputFormatClass().getName());//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
if (crtTbl.isExternal()) { //如果是外部表
tbl.setProperty("EXTERNAL", "TRUE"); //添加一个表的属性是EXTERNAL=TRUE
tbl.setTableType(TableType.EXTERNAL_TABLE);//设置表的类型是EXTERNAL_TABLE
}
// If the sorted columns is a superset of bucketed columns, store this fact.
// It can be later used to
// optimize some group-by queries. Note that, the order does not matter as
// long as it in the first
// 'n' columns where 'n' is the length of the bucketed columns.
if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) {
List<String> bucketCols = tbl.getBucketCols();
List<Order> sortCols = tbl.getSortCols();
if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) {
boolean found = true;
Iterator<String> iterBucketCols = bucketCols.iterator();
while (iterBucketCols.hasNext()) {
String bucketCol = iterBucketCols.next();
boolean colFound = false;
for (int i = 0; i < bucketCols.size(); i++) {
if (bucketCol.equals(sortCols.get(i).getCol())) {
colFound = true;
break;
}
}
if (colFound == false) {
found = false;
break;
}
}
if (found) {
tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
}
}
}
int rc = setGenericTableAttributes(tbl); //设置通用属性
if (rc != 0) {
return rc;
}
// create the table
db.createTable(tbl, crtTbl.getIfNotExists()); //Hive.createTable(Table tbl, boolean ifNotExists)
// add the table privilege to creator
if (SessionState.get() != null) {
String userName = SessionState.get().getUserName();
Authorizer authorizer = SessionState.get().getAuthorizer();
try {
if (userName != null) {
AuthorizeEntry entry =
new AuthorizeEntry(null, tbl, null,
PrivilegeLevel.TABLE_LEVEL.getPrivileges(),
PrivilegeLevel.TABLE_LEVEL);
authorizer.grant(userName, userName, entry);
}
} catch (AuthorizeException e) {
throw new HiveException(e);
}
}
work.getOutputs().add(new WriteEntity(tbl));
return 0; //成功,返回0
}
默认的一些信息:
Default to LazySimpleSerDe for table tablePartition
sd . serdeInfo. serializationLib 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Table默认的inputFormatClass是
class org.apache.hadoop.mapred.TextInputFormat
Table默认的outputFormatClass是
class org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Table
public Table(String databaseName, String tableName) { // default tablePartition
this(getEmptyTable(databaseName, tableName));
}
static org.apache.hadoop.hive.metastore.api.Table
getEmptyTable(String databaseName, String tableName) {
StorageDescriptor sd = new StorageDescriptor();
{
sd.setSerdeInfo(new SerDeInfo());
sd.setNumBuckets(-1);
sd.setBucketCols(new ArrayList<String>());
sd.setCols(new ArrayList<FieldSchema>());
sd.setParameters(new HashMap<String, String>());
sd.setSortCols(new ArrayList<Order>());
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
// We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does
// not support a table with no columns.
sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
sd.getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1"); //这个是默认加入的
sd.setInputFormat(SequenceFileInputFormat.class.getName()); // 默认是sequencefile
sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());
}
org.apache.hadoop.hive.metastore.api.Table t = new org.apache.hadoop.hive.metastore.api.Table();
{
t.setSd(sd);
t.setPartitionKeys(new ArrayList<FieldSchema>());
t.setParameters(new HashMap<String, String>());
t.setTableType(TableType.MANAGED_TABLE.toString()); //默认是MANAGED_TABLE类型的表
t.setDbName(databaseName); // default
t.setTableName(tableName); // tablePartition
t.setDbName(databaseName); //多余的
}
return t;
}
Constants.SERIALIZATION_FORMAT
public static final String SERIALIZATION_FORMAT = "serialization.format" 默认为1
public enum TableType {
MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW
}
Hive:
public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
try {
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) {
tbl.setDbName(getCurrentDatabase());
}
if (tbl.getCols().size() == 0) {
tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(),
tbl.getDeserializer()));
}
tbl.checkValidity(); //检查表的合法性,如至少需要有一个column、表名不为空,且名字是字母数字或者下划线的组合、没有两个column的名字相同、分区字段和非分区字段没有相同的字段名
getMSC().createTable(tbl.getTTable()); // HiveMetaStoreClient
} catch (AlreadyExistsException e) {
if (!ifNotExists) {
throw new HiveException(e);
}
} catch (Exception e) {
throw new HiveException(e);
}
}
HiveMetaStoreClient
public void createTable(Table tbl) throws AlreadyExistsException,
InvalidObjectException, MetaException, NoSuchObjectException, TException {
HiveMetaHook hook = getHook(tbl);
if (hook != null) {
hook.preCreateTable(tbl);
}
boolean success = false;
try {
client.create_table(tbl); // org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler
if (hook != null) {
hook.commitCreateTable(tbl);
}
success = true; //成功
} finally {
if (!success && (hook != null)) {
hook.rollbackCreateTable(tbl);
}
}
}
HiveMetaStore.HMSHandler:
public void create_table(final Table tbl) throws AlreadyExistsException,
MetaException, InvalidObjectException {
incrementCounter("create_table");
logStartFunction("create_table: db=" + tbl.getDbName() + " tbl="
+ tbl.getTableName()); // 11/08/28 06:12:02 INFO metastore.HiveMetaStore: 0: create_table: db=default tbl=tablePartition
try {
executeWithRetry(new Command<Boolean>() {
@Override
Boolean run(RawStore ms) throws Exception {
create_table_core(ms, tbl); //
return Boolean.TRUE; //返回true
}
});
} catch (AlreadyExistsException e) {
throw e;
} catch (MetaException e) {
throw e;
} catch (InvalidObjectException e) {
throw e;
} catch (Exception e) {
assert(e instanceof RuntimeException);
throw (RuntimeException)e;
}
}
HiveMetaStore.HMSHandler:
private <T> T executeWithRetry(Command<T> cmd) throws Exception {
T ret = null;
boolean gotNewConnectUrl = false;
boolean reloadConf = HiveConf.getBoolVar(hiveConf,
HiveConf.ConfVars.METASTOREFORCERELOADCONF); // false
if (reloadConf) {
updateConnectionURL(getConf(), null);
}
int retryCount = 0;
Exception caughtException = null;
while(true) {
try {
RawStore ms = getMS(reloadConf || gotNewConnectUrl); // ObjectStore
ret = cmd.run(ms);
break;
} catch (javax.jdo.JDOFatalDataStoreException e) {
caughtException = e;
} catch (javax.jdo.JDODataStoreException e) {
caughtException = e;
}
if (retryCount >= retryLimit) {
throw caughtException;
}
assert(retryInterval >= 0);
retryCount++;
LOG.error(
String.format(
"JDO datastore error. Retrying metastore command " +
"after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));
Thread.sleep(retryInterval);
// If we have a connection error, the JDO connection URL hook might
// provide us with a new URL to access the datastore.
String lastUrl = getConnectionURL(getConf());
gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);
}
return ret; // 返回true
}
private void create_table_core(final RawStore ms, final Table tbl)
throws AlreadyExistsException, MetaException, InvalidObjectException {
if (!MetaStoreUtils.validateName(tbl.getTableName())
|| !MetaStoreUtils.validateColNames(tbl.getSd().getCols())
|| (tbl.getPartitionKeys() != null && !MetaStoreUtils
.validateColNames(tbl.getPartitionKeys()))) { //验证表名、非分区字段名、分区字段名的合法性
throw new InvalidObjectException(tbl.getTableName()
+ " is not a valid object name");
}
Path tblPath = null;
boolean success = false, madeDir = false;
try {
ms.openTransaction();
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
if (tbl.getSd().getLocation() == null
|| tbl.getSd().getLocation().isEmpty()) {
tblPath = wh.getDefaultTablePath(
tbl.getDbName(), tbl.getTableName()); // 默认的路径加上表名就是表的默认路径hdfs://localhost:54310/user/hive/warehouse/tablepartition
} else {
if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
LOG.warn("Location: " + tbl.getSd().getLocation()
+ " specified for non-external table:" + tbl.getTableName());
}
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
tbl.getSd().setLocation(tblPath.toString()); //设置表的路径信息hdfs://localhost:54310/user/hive/warehouse/tablepartition
}
// get_table checks whether database exists, it should be moved here
if (is_table_exists(tbl.getDbName(), tbl.getTableName())) { //如果表已经存在则判抛出异常。
throw new AlreadyExistsException("Table " + tbl.getTableName()
+ " already exists");
}
if (tblPath != null) { // 路径不为空
if (!wh.isDir(tblPath)) { // 路径是存在与否,若存在是否是目录
if (!wh.mkdirs(tblPath)) { //路径不存在,创建
throw new MetaException(tblPath
+ " is not a directory or unable to create one");
}
madeDir = true; //创建成功
}
}
// set create time
long time = System.currentTimeMillis() / 1000;
tbl.setCreateTime((int) time); //设置表的创建时间
tbl.putToParameters(Constants.DDL_TIME, Long.toString(time)); //最后操作时间
ms.createTable(tbl); //保存到数据库中
success = ms.commitTransaction(); //事务是否成功
} finally {
if (!success) { //
ms.rollbackTransaction();
if (madeDir) {
wh.deleteDir(tblPath, true);
}
}
}
}
ObjectStore:
public void createTable(Table tbl) throws InvalidObjectException,
MetaException {
boolean commited = false;
try {
openTransaction();
MTable mtbl = convertToMTable(tbl);
pm.makePersistent(mtbl); //保存到数据库中,通过DataNucleus
commited = commitTransaction();
} finally {
if (!commited) {
rollbackTransaction();
}
}
}
评论
3 楼
yyx1987
2012-07-27
bupt04406 写道
这个应该是默认值,在其他地方被替换,具体代码我也不记得了
好的,我再看看,谢谢!
2 楼
bupt04406
2012-07-26
这个应该是默认值,在其他地方被替换,具体代码我也不记得了
1 楼
yyx1987
2012-07-26
if (crtTblDesc.getStorageHandler() == null) { try { Class<?> origin = Class.forName(crtTblDesc.getOutputFormat(), true, JavaUtils.getClassLoader()); //class org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils .getOutputFormatSubstitute(origin); // org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat if (replaced == null) { throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE .getMsg()); } } catch (ClassNotFoundException e) { throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg()); } }
您好,我想问下,这段代码是不是说明,hive 的outputformat是不能自定义的,只能是org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,谢谢。
发表评论
-
hive rename table name
2013-09-18 14:28 2594hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2470有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11208like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8734insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8277原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1206select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1546官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1814Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5445drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1361数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6238CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3032select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
针对 hdp-22.4.3.0-227版中的hive1.2.1000的cli show create table乱码的替换jar包。
解决Hive show create table重编译的jar包 Hive2.1.1版本
hive_ods_create_table.sql
Hive基本操作包括create table、drop table、alter table等。这些操作可以帮助用户快速创建和管理Hive中的数据表。 2.1 create table create table是Hive中最基本的操作,用于创建新的数据表。create table语句...
以上就是Hive客户端的安装和基本使用过程。通过这个客户端,你可以方便地对存储在Hadoop上的大数据进行结构化查询和分析。在实际生产环境中,你可能还需要配置其他高级特性,如Hive与Hadoop安全性的集成、优化性能...
Hive提供了多种基本操作,包括Create Table、Insert、Select、Update和Delete等。Create Table操作用于创建表结构,Insert操作用于插入数据,Select操作用于查询数据,Update操作用于更新数据,Delete操作用于删除...
hive> create table new_table like records; ``` 创建一个名为 logs 的分区表,具有两个分区列 dt 和 country: ``` hive> create table logs(ts bigint,line string) partitioned by (dt String,country String); ...
CREATE TABLE 语句是用于在 Hive 中创建表的语句。语法和示例如下: 语法 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name[(col_name data_type [COMMENT col_comment], ...)]...
`CREATE TABLE`用于创建Hive中的表。你可以指定表的列名、数据类型以及存储格式。例如,`CREATE TABLE my_table (col1 string, col2 int)`将创建一个名为my_table的表,包含两列col1和col2。 5. **修改表**: ...
CREATE TABLE hive_hbase_table(key int, name String, age String) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:name,cf1:age") ...
CREATE TABLE target_table AS SELECT * FROM source_table WHERE condition; ``` 这里 `condition` 表示筛选条件,可以根据实际需求进行调整。 #### 4. 在创建表时直接插入数据 在创建表的同时,也可以通过查询...
String sql = "CREATE TABLE " + tableName + " (key INT, value STRING)"; stmt.execute(sql); sql = "INSERT INTO TABLE " + tableName + " VALUES (1, 'one')"; stmt.execute(sql); sql = "SELECT * FROM ...
建表语句是Hive操作的核心部分,它的语法类似于SQL的CREATE TABLE语句。以下是一些基本的建表元素: 1. **表名**:创建表时需要指定一个唯一的表名,例如`CREATE TABLE my_table...`。 2. **列定义**:定义表中的...
在Hive中,`CREATE TABLE` 语句用于创建新的表。与传统的关系型数据库类似,但在Hive中有其特殊之处。 ##### 示例 ```sql hive> CREATE TABLE pokes (foo INT, bar STRING); ``` 该命令创建了一个名为 `pokes` 的新...
- `CREATE TABLE new_table_name CLONE TABLE old_table_name;` - **备份表**: - 使用`ALTER TABLE table_name EXPORT TO 'backup_path';`来备份表。 - **还原表**: - 使用`ALTER TABLE table_name IMPORT FROM '...
- 建表:CREATE TABLE 表名 (列名 数据类型) [其他选项],例如 `CREATE TABLE t1(id int)`。 - 插入数据:LOAD DATA LOCAL INPATH '文件路径' INTO TABLE 表名,例如加载文本文件到表中。 - 查询数据:SELECT * ...
Hive的执行原理与优化部分主要讲述了Hive的技术架构,包括Hive的核心组件、Hive的底层存储、Hive程序的执行过程以及元数据存储。同时涉及了MapReduce的执行过程、Shuffle原理和性能优化策略。 Hive的执行过程主要...
- `hive> create table if not exists hive.usr(id bigint,name string,age int) location '/usr/local/hive/warehouse/hive/usr';`:创建表`usr`并指定存储路径。 - `hive> create table hive.usr(id bigint,name...
例如:`CREATE TABLE my_table (col1 INT, col2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';` - **修改表** 可以增加字段、更改字段属性或重命名表名。例如,新增字段:`ALTER TABLE my_table ADD ...