`

hbase increment代码

阅读更多

hbase increase    

 

increase代码

1.将数据封装为increment对象

2.从increment对象中封装get

3.封装新kv

4.对HRegion下的Storm做upsert或add操作

5.查看是否需要flush并添加队列

6.返回kvs

 

HRegion代码,如下

 

  /**
   * Perform one or more increment operations on a row.
   * @param increment
   * @return new keyvalues after increment
   * @throws IOException
   */
  public Result increment(Increment increment, long nonceGroup, long nonce)
  throws IOException {
    byte [] row = increment.getRow();
    checkRow(row, "increment");
    TimeRange tr = increment.getTimeRange();
    boolean flush = false;
    Durability durability = getEffectiveDurability(increment.getDurability());
    boolean writeToWAL = durability != Durability.SKIP_WAL;
    WALEdit walEdits = null;
    List<Cell> allKVs = new ArrayList<Cell>(increment.size());
    Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();

    long size = 0;
    long txid = 0;

    checkReadOnly();
    checkResources();
    // Lock row
    startRegionOperation(Operation.INCREMENT);
    this.writeRequestsCount.increment();
    WriteEntry w = null;
    try {
      RowLock rowLock = getRowLock(row);
      try {
        lock(this.updatesLock.readLock());
        // wait for all prior MVCC transactions to finish - while we hold the row lock
        // (so that we are guaranteed to see the latest state)
        mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
        // now start my own transaction
        w = mvcc.beginMemstoreInsert();
        try {
          long now = EnvironmentEdgeManager.currentTimeMillis();
          // Process each family
          for (Map.Entry<byte [], List<Cell>> family:
              increment.getFamilyCellMap().entrySet()) {

            Store store = stores.get(family.getKey());
            List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());

            // Sort the cells so that they match the order that they
            // appear in the Get results. Otherwise, we won't be able to
            // find the existing values if the cells are not specified
            // in order by the client since cells are in an array list.
            Collections.sort(family.getValue(), store.getComparator());
            // Get previous values for all columns in this family
            // 从increment封装get请求
            Get get = new Get(row);
            for (Cell cell: family.getValue()) {
              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
              get.addColumn(family.getKey(), kv.getQualifier());
            }
            get.setTimeRange(tr.getMin(), tr.getMax());
            List<Cell> results = get(get, false);//获得此increase的row

            // Iterate the input columns and update existing values if they were
            // found, otherwise add new column initialized to the increment amount
            int idx = 0;
            for (Cell kv: family.getValue()) {
              long amount = Bytes.toLong(CellUtil.cloneValue(kv));//获得当前value值
              boolean noWriteBack = (amount == 0);

              Cell c = null;
              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
                c = results.get(idx);
                if(c.getValueLength() == Bytes.SIZEOF_LONG) {
                  amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
                } else {
                  // throw DoNotRetryIOException instead of IllegalArgumentException
                  throw new org.apache.hadoop.hbase.DoNotRetryIOException(
                      "Attempted to increment field that isn't 64 bits wide");
                }
                idx++;
              }

              // Append new incremented KeyValue to list
              byte[] q = CellUtil.cloneQualifier(kv);
              byte[] val = Bytes.toBytes(amount);
              int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
              int incCellTagsLen = kv.getTagsLength();
              KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
                  KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
              System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
              System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
                  family.getKey().length);
              System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
              // copy in the value
              System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
              // copy tags
              if (oldCellTagsLen > 0) {
                System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
                    newKV.getTagsOffset(), oldCellTagsLen);
              }
              if (incCellTagsLen > 0) {
                System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
                    newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
              }
              newKV.setMvccVersion(w.getWriteNumber());
              // Give coprocessors a chance to update the new cell
              if (coprocessorHost != null) {
                newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
                    RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
              }
              allKVs.add(newKV);

              if (!noWriteBack) {
                kvs.add(newKV);

                // Prepare WAL updates
                if (writeToWAL) {
                  if (walEdits == null) {
                    walEdits = new WALEdit();
                  }
                  walEdits.add(newKV);
                }
              }
            }

            //store the kvs to the temporary memstore before writing HLog
            if (!kvs.isEmpty()) {
              tempMemstore.put(store, kvs);
            }
          }

          // Actually write to WAL now
          if (walEdits != null && !walEdits.isEmpty()) {
            if (writeToWAL) {
              // Using default cluster id, as this can only happen in the orginating
              // cluster. A slave cluster receives the final value (not the delta)
              // as a Put.
              txid = this.log.appendNoSync(this.getRegionInfo(),
                  this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
                  EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
                  true, nonceGroup, nonce);
            } else {
              recordMutationWithoutWal(increment.getFamilyCellMap());
            }
          }
          //Actually write to Memstore now
          if (!tempMemstore.isEmpty()) {//更新hbase kv
            for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
              Store store = entry.getKey();
              if (store.getFamily().getMaxVersions() == 1) {
                // upsert if VERSIONS for this CF == 1
                size += store.upsert(entry.getValue(), getSmallestReadPoint());
              } else {
                // otherwise keep older versions around
                for (Cell cell : entry.getValue()) {
                  KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                  size += store.add(kv);
                }
              }
            }
            size = this.addAndGetGlobalMemstoreSize(size);
            flush = isFlushSize(size);
          }
        } finally {
          this.updatesLock.readLock().unlock();
        }
      } finally {
        rowLock.release();
      }
      if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
        // sync the transaction log outside the rowlock
        syncOrDefer(txid, durability);
      }
    } finally {
      if (w != null) {
        mvcc.completeMemstoreInsert(w);
      }
      closeRegionOperation(Operation.INCREMENT);
      if (this.metricsRegion != null) {
        this.metricsRegion.updateIncrement();
      }
    }

    if (flush) {
      // Request a cache flush.  Do it outside update lock.
      requestFlush();
    }

    return Result.create(allKVs);
  }

 

在一些情况下,如increment压力过大时,会出现下列错误,startNonceOperation方法:

 

 regionserver.ServerNonceManager: Conflict detected by nonce

 

一个mutation里边有多个相同的nonce的操作,如increment,这样会产生此日志,影响相应速度

 

 

 

分享到:
评论

相关推荐

    HBase源代码 hbase-0.98.23

    《深入剖析HBase源代码:hbase-0.98.23》 HBase,作为Apache的一个开源项目,是构建在Hadoop之上的分布式、版本化、列族式的NoSQL数据库,它提供了高可靠性、高性能、可伸缩的数据存储解决方案。本文将基于hbase-...

    经过测试,总结出可运行成功的C#For HBase示例代码

    标题"经过测试,总结出可运行成功的C# For HBase示例代码"表明,这里包含的是一系列已经经过验证的C#代码片段,它们能够成功地与HBase进行交互,执行常见的数据操作。这些示例代码对于初学者和有经验的开发者都极具...

    WordCount,HBase MR样例代码

    “HBase MR样例代码”则指的是使用Hadoop的MapReduce框架来操作HBase,HBase是一个基于Google的Bigtable论文设计的开源NoSQL数据库,运行在Hadoop之上。HBase提供高吞吐量的数据读写能力,适合存储非结构化和半结构...

    VC代码 hbase1.0 (实用代码源).rar

    VC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 ...

    HBase基本操作 Java代码

    HBase基本操作 增删改查 java代码 要使用须导入对应的jar包

    java链接及操作hbase实例代码

    这个名为“Hbasetest”的压缩包文件很可能包含了上述操作的完整示例代码,你可以通过查看和运行代码来加深对Java操作HBase的理解。务必注意,根据你的HBase集群配置,可能需要调整代码中的连接参数,以确保正确连接...

    hbase权威指南.源代码

    《HBase权威指南》是一本深入探讨分布式大数据存储系统HBase的专业书籍,其源代码的提供为读者提供了更直观的学习材料。HBase是基于Apache Hadoop的非关系型数据库(NoSQL),它在大规模数据存储方面表现卓越,尤其...

    中国移动storm练习项目hbase代码

    【中国移动storm练习项目hbase代码】是一个以HBase数据库为核心的应用实践项目,主要涉及实时数据处理框架Apache Storm和大数据存储系统HBase的结合使用。在这个项目中,开发者可能需要掌握如何利用Storm处理实时...

    Hbase权威指南 随书源代码 源码包 绝对完整版

    Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。

    spark streamming消费kafka数据存入hbase示例代码

    这个示例代码是用 Scala 编写的,用于演示如何使用 Spark Streaming 消费来自 Kafka 的数据,并将这些数据存储到 HBase 数据库中。Kafka 是一个分布式流处理平台,而 HBase 是一个基于 Hadoop 的非关系型数据库,...

    hbase权威指南源代码下载

    在本文中,我们将围绕HBase的核心概念、架构以及如何通过源代码学习进行深入探讨。 HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的分布式数据库,特别适合处理海量结构化数据。它的设计目标是在廉价...

    利用Hbase的Coprocessor实现的增量式Apriori算法

    增量式的Apriori算法,有点像分布式的Apriori,因为我们可以把已挖掘的事务集和新增的事务集看作两个互相独立的数据集,挖掘新增的事务集,获取所有新增频繁集,然后与已有的频繁集做并集,对于两边都同时频繁的项集...

    hbase 的java代码 集合 hbase 0.96

    1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...

    java代码将mysql表数据导入HBase表

    本文将详细介绍如何使用Java代码实现这一过程,包括样例MySQL表和数据,以及HBase表的创建。 首先,我们需要了解MySQL和HBase的基本概念。MySQL是一种关系型数据库管理系统,它基于ACID(原子性、一致性、隔离性和...

    使用Java API连接虚拟机HBase并进行数据库操作,Java源代码

    在Java代码中,首先需要导入必要的库,如`org.apache.hadoop.hbase.HBaseConfiguration`和`org.apache.hadoop.hbase.client.Connection`。然后,我们需要创建一个HBase的配置对象,设置Zookeeper的地址,这是HBase...

    《HBase权威指南》示例代码

    这本书的示例代码提供了丰富的实践案例,帮助读者更好地理解和应用HBase的核心概念和技术。 在HBase中,数据被组织成表格形式,由行键(Row Key)、列族(Column Family)、列限定符(Column Qualifier)和时间戳...

    hbase0.94java源代码

    这个源代码包是针对Java开发者的宝贵资源,可以帮助他们深入理解HBase的内部工作原理,优化自己的应用,以及对HBase进行定制化开发。 HBase的核心概念包括: 1. 表(Table):HBase的数据存储在表中,每个表由行键...

    java解决hive快速导数据到Hbase代码

    这个项目可能包括了Hive和HBase的连接代码、数据预处理逻辑、MapReduce作业的配置以及加载HFiles的Java代码。通过阅读和理解这个项目的源码,你可以更好地掌握如何在实际项目中实现Hive到HBase的数据快速导入。 ...

    HbaseTemplate 操作hbase

    在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。...在实际项目中,结合Spring的依赖注入和配置管理,能够有效地提升代码的可维护性和可扩展性。

Global site tag (gtag.js) - Google Analytics