- 浏览: 348774 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
旧的mapjoin实现,0.6及其以前的版本
新的mapjoin实现,0.7版本,HIVE-1641、HIVE-1754 将小表加载到分布式缓存
automapjoin依赖与新的mapjoin实现
https://issues.apache.org/jira/browse/HIVE-1642
Hiveconf:
hive.auto.convert.join
SemanticAnalyzer.genMapRedTasks(QB qb){
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(), ctx, rootTasks, fetchTask);
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
physicalContext, conf);
physicalOptimizer.optimize();
}
PhysicalOptimizer:
private void initialize(HiveConf hiveConf) {
resolvers = new ArrayList<PhysicalPlanResolver>();
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
resolvers.add(new SkewJoinResolver());
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
resolvers.add(new CommonJoinResolver());
}
resolvers.add(new MapJoinResolver());
}
MapJoinResolver:
遍历所有的task,如果这个task有一个MapredLocalWork,那么创建一个MapredLocalTask运行这
个MapredLocalWork,前后task的依赖关系也相应的建立起来。
public class MapJoinResolver implements PhysicalPlanResolver {
class LocalMapJoinTaskDispatcher implements Dispatcher {
if(localwork != null) {
private void processCurrentTask(Task<? extends Serializable> currTask,
ConditionalTask conditionalTask) throws SemanticException {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
}
}
}
HashTableSinkOperator:
public void closeOp(boolean abort) throws HiveException {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
// get the tmp URI path; it will be a hdfs path if not local mode
String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName);
}
MapJoinOperator:
private void loadHashTable() throws HiveException {
}
CommonJoinResolver 是auto map join的入口。
CommonJoinResolver.resolve(PhysicalContext pctx){
CommonJoinTaskDispatcher
}
CommonJoinTaskDispatcher.dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs){
}
HiveConf
//small table file size
HIVESMALLTABLESFILESIZE("hive.smalltable.filesize",25000000L), //25M
ConditionalTask
MapJoinOperator
将每个包含 Map Join 的 Stage 替换并拆分成两个 Stage:
第一个 Stage 是一个包含 HashTableSinkOperator 的 MapredLocalTask,将小表生成 Hash Table 文件,并分发到 Distributed Cache。
第二个 Stage 是一个包含 MapJoinOperator 的 MapredTask,将小表(Hash Table)从Distributed Cache中加载进来,并执行实际的 Map Join 操作。
原先的一个MapredTask -> MapredLocalTask+MapredTask
MapredLocalTask :
MapredLocalTask.execute(){
起子进程运行ExecDriver.main() {}
}
子进程运行
ExecDriver.main(){
MapredLocalTask.executeFromChildJVM(){
运行TableScanOperator+HashTableSinkOperator
HashTableSinkOperator读取小表创建HashTable,写入本地的一个文件。
//HashTableSinkOperator的processOp把数据put到HashMapWrapper。
//HashTableSinkOperator的closeOp把HashMapWrappert Dump到本地的一个文件
}
}
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.processOp(HashTableSinkOperator.java:318)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:330)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.closeOp(HashTableSinkOperator.java:438)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:326)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
MapredTask:
调用return super.execute(driverContext);
try { //把小表存放在本地的文件中,写入hdfs
// propagate the file to distributed cache
MapredLocalWork localwork = work.getMapLocalWork();
if (localwork != null) {
boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local");
if (!localMode) {
Path localPath = new Path(localwork.getTmpFileURI());
Path hdfsPath = new Path(work.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
FileStatus[] hashtableFiles = localFS.listStatus(localPath);
for(int i =0; i<hashtableFiles.length;i++){
FileStatus file = hashtableFiles[i];
Path path = file.getPath();
String fileName = path.getName();
String hdfsFile = hdfsPath + Path.SEPARATOR + fileName;
LOG.info("Upload 1 HashTable from" +path+" to: "+hdfsFile);
Path hdfsFilePath = new Path(hdfsFile);
hdfs.copyFromLocalFile(path, hdfsFilePath);
short replication = (short) job.getInt("mapred.submit.replication", 10);
hdfs.setReplication(hdfsFilePath, replication);
}
FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath);
for(int i =0; i<hashtableRemoteFiles.length;i++){
FileStatus file = hashtableRemoteFiles[i];
Path path = file.getPath();
DistributedCache.addCacheFile(path.toUri(), job);
LOG.info("add 1 hashtable file to distributed cache: " + path.toUri());
}
}
}
提交任务给JobTracker
Map阶段:
MapTask 运行 MapJoinOperator,MapJoinOperator在processOp的时候会loadHashTable
MapTask执行MapJoinOperator时读取小表的栈:
java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:117)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at java.util.HashMap.readObject(HashMap.java:1029)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.initilizePersistentHash(HashMapWrapper.java:127)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:193)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:225)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.genAllOneUniqueJoinObject(CommonJoinOperator.java:732)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.checkAndGenObject(CommonJoinOperator.java:819)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:265)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:402)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:141)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.Child.main(Child.java:156)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.RangeCheck(ArrayList.java:547)
at java.util.ArrayList.get(ArrayList.java:322)
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:113)
... 50 more
初始化:
2011-09-01 05:59:00 Starting to launch local task to process map join; maximum memory = 372834304
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.<init>(HashMapWrapper.java:83)
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.initializeOp(HashTableSinkOperator.java:264)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.Operator.initializeOp(Operator.java:375)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.initializeOperators(MapredLocalTask.java:429)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:267)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask
新的mapjoin实现,0.7版本,HIVE-1641、HIVE-1754 将小表加载到分布式缓存
automapjoin依赖与新的mapjoin实现
https://issues.apache.org/jira/browse/HIVE-1642
Hiveconf:
hive.auto.convert.join
SemanticAnalyzer.genMapRedTasks(QB qb){
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(), ctx, rootTasks, fetchTask);
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
physicalContext, conf);
physicalOptimizer.optimize();
}
PhysicalOptimizer:
private void initialize(HiveConf hiveConf) {
resolvers = new ArrayList<PhysicalPlanResolver>();
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
resolvers.add(new SkewJoinResolver());
}
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
resolvers.add(new CommonJoinResolver());
}
resolvers.add(new MapJoinResolver());
}
MapJoinResolver:
遍历所有的task,如果这个task有一个MapredLocalWork,那么创建一个MapredLocalTask运行这
个MapredLocalWork,前后task的依赖关系也相应的建立起来。
public class MapJoinResolver implements PhysicalPlanResolver {
class LocalMapJoinTaskDispatcher implements Dispatcher {
if(localwork != null) {
private void processCurrentTask(Task<? extends Serializable> currTask,
ConditionalTask conditionalTask) throws SemanticException {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
}
}
}
HashTableSinkOperator:
public void closeOp(boolean abort) throws HiveException {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
// get the tmp URI path; it will be a hdfs path if not local mode
String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName);
}
MapJoinOperator:
private void loadHashTable() throws HiveException {
}
CommonJoinResolver 是auto map join的入口。
CommonJoinResolver.resolve(PhysicalContext pctx){
CommonJoinTaskDispatcher
}
CommonJoinTaskDispatcher.dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs){
}
HiveConf
//small table file size
HIVESMALLTABLESFILESIZE("hive.smalltable.filesize",25000000L), //25M
ConditionalTask
MapJoinOperator
将每个包含 Map Join 的 Stage 替换并拆分成两个 Stage:
第一个 Stage 是一个包含 HashTableSinkOperator 的 MapredLocalTask,将小表生成 Hash Table 文件,并分发到 Distributed Cache。
第二个 Stage 是一个包含 MapJoinOperator 的 MapredTask,将小表(Hash Table)从Distributed Cache中加载进来,并执行实际的 Map Join 操作。
原先的一个MapredTask -> MapredLocalTask+MapredTask
MapredLocalTask :
MapredLocalTask.execute(){
起子进程运行ExecDriver.main() {}
}
子进程运行
ExecDriver.main(){
MapredLocalTask.executeFromChildJVM(){
运行TableScanOperator+HashTableSinkOperator
HashTableSinkOperator读取小表创建HashTable,写入本地的一个文件。
//HashTableSinkOperator的processOp把数据put到HashMapWrapper。
//HashTableSinkOperator的closeOp把HashMapWrappert Dump到本地的一个文件
}
}
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.processOp(HashTableSinkOperator.java:318)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:330)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
子进程MapredLocalTask里:
java.lang.Exception
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.closeOp(HashTableSinkOperator.java:438)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.startForward(MapredLocalTask.java:326)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:277)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
MapredTask:
调用return super.execute(driverContext);
try { //把小表存放在本地的文件中,写入hdfs
// propagate the file to distributed cache
MapredLocalWork localwork = work.getMapLocalWork();
if (localwork != null) {
boolean localMode = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local");
if (!localMode) {
Path localPath = new Path(localwork.getTmpFileURI());
Path hdfsPath = new Path(work.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
FileStatus[] hashtableFiles = localFS.listStatus(localPath);
for(int i =0; i<hashtableFiles.length;i++){
FileStatus file = hashtableFiles[i];
Path path = file.getPath();
String fileName = path.getName();
String hdfsFile = hdfsPath + Path.SEPARATOR + fileName;
LOG.info("Upload 1 HashTable from" +path+" to: "+hdfsFile);
Path hdfsFilePath = new Path(hdfsFile);
hdfs.copyFromLocalFile(path, hdfsFilePath);
short replication = (short) job.getInt("mapred.submit.replication", 10);
hdfs.setReplication(hdfsFilePath, replication);
}
FileStatus[] hashtableRemoteFiles = hdfs.listStatus(hdfsPath);
for(int i =0; i<hashtableRemoteFiles.length;i++){
FileStatus file = hashtableRemoteFiles[i];
Path path = file.getPath();
DistributedCache.addCacheFile(path.toUri(), job);
LOG.info("add 1 hashtable file to distributed cache: " + path.toUri());
}
}
}
提交任务给JobTracker
Map阶段:
MapTask 运行 MapJoinOperator,MapJoinOperator在processOp的时候会loadHashTable
MapTask执行MapJoinOperator时读取小表的栈:
java.io.IOException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:117)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at java.util.HashMap.readObject(HashMap.java:1029)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.initilizePersistentHash(HashMapWrapper.java:127)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:193)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:225)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.genAllOneUniqueJoinObject(CommonJoinOperator.java:732)
at org.apache.hadoop.hive.ql.exec.CommonJoinOperator.checkAndGenObject(CommonJoinOperator.java:819)
at org.apache.hadoop.hive.ql.exec.MapJoinOperator.processOp(MapJoinOperator.java:265)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:87)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:55)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:472)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:715)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:402)
at org.apache.hadoop.hive.ql.exec.ExecMapper.map(ExecMapper.java:141)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.Child.main(Child.java:156)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.RangeCheck(ArrayList.java:547)
at java.util.ArrayList.get(ArrayList.java:322)
at org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys.readExternal(MapJoinDoubleKeys.java:113)
... 50 more
初始化:
2011-09-01 05:59:00 Starting to launch local task to process map join; maximum memory = 372834304
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper.<init>(HashMapWrapper.java:83)
at org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.initializeOp(HashTableSinkOperator.java:264)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:73)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:434)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:390)
at org.apache.hadoop.hive.ql.exec.Operator.initializeOp(Operator.java:375)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:358)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.initializeOperators(MapredLocalTask.java:429)
at org.apache.hadoop.hive.ql.exec.MapredLocalTask.executeFromChildJVM(MapredLocalTask.java:267)
at org.apache.hadoop.hive.ql.exec.ExecDriver.main(ExecDriver.java:1054)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask
发表评论
-
hive rename table name
2013-09-18 14:28 2603hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2483有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11219like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8748insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8287原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1216select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1553官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1822Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5457drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1374数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6249CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3044select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
如果设置为true,则Hive将自动将Reduce端的Common Join转化为Map Join,默认值为false。 5. hive.mapred.local.mem 该参数决定了Mapper/Reducer在本地模式的最大内存量,以字节为单位,0为不限制。如果设置为0,则...
* 使用MapJoin:set hive.auto.convert.join = true; * 设置小表刷入内存中的大小:set hive.mapjoin.smalltable.filesize = 2500000; * 设置最大Reduce个数:set hive.exec.reducers.max=200; * 设置Reduce个数:...
当遇到此运行时异常时,可以尝试禁用向量化执行或者混合Grace Hash Join,设置`hive.vectorized.execution.enabled`为false,以及`hive.mapjoin.hybridgrace.hashtable`和`hive.vectorized.execution.reduce....
对于Map阶段的内存溢出,尤其是由于MapJoin引起的,应设置`hive.auto.convert.join = false`,将MapJoin转换为Reduce端的Common Join。此外,可以调整以下参数以减少内存压力: - `hive.exec.reducers.bytes.per....
- `set hive.mapjoin.smalltable.size=2500000;` 设置小表大小阈值,小于该阈值的表会被加载到内存中用于Map Join。 - `set hive.auto.convert.join.noconditionaltask.size=512000000;` 设置无需条件任务的Map ...
- 其他参数:如hive.exec.compress.output控制是否压缩输出,hive.auto.convert.join决定自动转换内连接的条件。 通过以上内容的学习,你将能熟练掌握Hive在大数据处理中的运用,理解其数仓设计原则,编写高效的...
- **实现**:通过`SET hive.auto.convert.join.noconditionaltask=true`等配置来启用自动转换功能。 ##### 2. 小文件优化 - **问题**:大量小文件的存在不仅会导致Map任务增多,还可能导致数据倾斜等问题。 - **...
left join (select pid, booth_type, booth_area, latest_journal_sd, lates -- 其他字段省略 from some_other_table) b on a.pid = b.pid; ``` #### 总结 通过以上步骤,可以有效地解决Hive与Elasticsearch之间的...
4. **hive.auto.convert.join** - **含义**:控制是否根据输入小表的大小,自动将Reduce端的Common Join转换为Map Join,从而提高大表关联小表的速度。 - **默认值**:`false` - **建议设置**:可根据具体场景...
set hive.auto.convert.join=false;”。 五、内存溢出报错异常 在Hive中,内存溢出报错是另一个常见的异常。当container执行需要的内存超出了物理内存限制时,Hive将抛出一个Container异常,错误信息为“Container...
开启本地模式的设置是`hive.exec.mode.local.auto=true`,并可通过`hive.exec.mode.local.auto.inputbytes.max`和`hive.exec.mode.local.auto.input.files.max`设定阈值,以决定何时使用本地模式。通过合理设置这些...
本文来自于cnblogs,赘述了在工作中总结Hive的常用优化手段和在工作中使用Hive出现的问题。下面开始本篇文章的优化介绍:继续《那些年使用Hive踩过的坑》一文中的剩余部分.首先,我们来看看Hadoop的计算框架特性,在此...
在Hive中,通过在查询语句中添加`/*+MAPJOIN(table)*/`来指定较小的表作为map阶段的输入。对于10G的tableA和100G的tableB,将tableA设置为MapJoin的参数是合理的优化,即A选项正确。 10. HMaster在HBase中负责...
例如,`hive.exec.local.scratchdir` 设定本地临时目录,`hive.auto.convert.join` 控制是否自动转换内连接为MapJoin操作。正确配置这些参数可以显著提高查询效率。 YARN(Yet Another Resource Negotiator)是...
10. **优化查询效率**:在大型关联查询中,通过使用MapJoin(选项A和B)或者分区(选项C)可以优化效率。根据id字段进行分区比根据name字段更有效,因为id通常具有更好的分布特性。选项D不是最佳实践。 11. **...