- 浏览: 112038 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
图片地址 :http://hi.csdn.net/attachment/201107/29/0_1311922740tXqK.gif
CliDriver可以说是hive的入口,对应上图中的UI部分。大家看它的结构就可以明白了,main()函数!对!你猜的没错就是从main()开始。
下图是类结构,总共有五个关键的函数。
这个类可以说是用户和hive交互的平台,你可以把它认为是hive客户端。总共有4个key函数:
下图是这个CliDriver类在整个Hive执行过程中的作用的地位。
如图,hive执行流程_按正常步骤走:
1.—CliDriver.classz中main()开始,初始化Hive环境变量,获取客户端提供的string或者file。
2 —将其代码送入processLine(cmd),这步主要是读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的会忽略。读完后,传入processCmd()处理
3 —调用processCmd(cmd),分情况处理
//– 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。
// 1.set.. 设置operator参数,hive环境参数
// 2.quit or exit — 退出Hive环境
// 3.! 开头
// 4.dfs 开头 交给FsShell处理
// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引擎 Driver。返回ret = Driver.run(cmd);
4.—不同情况不同处理方法。我们关心的第五种情况:正常的HiveSQL如何处理?其实是进入driver.class里面run(),
//读入hivesql ,词法分析,语法分析,直到执行结束
//1.ParseDriver 返回 词法树 CommonTree
//2.BaseSemanticAnalyzer sem.analyze(tree, ctx);//语义解释,生成执行计划
5.—。。。etc
今天的主题是hive的入口,我们只聊前三步。
现在我们细化主要函数,看hive实际是怎么处理的。(如果你只想了解hive工作流程或原理,不想拘泥于细节,可以跳过下面的细节,如果你想修改源码,做优化,可以继续往下看)
下面是hive入口 涉及的一些关键类和关键函数。
——————————-类CliDriver —
由于这个类,可以说贯彻Hive的整个流程架构,所以我聊的比较细。
————————————————main()
主函数是CliDriver类的main函数,然后走run函数,再做了一些初始化和检测后,再调用processLine,再调用processCmd。processLocalCmd则调用了Driver类的run函数和runExcute函数。
直到:
while ((line = reader.readLine(curPrompt + "> ")) != null) {
表示重复请求读入 SQL>
1,cli/src/java CliDriver.main是主函数。
[java] view plaincopy
public static void main(String[] args) throws Exception {
int ret = run(args);
System.exit(ret);
}
2,进入run函数
public static int run(String[] args) throws Exception {
OptionsProcessor oproc = new OptionsProcessor();
//(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val 用于增加或者覆盖hive/hadoop配置,设置到System.Properties中。
if (!oproc.process_stage1(args)) {
return 1;
}
//(2) 配置log4j,加载hive-log4j.properties里的配置信息,日志的初始化在其他任何核心类初始化前。
boolean logInitFailed = false;
String logInitDetailMessage;
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}
//(3) 创建一个CliSessionState(SessionState)
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
try {
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.err = new PrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
return 3;
}
//(4) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。
if (!oproc.process_stage2(ss)) {
return 2;
}
//(5) 如果不是-S,就是说不是静默状态,就输出一些提示信息,表示初始化好了。
if (!ss.getIsSilent()) {
if (logInitFailed) {
System.err.println(logInitDetailMessage);
} else {
//(5)输出一些信息:12/07/05 16:52:34 INFO SessionState:
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}
//(6)创建一个HiveConf,通过命令行设置的参数配置所有属性。
HiveConf conf = ss.getConf();
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
}
//(7)启动CliSessionState ss。
SessionState.start(ss);
// (8)连接到 Hive Server
if (ss.getHost() != null) {
ss.connect();
if (ss.isRemoteMode()) {
prompt = "[" + ss.host + ':' + ss.port + "] " + prompt;
char[] spaces = new char[prompt.length()];
Arrays.fill(spaces, ' ');
prompt2 = new String(spaces);
}
}
//(9) ShimLoader,load HadoopShims
// CLI remote mode is a thin client: only load auxJars in local mode
if (!ss.isRemoteMode() && !ShimLoader.getHadoopShims().usesJobShell()) {
// hadoop-20 and above - we need to augment classpath using hiveconf
// components
// see also: code in ExecDriver.java
ClassLoader loader = conf.getClassLoader();
//(9)设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。
String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
if (StringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
}
conf.setClassLoader(loader);
Thread.currentThread().setContextClassLoader(loader);
}
//(10) 创建CliDriver.
CliDriver cli = new CliDrive();
cli.setHiveVariables(oproc.getHiveVariables());
//(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就执行这两个文件中的命令。
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
//(10) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。
if (ss.execString != null) {
return cli.processLine(ss.execString);
}
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
//(11)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。
ConsoleReader reader = new ConsoleReader();
reader.setBellEnabled(false);
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)));
reader.addCompletor(getCommandCompletor());
String line;
final String HISTORYFILE = ".hivehistory";
String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE;
reader.setHistory(new History(new File(historyFile)));
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
String dbSpaces = spacesForString(curDB);
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
ss.close();
return ret;
3,主要是调用了 processLine。
ProcessLine又调用了 processCmd。
CliDriver.processLine 去掉命令末尾的;,
public int processLine(String line, boolean allowInterupting) {
SignalHandler oldSignal = null;
Signal interupSignal = null;
//(1)整理允许中断 ctrl+C
if (allowInterupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interupSignal = new Signal("INT");
oldSignal = Signal.handle(interupSignal, new SignalHandler() {
private final Thread cliThread = Thread.currentThread();
private boolean interruptRequested;
@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;
// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}
// Interrupt the CLI thread to stop the current statement and return
// to prompt
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");
// First, kill any running MR jobs
HadoopJobExecHelper.killRunningJobs();
HiveInterruptUtils.interrupt();
this.cliThread.interrupt();
}
});
}
try {
int lastRet = 0, ret = 0;
String command = "";
//(2)循环处理每一个以分号结尾的语句。
for (String oneCmd : line.split(";")) {
if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
//(3)执行处理命令
ret = processCmd(command);
//(4)清除query State的状态。wipe cli query state
SessionState ss = SessionState.get();
ss.setCommandType(null);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf) conf);
return ret;
}
}
CommandProcessorFactory.clean((HiveConf) conf);
return lastRet;
} finally {
// Once we are done processing the line, restore the old handler
if (oldSignal != null && interupSignal != null) {
Signal.handle(interupSignal, oldSignal);
}
}
}
4,processCmd
CliDriver.processCmd
Split命令,分析第一个单词:
(1)如果是quit或者exit,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
[java] view plaincopy
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
//(1)如果是quit或者exit,退出。
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
//(2)source,执行文件中的HiveQL
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
this.processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}//(3)!,执行命令,如!ls,列出当前目录的文件信息。
} else if (cmd_trimmed.startsWith("!")) {
String shell_cmd = cmd_trimmed.substring(1);
shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd);
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
Process executor = Runtime.getRuntime().exec(shell_cmd);
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);
outPrinter.start();
errPrinter.start();
ret = executor.waitFor();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
//(4)list,列出jar/file/archive。
} else if (tokens[0].toLowerCase().equals("list")) {
SessionState.ResourceType t;
if (tokens.length < 2 || (t = SessionState.find_resource_type(tokens[1])) == null) {
console.printError("Usage: list ["
+ StringUtils.join(SessionState.ResourceType.values(), "|") + "] [<value> [<value>]*]");
ret = 1;
} else {
List<String> filter = null;
if (tokens.length >= 3) {
System.arraycopy(tokens, 2, tokens, 0, tokens.length - 2);
filter = Arrays.asList(tokens);
}
Set<String> s = ss.list_resource(t, filter);
if (s != null && !s.isEmpty()) {
ss.out.println(StringUtils.join(s, "\n"));
}
}//(5)如果是其他,则生成调用相应的CommandProcessor处理。//如果是远端
} else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;
try {
client.execute(cmd_trimmed);
List<String> results;
do {
results = client.fetchN(LINES_TO_FETCH);
for (String line : results) {
out.println(line);
}
} while (results.size() == LINES_TO_FETCH);
} catch (HiveServerException e) {
ret = e.getErrorCode();
if (ret != 0) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = e.getErrorCode();
err.println("[Hive Error]: " + errMsg);
}
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = -10002;
err.println("[Thrift Error]: " + errMsg);
} finally {
try {
client.clean();
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
}
}//如果是本地
} else { // local mode
CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf) conf);
ret = processLocalCmd(cmd, proc, ss);
}
return ret;
}
5,processLoacalCmd
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;
do {
try {
needRetry = false;
if (proc != null) {
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
if (ss.getIsVerbose()) {
out.println(cmd);
}
qp.setTryCount(tryCount);
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}
ArrayList<String> res = new ArrayList<String>();
printHeader(qp, out);
try {
while (qp.getResults(res)) {
for (String r : res) {
out.println(r);
}
res.clear();
if (out.checkError()) {
break;
}
}
} catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":"
+ e.getMessage(), "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
int cret = qp.close();
if (ret == 0) {
ret = cret;
}
long end = System.currentTimeMillis();
if (end > start) {
double timeTaken = (end - start) / 1000.0;
console.printInfo("Time taken: " + timeTaken + " seconds", null);
}
} else {
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
if (ss.getIsVerbose()) {
ss.out.println(firstToken + " " + cmd_1);
}
ret = proc.run(cmd_1).getResponseCode();
}
}
} catch (CommandNeedRetryException e) {
console.printInfo("Retry query with a different approach...");
tryCount++;
needRetry = true;
}
} while (needRetry);
return ret;
}
6,Driver 类 的run 方法。
Driver
Driver.run(String command) // 处理一条命令
{
int ret =compile(command); // 分析命令,生成Task。
ret = execute(); // 运行Task。
}
Driver.compile
Driver.compile(String command) // 处理一条命令
{
(1) Parser(antlr):HiveQL->AbstractSyntaxTree(AST)
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
(2) SemanticAnalyzer
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
}
7,plan生成位置
可以通过跟踪到Driver.java文件的Line 663可知其路径为:
/tmp/hive-gexing111/hive_2012-07-09_10-37-27_511_5073252372102100766/test2.py
如果系统自己的plan:
/tmp/hive-gexing111/hive_2012-07-09_12-45-55_479_6444298560478274273/-local-10002/plan.xml
8,Debug show tables "ge*";
在hive/ metastore/src/java /com.aliyun.apsara.odps.metastore.ots /OTSObjectStore.java
中的Gettables;
9,配置文件位置
hive/build/dist/conf/hive-site.xml
设置为改写版本:
<property>
<name>com.aliyun.odps.mode</name>
<value>true</value>
</property>
<property>
<name>native.hive.mode</name>
<value>false</value>
</property>
http://blog.csdn.net/wf1982/article/details/6644330
http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163
CliDriver可以说是hive的入口,对应上图中的UI部分。大家看它的结构就可以明白了,main()函数!对!你猜的没错就是从main()开始。
下图是类结构,总共有五个关键的函数。
这个类可以说是用户和hive交互的平台,你可以把它认为是hive客户端。总共有4个key函数:
下图是这个CliDriver类在整个Hive执行过程中的作用的地位。
如图,hive执行流程_按正常步骤走:
1.—CliDriver.classz中main()开始,初始化Hive环境变量,获取客户端提供的string或者file。
2 —将其代码送入processLine(cmd),这步主要是读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的会忽略。读完后,传入processCmd()处理
3 —调用processCmd(cmd),分情况处理
//– 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。
// 1.set.. 设置operator参数,hive环境参数
// 2.quit or exit — 退出Hive环境
// 3.! 开头
// 4.dfs 开头 交给FsShell处理
// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引擎 Driver。返回ret = Driver.run(cmd);
4.—不同情况不同处理方法。我们关心的第五种情况:正常的HiveSQL如何处理?其实是进入driver.class里面run(),
//读入hivesql ,词法分析,语法分析,直到执行结束
//1.ParseDriver 返回 词法树 CommonTree
//2.BaseSemanticAnalyzer sem.analyze(tree, ctx);//语义解释,生成执行计划
5.—。。。etc
今天的主题是hive的入口,我们只聊前三步。
现在我们细化主要函数,看hive实际是怎么处理的。(如果你只想了解hive工作流程或原理,不想拘泥于细节,可以跳过下面的细节,如果你想修改源码,做优化,可以继续往下看)
下面是hive入口 涉及的一些关键类和关键函数。
——————————-类CliDriver —
由于这个类,可以说贯彻Hive的整个流程架构,所以我聊的比较细。
————————————————main()
主函数是CliDriver类的main函数,然后走run函数,再做了一些初始化和检测后,再调用processLine,再调用processCmd。processLocalCmd则调用了Driver类的run函数和runExcute函数。
直到:
while ((line = reader.readLine(curPrompt + "> ")) != null) {
表示重复请求读入 SQL>
1,cli/src/java CliDriver.main是主函数。
[java] view plaincopy
public static void main(String[] args) throws Exception {
int ret = run(args);
System.exit(ret);
}
2,进入run函数
public static int run(String[] args) throws Exception {
OptionsProcessor oproc = new OptionsProcessor();
//(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val 用于增加或者覆盖hive/hadoop配置,设置到System.Properties中。
if (!oproc.process_stage1(args)) {
return 1;
}
//(2) 配置log4j,加载hive-log4j.properties里的配置信息,日志的初始化在其他任何核心类初始化前。
boolean logInitFailed = false;
String logInitDetailMessage;
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}
//(3) 创建一个CliSessionState(SessionState)
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
try {
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.err = new PrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
return 3;
}
//(4) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。
if (!oproc.process_stage2(ss)) {
return 2;
}
//(5) 如果不是-S,就是说不是静默状态,就输出一些提示信息,表示初始化好了。
if (!ss.getIsSilent()) {
if (logInitFailed) {
System.err.println(logInitDetailMessage);
} else {
//(5)输出一些信息:12/07/05 16:52:34 INFO SessionState:
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}
//(6)创建一个HiveConf,通过命令行设置的参数配置所有属性。
HiveConf conf = ss.getConf();
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
}
//(7)启动CliSessionState ss。
SessionState.start(ss);
// (8)连接到 Hive Server
if (ss.getHost() != null) {
ss.connect();
if (ss.isRemoteMode()) {
prompt = "[" + ss.host + ':' + ss.port + "] " + prompt;
char[] spaces = new char[prompt.length()];
Arrays.fill(spaces, ' ');
prompt2 = new String(spaces);
}
}
//(9) ShimLoader,load HadoopShims
// CLI remote mode is a thin client: only load auxJars in local mode
if (!ss.isRemoteMode() && !ShimLoader.getHadoopShims().usesJobShell()) {
// hadoop-20 and above - we need to augment classpath using hiveconf
// components
// see also: code in ExecDriver.java
ClassLoader loader = conf.getClassLoader();
//(9)设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。
String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
if (StringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
}
conf.setClassLoader(loader);
Thread.currentThread().setContextClassLoader(loader);
}
//(10) 创建CliDriver.
CliDriver cli = new CliDrive();
cli.setHiveVariables(oproc.getHiveVariables());
//(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就执行这两个文件中的命令。
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
//(10) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。
if (ss.execString != null) {
return cli.processLine(ss.execString);
}
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
} catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
//(11)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。
ConsoleReader reader = new ConsoleReader();
reader.setBellEnabled(false);
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)));
reader.addCompletor(getCommandCompletor());
String line;
final String HISTORYFILE = ".hivehistory";
String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE;
reader.setHistory(new History(new File(historyFile)));
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
String dbSpaces = spacesForString(curDB);
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}
ss.close();
return ret;
3,主要是调用了 processLine。
ProcessLine又调用了 processCmd。
CliDriver.processLine 去掉命令末尾的;,
public int processLine(String line, boolean allowInterupting) {
SignalHandler oldSignal = null;
Signal interupSignal = null;
//(1)整理允许中断 ctrl+C
if (allowInterupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interupSignal = new Signal("INT");
oldSignal = Signal.handle(interupSignal, new SignalHandler() {
private final Thread cliThread = Thread.currentThread();
private boolean interruptRequested;
@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;
// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}
// Interrupt the CLI thread to stop the current statement and return
// to prompt
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");
// First, kill any running MR jobs
HadoopJobExecHelper.killRunningJobs();
HiveInterruptUtils.interrupt();
this.cliThread.interrupt();
}
});
}
try {
int lastRet = 0, ret = 0;
String command = "";
//(2)循环处理每一个以分号结尾的语句。
for (String oneCmd : line.split(";")) {
if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
//(3)执行处理命令
ret = processCmd(command);
//(4)清除query State的状态。wipe cli query state
SessionState ss = SessionState.get();
ss.setCommandType(null);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf) conf);
return ret;
}
}
CommandProcessorFactory.clean((HiveConf) conf);
return lastRet;
} finally {
// Once we are done processing the line, restore the old handler
if (oldSignal != null && interupSignal != null) {
Signal.handle(interupSignal, oldSignal);
}
}
}
4,processCmd
CliDriver.processCmd
Split命令,分析第一个单词:
(1)如果是quit或者exit,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
[java] view plaincopy
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
//(1)如果是quit或者exit,退出。
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
//(2)source,执行文件中的HiveQL
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
this.processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}//(3)!,执行命令,如!ls,列出当前目录的文件信息。
} else if (cmd_trimmed.startsWith("!")) {
String shell_cmd = cmd_trimmed.substring(1);
shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd);
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
Process executor = Runtime.getRuntime().exec(shell_cmd);
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);
outPrinter.start();
errPrinter.start();
ret = executor.waitFor();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
//(4)list,列出jar/file/archive。
} else if (tokens[0].toLowerCase().equals("list")) {
SessionState.ResourceType t;
if (tokens.length < 2 || (t = SessionState.find_resource_type(tokens[1])) == null) {
console.printError("Usage: list ["
+ StringUtils.join(SessionState.ResourceType.values(), "|") + "] [<value> [<value>]*]");
ret = 1;
} else {
List<String> filter = null;
if (tokens.length >= 3) {
System.arraycopy(tokens, 2, tokens, 0, tokens.length - 2);
filter = Arrays.asList(tokens);
}
Set<String> s = ss.list_resource(t, filter);
if (s != null && !s.isEmpty()) {
ss.out.println(StringUtils.join(s, "\n"));
}
}//(5)如果是其他,则生成调用相应的CommandProcessor处理。//如果是远端
} else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;
try {
client.execute(cmd_trimmed);
List<String> results;
do {
results = client.fetchN(LINES_TO_FETCH);
for (String line : results) {
out.println(line);
}
} while (results.size() == LINES_TO_FETCH);
} catch (HiveServerException e) {
ret = e.getErrorCode();
if (ret != 0) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = e.getErrorCode();
err.println("[Hive Error]: " + errMsg);
}
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = -10002;
err.println("[Thrift Error]: " + errMsg);
} finally {
try {
client.clean();
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
}
}//如果是本地
} else { // local mode
CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf) conf);
ret = processLocalCmd(cmd, proc, ss);
}
return ret;
}
5,processLoacalCmd
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;
do {
try {
needRetry = false;
if (proc != null) {
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
if (ss.getIsVerbose()) {
out.println(cmd);
}
qp.setTryCount(tryCount);
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}
ArrayList<String> res = new ArrayList<String>();
printHeader(qp, out);
try {
while (qp.getResults(res)) {
for (String r : res) {
out.println(r);
}
res.clear();
if (out.checkError()) {
break;
}
}
} catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":"
+ e.getMessage(), "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
int cret = qp.close();
if (ret == 0) {
ret = cret;
}
long end = System.currentTimeMillis();
if (end > start) {
double timeTaken = (end - start) / 1000.0;
console.printInfo("Time taken: " + timeTaken + " seconds", null);
}
} else {
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
if (ss.getIsVerbose()) {
ss.out.println(firstToken + " " + cmd_1);
}
ret = proc.run(cmd_1).getResponseCode();
}
}
} catch (CommandNeedRetryException e) {
console.printInfo("Retry query with a different approach...");
tryCount++;
needRetry = true;
}
} while (needRetry);
return ret;
}
6,Driver 类 的run 方法。
Driver
Driver.run(String command) // 处理一条命令
{
int ret =compile(command); // 分析命令,生成Task。
ret = execute(); // 运行Task。
}
Driver.compile
Driver.compile(String command) // 处理一条命令
{
(1) Parser(antlr):HiveQL->AbstractSyntaxTree(AST)
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
(2) SemanticAnalyzer
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
}
7,plan生成位置
可以通过跟踪到Driver.java文件的Line 663可知其路径为:
/tmp/hive-gexing111/hive_2012-07-09_10-37-27_511_5073252372102100766/test2.py
如果系统自己的plan:
/tmp/hive-gexing111/hive_2012-07-09_12-45-55_479_6444298560478274273/-local-10002/plan.xml
8,Debug show tables "ge*";
在hive/ metastore/src/java /com.aliyun.apsara.odps.metastore.ots /OTSObjectStore.java
中的Gettables;
9,配置文件位置
hive/build/dist/conf/hive-site.xml
设置为改写版本:
<property>
<name>com.aliyun.odps.mode</name>
<value>true</value>
</property>
<property>
<name>native.hive.mode</name>
<value>false</value>
</property>
http://blog.csdn.net/wf1982/article/details/6644330
http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163
发表评论
-
hive + hbase
2015-01-04 10:42 773环境配置: hadoop-2.0.0-cdh4.3.0 (4 ... -
hive 数据倾斜
2014-08-27 09:03 687链接:http://www.alidata.org/archi ... -
hive 分通总结
2014-08-27 08:42 576总结分析: 1. 定义了桶,但要生成桶的数据,只能是由其他表 ... -
深入了解Hive Index具体实现
2014-08-25 08:51 739索引是标准的数据库技术,hive 0.7版本之后支持索引。hi ... -
explain hive index
2014-08-24 16:44 1148设置索引: 使用聚合索引优化groupby操作 hive> ... -
Hive 中内部表与外部表的区别与创建方法
2014-08-15 17:11 763分类: Hive 2013-12-07 11:56 ... -
hive map和reduce的控制
2014-08-15 16:14 625一、 控制hive任务中的map数: 1. 通 ... -
hive 压缩策略
2014-08-15 15:16 1769Hive使用的是Hadoop的文件 ... -
hive 在mysql中创建备用数据库
2014-08-15 09:21 881修改hive-site.xml <property> ... -
HIVE 窗口及分析函数
2014-08-11 16:21 1189HIVE 窗口及分析函数 使 ... -
hive 内置函数
2014-08-11 09:06 30701.sort_array(): sort_array(arra ... -
hive lateral view
2014-08-09 14:59 2026通过Lateral view可以方便的将UDTF得到的行转列的 ... -
hive数据的导出
2014-07-28 21:53 445在本博客的《Hive几种数据导入方式》文章中,谈到了Hive中 ... -
hive udaf
2014-07-25 16:11 755package com.lwz.udaf; import o ... -
hive自定义InputFormat
2014-07-25 09:13 862自定义分隔符 package com.lwz.inputf; ... -
HiveServer2连接ZooKeeper出现Too many connections问题的解决
2014-07-24 08:49 1768HiveServer2连接ZooKeeper出现Too man ... -
hive 常用命令
2014-07-17 22:22 6961.hive通过外部设置参数传入脚本中: hiv ... -
CouderaHadoop中hive的Hook扩展
2014-07-16 21:18 3338最近在做关于CDH4.3.0的hive封装,其中遇到了很多问题 ... -
利用SemanticAnalyzerHook回过滤不加分区条件的Hive查询
2014-07-16 16:43 1468我们Hadoop集群中将近百分之80的作业是通过Hive来提交 ... -
hive 的常用命令
2014-07-16 10:07 0设置、查看hive当前的角色: set sys ...
相关推荐
Hive on Spark 源码分析是指将 Hive 默认的执行引擎 MapReduce 换成 Spark 或者 Tez,以满足实际场景中的需求。本文将对 Hive on Spark 的源码进行深入分析,涵盖其基本原理、运行模式、Hive 解析 HQL、Spark 上下文...
至于Hive程序的入口,可以通过$HIVE_HOME/bin/hive脚本进入客户端执行HQL语句,或者使用$HIVE_HOME/bin/hive-e命令行方式直接执行HQL语句,还可以通过$HIVE_HOME/bin/hive-f指定执行一个包含HQL语句的文件。...
在源码层面,`org.apache.spark.sql.hive`包下包含了许多与Hive相关的类和接口,如`HiveExternalCatalog`用于与Hive Metastore交互,`HiveShim`处理Hive版本之间的差异,`HiveContext`是Spark与Hive交互的主要入口,...
接着上一篇来说执行入口的分析,CliDriver最终将用户指令command提交给了Driver的run方法(针对常用查询语句而言),在这里用户的command将会被编译,优化并生成MapReduce任务进行执行。所以Driver也是Hive的核心,...
2. **阅读源码**:从主入口点开始,逐步深入到各个组件的实现。理解RDD的生命周期、DAGScheduler如何工作、TaskScheduler如何将任务分配给Executor等关键概念。 3. **调试和实验**:在本地模式下运行Spark,设置...
源码根目录下的`pom.xml`文件是项目构建的入口,通过执行特定的Maven命令,你可以生成可执行的Dremio服务和客户端。 9. **社区与贡献** Dremio作为开源项目,其发展离不开社区的支持。源码中包含的`CONTRIBUTING....
通常在Java开发中,"main"目录用于存放应用的主入口点、配置文件和其他核心模块,这表明这个目录可能包含了整个后端项目的源代码结构。 **详细知识点** 1. **大数据基础**:BI项目通常涉及大量数据处理,因此理解...
`mvnw`和`mvnw.cmd`是Maven wrapper的可执行文件,允许开发者在没有全局安装Maven的情况下运行Maven命令。`module.iml`是IntelliJ IDEA项目模块文件,用于IDE识别和管理项目结构。 `README.md`和`HELP.md`通常包含...
- 读取数据:可以从多种数据源(如HDFS、Cassandra、Hive等)加载数据到DataFrame,例如`spark.read.csv()`。 - 写入数据:将处理后的结果保存回磁盘或数据仓库,如`df.write.parquet()`。 5. PySpark数据处理: ...
如果已存在Hive Metastore中的表,可以通过`SparkSession.sql()`执行SQL语句,将其结果转换为DataFrame。 ```python df = spark.sql("SELECT * FROM my_table") ``` 4. **DataFrame API操作**: DataFrame...
4. **Gateway**:作为对外的统一入口,处理用户请求,转发到对应的EngineConn执行。 5. **Common**:包含通用的工具类和配置,如日志、网络通信等。 通过阅读和分析源码,开发者不仅可以了解Linkis的工作原理,还...
这为用户提供了方便,特别是对于那些不熟悉源码编译或者没有编译环境的用户来说,这是一个非常实用的发布形式。 Yanagishima的特性包括: 1. **多查询引擎支持**:Yanagishima可以与多个查询引擎如Hive、Presto和...
标题中的"Spark"指的是Apache Spark,一个开源的大数据处理框架,它被设计用于高效地执行大规模数据处理任务。Spark提供了一种快速、通用且可扩展的数据处理方式,支持批处理、交互式查询、流处理和机器学习等多种...
Hadoop是Apache软件基金会开发...总之,“hadoop-3.1.1-src.tar.gz”是一个宝贵的资源,涵盖了Hadoop的核心技术、设计理念以及实现细节,对于开发者、研究者和数据工程师来说,是深入了解和掌握大数据处理的重要入口。
本章节可能涵盖了如何配置和创建SparkSession,这是PySpark中执行所有操作的基础入口点。 3. **DataFrame和Dataset**:DataFrame是Spark SQL的核心概念,它是分布式的、带列名的二维表,支持各种标准SQL操作。...
通过使用由Linkis提供的REST / WS / JDBC之类的标准接口,上层应用程序可以轻松访问诸如MySQL / Spark / Hive / Presto / Flink等底层引擎,并实现用户资源(如统一变量)的互通,脚本,UDF,函数和资源文件同时...
这个DEMO是Hadoop初学者或开发者学习平台的基础,它提供了快速理解Hadoop工作原理和实际操作的入口。 1. **Hadoop核心组件** Hadoop由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS...
1. **创建SparkContext**:这是Spark应用程序的入口点,负责与集群资源管理器(如YARN或Mesos)交互,启动工作进程。 2. **定义RDD**:从数据源创建RDD,可以是HDFS、HBase或其他存储系统。 3. **转换**:对RDD应用...
5. **SparkSession**:作为 Spark SQL 和 DataFrame 的入口点,简化了不同组件之间的交互。 在使用 PySpark 处理大数据时,通常会经历以下步骤: 1. **设置 SparkConf**:创建 SparkConf 对象来配置 Spark 应用...
内存管理方面,源码中定义了几个变量来跟踪系统的内存状态,如`total_mem`、`used_mem`和`max_used_mem`,它们分别表示总内存、当前使用内存和最大使用内存。`rt_memory_info`函数用于获取这些信息,这样可以实时...