- 浏览: 348417 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
CliDriver
初始化过程
CliDriver.main 是 Cli 的入口
(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val 用于增加或者覆盖hive/hadoop配置,设置到System的属性中。
(2) 配置log4j,加载hive-log4j.properties里的配置信息。
(3)创建一个HiveConf,设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。
(4) 创建一个CliSessionState(SessionState)
(5) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。
(6) –hiveconf var=val 设置的属性设置到HiveConf中。
(7) ShimLoader,load HadoopShims
(8) CliSessionState设置到SessionState中,创建一个hive_job_log_ xxx文件(用于记录Hive的一些操作信息)保存到SessionState的hiveHist 。
(9) 创建CliDriver.
(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就执行这两个文件中的命令。
(11) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。
(12)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。
读取用户输入hivesql,处理运行过程
CliDriver.processLine 去掉命令末尾的;,
CliDriver.processCmd
Split命令,分析第一个单词:
(1)如果是quit或者exit,不区分大小写,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
CommandProcessor
CommandProcessorFactory
(1)set SetProcessor,设置修改参数,设置到SessionState的HiveConf里。
(2)dfs DfsProcessor,使用hadoop的 FsShell运行hadoop的命令。
(3)add AddResourceProcessor 添加到SessionState的resource_map里,运行提交job的时候会写入 Hadoop的Distributed Cache。
(4)delete DeleteResourceProcessor从SessionState的resource_map里删除。
(5)其他 Driver
Driver
Driver.run(String command) // 处理一条命令
{
int ret = compile(command); // 分析命令,生成Task。
ret = execute(); // 运行Task。
}
(1)词法分析,生成AST树,ParseDriver完成。
(2)分析AST树,AST拆分成查询子块,信息记录在QB,这个QB在下面几个阶段都需要用到,SemanticAnalyzer.doPhase1完成。
(3)从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。
(4)生成逻辑执行计划,SemanticAnalyzer.genPlan完成。
(5)优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。
(6)生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。
(7)物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。
(8)执行生成的物理计划,获得结果。
(1)~(7)在Driver的compile中完成。
(8)在Driver的execute中完成,在执行阶段一个一个Task运行,不会改变物理计划。
整个Hive代码架构还不够清晰,传递的上下文信息比较臃肿,比较难理解。
Driver.compile
Driver.compile(String command) // 处理一条命令
{
(1) Context
ctx = new Context(conf); // private Context ctx; Driver的一个字段变量
(2) Parser(antlr):HiveQL->AbstractSyntaxTree(AST)
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
(3) SemanticAnalyzer
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
// 说明:如果有SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null)这个hook,那么会在sem.analyze(tree, ctx);前执行hook.preAnalyze(hookCtx, tree);在sem.analyze(tree, ctx);后执行hook.postAnalyze(hookCtx, sem.getRootTasks(), sem.getFetchTask()); 这里的hook有多个
(4) QueryPlan
plan = new QueryPlan(command, sem);
(5) Schema
schema = getSchema(sem, conf); // / get the output schema
}
Parser是:
使用antlr,语法规则是 Hive.g
ql/src/java目录下面的:org.apache.hadoop.hive.ql. ParseDriver
SemanticAnalyzerFactory/SemanticAnalyzer
多种SemanticAnalyzer:
(1)ExplainSemanticAnalyzer(会调用SemanticAnalyzer获得相应信息)
explain 某条HiveSQL时调用
EXPLAIN [EXTENDED] query
(2)LoadSemanticAnalyzer
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
(3)DDLSemanticAnalyzer
SHOW TABLES、DROP TABLE、DESC TABLE等时
(4)FunctionSemanticAnalyzer
CREATE/DROP FUNCTION
(5)SemanticAnalyzer
select 等
(6)其他SemanticAnalyzer,Hive-0.6、0.7只有上面5种,trunk里面针对新功能新特性添加了相应的SemanticAnalyzer
Driver.execute
Driver.execute() // 运行命令生成的Task(一个或多个)
{
(1) Get all the pre execution hooks and execute them.
(2) 把root Tasks 加到 runnable队列
(3) 运行该SQL产生的Task
while (running.size() != 0 || runnable.peek() != null) { //task running队列不为空,或者runnable不为空。
while (runnable.peek() != null && running.size() < maxthreads) {//runnable队列不为空
Task<? extends Serializable> tsk = runnable.remove();//删除runnable队列头的task
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); //运行Task,如果打开了并发提交会通过新的线程去运行Task,否则就是主线程运行Task,直到Task运行完毕,把Task对应的TaskResult和TaskRunner加入running队列
}
//从running队列中获取一个运行完的Task
TaskResult tskRes = pollTasks(running.keySet());
TaskRunner tskRun = running.remove(tskRes);
Task<? extends Serializable> tsk = tskRun.getTask();
int exitVal = tskRes.getExitVal(); //task完成的状态
if (exitVal != 0) { //Task失败
获得task的backupTask
有backup,把backup加入到runnable队列,没有就需要返回return 9;,表示HiveSQL运行失败。而不是System.exit(9);
}
// 把task的ChildTasks加入到runnable队列。
}
(4) Get all the post execution hooks and execute them.
}
Driver.launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt){
tsk.initialize(conf, plan, cxt); // Task初始化
TaskResult tskRes = new TaskResult(); // task信息:是否成功执行,是否运行
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { //并发提交打开并且这个是MR task,在另一个线程中执行。
// Launch it in the parallel mode, as a separate thread only for MR tasks
tskRun.start();
} else {
tskRun.runSequential(); // 主线程执行
}
running.put(tskRes, tskRun); // 放入running队列
}
Task:
(1) ConditionalTask
(2) CopyTask
(3) DDLTask
(4) ExecDriver
(5) MapRedTask
(6) ExplainTask
(7) FetchTask
(8) FunctionTask
(9) MapredLocalTask
(10) MoveTask
核心之一:SemanticAnalyzer
ql/src/java目录下面的: org.apache.hadoop.hive.ql.SemanticAnalyzer
SemanticAnalyzer. analyzeInternal(ASTNode ast)
{
// analyze create table command
if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { //带有create
isCreateTable = true;
// if it is not CTAS, we don't need to go further and just return
if ((child = analyzeCreateTable(ast, qb)) == null) { // create-table-as-select 返回查询子树
regular create-table or create-table-like statements 返回null
return;
}
}
doPhase1(child, qb, initPhase1Ctx());//分析AST树
getMetaData(qb); //从数据库中获得表的信息
Operator sinkOp = genPlan(qb);// AST-〉operator trees
Optimizer optm = new Optimizer();
pCtx = optm.optimize();// 优化 operator trees -〉operator trees
// At this point we have the complete operator tree
// from which we want to find the reduce operator
genMapRedTasks(qb); // operator trees-〉MapReduce Tasks
}
Hive原理分析:
(1) 从HQL语句到AST的转化过程是很机械,使用ANTLR,根据Hive.g的语法分析规则,生成AST。
(2) 从AST转化到QB,再到DAG图不是那么很容易明白,所以需要理清楚一下。
AST-〉QB就是把AST里面的一些信息和子查询分析出来,如所有涉及的表和表的别名(如果这条HQL查询语句中没有为表取别名,那么取别名为表名)的对应关系保存到QB的aliasToTabs,目标表(目标表即输出的table)的子AST。where子句,select子句,join子句,等一些子查询的AST分析出来,保存起来。
QBMetaData是查询相关的元数据信息,如所有源表(源表即从哪些表取得输入数据)到该表的Table关联。表的Table用来记录Table有哪些字段,各个字段的类型,表的分隔符等等信息。目标表名(存放输出结果的表)到表的Table的关联,目标表可以有多个,因为输出可能是写入多个表。
QB-〉DAG图的转化过程。
从QB生成operator,从生成的QB中的子查询生成Operator并保存记录它们之间的父子关系,还可能插入一些operator,这些operator是一些必要的辅助功能。
后面需要对这个DAG图,即operator图进行拆分,生成一些mapreduce作业(job),如有一个map阶段可能有多个operator,完成这些operator的功能,如某个Job的map执行多个operator,TableScanOperator是第一个operator,从读取一个表的数据开始(一条一条记录,record),在接着可能就是跟据where生成的operator(FilterOperator),过滤哪些不符合规则的记录(record,key/value),在接着是执行根据select生成的Select Operator(该operator选择仅需要的字段,过滤无关的字段,从而减少中间数据),最后是一个Reduce Output Operator,该operator完成map的输出,生成中间key和value。
作业的reduce也是可以执行多个operator的。
从QB生成的Operator里面有父子关系,生成mapreduce时,会对这个具有父子关系的operator图进行切分,生成一个个阶段,有些阶段是mapreduce作业,这些作业执行多个operator的功能。
ReduceSinkOperator是map的最后一个Operator,因为该operator需要生成一个map的输出,即输出key和输出value。
生成Operator树的过程:SemanticAnalyzer.genPlan(QB qb)
(1) 子查询必须有一个别名即alias,遍历所有的子查询,出现多个子查询在Join时出现,join两边的表都是来自子查询。
(2) 遍历所有的源表,出现多个在join时出现。
(3)处理join,在on条件中的过滤条件会推到join前即ReduceSinkOperator前,如果是一个join,那么先生成两个ReduceSinkOperator,然后再生成JoinOperator,join这两个表。
下面的是在SemanticAnalyzer.genBodyPlan(QB qb, Operator input) 里面完成。
(4) optimizeMultiGroupBy
(4.1)optimizeMultiGroupBy可以优化时走的路径跟下面的不相同。
(4.2) 对每个select进行处理,多个select出现在Multi-Group-By Inserts、Multi Table/File Inserts、Dynamic-partition Insert等情况下。multi_insert.q
SemanticAnalyzer.doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) {
switch (ast.getToken().getType()) {
case HiveParser.TOK_SELECTDI:
case HiveParser.TOK_SELECT:
(1) 在QBParseInfo里保存select查询子节点,Map<String, ASTNode> destToSelExpr;
(2)有hint,在QBParseInfo保存hints子节点,ASTNode hints;
(3)处理ast子树的聚合函数,HiveParser.TOK_FUNCTION、TOK_FUNCTIONDI、TOK_FUNCTIONSTAR,
(4)处理select中column别名Map<ASTNode, String> exprToColumnAlias;
(5)保存聚合函数,QBParseInfo的LinkedHashMap<String, LinkedHashMap<String, ASTNode>> destToAggregationExprs;
(6)TOK_FUNCTIONDI,抽取保存distinct聚合函数,HashMap<String, List<ASTNode>> destToDistinctFuncExprs;
case HiveParser.TOK_WHERE:
在QBParseInfo里保存where查询子节点,HashMap<String, ASTNode> destToWhereExpr;
case HiveParser.TOK_DESTINATION:
在QBParseInfo里面保存目标地址子节点信息,HashMap<String, ASTNode> nameToDest;
case HiveParser.TOK_FROM:
只有一个子节点,有四种子节点
(1) 一种是表,数据来源于一个表。processTable,处理别名,没有别名表名就是别名。
(2)一种是子查询,数据来源于子查询,processSubQuery,子查询必须要有个别名,子查询可能是单独的一个query或者是两个query的union。子查询也是递归调用doPhase1来完成相关分析。
(3)一种是视图,数据来源于一个视图,processLateralView
(4)一种是Join,数据来源于几个表的join,processJoin,join子节点的孩子节点是两个或者三个,孩子节点可以是表、子查询、join子节点,保存join子查询ASTNode joinExpr;
case HiveParser.TOK_CLUSTERBY:
在QBParseInfo里保存cluster by查询子节点,HashMap<String, ASTNode> destToClusterby;
case HiveParser.TOK_DISTRIBUTEBY:
在QBParseInfo里保存distribute by查询子节点,有distribute by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToDistributeby;
case HiveParser.TOK_SORTBY:
在QBParseInfo里保存sort by查询子节点,有sort by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToSortby;
case HiveParser.TOK_ORDERBY:
在QBParseInfo里保存order by查询子节点,有order by的时候不能有cluster by,HashMap<String, ASTNode> destToOrderby;
case HiveParser.TOK_GROUPBY:
在QBParseInfo里保存group by查询子节点,HashMap<String, ASTNode> destToGroupby;
case HiveParser.TOK_LIMIT:
在QBParseInfo里保存limit查询子节点,HashMap<String, Integer> destToLimit;
case HiveParser.TOK_UNION:
}
if (!skipRecursion) {
// Iterate over the rest of the children
int child_count = ast.getChildCount();
for (int child_pos = 0; child_pos < child_count; ++child_pos) {
// Recurse
doPhase1((ASTNode) ast.getChild(child_pos), qb, ctx_1); //递归处理各个孩子节点
}
}
}
SemanticAnalyzer.getMetaData(QB qb) {
(1)从数据库中获取表的信息,这些表是记录在QB的HashMap<String, String> aliasToTabs;中
表的别名和对应的org.apache.hadoop.hive.ql.metadata.Table保存记录在QB的QBMetaData qbm;的HashMap<String, Table> aliasToTable;中。
(2)如果有子查询,递归调用getMetaData(QB qb)从数据库获取表的信息
(3)获取目的表的信息
目的子节点存储在QBParseInfo的HashMap<String, ASTNode> nameToDest;
目的节点有2种:(3.1)目的是表,表分分区表和非分区表(3.2)目的是本地目录或者hdfs目录,获得设置一个中间临时目录
}
SemanticAnalyzer.genPlan(QB qb){
(1)处理子查询,生成子查询的operator tree
(2)遍历source tables,记录保存在QB的HashMap<String, String> aliasToTabs;里面,对每个源表生成一个TableScanOperator,保存到SemanticAnalyzer的HashMap<TableScanOperator, Table> topToTable;里
(3)处理视图
(4)处理join
(5)genBodyPlan,生成剩下的operator tree.
}
SemanticAnalyzer.genBodyPlan(QB qb, Operator input) {
(1)multi-group by优化
(2)遍历所有的destination tables,保存记录在QBParseInfo的Map<String, ASTNode> destToSelExpr;里,从select获得。
(2.1)有where语句生成FilterOperator,从QBParseInfo.destToWhereExpr里查询
(2.2)有group by或者聚合函数,根据相关配置生成相应operator tree.
(2.3)生成SelectOperator,选取相应字段,来自select语句
(2.4)有cluster by 或者distribute by或者order by或者sort by生成相应的ReduceSinkOperator和ExtractOperator,如果是order by设置reduce数为1
(2.5)分两种情况,qbp是子查询与qbp不是子查询
(2.5.1)是子查询
(2.5.2)不是子查询
有limit,生成相应的LimitOperator,这里需要分情况,是否需要两个MR
如果需要进行类型转换则生成相应的SelectOperator,生成FileSinkOperator
}
Optimizer.optimize() {
}
SemanticAnalyzer.genMapRedTasks(QB qb) {
}
核心之二:MapRedTask
TaskRunner:
public void runSequential() {
int exitVal = -101;
try {
exitVal = tsk.executeTask(); //运行Task.executeTask()
} catch (Throwable t) {
t.printStackTrace();
}
result.setExitVal(exitVal);
}
Task:
public int executeTask() {
int retval = execute(driverContext); //各个子类实现该方法
}
protected abstract int execute(DriverContext driverContext);
这里介绍MapRedTask这个Task.
MapRedTask:
public int execute(DriverContext driverContext) {
(1) setNumberOfReducers(); // estimate number of reducers 推测reduce个数
(2) if (!ctx.isLocalOnlyExecutionMode() &&
conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { //HiveConf.ConfVars.HADOOPJT不是local,并且LOCALMODEAUTO("hive.exec.mode.local.auto", true)打开
//hive.exec.mode.local.auto用于小job自动转换为本地运行,should hive determine whether to run in local mode automatically
判断job能否本地运行,目前的判断条件是:(一)输入数据小于等于128M (二)map数小于等于4 (三) reduce数小于等于1,这3个条件都满足,该任务就在本地运行。
}
(3)计算得到 runningViaChild
runningViaChild =
"local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) ||
conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
//如果是本地运行或者通过子进程提交作业,runningViaChild为true
(3.1) 如果runningViaChild为false,super.execute(driverContext); ExecDriver.execute完成task。
(3.2) 如果runningViaChild为true,通过子进程完成
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
子进程的入口main函数是ExecDriver.main()
}
核心之三:ExecDriver
/home/tianzhao/apache/hive-0.6.0/build/hadoopcore/hadoop-0.19.1/bin/hadoop jar /home/tianzhao/apache/hive-0.6.0/build/ql/hive-exec-0.6.0.jar org.apache.hadoop.hive.ql.exec.ExecDriver -plan /tmp/hive-tianzhao/hive_2011-05-31_09-30-02_222_4000721282829102058/plan5577504731701425227.xml -jobconf datanucleus.connectionPoolingType=DBCP
使用hadoop jar hive-exec-0.6.0.jar org.apache.hadoop.hive.ql.exec.ExecDriver提交job给hadoop,作业的信息诸如operator等信息序列化到了 -plan plan5577504731701425227.xml里面。
ExecMapper、ExecReducer在configure(JobConf job)运行的时候会反序列化出来。
hive提交给hadoop的MapReduce作业,map阶段运行ExecMapper,reduce阶段运行ExecReducer。
add jar/add file/add archive,这些archive、jar和文件会写入 Distributed Cache里面。在MapTask和ReduceTask运行的时候读取调用。 写入Distributed Cache参考ExecDriver。
ExecDriver使用JobClient提交Job后,定期查看Job的进展情况,Job完成后,调用operator的jobClose()方法。
hive.exec.plan
org.apache.hadoop.hive.ql.exec.Utilities.setMapRedWork() 会设置plan的ID
ExecDriver.execute(DriverContext driverContext) {
(1) 创建ScratchDir目录
(2) 设置mapper类,reducer类等等
job.setMapperClass(ExecMapper.class);
(3) 如果有MapredLocalWork,并且不是localMode,那么上传文件到HDFS,该文件加入DistributedCache。场景用于auto map join,auto map join会产生两个Task:MapredLocalTask+MapRedTask。 MapredLocalTask将小表的数据从hdfs fetch下来,put到一个HashTable,写入到本地的一个文件中。在MapRedTask中把本地的这个文件写入hdfs,add到DistributedCache,就是当前的部分。这里的小表写入HashTable合并相同的key,只需要在client端做一次,在map端只需要读取使用即可。加入DistributedCache是为了一个TaskTracker多次运行MapTask使用到一个文件时不需要多次下载,只需一次下载即可。
(4)MapredWork写入hdfs,副本数设置为10,加入DistributedCache
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
(5)创建JobClient
JobClient jc = new JobClient(job);
(6)运行PrejobHooks
runPreJobHooks(); // Call the Pre-job hooks' list
(7)提交job
orig_rj = rj = jc.submitJob(job);
(8)定时检测job时候运行完成
private void progress(ExecDriverTaskHandle th) throws IOException {
while (!rj.isComplete()) {
Thread.sleep(pullInterval); // pullInterval默认是1000L,HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L),通过hive.exec.counters.pull.interval可以设置
updateCounters(th); //获取更新进度信息
String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress
+ "%"; // 打印这行进度信息
}
// while循环外,job已经结束
runPostJobHooks(rj); //运行PostJobHooks
}
(9)清理操作
(10)运行Operator的jobClose方法
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
}
work.getReducer().jobClose(job, success, feedBack);
(11)return (returnVal); 返回
}
ExecDriver.main( )
两个地方调用
(1)MapRedTask :
在本地执行或者通过子进程提交两种方式下会调用。
ExecDriver.main(String[] args) {
} else {
MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
}
(2)MapredLocalTask :
启动子进程执行
ExecDriver.main(String[] args) {
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext()); // 从hdfs上面获取小表数据,写到HashTable中,然后dump到本地的一个文件。
}
}
......
初始化过程
CliDriver.main 是 Cli 的入口
(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val 用于增加或者覆盖hive/hadoop配置,设置到System的属性中。
(2) 配置log4j,加载hive-log4j.properties里的配置信息。
(3)创建一个HiveConf,设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。
(4) 创建一个CliSessionState(SessionState)
(5) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。
(6) –hiveconf var=val 设置的属性设置到HiveConf中。
(7) ShimLoader,load HadoopShims
(8) CliSessionState设置到SessionState中,创建一个hive_job_log_ xxx文件(用于记录Hive的一些操作信息)保存到SessionState的hiveHist 。
(9) 创建CliDriver.
(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就执行这两个文件中的命令。
(11) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。
(12)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。
读取用户输入hivesql,处理运行过程
CliDriver.processLine 去掉命令末尾的;,
CliDriver.processCmd
Split命令,分析第一个单词:
(1)如果是quit或者exit,不区分大小写,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
CommandProcessor
CommandProcessorFactory
(1)set SetProcessor,设置修改参数,设置到SessionState的HiveConf里。
(2)dfs DfsProcessor,使用hadoop的 FsShell运行hadoop的命令。
(3)add AddResourceProcessor 添加到SessionState的resource_map里,运行提交job的时候会写入 Hadoop的Distributed Cache。
(4)delete DeleteResourceProcessor从SessionState的resource_map里删除。
(5)其他 Driver
Driver
Driver.run(String command) // 处理一条命令
{
int ret = compile(command); // 分析命令,生成Task。
ret = execute(); // 运行Task。
}
(1)词法分析,生成AST树,ParseDriver完成。
(2)分析AST树,AST拆分成查询子块,信息记录在QB,这个QB在下面几个阶段都需要用到,SemanticAnalyzer.doPhase1完成。
(3)从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。
(4)生成逻辑执行计划,SemanticAnalyzer.genPlan完成。
(5)优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。
(6)生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。
(7)物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。
(8)执行生成的物理计划,获得结果。
(1)~(7)在Driver的compile中完成。
(8)在Driver的execute中完成,在执行阶段一个一个Task运行,不会改变物理计划。
整个Hive代码架构还不够清晰,传递的上下文信息比较臃肿,比较难理解。
Driver.compile
Driver.compile(String command) // 处理一条命令
{
(1) Context
ctx = new Context(conf); // private Context ctx; Driver的一个字段变量
(2) Parser(antlr):HiveQL->AbstractSyntaxTree(AST)
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
(3) SemanticAnalyzer
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
// 说明:如果有SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null)这个hook,那么会在sem.analyze(tree, ctx);前执行hook.preAnalyze(hookCtx, tree);在sem.analyze(tree, ctx);后执行hook.postAnalyze(hookCtx, sem.getRootTasks(), sem.getFetchTask()); 这里的hook有多个
(4) QueryPlan
plan = new QueryPlan(command, sem);
(5) Schema
schema = getSchema(sem, conf); // / get the output schema
}
Parser是:
使用antlr,语法规则是 Hive.g
ql/src/java目录下面的:org.apache.hadoop.hive.ql. ParseDriver
SemanticAnalyzerFactory/SemanticAnalyzer
多种SemanticAnalyzer:
(1)ExplainSemanticAnalyzer(会调用SemanticAnalyzer获得相应信息)
explain 某条HiveSQL时调用
EXPLAIN [EXTENDED] query
(2)LoadSemanticAnalyzer
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
(3)DDLSemanticAnalyzer
SHOW TABLES、DROP TABLE、DESC TABLE等时
(4)FunctionSemanticAnalyzer
CREATE/DROP FUNCTION
(5)SemanticAnalyzer
select 等
(6)其他SemanticAnalyzer,Hive-0.6、0.7只有上面5种,trunk里面针对新功能新特性添加了相应的SemanticAnalyzer
Driver.execute
Driver.execute() // 运行命令生成的Task(一个或多个)
{
(1) Get all the pre execution hooks and execute them.
(2) 把root Tasks 加到 runnable队列
(3) 运行该SQL产生的Task
while (running.size() != 0 || runnable.peek() != null) { //task running队列不为空,或者runnable不为空。
while (runnable.peek() != null && running.size() < maxthreads) {//runnable队列不为空
Task<? extends Serializable> tsk = runnable.remove();//删除runnable队列头的task
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); //运行Task,如果打开了并发提交会通过新的线程去运行Task,否则就是主线程运行Task,直到Task运行完毕,把Task对应的TaskResult和TaskRunner加入running队列
}
//从running队列中获取一个运行完的Task
TaskResult tskRes = pollTasks(running.keySet());
TaskRunner tskRun = running.remove(tskRes);
Task<? extends Serializable> tsk = tskRun.getTask();
int exitVal = tskRes.getExitVal(); //task完成的状态
if (exitVal != 0) { //Task失败
获得task的backupTask
有backup,把backup加入到runnable队列,没有就需要返回return 9;,表示HiveSQL运行失败。而不是System.exit(9);
}
// 把task的ChildTasks加入到runnable队列。
}
(4) Get all the post execution hooks and execute them.
}
Driver.launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,
Map<TaskResult, TaskRunner> running, String jobname, int jobs, DriverContext cxt){
tsk.initialize(conf, plan, cxt); // Task初始化
TaskResult tskRes = new TaskResult(); // task信息:是否成功执行,是否运行
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { //并发提交打开并且这个是MR task,在另一个线程中执行。
// Launch it in the parallel mode, as a separate thread only for MR tasks
tskRun.start();
} else {
tskRun.runSequential(); // 主线程执行
}
running.put(tskRes, tskRun); // 放入running队列
}
Task:
(1) ConditionalTask
(2) CopyTask
(3) DDLTask
(4) ExecDriver
(5) MapRedTask
(6) ExplainTask
(7) FetchTask
(8) FunctionTask
(9) MapredLocalTask
(10) MoveTask
核心之一:SemanticAnalyzer
ql/src/java目录下面的: org.apache.hadoop.hive.ql.SemanticAnalyzer
SemanticAnalyzer. analyzeInternal(ASTNode ast)
{
// analyze create table command
if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { //带有create
isCreateTable = true;
// if it is not CTAS, we don't need to go further and just return
if ((child = analyzeCreateTable(ast, qb)) == null) { // create-table-as-select 返回查询子树
regular create-table or create-table-like statements 返回null
return;
}
}
doPhase1(child, qb, initPhase1Ctx());//分析AST树
getMetaData(qb); //从数据库中获得表的信息
Operator sinkOp = genPlan(qb);// AST-〉operator trees
Optimizer optm = new Optimizer();
pCtx = optm.optimize();// 优化 operator trees -〉operator trees
// At this point we have the complete operator tree
// from which we want to find the reduce operator
genMapRedTasks(qb); // operator trees-〉MapReduce Tasks
}
Hive原理分析:
(1) 从HQL语句到AST的转化过程是很机械,使用ANTLR,根据Hive.g的语法分析规则,生成AST。
(2) 从AST转化到QB,再到DAG图不是那么很容易明白,所以需要理清楚一下。
AST-〉QB就是把AST里面的一些信息和子查询分析出来,如所有涉及的表和表的别名(如果这条HQL查询语句中没有为表取别名,那么取别名为表名)的对应关系保存到QB的aliasToTabs,目标表(目标表即输出的table)的子AST。where子句,select子句,join子句,等一些子查询的AST分析出来,保存起来。
QBMetaData是查询相关的元数据信息,如所有源表(源表即从哪些表取得输入数据)到该表的Table关联。表的Table用来记录Table有哪些字段,各个字段的类型,表的分隔符等等信息。目标表名(存放输出结果的表)到表的Table的关联,目标表可以有多个,因为输出可能是写入多个表。
QB-〉DAG图的转化过程。
从QB生成operator,从生成的QB中的子查询生成Operator并保存记录它们之间的父子关系,还可能插入一些operator,这些operator是一些必要的辅助功能。
后面需要对这个DAG图,即operator图进行拆分,生成一些mapreduce作业(job),如有一个map阶段可能有多个operator,完成这些operator的功能,如某个Job的map执行多个operator,TableScanOperator是第一个operator,从读取一个表的数据开始(一条一条记录,record),在接着可能就是跟据where生成的operator(FilterOperator),过滤哪些不符合规则的记录(record,key/value),在接着是执行根据select生成的Select Operator(该operator选择仅需要的字段,过滤无关的字段,从而减少中间数据),最后是一个Reduce Output Operator,该operator完成map的输出,生成中间key和value。
作业的reduce也是可以执行多个operator的。
从QB生成的Operator里面有父子关系,生成mapreduce时,会对这个具有父子关系的operator图进行切分,生成一个个阶段,有些阶段是mapreduce作业,这些作业执行多个operator的功能。
ReduceSinkOperator是map的最后一个Operator,因为该operator需要生成一个map的输出,即输出key和输出value。
生成Operator树的过程:SemanticAnalyzer.genPlan(QB qb)
(1) 子查询必须有一个别名即alias,遍历所有的子查询,出现多个子查询在Join时出现,join两边的表都是来自子查询。
(2) 遍历所有的源表,出现多个在join时出现。
(3)处理join,在on条件中的过滤条件会推到join前即ReduceSinkOperator前,如果是一个join,那么先生成两个ReduceSinkOperator,然后再生成JoinOperator,join这两个表。
下面的是在SemanticAnalyzer.genBodyPlan(QB qb, Operator input) 里面完成。
(4) optimizeMultiGroupBy
(4.1)optimizeMultiGroupBy可以优化时走的路径跟下面的不相同。
(4.2) 对每个select进行处理,多个select出现在Multi-Group-By Inserts、Multi Table/File Inserts、Dynamic-partition Insert等情况下。multi_insert.q
SemanticAnalyzer.doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) {
switch (ast.getToken().getType()) {
case HiveParser.TOK_SELECTDI:
case HiveParser.TOK_SELECT:
(1) 在QBParseInfo里保存select查询子节点,Map<String, ASTNode> destToSelExpr;
(2)有hint,在QBParseInfo保存hints子节点,ASTNode hints;
(3)处理ast子树的聚合函数,HiveParser.TOK_FUNCTION、TOK_FUNCTIONDI、TOK_FUNCTIONSTAR,
(4)处理select中column别名Map<ASTNode, String> exprToColumnAlias;
(5)保存聚合函数,QBParseInfo的LinkedHashMap<String, LinkedHashMap<String, ASTNode>> destToAggregationExprs;
(6)TOK_FUNCTIONDI,抽取保存distinct聚合函数,HashMap<String, List<ASTNode>> destToDistinctFuncExprs;
case HiveParser.TOK_WHERE:
在QBParseInfo里保存where查询子节点,HashMap<String, ASTNode> destToWhereExpr;
case HiveParser.TOK_DESTINATION:
在QBParseInfo里面保存目标地址子节点信息,HashMap<String, ASTNode> nameToDest;
case HiveParser.TOK_FROM:
只有一个子节点,有四种子节点
(1) 一种是表,数据来源于一个表。processTable,处理别名,没有别名表名就是别名。
(2)一种是子查询,数据来源于子查询,processSubQuery,子查询必须要有个别名,子查询可能是单独的一个query或者是两个query的union。子查询也是递归调用doPhase1来完成相关分析。
(3)一种是视图,数据来源于一个视图,processLateralView
(4)一种是Join,数据来源于几个表的join,processJoin,join子节点的孩子节点是两个或者三个,孩子节点可以是表、子查询、join子节点,保存join子查询ASTNode joinExpr;
case HiveParser.TOK_CLUSTERBY:
在QBParseInfo里保存cluster by查询子节点,HashMap<String, ASTNode> destToClusterby;
case HiveParser.TOK_DISTRIBUTEBY:
在QBParseInfo里保存distribute by查询子节点,有distribute by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToDistributeby;
case HiveParser.TOK_SORTBY:
在QBParseInfo里保存sort by查询子节点,有sort by的时候不能有cluster by和order by,HashMap<String, ASTNode> destToSortby;
case HiveParser.TOK_ORDERBY:
在QBParseInfo里保存order by查询子节点,有order by的时候不能有cluster by,HashMap<String, ASTNode> destToOrderby;
case HiveParser.TOK_GROUPBY:
在QBParseInfo里保存group by查询子节点,HashMap<String, ASTNode> destToGroupby;
case HiveParser.TOK_LIMIT:
在QBParseInfo里保存limit查询子节点,HashMap<String, Integer> destToLimit;
case HiveParser.TOK_UNION:
}
if (!skipRecursion) {
// Iterate over the rest of the children
int child_count = ast.getChildCount();
for (int child_pos = 0; child_pos < child_count; ++child_pos) {
// Recurse
doPhase1((ASTNode) ast.getChild(child_pos), qb, ctx_1); //递归处理各个孩子节点
}
}
}
SemanticAnalyzer.getMetaData(QB qb) {
(1)从数据库中获取表的信息,这些表是记录在QB的HashMap<String, String> aliasToTabs;中
表的别名和对应的org.apache.hadoop.hive.ql.metadata.Table保存记录在QB的QBMetaData qbm;的HashMap<String, Table> aliasToTable;中。
(2)如果有子查询,递归调用getMetaData(QB qb)从数据库获取表的信息
(3)获取目的表的信息
目的子节点存储在QBParseInfo的HashMap<String, ASTNode> nameToDest;
目的节点有2种:(3.1)目的是表,表分分区表和非分区表(3.2)目的是本地目录或者hdfs目录,获得设置一个中间临时目录
}
SemanticAnalyzer.genPlan(QB qb){
(1)处理子查询,生成子查询的operator tree
(2)遍历source tables,记录保存在QB的HashMap<String, String> aliasToTabs;里面,对每个源表生成一个TableScanOperator,保存到SemanticAnalyzer的HashMap<TableScanOperator, Table> topToTable;里
(3)处理视图
(4)处理join
(5)genBodyPlan,生成剩下的operator tree.
}
SemanticAnalyzer.genBodyPlan(QB qb, Operator input) {
(1)multi-group by优化
(2)遍历所有的destination tables,保存记录在QBParseInfo的Map<String, ASTNode> destToSelExpr;里,从select获得。
(2.1)有where语句生成FilterOperator,从QBParseInfo.destToWhereExpr里查询
(2.2)有group by或者聚合函数,根据相关配置生成相应operator tree.
(2.3)生成SelectOperator,选取相应字段,来自select语句
(2.4)有cluster by 或者distribute by或者order by或者sort by生成相应的ReduceSinkOperator和ExtractOperator,如果是order by设置reduce数为1
(2.5)分两种情况,qbp是子查询与qbp不是子查询
(2.5.1)是子查询
(2.5.2)不是子查询
有limit,生成相应的LimitOperator,这里需要分情况,是否需要两个MR
如果需要进行类型转换则生成相应的SelectOperator,生成FileSinkOperator
}
Optimizer.optimize() {
}
SemanticAnalyzer.genMapRedTasks(QB qb) {
}
核心之二:MapRedTask
TaskRunner:
public void runSequential() {
int exitVal = -101;
try {
exitVal = tsk.executeTask(); //运行Task.executeTask()
} catch (Throwable t) {
t.printStackTrace();
}
result.setExitVal(exitVal);
}
Task:
public int executeTask() {
int retval = execute(driverContext); //各个子类实现该方法
}
protected abstract int execute(DriverContext driverContext);
这里介绍MapRedTask这个Task.
MapRedTask:
public int execute(DriverContext driverContext) {
(1) setNumberOfReducers(); // estimate number of reducers 推测reduce个数
(2) if (!ctx.isLocalOnlyExecutionMode() &&
conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { //HiveConf.ConfVars.HADOOPJT不是local,并且LOCALMODEAUTO("hive.exec.mode.local.auto", true)打开
//hive.exec.mode.local.auto用于小job自动转换为本地运行,should hive determine whether to run in local mode automatically
判断job能否本地运行,目前的判断条件是:(一)输入数据小于等于128M (二)map数小于等于4 (三) reduce数小于等于1,这3个条件都满足,该任务就在本地运行。
}
(3)计算得到 runningViaChild
runningViaChild =
"local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) ||
conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
//如果是本地运行或者通过子进程提交作业,runningViaChild为true
(3.1) 如果runningViaChild为false,super.execute(driverContext); ExecDriver.execute完成task。
(3.2) 如果runningViaChild为true,通过子进程完成
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
子进程的入口main函数是ExecDriver.main()
}
核心之三:ExecDriver
/home/tianzhao/apache/hive-0.6.0/build/hadoopcore/hadoop-0.19.1/bin/hadoop jar /home/tianzhao/apache/hive-0.6.0/build/ql/hive-exec-0.6.0.jar org.apache.hadoop.hive.ql.exec.ExecDriver -plan /tmp/hive-tianzhao/hive_2011-05-31_09-30-02_222_4000721282829102058/plan5577504731701425227.xml -jobconf datanucleus.connectionPoolingType=DBCP
使用hadoop jar hive-exec-0.6.0.jar org.apache.hadoop.hive.ql.exec.ExecDriver提交job给hadoop,作业的信息诸如operator等信息序列化到了 -plan plan5577504731701425227.xml里面。
ExecMapper、ExecReducer在configure(JobConf job)运行的时候会反序列化出来。
hive提交给hadoop的MapReduce作业,map阶段运行ExecMapper,reduce阶段运行ExecReducer。
add jar/add file/add archive,这些archive、jar和文件会写入 Distributed Cache里面。在MapTask和ReduceTask运行的时候读取调用。 写入Distributed Cache参考ExecDriver。
ExecDriver使用JobClient提交Job后,定期查看Job的进展情况,Job完成后,调用operator的jobClose()方法。
hive.exec.plan
org.apache.hadoop.hive.ql.exec.Utilities.setMapRedWork() 会设置plan的ID
ExecDriver.execute(DriverContext driverContext) {
(1) 创建ScratchDir目录
(2) 设置mapper类,reducer类等等
job.setMapperClass(ExecMapper.class);
(3) 如果有MapredLocalWork,并且不是localMode,那么上传文件到HDFS,该文件加入DistributedCache。场景用于auto map join,auto map join会产生两个Task:MapredLocalTask+MapRedTask。 MapredLocalTask将小表的数据从hdfs fetch下来,put到一个HashTable,写入到本地的一个文件中。在MapRedTask中把本地的这个文件写入hdfs,add到DistributedCache,就是当前的部分。这里的小表写入HashTable合并相同的key,只需要在client端做一次,在map端只需要读取使用即可。加入DistributedCache是为了一个TaskTracker多次运行MapTask使用到一个文件时不需要多次下载,只需一次下载即可。
(4)MapredWork写入hdfs,副本数设置为10,加入DistributedCache
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
(5)创建JobClient
JobClient jc = new JobClient(job);
(6)运行PrejobHooks
runPreJobHooks(); // Call the Pre-job hooks' list
(7)提交job
orig_rj = rj = jc.submitJob(job);
(8)定时检测job时候运行完成
private void progress(ExecDriverTaskHandle th) throws IOException {
while (!rj.isComplete()) {
Thread.sleep(pullInterval); // pullInterval默认是1000L,HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L),通过hive.exec.counters.pull.interval可以设置
updateCounters(th); //获取更新进度信息
String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress
+ "%"; // 打印这行进度信息
}
// while循环外,job已经结束
runPostJobHooks(rj); //运行PostJobHooks
}
(9)清理操作
(10)运行Operator的jobClose方法
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
}
work.getReducer().jobClose(job, success, feedBack);
(11)return (returnVal); 返回
}
ExecDriver.main( )
两个地方调用
(1)MapRedTask :
在本地执行或者通过子进程提交两种方式下会调用。
ExecDriver.main(String[] args) {
} else {
MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
}
(2)MapredLocalTask :
启动子进程执行
ExecDriver.main(String[] args) {
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext()); // 从hdfs上面获取小表数据,写到HashTable中,然后dump到本地的一个文件。
}
}
......
发表评论
-
hive rename table name
2013-09-18 14:28 2601hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2481有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11216like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8744insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8282原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1211select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1552官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1821Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5455drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1372数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6245CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3040select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
4. **ExecDriver.execute()**:在非单独JVM执行的情况下,执行流程进入ExecDriver。这个类负责执行Task,包括设置MapReduce作业的参数、提交作业到Hadoop集群以及等待作业完成。 - **准备工作**:ExecDriver会进行...
Apache Hive 是一个基于 Hadoop 的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运行。其优点是学习成本低,可以通过类 SQL 语句...
接着,通过 bin 目录下的 `schematool` 工具初始化元数据,然后就可以使用 `hive` 命令启动 Hive CLI 或者 `beeline` 连接 JDBC 服务执行 SQL 查询了。 Hive 支持多种数据处理操作,如创建表、加载数据、查询、聚合...
1. **设置Hive环境**: 安装Hive、配置Hive-site.xml、初始化元数据存储。 2. **数据加载与导出**: 使用LOAD DATA命令加载数据,使用EXPORT/IMPORT操作迁移数据。 3. **性能优化**: 通过分区、桶、压缩和选择合适的...
2. **初始化日志**:通过`SessionState.initHiveLog4j()`初始化日志相关的类,确保在Hive其他核心类初始化前完成。 3. **创建SessionState**:为当前会话创建一个`SessionState`实例,并传入一个`HiveConf`对象。 4....
2. **减少 Job 数**:尽量合并多个小任务为一个大任务,减少 Job 的初始化时间,提高整体效率。 3. **设置合理的 MapReduce Task 数**:根据集群资源和数据量调整 Map 和 Reduce 的数量,避免过多的小任务导致的...
- **初始化Zookeeper ID**:在三台服务器上创建`myid`文件,并在文件中写入ID(分别是1,2,3)。 - **配置zoo.cfg**:在三台Zookeeper服务器上配置`/etc/zookeeper/conf/zoo.cfg`文件,设置集群的server参数。 - **...
7. **scripts** - 可能包含初始化脚本或其他管理脚本,用于设置和管理Hive环境。 8. **metastore** - 有关元数据存储的信息,如数据库连接和表定义。 9. **webapps** - 如果Hive提供了Web界面,这部分可能包含相关的...
4. 初始化Metastore:运行Hive的schematool工具,初始化元数据存储。 5. 启动Hive:使用hive命令启动Hive CLI,或者通过HiveServer2启动服务供其他客户端连接。 总结,Apache Hive 0.13.0 是一个强大且功能丰富的...
4. 初始化元数据:执行 `schematool -initSchema -dbType derby` 或对应数据库的命令,初始化 Hive 元数据。 四、启动 Hive 1. 启动 Hadoop:确保 HDFS 和 YARN 正在运行。 2. 启动 Hive:使用 `hive` 命令启动 ...
1. **Hadoop的启动成本**:Hadoop的MapReduce框架在启动时存在较高的初始化成本,因此减少作业数量并增大单次任务的数据处理量是提高效率的关键。 2. **分区与排序的重要性**:Hadoop的核心能力在于并行处理大量数据...
接下来,需要对MySQL进行初始化设置。这包括设置root账号密码、配置MySQL服务开机自启动等。设置root账号密码的命令如下: ```bash sudo mysql_secure_installation ``` 此命令会提示您设置root密码,并根据提示...
2. **对jobs数量较多的任务运行效率较低**:当一个任务被分解成多个较小的任务(jobs)时,每一个小任务都需要经过初始化阶段,这会消耗额外的时间,导致整体效率下降。 3. **对`SUM`、`COUNT`等聚合操作较为友好**...
但需要注意,如果Map任务运行时间已经很短,增加Map数量可能会导致初始化开销增加,总体性能反而下降。 接着是Reduce阶段的优化。在Hive中,Reduce任务主要负责聚合和写入数据。与Map不同,Reduce任务的个数可以...
- 使用`bin/hive`脚本启动Hive的交互式Shell,或者通过`schematool`命令初始化元数据。 9. **自定义Hive**: - 如果你对Hive进行了定制,例如添加新的UDF(用户自定义函数)或者改进现有功能,那么你需要重新编译...
- **默认元数据库**:Hive 默认使用 Derby 数据库作为元数据库,可以通过执行 `schematool -initSchema -dbType derby` 来初始化。 - **注意事项**:确保 Hive 的安装目录中有 Derby 数据库的相关文件。 ##### 7. ...
初始化Hive:首次启动Hive时,它会自动创建必要的表和元数据。 在实际应用中,理解Hive的这些基本概念和操作对于有效管理和查询大数据至关重要。此外,还需要掌握HQL(Hive Query Language)的语法,如SELECT、...
2. 创建并初始化MySQL的Hive元数据库,用于存储Hive的表定义、分区信息等。 3. 将Hive添加到系统的PATH环境变量中,以便在任何地方都能运行Hive命令。 4. 根据需要,可能还需要配置Hadoop的`core-site.xml`和`hdfs-...
- **元数据的管理**:包括初始化、备份和恢复等操作。 #### 三、DML操作(数据操纵语言) **3.1 向数据表内加载文件** - **LOAD DATA INPATH**:将文件系统中的文件加载到表中。 - **INSERT INTO TABLE**:将数据...
【Hive安装与部署】 Hive是基于Hadoop的数据仓库工具,可以将结构化的...在实际工作中,Hive常用于数据仓库、报表生成和ETL(提取、转换、加载)流程。掌握Hive的安装、配置和基本操作对于理解和处理大数据至关重要。