- 浏览: 348799 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
(1)startGroup:清空各个表的RowContainer
(2)processOp:根据tag,把row add到表对应的RowContainer中。
(3)endGroup:RowContainer中的数据进行join并输出。
RowContainer添加数据时:内存中的数据条数是否达到了blockSize(默认是25000)个,如果有,则写入一个临时文件,如果没有放入内存的数组(currentWriteBlock)中。所以数据可能文件中有,内存中也有,也可能是只有内存中有。
读取时:
(1)first
如果文件中有,则读取文件中的数据,返回第一条,如果没有就读取内存中的值,返回第一条。
(2)next
如果文件中有,则读取文件中的数据,没有就读取内存中的数据。
public class JoinOperator extends CommonJoinOperator<JoinDesc> implements
Serializable {
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf); // CommonJoinOperator初始化
}
}
JoinOperator:
startGroup
@Override
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true; //一个新的key
for (AbstractRowContainer<ArrayList<Object>> alw : storage.values()) {
alw.clear(); // RowContainer清空。
}
}
endGroup
/**
* Forward a record of join results.
*
* @throws HiveException
*/
@Override
public void endGroup() throws HiveException {
// if this is a skew key, we need to handle it in a separate map reduce job.
if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) {
try {
skewJoinKeyContext.endGroup();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
throw new HiveException(e);
}
return;
} else {
checkAndGenObject();
}
}
JoinOperator :
public void processOp(Object row, int tag) throws HiveException {
try {
// get alias
alias = (byte) tag; //从那个表来的数据。
if ((lastAlias == null) || (!lastAlias.equals(alias))) {
nextSz = joinEmitInterval; //默认为1000
}
ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues.get(alias),
joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
joinFilterObjectInspectors.get(alias), noOuterJoin); // nr计算出来的value.
if (handleSkewJoin) {
skewJoinKeyContext.handleSkew(tag);
}
// number of rows for the key in the given table
int sz = storage.get(alias).size(); //表中当前有多少条记录
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
.toString());
Object keyObject = soi.getStructFieldData(row, sf);
// Are we consuming too much memory
if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0)) {
if (sz == joinEmitInterval) {
// The input is sorted by alias, so if we are already in the last join
// operand,
// we can emit some results now.
// Note this has to be done before adding the current row to the
// storage,
// to preserve the correctness for outer joins.
checkAndGenObject(); //先输出部分结果
storage.get(alias).clear(); //清空本table对应的RowContainer中的数据
}
} else {
if (sz == nextSz) {
// Output a warning if we reached at least 1000 rows for a join
// operand
// We won't output a warning for the last join operand since the size
// will never goes to joinEmitInterval.
LOG.warn("table " + alias + " has " + sz + " rows for join key "
+ keyObject);
nextSz = getNextSize(nextSz);
}
}
// Add the value to the vector
storage.get(alias).add(nr); //添加到RowContainer中。
// if join-key is null, process each row in different group.
if (SerDeUtils.hasAnyNullObject(keyObject, sf.getFieldObjectInspector())) {
endGroup();
startGroup();
}
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
}
}
package org.apache.hadoop.hive.ql.exec.persistence:
public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row>{
add
写入磁盘的row数目一定是blockSize的整数倍,否则不会写入文件,而是存在currentWriteBlock中。
@Override
public void add(Row t) throws HiveException {
if (this.tblDesc != null) {
if (addCursor >= blockSize) { // spill the current block to tmp file addCursor大于等于blockSize,blockSize默认是25000,数组中已经满了,写入磁盘的文件中
spillBlock(currentWriteBlock, addCursor);
addCursor = 0;
if (numFlushedBlocks == 1) {
currentWriteBlock = (Row[]) new ArrayList[blockSize];
}
}
currentWriteBlock[addCursor++] = t; //写到currentWriteBlock里,addCursor++
} else if (t != null) {
// the tableDesc will be null in the case that all columns in that table
// is not used. we use a dummy row to denote all rows in that table, and
// the dummy row is added by caller.
this.dummyRow = t;
}
++size; //个数
}
}
spillBlock
private void spillBlock(Row[] block, int length) throws HiveException {
try {
if (tmpFile == null) { //临时文件
String suffix = ".tmp";
if (this.keyObject != null) {
suffix = "." + this.keyObject.toString() + suffix;
}
while (true) { //创建临时目录
parentFile = File.createTempFile("hive-rowcontainer", "");
boolean success = parentFile.delete() && parentFile.mkdir();
if (success) {
break;
}
LOG.debug("retry creating tmp row-container directory...");
}
tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
// Delete the temp file if the JVM terminate normally through Hadoop job
// kill command.
// Caveat: it won't be deleted if JVM is killed by 'kill -9'.
parentFile.deleteOnExit();
tmpFile.deleteOnExit();
// rFile = new RandomAccessFile(tmpFile, "rw");
HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
tempOutPath = new Path(tmpFile.toString());
JobConf localJc = getLocalFSJobConfClone(jc);
rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
.getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
} else if (rw == null) {
throw new HiveException("RowContainer has already been closed for writing.");
}
row.clear();
row.add(null);
row.add(null);
if (this.keyObject != null) {
row.set(1, this.keyObject);
for (int i = 0; i < length; ++i) {
Row currentValRow = block[i];
row.set(0, currentValRow);
Writable outVal = serde.serialize(row, standardOI);
rw.write(outVal);
}
} else {
for (int i = 0; i < length; ++i) { //把数组中的row写入文件中。
Row currentValRow = block[i];
Writable outVal = serde.serialize(currentValRow, standardOI);
rw.write(outVal); //写入
}
}
if (block == this.currentWriteBlock) { //如果相等,currentWriteBlock已经写入文件
this.addCursor = 0; //置位。
}
this.numFlushedBlocks++;
} catch (Exception e) {
clear();
LOG.error(e.toString(), e);
throw new HiveException(e);
}
}
使用:
* for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
* // do anything with v
* }
*
first
public Row first() throws HiveException {
if (size == 0) { //每次add时,size会加1
return null;
}
try {
firstCalled = true; //
// when we reach here, we must have some data already (because size >0).
// We need to see if there are any data flushed into file system. If not,
// we can
// directly read from the current write block. Otherwise, we need to read
// from the beginning of the underlying file.
this.itrCursor = 0;
closeWriter(); //关闭RecordWriter
closeReader(); //关闭RecordReader
if (tblDesc == null) {
this.itrCursor++;
return dummyRow;
}
this.currentReadBlock = this.firstReadBlockPointer;
if (this.numFlushedBlocks == 0) { //如果
this.readBlockSize = this.addCursor; //需要读取的长度
this.currentReadBlock = this.currentWriteBlock; // 赋值
} else {
JobConf localJc = getLocalFSJobConfClone(jc);
if (inputSplits == null) {
if (this.inputFormat == null) {
inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(
tblDesc.getInputFileFormatClass(), localJc);
}
HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
inputSplits = inputFormat.getSplits(localJc, 1);
acutalSplitNum = inputSplits.length;
}
currentSplitPointer = 0;
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
currentSplitPointer++;
nextBlock();
}
// we are guaranteed that we can get data here (since 'size' is not zero)
Row ret = currentReadBlock[itrCursor++]; //返回地一行数据
removeKeys(ret);
return ret;
} catch (Exception e) {
throw new HiveException(e);
}
}
next
public Row next() throws HiveException {
if (!firstCalled) { //如果first还没有被调用
throw new RuntimeException("Call first() then call next().");
}
if (size == 0) {
return null;
}
if (tblDesc == null) {
if (this.itrCursor < size) {
this.itrCursor++;
return dummyRow;
}
return null;
}
Row ret;
if (itrCursor < this.readBlockSize) { //currentReadBlock中的数据还没有读取完
ret = this.currentReadBlock[itrCursor++];
removeKeys(ret);
return ret;
} else { //currentReadBlock中的数据读取完了。
nextBlock(); //读取写一个block
if (this.readBlockSize == 0) { //文件中的数据已经读取完了
if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) { //currentWriteBlock中的数据没有读取过
this.itrCursor = 0;
this.readBlockSize = this.addCursor;
this.firstReadBlockPointer = this.currentReadBlock;
currentReadBlock = currentWriteBlock;
} else { // currentWriteBlock中的数据已经读取过了
return null;
}
}
return next();
}
}
nextBlock
private boolean nextBlock() throws HiveException {
itrCursor = 0; //复位
this.readBlockSize = 0; //复位
if (this.numFlushedBlocks == 0) { //没有数据写入文件,返回false
return false;
}
try {
if (val == null) {
val = serde.getSerializedClass().newInstance();
}
boolean nextSplit = true;
int i = 0;
if (rr != null) {
Object key = rr.createKey();
while (i < this.currentReadBlock.length && rr.next(key, val)) { //读取出来放入currentReadBlock中
nextSplit = false;
this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
.deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
}
}
if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
JobConf localJc = getLocalFSJobConfClone(jc);
// open record reader to read next split
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
Reporter.NULL);
currentSplitPointer++;
return nextBlock();
}
this.readBlockSize = i; //读取的长度
return this.readBlockSize > 0;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
try {
this.clear();
} catch (HiveException e1) {
LOG.error(e.getMessage(), e);
}
throw new HiveException(e);
}
}
(2)processOp:根据tag,把row add到表对应的RowContainer中。
(3)endGroup:RowContainer中的数据进行join并输出。
RowContainer添加数据时:内存中的数据条数是否达到了blockSize(默认是25000)个,如果有,则写入一个临时文件,如果没有放入内存的数组(currentWriteBlock)中。所以数据可能文件中有,内存中也有,也可能是只有内存中有。
读取时:
(1)first
如果文件中有,则读取文件中的数据,返回第一条,如果没有就读取内存中的值,返回第一条。
(2)next
如果文件中有,则读取文件中的数据,没有就读取内存中的数据。
public class JoinOperator extends CommonJoinOperator<JoinDesc> implements
Serializable {
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf); // CommonJoinOperator初始化
}
}
JoinOperator:
startGroup
@Override
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true; //一个新的key
for (AbstractRowContainer<ArrayList<Object>> alw : storage.values()) {
alw.clear(); // RowContainer清空。
}
}
endGroup
/**
* Forward a record of join results.
*
* @throws HiveException
*/
@Override
public void endGroup() throws HiveException {
// if this is a skew key, we need to handle it in a separate map reduce job.
if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) {
try {
skewJoinKeyContext.endGroup();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
throw new HiveException(e);
}
return;
} else {
checkAndGenObject();
}
}
JoinOperator :
public void processOp(Object row, int tag) throws HiveException {
try {
// get alias
alias = (byte) tag; //从那个表来的数据。
if ((lastAlias == null) || (!lastAlias.equals(alias))) {
nextSz = joinEmitInterval; //默认为1000
}
ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues.get(alias),
joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
joinFilterObjectInspectors.get(alias), noOuterJoin); // nr计算出来的value.
if (handleSkewJoin) {
skewJoinKeyContext.handleSkew(tag);
}
// number of rows for the key in the given table
int sz = storage.get(alias).size(); //表中当前有多少条记录
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
.toString());
Object keyObject = soi.getStructFieldData(row, sf);
// Are we consuming too much memory
if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0)) {
if (sz == joinEmitInterval) {
// The input is sorted by alias, so if we are already in the last join
// operand,
// we can emit some results now.
// Note this has to be done before adding the current row to the
// storage,
// to preserve the correctness for outer joins.
checkAndGenObject(); //先输出部分结果
storage.get(alias).clear(); //清空本table对应的RowContainer中的数据
}
} else {
if (sz == nextSz) {
// Output a warning if we reached at least 1000 rows for a join
// operand
// We won't output a warning for the last join operand since the size
// will never goes to joinEmitInterval.
LOG.warn("table " + alias + " has " + sz + " rows for join key "
+ keyObject);
nextSz = getNextSize(nextSz);
}
}
// Add the value to the vector
storage.get(alias).add(nr); //添加到RowContainer中。
// if join-key is null, process each row in different group.
if (SerDeUtils.hasAnyNullObject(keyObject, sf.getFieldObjectInspector())) {
endGroup();
startGroup();
}
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
}
}
package org.apache.hadoop.hive.ql.exec.persistence:
public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row>{
add
写入磁盘的row数目一定是blockSize的整数倍,否则不会写入文件,而是存在currentWriteBlock中。
@Override
public void add(Row t) throws HiveException {
if (this.tblDesc != null) {
if (addCursor >= blockSize) { // spill the current block to tmp file addCursor大于等于blockSize,blockSize默认是25000,数组中已经满了,写入磁盘的文件中
spillBlock(currentWriteBlock, addCursor);
addCursor = 0;
if (numFlushedBlocks == 1) {
currentWriteBlock = (Row[]) new ArrayList[blockSize];
}
}
currentWriteBlock[addCursor++] = t; //写到currentWriteBlock里,addCursor++
} else if (t != null) {
// the tableDesc will be null in the case that all columns in that table
// is not used. we use a dummy row to denote all rows in that table, and
// the dummy row is added by caller.
this.dummyRow = t;
}
++size; //个数
}
}
spillBlock
private void spillBlock(Row[] block, int length) throws HiveException {
try {
if (tmpFile == null) { //临时文件
String suffix = ".tmp";
if (this.keyObject != null) {
suffix = "." + this.keyObject.toString() + suffix;
}
while (true) { //创建临时目录
parentFile = File.createTempFile("hive-rowcontainer", "");
boolean success = parentFile.delete() && parentFile.mkdir();
if (success) {
break;
}
LOG.debug("retry creating tmp row-container directory...");
}
tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
// Delete the temp file if the JVM terminate normally through Hadoop job
// kill command.
// Caveat: it won't be deleted if JVM is killed by 'kill -9'.
parentFile.deleteOnExit();
tmpFile.deleteOnExit();
// rFile = new RandomAccessFile(tmpFile, "rw");
HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
tempOutPath = new Path(tmpFile.toString());
JobConf localJc = getLocalFSJobConfClone(jc);
rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
.getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
} else if (rw == null) {
throw new HiveException("RowContainer has already been closed for writing.");
}
row.clear();
row.add(null);
row.add(null);
if (this.keyObject != null) {
row.set(1, this.keyObject);
for (int i = 0; i < length; ++i) {
Row currentValRow = block[i];
row.set(0, currentValRow);
Writable outVal = serde.serialize(row, standardOI);
rw.write(outVal);
}
} else {
for (int i = 0; i < length; ++i) { //把数组中的row写入文件中。
Row currentValRow = block[i];
Writable outVal = serde.serialize(currentValRow, standardOI);
rw.write(outVal); //写入
}
}
if (block == this.currentWriteBlock) { //如果相等,currentWriteBlock已经写入文件
this.addCursor = 0; //置位。
}
this.numFlushedBlocks++;
} catch (Exception e) {
clear();
LOG.error(e.toString(), e);
throw new HiveException(e);
}
}
使用:
* for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
* // do anything with v
* }
*
first
public Row first() throws HiveException {
if (size == 0) { //每次add时,size会加1
return null;
}
try {
firstCalled = true; //
// when we reach here, we must have some data already (because size >0).
// We need to see if there are any data flushed into file system. If not,
// we can
// directly read from the current write block. Otherwise, we need to read
// from the beginning of the underlying file.
this.itrCursor = 0;
closeWriter(); //关闭RecordWriter
closeReader(); //关闭RecordReader
if (tblDesc == null) {
this.itrCursor++;
return dummyRow;
}
this.currentReadBlock = this.firstReadBlockPointer;
if (this.numFlushedBlocks == 0) { //如果
this.readBlockSize = this.addCursor; //需要读取的长度
this.currentReadBlock = this.currentWriteBlock; // 赋值
} else {
JobConf localJc = getLocalFSJobConfClone(jc);
if (inputSplits == null) {
if (this.inputFormat == null) {
inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(
tblDesc.getInputFileFormatClass(), localJc);
}
HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
inputSplits = inputFormat.getSplits(localJc, 1);
acutalSplitNum = inputSplits.length;
}
currentSplitPointer = 0;
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
currentSplitPointer++;
nextBlock();
}
// we are guaranteed that we can get data here (since 'size' is not zero)
Row ret = currentReadBlock[itrCursor++]; //返回地一行数据
removeKeys(ret);
return ret;
} catch (Exception e) {
throw new HiveException(e);
}
}
next
public Row next() throws HiveException {
if (!firstCalled) { //如果first还没有被调用
throw new RuntimeException("Call first() then call next().");
}
if (size == 0) {
return null;
}
if (tblDesc == null) {
if (this.itrCursor < size) {
this.itrCursor++;
return dummyRow;
}
return null;
}
Row ret;
if (itrCursor < this.readBlockSize) { //currentReadBlock中的数据还没有读取完
ret = this.currentReadBlock[itrCursor++];
removeKeys(ret);
return ret;
} else { //currentReadBlock中的数据读取完了。
nextBlock(); //读取写一个block
if (this.readBlockSize == 0) { //文件中的数据已经读取完了
if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) { //currentWriteBlock中的数据没有读取过
this.itrCursor = 0;
this.readBlockSize = this.addCursor;
this.firstReadBlockPointer = this.currentReadBlock;
currentReadBlock = currentWriteBlock;
} else { // currentWriteBlock中的数据已经读取过了
return null;
}
}
return next();
}
}
nextBlock
private boolean nextBlock() throws HiveException {
itrCursor = 0; //复位
this.readBlockSize = 0; //复位
if (this.numFlushedBlocks == 0) { //没有数据写入文件,返回false
return false;
}
try {
if (val == null) {
val = serde.getSerializedClass().newInstance();
}
boolean nextSplit = true;
int i = 0;
if (rr != null) {
Object key = rr.createKey();
while (i < this.currentReadBlock.length && rr.next(key, val)) { //读取出来放入currentReadBlock中
nextSplit = false;
this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
.deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
}
}
if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
JobConf localJc = getLocalFSJobConfClone(jc);
// open record reader to read next split
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
Reporter.NULL);
currentSplitPointer++;
return nextBlock();
}
this.readBlockSize = i; //读取的长度
return this.readBlockSize > 0;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
try {
this.clear();
} catch (HiveException e1) {
LOG.error(e.getMessage(), e);
}
throw new HiveException(e);
}
}
发表评论
-
hive rename table name
2013-09-18 14:28 2604hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2484有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11219like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8749insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8287原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1216select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1553官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1822Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5457drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1374数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6250CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 3046select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
- **操作符**:Hive执行的基本单位,每个操作符对应一个特定的功能,例如TableScanOperator负责从表中读取数据,JoinOperator负责连接两个数据集。 - **执行计划**:操作符按照一定的顺序组合而成的计划,描述了Hive...
- **Join的实现原理**:在Hive中,Join操作通常通过MapReduce来实现。例如,在一个内连接中,Map阶段将不同表的数据分别标记,然后在Reduce阶段,依据标记判断并合并来自不同表的记录。这种方法确保了JOIN操作的...
- **JoinOperator**:用于连接两张或多张表的数据。 - **SelectOperator**:用于选择输出列。 - **FileSinkOperator**:用于建立结果数据并输出到文件。 - **FilterOperator**:用于过滤输入数据。 - **...
在深入理解Hive如何将SQL语句转换为MapReduce任务之前,我们需要首先了解MapReduce是如何实现基本的SQL操作,比如Join、Group By以及Distinct等。 ##### 1. Join的实现原理 在Hive中,实现两个表的Join操作通常...
5. **连接运算符(Join Operator)**:将多个表的数据合并。 6. **排序运算符(Sort Operator)**:对数据进行排序。 7. **分桶运算符(Bucket Operator)**:根据指定列进行数据分区,用于提高查询效率。 这些算子...
例如,`TableScanOperator` 用于读取表数据,`FilterOperator` 过滤数据,`JoinOperator` 实现数据联接,`GroupByOperator` 进行分组聚合,`ReduceSinkOperator` 准备数据传输到 Reducer,`SelectOperator` 选择输出...
#### 四、Hive大表Join小表的优化方法 1. **小表前置**: - **方法**:将小表放在Join操作的前面。 - **效果**:Hive能够将小表缓存到内存中,从而提高Join操作的效率。 #### 五、Hive中各种Join类型 1. **内...
- **Operator Log**:展示了每个操作符的执行顺序及其相关信息。 - **Plan Tree**:以树状结构展示整个查询计划,便于理解各个部分之间的关系。 ##### 2. explain 的使用场景 - **案例一:Join 语句会过滤 null 的...
3. **执行引擎**:深入`exec`模块,研究`Task`和`Operator`类,理解Hive如何将物理计划转化为MapReduce任务。 4. **SerDe实现**:查看不同类型的SerDe实现,如`LazySimpleSerDe`,理解数据如何在Hive和HDFS之间转换...
- Hive 支持多种 Join 类型,包括 Inner Join、Left Outer Join、Right Outer Join、Full Outer Join 等。 - 每种 Join 类型都有不同的应用场景和性能特征。 - **Hive 的炸裂函数:** - Hive 的炸裂函数主要用于...