`

HBase 写入数据Region路由机制

 
阅读更多
HBase put一条数据 Region 路由规则
1.客户端put接口
org.apache.hadoop.hbase.client.HTableInterface.put(Put put)
   org.apache.hadoop.hbase.client.HTable.put
   public void put(final Put put) throws IOException {
    //缓存数据
 doPut(put);
    if (autoFlush) {
   //提交数据刷写到磁盘请求
      flushCommits();
    }
  }
 
2.提交写请求
org.apache.hadoop.hbase.client.HTable.flushCommits
 public void flushCommits() throws IOException {
    try {
      Object[] results = new Object[writeBuffer.size()];
      try {
  //提交数据刷下请求
        this.connection.processBatch(writeBuffer, tableName, pool, results);
      } catch (InterruptedException e) {
        throw new IOException(e);
      } finally {
      ...
    } finally {
      if (clearBufferOnFail) {
        writeBuffer.clear();//清理客户端缓存数据
        currentWriteBufferSize = 0;
      } else {
        // 计算客户端缓存数据大小
        currentWriteBufferSize = 0;
        for (Put aPut : writeBuffer) {
          currentWriteBufferSize += aPut.heapSize();
        }
      }
    }
  }
 
3.处理批量写
org.apache.hadoop.hbase.client.HConnection.processBatch()
                              .HConnectionImplementation.processBatch()
    public void processBatch(List<? extends Row> list,
        final byte[] tableName,
        ExecutorService pool,
        Object[] results) throws IOException, InterruptedException {
      // This belongs in HTable!!! Not in here. St.Ack
      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException("argument results must be the same size as argument list");
      }
      //处理批量写
      processBatchCallback(list, tableName, pool, results, null);
    }
 //这个方法非常重要,在这里定义了,哪些rowkey对应的数据应该存放到那个Region上,然后将相应的数据提交到Region对应的RegionServer上
    public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {
        ...
        // step 1: 分解Regionserver块并构建对应的数据结构
  //HRegionLocation这个对象至关重要,他定义了Region相关的信息HRegionInfo,RegionServer Name、port
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
   //下面这句是整个put数据路由的核心,将提交的数据根据row分类到不同的Region上
            HRegionLocation loc = locateRegion(tableName, row.getRow());
            byte[] regionName = loc.getRegionInfo().getRegionName();
            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }
            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }
        // step 2: 提交请求到相应的RegionServer上去处理
        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());
        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }
        // step 3:收集写入成功是失败的返回结果
        ...
        // step 4: 对写入失败的数据进行重试.
        ...
    }
 
 
 
 public HRegionLocation relocateRegion(final byte [] tableName,
        final byte [] row)
    throws IOException{
   ...
      return locateRegion(tableName, row, false, true);
    }
 
4.处理批量写
org.apache.hadoop.hbase.client.HConnectionImplementation.locateRegion()
    private HRegionLocation locateRegion(final byte [] tableName,
      final byte [] row, boolean useCache, boolean retry)
    throws IOException {
   ...
   //确保ZK是正常的
      ensureZookeeperTrackers();
      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
        ...
      } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
        ...
      } else {//用户表的数据插入是调用下面的这个操作
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
            useCache, userRegionLock, retry);
      }
    }
    private HRegionLocation locateRegionInMeta(final byte [] parentTable,
      final byte [] tableName, final byte [] row, boolean useCache,
      Object regionLockObject, boolean retry)
    throws IOException {
      HRegionLocation location;
      //如果客户端保存的缓存,从缓存中直接查询
      if (useCache) {
        location = getCachedLocation(tableName, row);
        if (location != null) {
          return location;
        }
      }
   //以下是如果客户端没有ZK缓存,从ZOOKEEPER -> -ROOT- -> .META.将这些数据缓存到客户端,然后再去从.META.表中数去判定row应该路由到那个Region上
     ...
    }
 
  HRegionLocation getCachedLocation(final byte [] tableName,
        final byte [] row) {
      SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
        getTableLocations(tableName);
  ....
      //判断row对应的rowkey应该在哪个Region上,应该将请求发给哪个RegionServer
      byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
          KeyValue.getRowComparator(tableName).compareRows(
              endKey, 0, endKey.length, row, 0, row.length) > 0) {
        return possibleRegion;
      }
      return null;
    }
 
5.下面的事情就是RegionServer去将数据写道Memstore,StoreFile,HFile了
分享到:
评论

相关推荐

    kettle集群搭建以及使用kettle将mysql数据转换为Hbase数据

    通过本文的介绍,我们了解了Kettle集群的基本概念、搭建步骤以及如何使用Kettle将MySQL数据转换为HBase数据的过程。Kettle作为一款强大的数据集成工具,在企业级数据处理中扮演着重要的角色,尤其是在大数据时代背景...

    spark读取hbase数据,并使用spark sql保存到mysql

    使用spark读取hbase中的数据,并插入到mysql中

    hbase数据可视化系统

    使用HBase的Compaction和Split机制,保持Region的平衡;并考虑使用二级索引提高查询效率。 六、总结 通过SpringBoot搭建的HBase可视化系统,使得非技术人员也能便捷地管理和操作HBase,降低了使用门槛,提高了工作...

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...

    hbase备份和数据恢复

    - HLog备份:HBase的日志文件(HLog)记录了所有写入操作,定期备份HLog可以防止数据丢失。 - Region Server的HDFS数据备份:通过复制Region Server上的HDFS数据到其他存储位置,实现数据备份。 3. 数据恢复: -...

    hbase海量数据的全量导入方法

    2. **使用HFileOutputFormat**:HBase提供了HFileOutputFormat类,可以将数据直接写入HFile格式,跳过HBase的内部流程,从而提高数据导入效率。这种方法适用于离线大批量数据导入场景。 3. **并行导入**:利用...

    Hbase同步数据到Solr的方案

    HBase 和 Solr 都是大数据处理中的关键组件。HBase 是一个分布式的、面向列的NoSQL数据库,适合存储大规模结构化数据。而Solr 是一个流行的全文搜索引擎,提供高效的全文检索、命中高亮、拼写检查等特性。将HBase的...

    HDFS读文件并写入Hbase

    从HDFS中读文件,用groupby进行sort,然后写入Hbase中

    hbase regions数据切割.docx

    在HBase这个分布式列式数据库中,Region是其核心的数据存储和管理单元,它负责存储表中的行数据。随着数据量的增长,一个Region可能会变得过大,导致读写性能下降。这时,就需要对Region进行数据切割(Split),以...

    基于数据冗余的HBase合并机制研究_熊安萍

    基于数据冗余的HBase合并机制研究_HBase列式数据库的所有操作均以追加数据的方式写入,导致其合并机制占用资源过多,影响系统读性能。

    Kafka集成Spark Streaming并写入数据到HBase

    **Kafka、Spark Streaming与HBase的集成**...6. **数据写入HBase**:将处理后的数据转换为Put操作,然后使用`hbaseContext.bulkPut`将数据批量写入HBase表。 **参考链接** 对于更详细的实现步骤,可以参考以下链接: ...

    HBase海量数据存储实战视频教程

    从HBase的集群搭建、HBaseshell操作、java编程、架构、原理、涉及的数据结构,并且结合陌陌海量消息存储案例来讲解实战HBase 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为...

    Hbase的region合并与拆分

    1、region 拆分机制 region中存储的是大量的rowkey数据 ,当region中的数据条数过多的时候,直接影响查询效率.当region过大的时候.hbase会拆分region , 这也是Hbase的一个优点 . HBase的region split策略一共有以下几...

    hbase和hadoop数据块损坏处理

    HBase 和 Hadoop 数据块损坏处理 HBase 和 Hadoop 数据块损坏是非常常见的问题,可能会导致数据丢失、集群崩溃等严重后果。因此,了解如何处理 HBase 和 Hadoop 数据块损坏是非常重要的。本文将介绍 HBase 和 ...

    hbase读取数据过程

    HBASE的一个读取数据流程的解析,清晰的画出整个过程,十分有利于理解

    HBASERegion数量增多问题描述及解决方案.docx

    【HBASERegion数量增多问题描述及解决方案】 在HBase分布式数据库中,Region是表数据的基本存储单元,它将表的数据按照ROWKEY的范围进行分割。随着数据的增长,一个Region会分裂成两个,以此来确保数据的均衡分布。...

    浅谈HBASE数据结构设计.pdf

    - Region:HBase将表水平切分为多个Region,每个Region负责一部分行数据的存储,这些Region分布在不同的RegionServer上。随着表数据的增长,Region可以被拆分和移动。 - RegionServer:RegionServer负责管理多个...

    行业分类-设备装置-一种应用于HBASE数据库的数据写入方法及系统.zip

    标题中的“行业分类-设备装置-一种应用于HBASE数据库的数据写入方法及系统”...通过阅读这份文档,读者可以深入理解HBase的数据写入机制,并学习如何在实际项目中有效地运用这些知识,以提升大数据处理的效率和可靠性。

    python 连接hbase 打印数据

    python 连接hbase 打印数据。hbase 的一些源数据未转化

Global site tag (gtag.js) - Google Analytics