- 浏览: 347373 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
CREATE TABLE records (year STRING, temperature INT, quality INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
hive> explain
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_00-10-28_984_2803781885868135028/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
Time taken: 0.333 seconds
(TOK_LOAD
'/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
(TOK_TAB records)
LOCAL
OVERWRITE
)
hive> explain extended
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns year,temperature,quality
columns.types string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/records
name records
serialization.ddl struct records { string year, i32 temperature, i32 quality}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1313975965
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10001
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) {
ret = processCmd(command);
}
CliDriver:
public int processCmd(String cmd) {
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
Driver:
public CommandProcessorResponse run(String command) {
// command = LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records
int ret = compile(command);
ret = execute();
}
Driver:
public int compile(String command) {
ctx = new Context(conf); //
SemanticAnalyzerFactory sfactory = new SemanticAnalyzerFactory(conf);
BaseSemanticAnalyzer sem = sfactory.get(tree); // return new LoadSemanticAnalyzer(conf);
sem.analyze(tree, ctx);
}
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
/**
* Create a Context with a given executionId. ExecutionId, together with
* user name and conf, will determine the temporary directory locations.
*/
public Context(Configuration conf, String executionId) throws IOException {
this.conf = conf;
this.executionId = executionId; //hive_2011-08-21_00-02-22_445_7799135143086468923
// non-local tmp location is configurable. however it is the same across
// all external file systems
nonLocalScratchPath =
new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
executionId); // /tmp/hive-tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
// local tmp location is not configurable for now
localScratchDir = System.getProperty("java.io.tmpdir")
+ Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+ executionId; // /tmp/tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
}
public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
public void analyzeInternal(ASTNode ast) throws SemanticException {
isLocal = false;
isOverWrite = false;
Tree fromTree = ast.getChild(0); // '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
Tree tableTree = ast.getChild(1); // TOK_TAB
if (ast.getChildCount() == 4) { // true
isLocal = true; //
isOverWrite = true; //
}
if (ast.getChildCount() == 3) {
if (ast.getChild(2).getText().toLowerCase().equals("local")) {
isLocal = true;
} else {
isOverWrite = true;
}
}
// initialize load path
URI fromURI;
try {
String fromPath = stripQuotes(fromTree.getText()); ///home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
fromURI = initializeFromURI(fromPath); // file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
} catch (IOException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
} catch (URISyntaxException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
}
// initialize destination table/partition
tableSpec ts = new tableSpec(db, conf, (ASTNode) tableTree); //
Table tbl = ts.tableHandle;
if (tbl.isView()) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
if (tbl.isNonNative()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
}
genAuthorizeEntry(db.getCurrentDatabase(), tbl.getTableName(), null, Privilege.INSERT_PRIV);
URI toURI = (ts.partHandle != null) ? ts.partHandle.getDataLocation()
: ts.tableHandle.getDataLocation(); // hdfs://localhost:54310/user/hive/warehouse/records
List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
if (isOverWrite && (parts != null && parts.size() > 0)
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
// make sure the arguments make sense
applyConstraints(fromURI, toURI, fromTree, isLocal);
Task<? extends Serializable> rTask = null;
// create copy work
if (isLocal) { //true
// if the local keyword is specified - we will always make a copy. this
// might seem redundant in the case
// that the hive warehouse is also located in the local file system - but
// that's just a test case.
String copyURIStr = ctx.getExternalTmpFileURI(toURI); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
URI copyURI = URI.create(copyURIStr);
rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr),
conf);
fromURI = copyURI;
}
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI); // hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
Map<String, String> partSpec = ts.getPartSpec();
if (partSpec == null) {
partSpec = new LinkedHashMap<String, String>();
}
LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
if (rTask != null) {
rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true), conf));
} else {
rTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(),
loadTableWork, null, true), conf);
}
rootTasks.add(rTask);
}
private URI initializeFromURI(String fromPath) throws IOException,
URISyntaxException {
URI fromURI = new Path(fromPath).toUri(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
String fromScheme = fromURI.getScheme(); // null
String fromAuthority = fromURI.getAuthority(); // null
String path = fromURI.getPath(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
// generate absolute path relative to current directory or hdfs home
// directory
if (!path.startsWith("/")) { //不是绝对路径,执行下面的操作
if (isLocal) {
path = new Path(System.getProperty("user.dir"), path).toString();
} else {
path = new Path(new Path("/user/" + System.getProperty("user.name")),
path).toString();
}
}
// set correct scheme and authority
if (StringUtils.isEmpty(fromScheme)) { // true
if (isLocal) { //true
// file for local
fromScheme = "file"; //
} else {
// use default values from fs.default.name
URI defaultURI = FileSystem.get(conf).getUri();
fromScheme = defaultURI.getScheme();
fromAuthority = defaultURI.getAuthority();
}
}
// if scheme is specified but not authority then use the default authority
if (fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
URI defaultURI = FileSystem.get(conf).getUri();
fromAuthority = defaultURI.getAuthority();
}
LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
return new URI(fromScheme, fromAuthority, path, null, null); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
}
public tableSpec(Hive db, HiveConf conf, ASTNode ast)
throws SemanticException {
assert (ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
numDynParts = 0;
try {
// get table metadata
tableName = unescapeIdentifier(ast.getChild(0).getText()); // records
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); // false
if (testMode) {
tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX)
+ tableName;
}
tableHandle = db.getTable(tableName);
} catch (InvalidTableException ite) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast
.getChild(0)), ite);
} catch (HiveException e) {
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg(ast
.getChild(childIndex), e.getMessage()), e);
}
// get partition metadata if partition specified
if (ast.getChildCount() == 2) { // false
childIndex = 1;
ASTNode partspec = (ASTNode) ast.getChild(1);
// partSpec is a mapping from partition column name to its value.
partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
for (int i = 0; i < partspec.getChildCount(); ++i) {
ASTNode partspec_val = (ASTNode) partspec.getChild(i);
String val = null;
if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
++numDynParts;
} else { // in the form of T partition (ds="2010-03-03")
val = stripQuotes(partspec_val.getChild(1).getText());
}
partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
}
// check if the partition spec is valid
if (numDynParts > 0) {
List<FieldSchema> parts = tableHandle.getPartitionKeys();
int numStaPart = parts.size() - numDynParts;
if (numStaPart == 0 &&
conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
for (FieldSchema fs: parts) {
if (partSpec.get(fs.getName().toLowerCase()) == null) {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
}
break;
} else {
--numStaPart;
}
}
partHandle = null;
} else {
try {
// this doesn't create partition. partition is created in MoveTask
partHandle = new Partition(tableHandle, partSpec, null);
} catch (HiveException e) {
throw new SemanticException(
ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
}
}
}
}
Driver.execute() {
}
CopyTask.execute()
public int execute(DriverContext driverContext) {
FileSystem dstFs = null;
Path toPath = null;
try {
Path fromPath = new Path(work.getFromPath()); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
toPath = new Path(work.getToPath()); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf); //org.apache.hadoop.fs.LocalFileSystem@1c9e4d2
dstFs = toPath.getFileSystem(conf);
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
if (srcs == null || srcs.length == 0) {
console.printError("No files matching path: " + fromPath.toString());
errorMessage = "No files matching path: " + fromPath.toString();
return 3;
}
if (!dstFs.mkdirs(toPath)) {
console
.printError("Cannot make target directory: " + toPath.toString());
errorMessage = "Cannot make target directory: " + toPath.toString();
return 2;
}
for (FileStatus oneSrc : srcs) {
LOG.debug("Copying file: " + oneSrc.getPath().toString());
if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
// source
true, // overwrite destination
conf)) {
console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'");
errorMessage = "Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'";
return 1;
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage()+ "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
MoveTask:
public int execute(DriverContext driverContext) {
try {
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork(); // null
if (lfd != null) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
FileSystem fs = sourcePath.getFileSystem(conf);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs, they belong to the same FS
String mesg = "Moving data to: " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the output directory if it already exists
fs.delete(targetPath, true);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
if (!fs.rename(sourcePath, targetPath)) {
throw new HiveException("Unable to rename: " + sourcePath
+ " to: " + targetPath);
}
} else if (!fs.mkdirs(targetPath)) {
throw new HiveException("Unable to make directory: " + targetPath);
}
} else {
// This is a local file
String mesg = "Copying data to local directory " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the existing dest directory
LocalFileSystem dstFs = FileSystem.getLocal(conf);
if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
fs.copyToLocalFile(sourcePath, targetPath);
} else {
if (!dstFs.mkdirs(targetPath)) {
throw new HiveException("Unable to make local directory: "
+ targetPath);
}
}
} else {
throw new AccessControlException(
"Unable to delete the existing destination directory: "
+ targetPath);
}
}
}
// Next we do this for tables and partitions
LoadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
StringBuilder mesg = new StringBuilder("Loading data to table ")
.append( tbd.getTable().getTableName());
if (tbd.getPartitionSpec().size() > 0) {
mesg.append(" partition (");
Map<String, String> partSpec = tbd.getPartitionSpec();
for (String key: partSpec.keySet()) {
mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
}
mesg.setLength(mesg.length()-2);
mesg.append(')');
}
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg.toString(), mesg_detail); //11/08/21 21:58:44 INFO exec.MoveTask: Loading data to table records from hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
Table table = db.getTable(db.getCurrentDatabase(), tbd
.getTable().getTableName());
if (work.getCheckFileFormat()) {
// Get all files from the src directory
FileStatus[] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
fs = FileSystem.get(table.getDataLocation(), conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
break;
}
}
} catch (IOException e) {
throw new HiveException(
"addFiles: filesystem error in check phase", e);
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
// Check if the file format of the file matches that of the table.
boolean flag = HiveFileFormatUtils.checkInputFormat(
fs, conf, tbd.getTable().getInputFileFormatClass(), files);
if (!flag) {
throw new HiveException(
"Wrong file format. Please check the file's format.");
}
}
}
// Create a data container
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {
dc = new DataContainer(table.getTTable());
db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
.getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir())); // 替换。
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table));
}
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
// load the list of DP partitions and return the list of partition specs
ArrayList<LinkedHashMap<String, String>> dp =
db.loadDynamicPartitions(
new Path(tbd.getSourceDir()),
tbd.getTable().getTableName(),
tbd.getPartitionSpec(),
tbd.getReplace(),
new Path(tbd.getTmpDir()),
dpCtx.getNumDPCols());
// for each partition spec, get the partition
// and put it to WriteEntity for post-exec hook
for (LinkedHashMap<String, String> partSpec: dp) {
Partition partn = db.getPartition(table, partSpec, false);
WriteEntity enty = new WriteEntity(partn);
if (work.getOutputs() != null) {
work.getOutputs().add(enty);
}
// Need to update the queryPlan's output as well so that post-exec hook get executed.
// This is only needed for dynamic partitioning since for SP the the WriteEntity is
// constructed at compile time and the queryPlan already contains that.
// For DP, WriteEntity creation is deferred at this stage so we need to update
// queryPlan here.
if (queryPlan.getOutputs() == null) {
queryPlan.setOutputs(new HashSet<WriteEntity>());
}
queryPlan.getOutputs().add(enty);
// update columnar lineage for each partition
dc = new DataContainer(table.getTTable(), partn.getTPartition());
if (SessionState.get() != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
console.printInfo("\tLoading partition " + partSpec);
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// add this partition to post-execution hook
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(partn));
}
}
}
if (SessionState.get() != null && dc != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
Hive.java:
public void loadTable(Path loadPath, String tableName, boolean replace,
Path tmpDirPath) throws HiveException {
Table tbl = getTable(tableName); //records
if (replace) { //true
tbl.replaceFiles(loadPath, tmpDirPath);
// loadPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
// tmpDirPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
} else {
tbl.copyFiles(loadPath);
}
}
Table:
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
FileSystem fs;
try {
fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
}
getDataLocation() // hdfs://localhost:54310/user/hive/warehouse/records
static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
Path tmppath) throws HiveException {
FileStatus[] srcs;
try {
srcs = fs.globStatus(srcf); // srcf = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
return;
// srcs = new FileStatus[0]; Why is this needed?
}
checkPaths(fs, srcs, destf, true); // destf = /user/hive/warehouse/records
try {
fs.mkdirs(tmppath); // tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (int j = 0; j < items.length; j++) {
if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j]
.getPath().getName()))) {
//
// public boolean rename(Path src, Path dst) throws IOException {
// return dfs.rename(getPathName(src), getPathName(dst));
// }
// src = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000/sample.txt
// dst = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001/sample.txt
throw new HiveException("Error moving: " + items[j].getPath()
+ " into: " + tmppath);
}
}
}
// point of no return
boolean b = fs.delete(destf, true); // destf = /user/hive/warehouse/records
LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
// create the parent directory otherwise rename can fail if the parent
// doesn't exist
if (!fs.mkdirs(destf.getParent())) {
throw new HiveException("Unable to create destination directory: "
+ destf.getParent().toString());
}
b = fs.rename(tmppath, destf);
// tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
// destf = /user/hive/warehouse/records
if (!b) {
throw new HiveException("Unable to move results from " + tmppath
+ " to destination directory: " + destf.getParent().toString());
}
LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
} catch (IOException e) {
throw new HiveException("replaceFiles: error while moving files from "
+ tmppath + " to " + destf + "!!!", e);
}
// In case of error, we should leave the temporary data there, so
// that user can recover the data if necessary.
}
static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf,
boolean replace) throws HiveException {
try {
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (FileStatus item : items) {
if (Utilities.isTempPath(item)) {
// This check is redundant because temp files are removed by
// execution layer before
// calling loadTable/Partition. But leaving it in just in case.
fs.delete(item.getPath(), true);
continue;
}
if (item.isDir()) {
throw new HiveException("checkPaths: " + src.getPath()
+ " has nested directory" + item.getPath());
}
Path tmpDest = new Path(destf, item.getPath().getName()); ///user/hive/warehouse/records/sample.txt
if (!replace && fs.exists(tmpDest)) { // replace = true
throw new HiveException("checkPaths: " + tmpDest
+ " already exists");
}
}
}
} catch (IOException e) {
throw new HiveException("checkPaths: filesystem error in check phase", e);
}
}
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
hive> explain
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_00-10-28_984_2803781885868135028/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
Time taken: 0.333 seconds
(TOK_LOAD
'/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
(TOK_TAB records)
LOCAL
OVERWRITE
)
hive> explain extended
> LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records;
OK
ABSTRACT SYNTAX TREE:
(TOK_LOAD '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' (TOK_TAB records) LOCAL OVERWRITE)
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 depends on stages: Stage-0
STAGE PLANS:
Stage: Stage-0
Copy
source: file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
destination: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
Stage: Stage-1
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns year,temperature,quality
columns.types string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/records
name records
serialization.ddl struct records { string year, i32 temperature, i32 quality}
serialization.format
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1313975965
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: records
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-23-29_636_1841974897600272533/-ext-10001
CliDriver:
CliDriver.main() {
ret = cli.processLine(line);
}
CliDriver:
public int processLine(String line) {
ret = processCmd(command);
}
CliDriver:
public int processCmd(String cmd) {
CommandProcessor proc = CommandProcessorFactory.get(tokens[0]);
Driver qp = (Driver) proc;
ret = qp.run(cmd).getResponseCode();
}
Driver:
public CommandProcessorResponse run(String command) {
// command = LOAD DATA LOCAL INPATH '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt' OVERWRITE INTO TABLE records
int ret = compile(command);
ret = execute();
}
Driver:
public int compile(String command) {
ctx = new Context(conf); //
SemanticAnalyzerFactory sfactory = new SemanticAnalyzerFactory(conf);
BaseSemanticAnalyzer sem = sfactory.get(tree); // return new LoadSemanticAnalyzer(conf);
sem.analyze(tree, ctx);
}
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
/**
* Create a Context with a given executionId. ExecutionId, together with
* user name and conf, will determine the temporary directory locations.
*/
public Context(Configuration conf, String executionId) throws IOException {
this.conf = conf;
this.executionId = executionId; //hive_2011-08-21_00-02-22_445_7799135143086468923
// non-local tmp location is configurable. however it is the same across
// all external file systems
nonLocalScratchPath =
new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
executionId); // /tmp/hive-tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
// local tmp location is not configurable for now
localScratchDir = System.getProperty("java.io.tmpdir")
+ Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+ executionId; // /tmp/tianzhao/hive_2011-08-21_00-02-22_445_7799135143086468923
}
public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
public void analyzeInternal(ASTNode ast) throws SemanticException {
isLocal = false;
isOverWrite = false;
Tree fromTree = ast.getChild(0); // '/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt'
Tree tableTree = ast.getChild(1); // TOK_TAB
if (ast.getChildCount() == 4) { // true
isLocal = true; //
isOverWrite = true; //
}
if (ast.getChildCount() == 3) {
if (ast.getChild(2).getText().toLowerCase().equals("local")) {
isLocal = true;
} else {
isOverWrite = true;
}
}
// initialize load path
URI fromURI;
try {
String fromPath = stripQuotes(fromTree.getText()); ///home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
fromURI = initializeFromURI(fromPath); // file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
} catch (IOException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
} catch (URISyntaxException e) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e
.getMessage()), e);
}
// initialize destination table/partition
tableSpec ts = new tableSpec(db, conf, (ASTNode) tableTree); //
Table tbl = ts.tableHandle;
if (tbl.isView()) {
throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
}
if (tbl.isNonNative()) {
throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
}
genAuthorizeEntry(db.getCurrentDatabase(), tbl.getTableName(), null, Privilege.INSERT_PRIV);
URI toURI = (ts.partHandle != null) ? ts.partHandle.getDataLocation()
: ts.tableHandle.getDataLocation(); // hdfs://localhost:54310/user/hive/warehouse/records
List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
if (isOverWrite && (parts != null && parts.size() > 0)
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
// make sure the arguments make sense
applyConstraints(fromURI, toURI, fromTree, isLocal);
Task<? extends Serializable> rTask = null;
// create copy work
if (isLocal) { //true
// if the local keyword is specified - we will always make a copy. this
// might seem redundant in the case
// that the hive warehouse is also located in the local file system - but
// that's just a test case.
String copyURIStr = ctx.getExternalTmpFileURI(toURI); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
URI copyURI = URI.create(copyURIStr);
rTask = TaskFactory.get(new CopyWork(fromURI.toString(), copyURIStr),
conf);
fromURI = copyURI;
}
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI); // hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
Map<String, String> partSpec = ts.getPartSpec();
if (partSpec == null) {
partSpec = new LinkedHashMap<String, String>();
}
LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
if (rTask != null) {
rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true), conf));
} else {
rTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(),
loadTableWork, null, true), conf);
}
rootTasks.add(rTask);
}
private URI initializeFromURI(String fromPath) throws IOException,
URISyntaxException {
URI fromURI = new Path(fromPath).toUri(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
String fromScheme = fromURI.getScheme(); // null
String fromAuthority = fromURI.getAuthority(); // null
String path = fromURI.getPath(); // /home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
// generate absolute path relative to current directory or hdfs home
// directory
if (!path.startsWith("/")) { //不是绝对路径,执行下面的操作
if (isLocal) {
path = new Path(System.getProperty("user.dir"), path).toString();
} else {
path = new Path(new Path("/user/" + System.getProperty("user.name")),
path).toString();
}
}
// set correct scheme and authority
if (StringUtils.isEmpty(fromScheme)) { // true
if (isLocal) { //true
// file for local
fromScheme = "file"; //
} else {
// use default values from fs.default.name
URI defaultURI = FileSystem.get(conf).getUri();
fromScheme = defaultURI.getScheme();
fromAuthority = defaultURI.getAuthority();
}
}
// if scheme is specified but not authority then use the default authority
if (fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
URI defaultURI = FileSystem.get(conf).getUri();
fromAuthority = defaultURI.getAuthority();
}
LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
return new URI(fromScheme, fromAuthority, path, null, null); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
}
public tableSpec(Hive db, HiveConf conf, ASTNode ast)
throws SemanticException {
assert (ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
numDynParts = 0;
try {
// get table metadata
tableName = unescapeIdentifier(ast.getChild(0).getText()); // records
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); // false
if (testMode) {
tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX)
+ tableName;
}
tableHandle = db.getTable(tableName);
} catch (InvalidTableException ite) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast
.getChild(0)), ite);
} catch (HiveException e) {
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg(ast
.getChild(childIndex), e.getMessage()), e);
}
// get partition metadata if partition specified
if (ast.getChildCount() == 2) { // false
childIndex = 1;
ASTNode partspec = (ASTNode) ast.getChild(1);
// partSpec is a mapping from partition column name to its value.
partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
for (int i = 0; i < partspec.getChildCount(); ++i) {
ASTNode partspec_val = (ASTNode) partspec.getChild(i);
String val = null;
if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
++numDynParts;
} else { // in the form of T partition (ds="2010-03-03")
val = stripQuotes(partspec_val.getChild(1).getText());
}
partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
}
// check if the partition spec is valid
if (numDynParts > 0) {
List<FieldSchema> parts = tableHandle.getPartitionKeys();
int numStaPart = parts.size() - numDynParts;
if (numStaPart == 0 &&
conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
for (FieldSchema fs: parts) {
if (partSpec.get(fs.getName().toLowerCase()) == null) {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
}
break;
} else {
--numStaPart;
}
}
partHandle = null;
} else {
try {
// this doesn't create partition. partition is created in MoveTask
partHandle = new Partition(tableHandle, partSpec, null);
} catch (HiveException e) {
throw new SemanticException(
ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
}
}
}
}
Driver.execute() {
}
CopyTask.execute()
public int execute(DriverContext driverContext) {
FileSystem dstFs = null;
Path toPath = null;
try {
Path fromPath = new Path(work.getFromPath()); //file:/home/tianzhao/book/hadoop-book/input/ncdc/micro-tab/sample.txt
toPath = new Path(work.getToPath()); //hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
console.printInfo("Copying data from " + fromPath.toString(), " to "
+ toPath.toString());
FileSystem srcFs = fromPath.getFileSystem(conf); //org.apache.hadoop.fs.LocalFileSystem@1c9e4d2
dstFs = toPath.getFileSystem(conf);
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
if (srcs == null || srcs.length == 0) {
console.printError("No files matching path: " + fromPath.toString());
errorMessage = "No files matching path: " + fromPath.toString();
return 3;
}
if (!dstFs.mkdirs(toPath)) {
console
.printError("Cannot make target directory: " + toPath.toString());
errorMessage = "Cannot make target directory: " + toPath.toString();
return 2;
}
for (FileStatus oneSrc : srcs) {
LOG.debug("Copying file: " + oneSrc.getPath().toString());
if (!FileUtil.copy(srcFs, oneSrc.getPath(), dstFs, toPath, false, // delete
// source
true, // overwrite destination
conf)) {
console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'");
errorMessage = "Failed to copy: '" + oneSrc.getPath().toString()
+ "to: '" + toPath.toString() + "'";
return 1;
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage()+ "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
MoveTask:
public int execute(DriverContext driverContext) {
try {
// Do any hive related operations like moving tables and files
// to appropriate locations
LoadFileDesc lfd = work.getLoadFileWork(); // null
if (lfd != null) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
FileSystem fs = sourcePath.getFileSystem(conf);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs, they belong to the same FS
String mesg = "Moving data to: " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the output directory if it already exists
fs.delete(targetPath, true);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
if (!fs.rename(sourcePath, targetPath)) {
throw new HiveException("Unable to rename: " + sourcePath
+ " to: " + targetPath);
}
} else if (!fs.mkdirs(targetPath)) {
throw new HiveException("Unable to make directory: " + targetPath);
}
} else {
// This is a local file
String mesg = "Copying data to local directory " + lfd.getTargetDir();
String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the existing dest directory
LocalFileSystem dstFs = FileSystem.getLocal(conf);
if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
fs.copyToLocalFile(sourcePath, targetPath);
} else {
if (!dstFs.mkdirs(targetPath)) {
throw new HiveException("Unable to make local directory: "
+ targetPath);
}
}
} else {
throw new AccessControlException(
"Unable to delete the existing destination directory: "
+ targetPath);
}
}
}
// Next we do this for tables and partitions
LoadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
StringBuilder mesg = new StringBuilder("Loading data to table ")
.append( tbd.getTable().getTableName());
if (tbd.getPartitionSpec().size() > 0) {
mesg.append(" partition (");
Map<String, String> partSpec = tbd.getPartitionSpec();
for (String key: partSpec.keySet()) {
mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
}
mesg.setLength(mesg.length()-2);
mesg.append(')');
}
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg.toString(), mesg_detail); //11/08/21 21:58:44 INFO exec.MoveTask: Loading data to table records from hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
Table table = db.getTable(db.getCurrentDatabase(), tbd
.getTable().getTableName());
if (work.getCheckFileFormat()) {
// Get all files from the src directory
FileStatus[] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
fs = FileSystem.get(table.getDataLocation(), conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
break;
}
}
} catch (IOException e) {
throw new HiveException(
"addFiles: filesystem error in check phase", e);
}
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
// Check if the file format of the file matches that of the table.
boolean flag = HiveFileFormatUtils.checkInputFormat(
fs, conf, tbd.getTable().getInputFileFormatClass(), files);
if (!flag) {
throw new HiveException(
"Wrong file format. Please check the file's format.");
}
}
}
// Create a data container
DataContainer dc = null;
if (tbd.getPartitionSpec().size() == 0) {
dc = new DataContainer(table.getTTable());
db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
.getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir())); // 替换。
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table));
}
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
// load the list of DP partitions and return the list of partition specs
ArrayList<LinkedHashMap<String, String>> dp =
db.loadDynamicPartitions(
new Path(tbd.getSourceDir()),
tbd.getTable().getTableName(),
tbd.getPartitionSpec(),
tbd.getReplace(),
new Path(tbd.getTmpDir()),
dpCtx.getNumDPCols());
// for each partition spec, get the partition
// and put it to WriteEntity for post-exec hook
for (LinkedHashMap<String, String> partSpec: dp) {
Partition partn = db.getPartition(table, partSpec, false);
WriteEntity enty = new WriteEntity(partn);
if (work.getOutputs() != null) {
work.getOutputs().add(enty);
}
// Need to update the queryPlan's output as well so that post-exec hook get executed.
// This is only needed for dynamic partitioning since for SP the the WriteEntity is
// constructed at compile time and the queryPlan already contains that.
// For DP, WriteEntity creation is deferred at this stage so we need to update
// queryPlan here.
if (queryPlan.getOutputs() == null) {
queryPlan.setOutputs(new HashSet<WriteEntity>());
}
queryPlan.getOutputs().add(enty);
// update columnar lineage for each partition
dc = new DataContainer(table.getTTable(), partn.getTPartition());
if (SessionState.get() != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
console.printInfo("\tLoading partition " + partSpec);
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// add this partition to post-execution hook
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(partn));
}
}
}
if (SessionState.get() != null && dc != null) {
SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
table.getCols());
}
}
return 0;
} catch (Exception e) {
console.printError("Failed with exception " + e.getMessage(), "\n"
+ StringUtils.stringifyException(e));
errorMessage = "Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e);
return (1);
}
}
Hive.java:
public void loadTable(Path loadPath, String tableName, boolean replace,
Path tmpDirPath) throws HiveException {
Table tbl = getTable(tableName); //records
if (replace) { //true
tbl.replaceFiles(loadPath, tmpDirPath);
// loadPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
// tmpDirPath=hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
} else {
tbl.copyFiles(loadPath);
}
}
Table:
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
FileSystem fs;
try {
fs = FileSystem.get(getDataLocation(), Hive.get().getConf());
Hive.replaceFiles(srcf, new Path(getDataLocation().getPath()), fs, tmpd);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
}
getDataLocation() // hdfs://localhost:54310/user/hive/warehouse/records
static protected void replaceFiles(Path srcf, Path destf, FileSystem fs,
Path tmppath) throws HiveException {
FileStatus[] srcs;
try {
srcs = fs.globStatus(srcf); // srcf = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
return;
// srcs = new FileStatus[0]; Why is this needed?
}
checkPaths(fs, srcs, destf, true); // destf = /user/hive/warehouse/records
try {
fs.mkdirs(tmppath); // tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (int j = 0; j < items.length; j++) {
if (!fs.rename(items[j].getPath(), new Path(tmppath, items[j]
.getPath().getName()))) {
//
// public boolean rename(Path src, Path dst) throws IOException {
// return dfs.rename(getPathName(src), getPathName(dst));
// }
// src = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10000/sample.txt
// dst = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001/sample.txt
throw new HiveException("Error moving: " + items[j].getPath()
+ " into: " + tmppath);
}
}
}
// point of no return
boolean b = fs.delete(destf, true); // destf = /user/hive/warehouse/records
LOG.debug("Deleting:" + destf.toString() + ",Status:" + b);
// create the parent directory otherwise rename can fail if the parent
// doesn't exist
if (!fs.mkdirs(destf.getParent())) {
throw new HiveException("Unable to create destination directory: "
+ destf.getParent().toString());
}
b = fs.rename(tmppath, destf);
// tmppath = hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-08-21_18-32-25_911_8382925475653721697/-ext-10001
// destf = /user/hive/warehouse/records
if (!b) {
throw new HiveException("Unable to move results from " + tmppath
+ " to destination directory: " + destf.getParent().toString());
}
LOG.debug("Renaming:" + tmppath.toString() + ",Status:" + b);
} catch (IOException e) {
throw new HiveException("replaceFiles: error while moving files from "
+ tmppath + " to " + destf + "!!!", e);
}
// In case of error, we should leave the temporary data there, so
// that user can recover the data if necessary.
}
static private void checkPaths(FileSystem fs, FileStatus[] srcs, Path destf,
boolean replace) throws HiveException {
try {
for (FileStatus src : srcs) {
FileStatus[] items = fs.listStatus(src.getPath());
for (FileStatus item : items) {
if (Utilities.isTempPath(item)) {
// This check is redundant because temp files are removed by
// execution layer before
// calling loadTable/Partition. But leaving it in just in case.
fs.delete(item.getPath(), true);
continue;
}
if (item.isDir()) {
throw new HiveException("checkPaths: " + src.getPath()
+ " has nested directory" + item.getPath());
}
Path tmpDest = new Path(destf, item.getPath().getName()); ///user/hive/warehouse/records/sample.txt
if (!replace && fs.exists(tmpDest)) { // replace = true
throw new HiveException("checkPaths: " + tmpDest
+ " already exists");
}
}
}
} catch (IOException e) {
throw new HiveException("checkPaths: filesystem error in check phase", e);
}
}
发表评论
-
hive rename table name
2013-09-18 14:28 2590hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2468有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11205like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8728insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8273原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1205select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1542官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1812Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5437drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1354数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6235CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3029select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
insert into db_name.table_name_1 ( col_1,col2,col3 ) with temp_table_1 as ( select id,col_2 from db_name.table_name_2 where id = condatition ), temp_table_2 as ( select id,col_3 from db_name....
hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15'); ``` ##### 加载 HDFS 数据 ```sql hive> LOAD DATA INPATH '/user/myname/kv2.txt' ...
1. 从文件加载数据:使用`load data local inpath '/xxx/xxx/dim_cube.txt' overwrite into table dim.dim_cube_config;`语句可以从文件加载数据到表中。 2. 从查询语句给表插入数据:使用`insert overwrite table ...
`LOAD DATA`用于将文件直接加载到指定的分区,而`INSERT OVERWRITE`则可以用于插入单行或多行数据。 ```sql -- 使用LOAD DATA命令 LOAD DATA LOCAL INPATH '/path/to/file' INTO TABLE employee PARTITION (year=...
- **INSERT OVERWRITE**: 当需要将 Hive 表中的数据导出到指定目录时,可以使用 `INSERT OVERWRITE DIRECTORY` 语法。 ```sql INSERT OVERWRITE DIRECTORY '/user/hive/output' SELECT * FROM db_0309.emp; ``` ...
LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15'); ``` 还可以加载 DFS 数据,同时给定分区信息,例如: ``` LOAD DATA INPATH '/user/myname/kv2.txt...
Hive 支持 INSERT INTO 和 INSERT OVERWRITE 两种方式。INSERT INTO 向表中追加数据,如果表中已有数据,新数据将添加到原有数据之后。而 INSERT OVERWRITE 则会覆盖原有数据,当指定某个分区时,只替换指定分区的...
在Flink SQL中,可以使用`INSERT INTO`或`INSERT OVERWRITE`语句来实现。对于非分区表,可以直接插入数据;对于分区表,可以指定静态分区或动态分区进行写入。 例如,使用批处理模式向非分区表写入数据: ```sql ...
5. 数据导出:当需要将Hive中的结果导出到文件时,可以使用INSERT OVERWRITE命令。例如,将查询结果写入一个新的CSV文件: ```sql INSERT OVERWRITE LOCAL DIRECTORY '/path/to/output' SELECT * FROM my_table ...
2. **加载数据同时指定分区信息**: `LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');` - 加载数据时指定分区值,确保数据被正确地分配到对应的分区...
* INSERT OVERWRITE TABLE..SELECT:`FROM records2 INSERT OVERWRITE TABLE records SELECT *;` * EXPLAIN 查询:`EXPLAIN SELECT sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);` Hive...
load data local inpath '/usr/local/hive/dividends.csv' overwrite into table dividends_unpartitioned; ``` 4. **利用Hive自动分区插入数据**: 对`dividends_unpartitioned`表执行查询并将其结果插入到`...
- **插入数据**:INSERT INTO/OVERWRITE TABLE用于向表中添加数据,可一次性插入整行或整个文件。 - **查询数据**:SELECT语句用于检索数据,支持各种聚合函数、连接操作和子查询。 - **更新和删除数据**:Hive不...
使用`INSERT INTO`或`INSERT OVERWRITE`来向表中添加数据。前者追加数据,后者覆盖原有数据。 9. **更新和删除数据**: Hive 0.14版本开始支持`UPDATE`和`DELETE`操作,但这些操作通常在支持ACID特性的表上进行,...
因此,在设计Hive数据模型时,应尽可能减少UPDATE和DELETE操作,优先考虑INSERT和REPLACE INTO等方法来维护数据。 在实际应用中,Hive更新数据通常涉及以下步骤: 1. **创建一个临时表**:用于存放更新后的数据。 2...
- 数据加载与删除:除了 `LOAD DATA`,还可以使用 `INSERT INTO` 或 `INSERT OVERWRITE` 语句来加载或替换数据。 - 表操作:ALTER TABLE 改变表结构,DROP TABLE 删除表,TRUNCATE TABLE 清空表。 5. HiveQL 执行...
- 使用`insert overwrite table tablename [partition(partcol1=val1, partcol2=val2)] select_statement from from_statement`命令,其中`select_statement`是从其他表中选取数据的SQL语句,`from_statement`是源...
INSERT OVERWRITE DIRECTORY '/path/to/export' SELECT * FROM employees; ``` Hive的优势在于其可扩展性、容错性和对大规模数据的处理能力。然而,由于其基于MapReduce,因此对于实时查询和低延迟操作可能不太...
- **将数据插入到Hive表**:可以使用`INSERT INTO TABLE table_name [PARTITION (partition_spec)] SELECT ...`命令将数据从查询结果插入到Hive表中。 - **将查询结果写入文件系统**:通过`INSERT OVERWRITE ...