`
bupt04406
  • 浏览: 348278 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hive JoinOperator

    博客分类:
  • Hive
阅读更多
(1)startGroup:清空各个表的RowContainer
(2)processOp:根据tag,把row add到表对应的RowContainer中。
(3)endGroup:RowContainer中的数据进行join并输出。

RowContainer添加数据时:内存中的数据条数是否达到了blockSize(默认是25000)个,如果有,则写入一个临时文件,如果没有放入内存的数组(currentWriteBlock)中。所以数据可能文件中有,内存中也有,也可能是只有内存中有。

读取时:
(1)first
如果文件中有,则读取文件中的数据,返回第一条,如果没有就读取内存中的值,返回第一条。
(2)next
如果文件中有,则读取文件中的数据,没有就读取内存中的数据。

public class JoinOperator extends CommonJoinOperator<JoinDesc> implements
    Serializable {

    protected void initializeOp(Configuration hconf) throws HiveException {
          super.initializeOp(hconf); // CommonJoinOperator初始化
    }

}

JoinOperator:
startGroup
  @Override
  public void startGroup() throws HiveException {
    LOG.trace("Join: Starting new group");
    newGroupStarted = true; //一个新的key
    for (AbstractRowContainer<ArrayList<Object>> alw : storage.values()) {
      alw.clear(); // RowContainer清空。
    }
  }
endGroup
  /**
   * Forward a record of join results.
   *
   * @throws HiveException
   */
  @Override
  public void endGroup() throws HiveException {
    // if this is a skew key, we need to handle it in a separate map reduce job.
    if (handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) {
      try {
        skewJoinKeyContext.endGroup();
      } catch (IOException e) {
        LOG.error(e.getMessage(), e);
        throw new HiveException(e);
      }
      return;
    } else {
      checkAndGenObject();
    }
  }




JoinOperator :
  public void processOp(Object row, int tag) throws HiveException {
    try {

      // get alias
      alias = (byte) tag; //从那个表来的数据。

      if ((lastAlias == null) || (!lastAlias.equals(alias))) {
        nextSz = joinEmitInterval; //默认为1000
      }

      ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues.get(alias),
          joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
          joinFilterObjectInspectors.get(alias), noOuterJoin); // nr计算出来的value.

      if (handleSkewJoin) {
        skewJoinKeyContext.handleSkew(tag);
      }

      // number of rows for the key in the given table
      int sz = storage.get(alias).size(); //表中当前有多少条记录
      StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
      StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
          .toString());
      Object keyObject = soi.getStructFieldData(row, sf);

      // Are we consuming too much memory
      if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0)) {
        if (sz == joinEmitInterval) {
          // The input is sorted by alias, so if we are already in the last join
          // operand,
          // we can emit some results now.
          // Note this has to be done before adding the current row to the
          // storage,
          // to preserve the correctness for outer joins.
          checkAndGenObject(); //先输出部分结果
          storage.get(alias).clear(); //清空本table对应的RowContainer中的数据
        }
      } else {
        if (sz == nextSz) {
          // Output a warning if we reached at least 1000 rows for a join
          // operand
          // We won't output a warning for the last join operand since the size
          // will never goes to joinEmitInterval.
          LOG.warn("table " + alias + " has " + sz + " rows for join key "
              + keyObject);
          nextSz = getNextSize(nextSz);
        }
      }

      // Add the value to the vector
      storage.get(alias).add(nr); //添加到RowContainer中。
      // if join-key is null, process each row in different group.
      if (SerDeUtils.hasAnyNullObject(keyObject, sf.getFieldObjectInspector())) {
        endGroup();
        startGroup();
      }
    } catch (Exception e) {
      e.printStackTrace();
      throw new HiveException(e);
    }
  }

package org.apache.hadoop.hive.ql.exec.persistence:
public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row>{

add
写入磁盘的row数目一定是blockSize的整数倍,否则不会写入文件,而是存在currentWriteBlock中。
  @Override
  public void add(Row t) throws HiveException {
    if (this.tblDesc != null) {
      if (addCursor >= blockSize) { // spill the current block to tmp file   addCursor大于等于blockSize,blockSize默认是25000,数组中已经满了,写入磁盘的文件中
        spillBlock(currentWriteBlock, addCursor);
        addCursor = 0;
        if (numFlushedBlocks == 1) {
          currentWriteBlock = (Row[]) new ArrayList[blockSize];
        }
      }
      currentWriteBlock[addCursor++] = t; //写到currentWriteBlock里,addCursor++
    } else if (t != null) {
      // the tableDesc will be null in the case that all columns in that table
      // is not used. we use a dummy row to denote all rows in that table, and
      // the dummy row is added by caller.
      this.dummyRow = t;
    }
    ++size;  //个数
  }

}

spillBlock
  private void spillBlock(Row[] block, int length) throws HiveException {
    try {
      if (tmpFile == null) { //临时文件

        String suffix = ".tmp";
        if (this.keyObject != null) {
          suffix = "." + this.keyObject.toString() + suffix;
        }

        while (true) { //创建临时目录
          parentFile = File.createTempFile("hive-rowcontainer", "");
          boolean success = parentFile.delete() && parentFile.mkdir();
          if (success) {
            break;
          }
          LOG.debug("retry creating tmp row-container directory...");
        }

        tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
        LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
        // Delete the temp file if the JVM terminate normally through Hadoop job
        // kill command.
        // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
        parentFile.deleteOnExit();
        tmpFile.deleteOnExit();

        // rFile = new RandomAccessFile(tmpFile, "rw");
        HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
        tempOutPath = new Path(tmpFile.toString());
        JobConf localJc = getLocalFSJobConfClone(jc);
        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
            .getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
      } else if (rw == null) {
        throw new HiveException("RowContainer has already been closed for writing.");
      }

      row.clear();
      row.add(null);
      row.add(null);

      if (this.keyObject != null) {
        row.set(1, this.keyObject);
        for (int i = 0; i < length; ++i) {
          Row currentValRow = block[i];
          row.set(0, currentValRow);
          Writable outVal = serde.serialize(row, standardOI);
          rw.write(outVal);
        }
      } else {
        for (int i = 0; i < length; ++i) {  //把数组中的row写入文件中。
          Row currentValRow = block[i];
          Writable outVal = serde.serialize(currentValRow, standardOI);
          rw.write(outVal); //写入
        }
      }

      if (block == this.currentWriteBlock) {  //如果相等,currentWriteBlock已经写入文件
        this.addCursor = 0;  //置位。
      }

      this.numFlushedBlocks++;
    } catch (Exception e) {
      clear();
      LOG.error(e.toString(), e);
      throw new HiveException(e);
    }
  }

使用:
* for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
*   // do anything with v
* }
*
first
  public Row first() throws HiveException {
    if (size == 0) {  //每次add时,size会加1
      return null;
    }

    try {
      firstCalled = true;  //
      // when we reach here, we must have some data already (because size >0).
      // We need to see if there are any data flushed into file system. If not,
      // we can
      // directly read from the current write block. Otherwise, we need to read
      // from the beginning of the underlying file.
      this.itrCursor = 0;
      closeWriter();  //关闭RecordWriter
      closeReader();  //关闭RecordReader

      if (tblDesc == null) {
        this.itrCursor++;
        return dummyRow;
      }

      this.currentReadBlock = this.firstReadBlockPointer;
      if (this.numFlushedBlocks == 0) { //如果
        this.readBlockSize = this.addCursor; //需要读取的长度
        this.currentReadBlock = this.currentWriteBlock; // 赋值
      } else {
        JobConf localJc = getLocalFSJobConfClone(jc);
        if (inputSplits == null) {
          if (this.inputFormat == null) {
            inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(
                tblDesc.getInputFileFormatClass(), localJc);
          }

          HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
              org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
          inputSplits = inputFormat.getSplits(localJc, 1);
          acutalSplitNum = inputSplits.length;
        }
        currentSplitPointer = 0;
        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
        currentSplitPointer++;

        nextBlock();
      }
      // we are guaranteed that we can get data here (since 'size' is not zero)
      Row ret = currentReadBlock[itrCursor++]; //返回地一行数据
      removeKeys(ret);
      return ret;
    } catch (Exception e) {
      throw new HiveException(e);
    }

  }

next
  public Row next() throws HiveException {

    if (!firstCalled) { //如果first还没有被调用
      throw new RuntimeException("Call first() then call next().");
    }

    if (size == 0) {
      return null;
    }

    if (tblDesc == null) {
      if (this.itrCursor < size) {
        this.itrCursor++;
        return dummyRow;
      }
      return null;
    }

    Row ret;
    if (itrCursor < this.readBlockSize) { //currentReadBlock中的数据还没有读取完
      ret = this.currentReadBlock[itrCursor++];
      removeKeys(ret);
      return ret;
    } else { //currentReadBlock中的数据读取完了。
      nextBlock(); //读取写一个block
      if (this.readBlockSize == 0) { //文件中的数据已经读取完了
        if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) { //currentWriteBlock中的数据没有读取过
          this.itrCursor = 0;
          this.readBlockSize = this.addCursor;
          this.firstReadBlockPointer = this.currentReadBlock;
          currentReadBlock = currentWriteBlock;
        } else { // currentWriteBlock中的数据已经读取过了
          return null;
        }
      }
      return next();
    }
  }

nextBlock
  private boolean nextBlock() throws HiveException {
    itrCursor = 0;  //复位
    this.readBlockSize = 0;  //复位
    if (this.numFlushedBlocks == 0) {  //没有数据写入文件,返回false
      return false;
    }

    try {
      if (val == null) {
        val = serde.getSerializedClass().newInstance();
      }
      boolean nextSplit = true;
      int i = 0;

      if (rr != null) {
        Object key = rr.createKey();
        while (i < this.currentReadBlock.length && rr.next(key, val)) { //读取出来放入currentReadBlock中
          nextSplit = false;
          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
              .deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
        }
      }

      if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
        JobConf localJc = getLocalFSJobConfClone(jc);
        // open record reader to read next split
        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
            Reporter.NULL);
        currentSplitPointer++;
        return nextBlock();
      }

      this.readBlockSize = i;  //读取的长度
      return this.readBlockSize > 0;
    } catch (Exception e) {
      LOG.error(e.getMessage(), e);
      try {
        this.clear();
      } catch (HiveException e1) {
        LOG.error(e.getMessage(), e);
      }
      throw new HiveException(e);
    }
  }
分享到:
评论

相关推荐

    Hive原理与实现

    - **操作符**:Hive执行的基本单位,每个操作符对应一个特定的功能,例如TableScanOperator负责从表中读取数据,JoinOperator负责连接两个数据集。 - **执行计划**:操作符按照一定的顺序组合而成的计划,描述了Hive...

    Hive SQL 编译过程详解

    - **Join的实现原理**:在Hive中,Join操作通常通过MapReduce来实现。例如,在一个内连接中,Map阶段将不同表的数据分别标记,然后在Reduce阶段,依据标记判断并合并来自不同表的记录。这种方法确保了JOIN操作的...

    hive实现原理

    - **JoinOperator**:用于连接两张或多张表的数据。 - **SelectOperator**:用于选择输出列。 - **FileSinkOperator**:用于建立结果数据并输出到文件。 - **FilterOperator**:用于过滤输入数据。 - **...

    HiveSQL解析原理.docx

    在深入理解Hive如何将SQL语句转换为MapReduce任务之前,我们需要首先了解MapReduce是如何实现基本的SQL操作,比如Join、Group By以及Distinct等。 ##### 1. Join的实现原理 在Hive中,实现两个表的Join操作通常...

    hive course pdf

    5. **连接运算符(Join Operator)**:将多个表的数据合并。 6. **排序运算符(Sort Operator)**:对数据进行排序。 7. **分桶运算符(Bucket Operator)**:根据指定列进行数据分区,用于提高查询效率。 这些算子...

    hive实现原理--------from淘宝.pdf

    例如,`TableScanOperator` 用于读取表数据,`FilterOperator` 过滤数据,`JoinOperator` 实现数据联接,`GroupByOperator` 进行分组聚合,`ReduceSinkOperator` 准备数据传输到 Reducer,`SelectOperator` 选择输出...

    Apache Hive面试题

    #### 四、Hive大表Join小表的优化方法 1. **小表前置**: - **方法**:将小表放在Join操作的前面。 - **效果**:Hive能够将小表缓存到内存中,从而提高Join操作的效率。 #### 五、Hive中各种Join类型 1. **内...

    HiveSQL执行计划详解.pdf

    - **Operator Log**:展示了每个操作符的执行顺序及其相关信息。 - **Plan Tree**:以树状结构展示整个查询计划,便于理解各个部分之间的关系。 ##### 2. explain 的使用场景 - **案例一:Join 语句会过滤 null 的...

    apache-hive-1.2.2-src:蜂巢源代码学习-apache source code

    3. **执行引擎**:深入`exec`模块,研究`Task`和`Operator`类,理解Hive如何将物理计划转化为MapReduce任务。 4. **SerDe实现**:查看不同类型的SerDe实现,如`LazySimpleSerDe`,理解数据如何在Hive和HDFS之间转换...

    02-2022年上海大数据面试题汇总.pdf

    - Hive 支持多种 Join 类型,包括 Inner Join、Left Outer Join、Right Outer Join、Full Outer Join 等。 - 每种 Join 类型都有不同的应用场景和性能特征。 - **Hive 的炸裂函数:** - Hive 的炸裂函数主要用于...

Global site tag (gtag.js) - Google Analytics