`
iwinit
  • 浏览: 454017 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[HBase]Write Path

阅读更多

HBase的批量put操作主要步骤

1.同个region的put视为同一批操作

2.对批量操作按rowkey进行字节排序

Collections.sort(actionsForRegion);

 3.检查region server的全局内存是否超过阀值,如超过则唤醒flush线程进行flush操作

 

public void reclaimMemStoreMemory() {
	//如果超过高水位,默认为堆内存的0.4,阻塞rpc线程直到内存减少到预期
    if (isAboveHighWaterMark()) {
      lock.lock();
      try {
        boolean blocked = false;
        long startTime = 0;
        while (isAboveHighWaterMark() && !server.isStopped()) {
          .....
	  //给flush线程提交一个task
          wakeupFlushThread();
          try {
            // we should be able to wait forever, but we've seen a bug where
            // we miss a notify, so put a 5 second bound on it at least.
            flushOccurred.await(5, TimeUnit.SECONDS);
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
          }
        }
        ....
      } finally {
        lock.unlock();
      }
    } 
	//如果超过低水位,默认为堆内存的0.35,给flush线程提交一个task,不阻塞线程
    else if (isAboveLowWaterMark()) {
      wakeupFlushThread();
    }
  }
 

 

4.检查这个region的memstore内存大小是否超过限制,超过则唤醒flush线程对该region进行flush,异步操作

 

  private void checkResources()
      throws RegionTooBusyException, InterruptedIOException {

   .....
    boolean blocked = false;
    long startTime = 0;
    //当前region内存大小超过blockingMemStoreSize,默认为memstoreFlushSize的2被,memstoreFlushSize默认128M
    while (this.memstoreSize.get() > this.blockingMemStoreSize) {
	//给flush线程发个请求
      requestFlush();
     。。。。。
      blocked = true;
	//等待一段时间,10s
      synchronized(this) {
        try {
          wait(Math.min(timeToWait, threadWakeFrequency));
        } catch (InterruptedException ie) {
          final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
          if (totalTime > 0) {
            this.updatesBlockedMs.add(totalTime);
          }
          LOG.info("Interrupted while waiting to unblock updates for region "
            + this + " '" + Thread.currentThread().getName() + "'");
          InterruptedIOException iie = new InterruptedIOException();
          iie.initCause(ie);
          throw iie;
        }
      }
    }
 ......
  }

 

5.拿行锁,如果拿不到锁,则不处理

 

  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
      throws IOException {
	//检查row的范围是否在这个region里
    checkRow(row, "row lock");
    startRegionOperation();
    try {
      HashedBytes rowKey = new HashedBytes(row);
      //行锁是一个Latch,释放的时候Latch减1,等待线程就会被唤醒
      CountDownLatch rowLatch = new CountDownLatch(1);

      // loop until we acquire the row lock (unless !waitForLock)
      while (true) {
	//put一把
        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
	//如果锁不存在,则认为拿到锁
        if (existingLatch == null) {
          break;
        } 
	//已经有锁了,则等待锁释放或超时
	else {
          // row already locked
          if (!waitForLock) {
            return null;
          }
          try {
            if (!existingLatch.await(this.rowLockWaitDuration,
                            TimeUnit.MILLISECONDS)) {
              throw new IOException("Timed out on getting lock for row="
                  + Bytes.toStringBinary(row));
            }
          } catch (InterruptedException ie) {
            // Empty
          }
        }
      }

      // loop until we generate an unused lock id
	//锁id是一个原子递增的整数
      while (true) {
        Integer lockId = lockIdGenerator.incrementAndGet();
        HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
        if (existingRowKey == null) {
          return lockId;
        } else {
          // lockId already in use, jump generator to a new spot
          lockIdGenerator.set(rand.nextInt());
        }
      }
    } finally {
      closeRegionOperation();
    }
  }
 6.修改KeyValue的timestamp为当前时间

 

7.拿mvcc的写事务id

    public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
	//事务id是一个原子递增的long
      long nextWriteNumber = ++memstoreWrite;
        //entry用来存这个事务的状态,是否已完成
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }
 8.写入memstore的内存kv列表

 

 

    private long internalAdd(final KeyValue toAdd) {
    //堆内存加了多少
    long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
    timeRangeTracker.includeTimestamp(toAdd);
    this.size.addAndGet(s);
    return s;
  }
 9.写Hlog,但不flush,仍在内存
    private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
      final long now, HTableDescriptor htd, boolean doSync)
    throws IOException {
      ......
      long txid = 0;
      synchronized (this.updateLock) {
	//log的序列号
        long seqNum = obtainSeqNum();
        // The 'lastSeqWritten' map holds the sequence number of the oldest
        // write for each region (i.e. the first edit added to the particular
        // memstore). . When the cache is flushed, the entry for the
        // region being flushed is removed if the sequence number of the flush
        // is greater than or equal to the value in lastSeqWritten.
        // Use encoded name.  Its shorter, guaranteed unique and a subset of
        // actual  name.
        byte [] encodedRegionName = info.getEncodedNameAsBytes();
	//region第一个修改的事务id,flush时所有大于等于该值的entry都会被写入文件
        this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
        HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
        doWrite(info, logKey, edits, htd);
        this.numEntries.incrementAndGet();
	//事务id,代表第几条log
        txid = this.unflushedEntries.incrementAndGet();
        if (htd.isDeferredLogFlush()) {
          lastDeferredTxid = txid;
        }
      }
      // Sync if catalog region, and if not then check if that table supports
      // deferred log flushing
      if (doSync && 
          (info.isMetaRegion() ||
          !htd.isDeferredLogFlush())) {
        // sync txn to file system
        this.sync(txid);
      }
      return txid;
    }
 
    写log的cache
    // appends new writes to the pendingWrites. It is better to keep it in
    // our own queue rather than writing it to the HDFS output stream because
    // HDFSOutputStream.writeChunk is not lightweight at all.
    synchronized void append(Entry e) throws IOException {
      pendingWrites.add(e);
    }
 10.释放行锁
     public void releaseRowLock(final Integer lockId) {
    if (lockId == null) return; // null lock id, do nothing
    //先删除lock id
    HashedBytes rowKey = lockIds.remove(lockId);
    if (rowKey == null) {
      LOG.warn("Release unknown lockId: " + lockId);
      return;
    }
    //再删除lock
    CountDownLatch rowLatch = lockedRows.remove(rowKey);
    if (rowLatch == null) {
      LOG.error("Releases row not locked, lockId: " + lockId + " row: "
          + rowKey);
      return;
    }
    //lock释放
    rowLatch.countDown();
  }
 11.flush Hlog到HDFS

 

 

  // sync all transactions upto the specified txid
  private void syncer(long txid) throws IOException {
    Writer tempWriter;
    synchronized (this.updateLock) {
      if (this.closed) return;
      tempWriter = this.writer; // guaranteed non-null
    }
    // if the transaction that we are interested in is already 
    // synced, then return immediately.
    //当前flush到第一个日志了,有可能已经被其他rpc线程flush掉了
    if (txid <= this.syncedTillHere) {
      return;
    }
    try {
      long doneUpto;
      long now = System.currentTimeMillis();
      // First flush all the pending writes to HDFS. Then 
      // issue the sync to HDFS. If sync is successful, then update
      // syncedTillHere to indicate that transactions till this
      // number has been successfully synced.
      synchronized (flushLock) {
        if (txid <= this.syncedTillHere) {
          return;
        }
        doneUpto = this.unflushedEntries.get();
	//当前所有cache的log
        List<Entry> pending = logSyncerThread.getPendingWrites();
        try {
		//写,但没sync到HDFS
          logSyncerThread.hlogFlush(tempWriter, pending);
        } catch(IOException io) {
          synchronized (this.updateLock) {
            // HBASE-4387, HBASE-5623, retry with updateLock held
            tempWriter = this.writer;
            logSyncerThread.hlogFlush(tempWriter, pending);
          }
        }
      }
      // another thread might have sync'ed avoid double-sync'ing
      if (txid <= this.syncedTillHere) {
        return;
      }
      try {
	//sync到HDFS,写失败重试一次
        tempWriter.sync();
      } catch(IOException io) {
        synchronized (this.updateLock) {
          // HBASE-4387, HBASE-5623, retry with updateLock held
          tempWriter = this.writer;
          tempWriter.sync();
        }
      }
      //当前已sync的日志
      this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);

     ......
    } catch (IOException e) {
      LOG.fatal("Could not sync. Requesting close of hlog", e);
      //回滚。
      requestLogRoll();
      throw e;
    }
  }

 

 @Override
  public void append(HLog.Entry entry) throws IOException {
    entry.setCompressionContext(compressionContext);
    try {
      //SequenceFile写入
      this.writer.append(entry.getKey(), entry.getEdit());
    } catch (NullPointerException npe) {
      // Concurrent close...
      throw new IOException(npe);
    }
  }

 12.修改mvcc的读事务id

  public void completeMemstoreInsert(WriteEntry e) {
    //递增读事务id
    advanceMemstore(e);
    //等待之前的请求全部完成
    waitForRead(e);
  }

 

   boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
	//事务结束
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      //遍历队列,拿到最近已完成的事务id,如果中间有一个请求还未完成,则可能拿到的事务id比当前事务小
      while (!writeQueue.isEmpty()) {
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

	//修改读事务的id,所有小于该id的事务都已完成,对read可见
      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }

 

  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      //如果前面请求还未处理完,则等待它们结束
      while (memstoreRead < e.getWriteNumber()) {
        try {
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

 13.检查memstore的内存大小是否超过memstoreFlushSize,是则请求flush,异步

14.返回结果,如果put操作没拿到行锁,则结果是null

分享到:
评论

相关推荐

    hbase-1.1.5-bin.tar

    2. **环境配置**:解压`hbase-1.1.5-bin.tar`后,你需要将HBase的bin目录添加到系统的PATH环境变量中,以便在命令行中直接运行HBase命令。 3. **配置文件**:HBase的主要配置文件是`conf/hbase-site.xml`。在这里,...

    hbase-1.4.10-bin.tar.gz

    df.write.format("org.apache.spark.sql.hbase") \ .options(table="my_table", columnFamily="cf") \ .option("rowkey", "key_column") \ .save() ``` 至此,你已经在Linux环境中成功搭建了HBase,并准备好了与...

    HBase在风控系统应用和高可用实践.pdf

    Availability Problems-- Phoenix Write Path Availability Problems-- Fail Recovery Availability Problems-- Fail Recovery Dead Lock Availability Problems-- Phoenix Index Write Error Handler Availability ...

    hbase bucket cache

    - `hbase.bucketcache.persistent.path` 设置为 `"file:/disk1/hbase/cache.meta"`,表示 Bucket Cache 元数据的存储路径,用于重启时恢复缓存状态。 #### 工作流程 1. **首次使用流程**: - 当第一次请求一个块...

    sqoop的原理及概念

    5. 写入范围,以便读取 DataDrivenDBInputFormat.write(DataOutput output)。 6. 读取以上 2)写入的范围 DataDrivenDBInputFormat.readFields(DataInput input)。 7. 创建 RecordReader 从数据库中读取数据。 8. ...

    boss直聘spark指标分析程序设计

    result.write.format("parquet").save("path/to/save") ``` 在Boss直聘的Spark指标分析程序中,可能还会涉及到数据源的实时性,如使用Spark Streaming处理实时数据流,或者与外部数据库如Hive、HBase集成。此外,...

    数据库优化最佳实践.pptx

    2. 系统篇:关注底层系统层面的优化,如IO调度算法(如CFQ、DEADLINE、NOOP),RAID策略(write back与write through,以及BBU的使用),网卡多队列技术以提高网络处理能力,以及CPU软中断、中断分布等。这些系统...

    离线数据处理练习表数据

    5. **结果导出**:将处理后的数据写回HDFS或其他存储系统,如`df.write.format("parquet").save("hdfs://path/to/output")`。 在实际项目中,gy_pub.sql和ds_pub.sql可能还会涉及到更复杂的操作,如窗口函数、...

    flume 简介安装使用案例(将log4j数据写到hdfs中)

    2. **Sinks**:数据的去向,通常是数据存储系统,如 HDFS、HBase 或 Kafka。 3. **Channels**:临时存储数据的媒介,确保数据在传输过程中的可靠性。 Flume 的工作流程是线性的,数据从 Source 流入 Channel,然后...

    Flume安装包、安装文档

    a1.sinks.k1.hdfs.writeFormat = Text # Use a channel that buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the ...

    zookeeper相关安装包.rar

    - **HBase**:HBase依赖ZooKeeper进行RegionServer的负载均衡和主 Region 的选举。 在实际使用中,理解并熟练掌握ZooKeeper的这些核心概念和技术,将有助于构建和维护高效稳定的分布式系统。同时,ZooInspector...

    spark部署和基础代码的编写

    Spark是Apache软件基金会下的一个开源大数据处理...df.write.parquet("output/path") ``` 以上就是Spark的基础知识,包括其部署、基本概念和代码编写。通过深入学习和实践,你将能够熟练运用Spark处理各种大数据任务。

    flume-ng-1.6.0-cdh5.16.2.tar.gz

    sink则将接收到的数据发送到目标位置,如HDFS、HBase或Kafka等。 在配置Flume时,用户需要创建一个或多个配置文件,定义agent(代理),每个agent包含源、通道和sink的组合。例如,一个简单的配置可能如下: ```...

    hdfswriter.zip

    DataX是阿里巴巴集团开源的一款跨平台、高性能的数据同步工具,能够实现包括MySQL、Oracle、SQLServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS等数据源之间的数据迁移。...

    apache-flume.tar.gz

    Sink 负责将数据从通道中取出并发送到目的地,如 HDFS、HBase、Elasticsearch 或简单的文本文件。 在配置 Flume 时,你需要定义一个或多个Agent。Agent 是 Flume 的基本工作单元,它包含至少一个源、一个或多个通道...

    大数据技术之Flume笔记

    3. **Sinks**: 数据通过Sinks传递到最终目的地,可以是另一个Flume Agent,也可以是HDFS、Kafka、HBase等。选择哪种Sink取决于你的数据处理需求。 **三、Flume配置** Flume配置采用基于Java的Properties格式,易于...

    大数据采集技术-Flume监控端口实验手册.pdf

    Flume 支持多种数据源,如网络套接字、文件系统、应用程序接口等,能够灵活地将数据传输到各种存储系统,如 HDFS、HBase 或其他日志管理系统。通过构建可配置的、容错的、高可用的数据管道,Flume 提供了一种有效...

    Hadoop实战

    - **HBase**:提供基于 HDFS 的 NoSQL 数据库服务。 - **ZooKeeper**:提供分布式协调服务,解决分布式环境中的一致性问题。 #### 七、Hadoop优化与调优 1. **HDFS优化**: - 增加数据块大小:默认情况下,HDFS ...

    Apache-Spark:使用Apache Spark SQL操纵三个数据集

    sqlResult.write().format("parquet").save("output_path"); ``` 总的来说,Apache Spark SQL通过提供对结构化数据的强大支持,极大地简化了大数据处理中的分析任务。在Java项目中,开发人员可以利用其丰富的API和...

    DataX数据同步手册.docx

    这些系统可能包括传统的关系型数据库如 MySQL、Oracle,以及大数据平台中的 HDFS、Hive、ODPS、HBase 和 FTP 等。通过DataX,可以将原本复杂的多对多的数据同步网络简化为以 DataX 为中心的星型结构,从而大大简化了...

Global site tag (gtag.js) - Google Analytics