Flush过程,对应MemStoreFlusher
1.是否需要做global flush,有则取当前rs最大的region进行flush
if (isAboveLowWaterMark()) { .... //获取memstore最大的region进行flush if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the // entire server - another thread is flushing regions. We'll just // sleep a little bit to avoid spinning, and then pretend that // we flushed one, so anyone blocked will check again lock.lock(); try { Thread.sleep(1000); flushOccurred.signalAll(); } finally { lock.unlock(); } } //完了,再requeue一个任务,再次check是否超过内存限制 // Enqueue another one of these tokens so we'll wake up again wakeupFlushThread();
2.region flush开始
3.检查region下的store file是否超过限制,默认7个,超过则重试(requeue)
if (!fqe.region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { ...... // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); // Tell a lie, it's not flushed but it's ok return true; } }
4.拿Region的读锁,阻塞写
5.MVCC里增加一个事务,代表flush操作
w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w);
6.拿Log sequence id
7.take snapshot,kvList引用切换
void snapshot() { this.lock.writeLock().lock(); try { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. if (!this.snapshot.isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { if (!this.kvset.isEmpty()) { //引用切换 this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { this.allocator = new MemStoreLAB(conf); } } } } finally { this.lock.writeLock().unlock(); } }
8.MVCC等待之前的事务完成
mvcc.waitForRead(w);
9.将内存中的kv数据flush到hfile,流程如下
10.生成一个临时目录,使用UUID生成一个文件名
11.使用StoreScanner遍历内存中的kv数据,循环append入write cache
12.writer最终flush到hfile
private Path internalFlushCache(final SortedSet<KeyValue> set, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException { StoreFile.Writer writer; // Find the smallest read point across all the Scanners. long smallestReadPoint = region.getSmallestReadPoint(); long flushed = 0; Path pathName; // Don't flush if there are no entries. if (set.size() == 0) { return null; } //scan方式扫描KVlist数据,注意内存中的kv数据是有序的,先rowkey排序,再按family和qualifier,再按Timestamp Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); // Use a store scanner to find which rows to flush. // Note that we need to retain deletes, hence // treat this as a minor compaction. InternalScanner scanner = new StoreScanner(this, scan, Collections .singletonList(new CollectionBackedScanner(set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); try { // TODO: We can fail in the below block before we complete adding this // flush to list of store files. Add cleanup of anything put on filesystem // if we fail. synchronized (flushLock) { status.setStatus("Flushing " + this + ": creating writer"); // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); //临时目录 pathName = writer.getPath(); try { List<KeyValue> kvs = new ArrayList<KeyValue>(); boolean hasMore; do { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to disk. if (kv.getMemstoreTS() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); kv.setMemstoreTS(0); } //往cache中写数据 writer.append(kv); flushed += this.memstore.heapSizeChange(kv, true); } kvs.clear(); } } while (hasMore); } finally { // Write out the log sequence number that corresponds to this output // hfile. The hfile is current up to and including logCacheFlushId. status.setStatus("Flushing " + this + ": appending metadata"); writer.appendMetadata(logCacheFlushId, false); status.setStatus("Flushing " + this + ": closing flushed file"); //flush到HDFS writer.close(); } } } ...... }
13.将store file从tmp下move到正式目录,并添加到Store file列表
14.清理snapshot
private boolean updateStorefiles(final StoreFile sf, final SortedSet<KeyValue> set) throws IOException { this.lock.writeLock().lock(); try { //添加到storeFile中 ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles); newList.add(sf); storefiles = sortAndClone(newList); //释放内存 this.memstore.clearSnapshot(set); } finally { // We need the lock, as long as we are updating the storefiles // or changing the memstore. Let us release it before calling // notifyChangeReadersObservers. See HBASE-4485 for a possible // deadlock scenario that could have happened if continue to hold // the lock. this.lock.writeLock().unlock(); } // Tell listeners of the change in readers. notifyChangedReadersObservers(); return needsCompaction(); }
15.修改global和memstore的内存大小
public long addAndGetGlobalMemstoreSize(long memStoreSize) { if (this.rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); } return this.memstoreSize.getAndAdd(memStoreSize); }
16.flush成功后,HLog增加一条flush信息,小于该flush txid的事务已经失效了
public void completeCacheFlush(final byte [] encodedRegionName, final byte [] tableName, final long logSeqId, final boolean isMetaRegion) throws IOException { long start = System.currentTimeMillis(); try { if (this.closed) { return; } long txid = 0; synchronized (updateLock) { //flush的事务数据 WALEdit edit = completeCacheFlushLogEdit(); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); txid = this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); } // sync txn to file system //写入HDFS this.sync(txid); } finally { // updateLock not needed for removing snapshot's entry // Cleaning up of lastSeqWritten is in the finally clause because we // don't want to confuse getOldestOutstandingSeqNum() this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); this.cacheFlushLock.unlock(); } long took = System.currentTimeMillis() - start; doWALTime.inc(took); }
17.唤醒等待的业务线程
相关推荐
4. **实时查询**:HBase支持实时的读写操作,这得益于其内存中的数据模型和即时的数据flush机制。 5. **索引与查询优化**:虽然HBase不是关系型数据库,但仍然提供了索引功能,例如,可以使用Secondary Index进行...
`flush_compact.xml`可能是一个自定义的配置文件,用于调整HBase的刷写和合并策略。 最后,`hive-hbase-handler-1.2.1.jar`是Hive与HBase交互的连接器,它使得用户可以在Hive SQL中直接查询HBase表。这个连接器允许...
修改表属性,如MAX_FILESIZE、READONLY、MEMSTORE_FLUSHSIZE、DEFERRED_LOG_FLUSH等: hbase> alter 't1', MAX_FILESIZE => '134217728' 添加表coprocessor: hbase> alter 't1', 'coprocessor' => 'hdfs:///foo....
《深入探讨HBase查询机制》 HBase,作为一款分布式列式存储系统,以其高效、可扩展的特性在大数据领域广泛应用。本文将深入探讨HBase的查询机制,以帮助我们理解其背后的运作原理。 首先,我们需要了解HBase的查询...
在HBase的生命周期中,当Region的MemStore达到一定大小后,会触发flush操作,此时内存中的数据将被写入HLog,随后形成一个新的HFile存储在HDFS上。这个过程确保了即使在RegionServer宕机的情况下,数据也不会丢失,...
伴随数据写入,内存中的数据达到由“hbase.hregion.memstore.flush.size”参数控制的阈值(默认64MB)时,会被写入到region文件中。当region文件大小达到由“hbase.hregion.max.filesize”参数决定的上限(默认256MB...
- 当MemStore达到一定阈值时,会触发Flush操作,将数据写入StoreFile。 - 随着StoreFile的增多,会触发Compaction操作,将多个StoreFile合并成一个更大的StoreFile,以减少文件的数量并提高查询效率。 - 这个过程...
3. **HBase配置调整**:例如增大`hbase.hregion.max.filesize`以控制Region大小,调整`hbase.regionserver.handler.count`以增加处理线程数,或者优化`hbase.hregion.memstore.flush.size`以平衡内存和磁盘IO。...
当MemStore达到一定大小时,数据会被flush到磁盘上的StoreFile。 2. **数据读取流程**:读取数据时,会从最新的StoreFile开始查找,然后是旧的StoreFile,最后是WAL,以确保数据的一致性。 3. **Compaction过程**:...
通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。...
当数据写入HTable时,如果autoFlush设置为false,写操作会被缓存在客户端,直到缓冲区满或者手动触发flush操作。当达到writeBufferSize阈值时,客户端会通过RPC将这些Put操作发送给对应的Region Server。 Region ...
6. `hbase.hregion.memstore.flush.size`: 内存存储单元(MemStore)的刷新阈值,达到该值时会触发数据写入磁盘。 7. `hbase.regionserver.global.memstore.lowerLimit` 和 `hbase.regionserver.global.memstore....
- MemStore大小和flush策略:调整这些参数可以影响写入性能。 - Compaction策略:合理配置Compaction可以提高数据读取效率。 7. HBase的安全设置 HBase提供了安全机制,支持认证和授权,可以与Kerberos等安全协议...
`hbase.hregion.memstore.flush.size`决定了每个Region内存缓冲区的刷新阈值,当达到此大小时,Region会触发flush操作,将数据写入磁盘。 `hbase.regionserver.handler.count`是RegionServer处理请求的线程数,增加...
### HBase架构与运作机制详解 #### 一、HBase概览 HBase是一个分布式、可扩展的大数据存储系统,其设计目标是为了提供高可靠性、高性能的数据存储服务。HBase基于Google的Bigtable论文实现,运行于Hadoop之上,...
- 写入时,数据首先被发送到MemStore,当达到一定阈值后,会触发Flush操作,将数据写入HFile。 - 读取时,通过行键定位到对应的Region,然后从HFile和MemStore中查找数据。 5. **HBase的Region分裂**: - 当...
`hbase.hregion.memstore.flush.size`定义了触发MemStore刷新到磁盘的大小。 4. **HDFS相关配置**:HBase依赖于HDFS存储数据。`hbase.fs.defaultFS`指定HDFS的默认文件系统。`dfs.blocksize`设置HDFS的块大小,通常...
- **hbase.hregion.memstore.flush.size**:设置MemStore刷新到磁盘的阈值大小。 **5.2 性能调优** 为了提高HBase的性能,可以考虑以下方面: - **操作系统层面**:优化操作系统设置,如内存管理、文件系统缓存等...
同时,HBase的Region分布、表分区策略、以及Compaction和Flush机制也是理解HBase性能调优的重要知识点。 这个名为“Hbasetest”的压缩包文件很可能包含了上述操作的完整示例代码,你可以通过查看和运行代码来加深对...