`
iwinit
  • 浏览: 454617 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[HBase]Flush

阅读更多

Flush过程,对应MemStoreFlusher

1.是否需要做global flush,有则取当前rs最大的region进行flush

	if (isAboveLowWaterMark()) {
            ....
		//获取memstore最大的region进行flush
            if (!flushOneForGlobalPressure()) {
              // Wasn't able to flush any region, but we're above low water mark
              // This is unlikely to happen, but might happen when closing the
              // entire server - another thread is flushing regions. We'll just
              // sleep a little bit to avoid spinning, and then pretend that
              // we flushed one, so anyone blocked will check again
              lock.lock();
              try {
                Thread.sleep(1000);
                flushOccurred.signalAll();
              } finally {
                lock.unlock();
              }
            }
		//完了,再requeue一个任务,再次check是否超过内存限制
            // Enqueue another one of these tokens so we'll wake up again
            wakeupFlushThread();

2.region flush开始

3.检查region下的store file是否超过限制,默认7个,超过则重试(requeue)

if (!fqe.region.getRegionInfo().isMetaRegion() &&
        isTooManyStoreFiles(region)) {
      ......

        // Put back on the queue.  Have it come back out of the queue
        // after a delay of this.blockingWaitTime / 100 ms.
        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
        // Tell a lie, it's not flushed but it's ok
        return true;
      }
    }

 4.拿Region的读锁,阻塞写

5.MVCC里增加一个事务,代表flush操作

 w = mvcc.beginMemstoreInsert();
      mvcc.advanceMemstore(w);

 6.拿Log sequence id

7.take snapshot,kvList引用切换

 void snapshot() {
    this.lock.writeLock().lock();
    try {
      // If snapshot currently has entries, then flusher failed or didn't call
      // cleanup.  Log a warning.
      if (!this.snapshot.isEmpty()) {
        LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
      } else {
        if (!this.kvset.isEmpty()) {
	//引用切换
          this.snapshot = this.kvset;
          this.kvset = new KeyValueSkipListSet(this.comparator);
          this.snapshotTimeRangeTracker = this.timeRangeTracker;
          this.timeRangeTracker = new TimeRangeTracker();
          // Reset heap to not include any keys
          this.size.set(DEEP_OVERHEAD);
          // Reset allocator so we get a fresh buffer for the new memstore
          if (allocator != null) {
            this.allocator = new MemStoreLAB(conf);
          }
        }
      }
    } finally {
      this.lock.writeLock().unlock();
    }
  }

 8.MVCC等待之前的事务完成

mvcc.waitForRead(w);

 9.将内存中的kv数据flush到hfile,流程如下

10.生成一个临时目录,使用UUID生成一个文件名

11.使用StoreScanner遍历内存中的kv数据,循环append入write cache

12.writer最终flush到hfile

private Path internalFlushCache(final SortedSet<KeyValue> set,
      final long logCacheFlushId,
      TimeRangeTracker snapshotTimeRangeTracker,
      AtomicLong flushedSize,
      MonitoredTask status)
      throws IOException {
    StoreFile.Writer writer;
    // Find the smallest read point across all the Scanners.
    long smallestReadPoint = region.getSmallestReadPoint();
    long flushed = 0;
    Path pathName;
    // Don't flush if there are no entries.
    if (set.size() == 0) {
      return null;
    }
	//scan方式扫描KVlist数据,注意内存中的kv数据是有序的,先rowkey排序,再按family和qualifier,再按Timestamp
    Scan scan = new Scan();
    scan.setMaxVersions(scanInfo.getMaxVersions());
    // Use a store scanner to find which rows to flush.
    // Note that we need to retain deletes, hence
    // treat this as a minor compaction.
    InternalScanner scanner = new StoreScanner(this, scan, Collections
        .singletonList(new CollectionBackedScanner(set, this.comparator)),
        ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
        HConstants.OLDEST_TIMESTAMP);
    try {
      // TODO:  We can fail in the below block before we complete adding this
      // flush to list of store files.  Add cleanup of anything put on filesystem
      // if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + this + ": creating writer");
        // A. Write the map out to the disk
        writer = createWriterInTmp(set.size());
        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
	//临时目录
        pathName = writer.getPath();
        try {
          List<KeyValue> kvs = new ArrayList<KeyValue>();
          boolean hasMore;
          do {
            hasMore = scanner.next(kvs);
            if (!kvs.isEmpty()) {
              for (KeyValue kv : kvs) {
                // If we know that this KV is going to be included always, then let us
                // set its memstoreTS to 0. This will help us save space when writing to disk.
                if (kv.getMemstoreTS() <= smallestReadPoint) {
                  // let us not change the original KV. It could be in the memstore
                  // changing its memstoreTS could affect other threads/scanners.
                  kv = kv.shallowCopy();
                  kv.setMemstoreTS(0);
                }
		//往cache中写数据
                writer.append(kv);
                flushed += this.memstore.heapSizeChange(kv, true);
              }
              kvs.clear();
            }
          } while (hasMore);
        } finally {
          // Write out the log sequence number that corresponds to this output
          // hfile.  The hfile is current up to and including logCacheFlushId.
          status.setStatus("Flushing " + this + ": appending metadata");
          writer.appendMetadata(logCacheFlushId, false);
          status.setStatus("Flushing " + this + ": closing flushed file");
	//flush到HDFS
          writer.close();
        }
      }
    }
......
  }

13.将store file从tmp下move到正式目录,并添加到Store file列表

14.清理snapshot

private boolean updateStorefiles(final StoreFile sf,
                                   final SortedSet<KeyValue> set)
  throws IOException {
    this.lock.writeLock().lock();
    try {
	//添加到storeFile中
      ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
      newList.add(sf);
      storefiles = sortAndClone(newList);
	//释放内存
      this.memstore.clearSnapshot(set);
    } finally {
      // We need the lock, as long as we are updating the storefiles
      // or changing the memstore. Let us release it before calling
      // notifyChangeReadersObservers. See HBASE-4485 for a possible
      // deadlock scenario that could have happened if continue to hold
      // the lock.
      this.lock.writeLock().unlock();
    }

    // Tell listeners of the change in readers.
    notifyChangedReadersObservers();

    return needsCompaction();
  }

 15.修改global和memstore的内存大小

  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
    if (this.rsAccounting != null) {
      rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
    }  
    return this.memstoreSize.getAndAdd(memStoreSize);
  }

16.flush成功后,HLog增加一条flush信息,小于该flush txid的事务已经失效了

  public void completeCacheFlush(final byte [] encodedRegionName,
      final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
  throws IOException {
    long start = System.currentTimeMillis();
    try {
      if (this.closed) {
        return;
      }
      long txid = 0;
      synchronized (updateLock) {
	//flush的事务数据
        WALEdit edit = completeCacheFlushLogEdit();
        HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
            System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
        logSyncerThread.append(new Entry(key, edit));
        txid = this.unflushedEntries.incrementAndGet();
        this.numEntries.incrementAndGet();
      }
      // sync txn to file system
	//写入HDFS
      this.sync(txid);

    } finally {
      // updateLock not needed for removing snapshot's entry
      // Cleaning up of lastSeqWritten is in the finally clause because we
      // don't want to confuse getOldestOutstandingSeqNum()
      this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
      this.cacheFlushLock.unlock();
    }
    long took = System.currentTimeMillis() - start;
    doWALTime.inc(took);
  }

 17.唤醒等待的业务线程

分享到:
评论

相关推荐

    hbase-1.2.6-bin.tar.gz

    4. **实时查询**:HBase支持实时的读写操作,这得益于其内存中的数据模型和即时的数据flush机制。 5. **索引与查询优化**:虽然HBase不是关系型数据库,但仍然提供了索引功能,例如,可以使用Secondary Index进行...

    hbase资料_hbase-default.xml.zip

    `flush_compact.xml`可能是一个自定义的配置文件,用于调整HBase的刷写和合并策略。 最后,`hive-hbase-handler-1.2.1.jar`是Hive与HBase交互的连接器,它使得用户可以在Hive SQL中直接查询HBase表。这个连接器允许...

    HBASE具体操作指令

    修改表属性,如MAX_FILESIZE、READONLY、MEMSTORE_FLUSHSIZE、DEFERRED_LOG_FLUSH等: hbase&gt; alter 't1', MAX_FILESIZE =&gt; '134217728' 添加表coprocessor: hbase&gt; alter 't1', 'coprocessor' =&gt; 'hdfs:///foo....

    HBase查询的深入研究

    《深入探讨HBase查询机制》 HBase,作为一款分布式列式存储系统,以其高效、可扩展的特性在大数据领域广泛应用。本文将深入探讨HBase的查询机制,以帮助我们理解其背后的运作原理。 首先,我们需要了解HBase的查询...

    深入学习hbase原理资料整理

    在HBase的生命周期中,当Region的MemStore达到一定大小后,会触发flush操作,此时内存中的数据将被写入HLog,随后形成一个新的HFile存储在HDFS上。这个过程确保了即使在RegionServer宕机的情况下,数据也不会丢失,...

    HBase性能深度分析

    伴随数据写入,内存中的数据达到由“hbase.hregion.memstore.flush.size”参数控制的阈值(默认64MB)时,会被写入到region文件中。当region文件大小达到由“hbase.hregion.max.filesize”参数决定的上限(默认256MB...

    大数据HBASE考题材料

    - 当MemStore达到一定阈值时,会触发Flush操作,将数据写入StoreFile。 - 随着StoreFile的增多,会触发Compaction操作,将多个StoreFile合并成一个更大的StoreFile,以减少文件的数量并提高查询效率。 - 这个过程...

    Hadoop2.7.1+Hbase1.2.1集群环境搭建(7)hbase 性能优化

    3. **HBase配置调整**:例如增大`hbase.hregion.max.filesize`以控制Region大小,调整`hbase.regionserver.handler.count`以增加处理线程数,或者优化`hbase.hregion.memstore.flush.size`以平衡内存和磁盘IO。...

    HBASE基础应用的介绍

    当MemStore达到一定大小时,数据会被flush到磁盘上的StoreFile。 2. **数据读取流程**:读取数据时,会从最新的StoreFile开始查找,然后是旧的StoreFile,最后是WAL,以确保数据的一致性。 3. **Compaction过程**:...

    某大数据公司内部Hbase性能测试详细设计文档及用例

    通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。...

    HBase源码分析

    当数据写入HTable时,如果autoFlush设置为false,写操作会被缓存在客户端,直到缓冲区满或者手动触发flush操作。当达到writeBufferSize阈值时,客户端会通过RPC将这些Put操作发送给对应的Region Server。 Region ...

    HBase参数修改 PDF 下载

    6. `hbase.hregion.memstore.flush.size`: 内存存储单元(MemStore)的刷新阈值,达到该值时会触发数据写入磁盘。 7. `hbase.regionserver.global.memstore.lowerLimit` 和 `hbase.regionserver.global.memstore....

    HBase官网文档解读.pdf

    - MemStore大小和flush策略:调整这些参数可以影响写入性能。 - Compaction策略:合理配置Compaction可以提高数据读取效率。 7. HBase的安全设置 HBase提供了安全机制,支持认证和授权,可以与Kerberos等安全协议...

    HBase配置文件若干配置.zip

    `hbase.hregion.memstore.flush.size`决定了每个Region内存缓冲区的刷新阈值,当达到此大小时,Region会触发flush操作,将数据写入磁盘。 `hbase.regionserver.handler.count`是RegionServer处理请求的线程数,增加...

    Hbase

    ### HBase架构与运作机制详解 #### 一、HBase概览 HBase是一个分布式、可扩展的大数据存储系统,其设计目标是为了提供高可靠性、高性能的数据存储服务。HBase基于Google的Bigtable论文实现,运行于Hadoop之上,...

    大数据技术之HBase的面试题.zip

    - 写入时,数据首先被发送到MemStore,当达到一定阈值后,会触发Flush操作,将数据写入HFile。 - 读取时,通过行键定位到对应的Region,然后从HFile和MemStore中查找数据。 5. **HBase的Region分裂**: - 当...

    HBase配置项说明及调优建议.zip

    `hbase.hregion.memstore.flush.size`定义了触发MemStore刷新到磁盘的大小。 4. **HDFS相关配置**:HBase依赖于HDFS存储数据。`hbase.fs.defaultFS`指定HDFS的默认文件系统。`dfs.blocksize`设置HDFS的块大小,通常...

    HBase文档

    - **hbase.hregion.memstore.flush.size**:设置MemStore刷新到磁盘的阈值大小。 **5.2 性能调优** 为了提高HBase的性能,可以考虑以下方面: - **操作系统层面**:优化操作系统设置,如内存管理、文件系统缓存等...

    java链接及操作hbase实例代码

    同时,HBase的Region分布、表分区策略、以及Compaction和Flush机制也是理解HBase性能调优的重要知识点。 这个名为“Hbasetest”的压缩包文件很可能包含了上述操作的完整示例代码,你可以通过查看和运行代码来加深对...

Global site tag (gtag.js) - Google Analytics