`

hbase-mvcc principle

 
阅读更多

 

Part I---mvcc concepts

  in most of databases,the acid symentics are ganranteed ,also,like hbase.but the formers are based on the whole db object,and hbase is based on row(so called hbase provides row-based transactions).  in hbase,the acid symentics are addressed by:

  atomicity:all cell operations belong a transaction either completely finished or failed

  cosistency:named as integrity,that means the state from hbase is tranformed from one valid state to the other one.(eg.in a bank's transfer oper,the total amount is constant during withdrawing and posting;a row will not disappear during an update etc)

  isolation:any transaction has it's own data space ,ie.doesn't interfere others when processsing.there are two ioslation levels in hbase's scan opers:read_committed and read_uncommitted.

  all three symentics above in general are achieved by 'Edit-Log'.

  durability:a resulting data from a success transaction must be persistent.ie.will be re-read by later readers before any transactions.(usually use Lock to implement)

  MVCC(multiversion concurrent/consistency control) is a method used for concurrent controls.so you can think as:

  multi verion-is a mean to achieve the goal-concurrent control

  there is a class named MultiVersionConsistencyControl.java in hbase to achieve this ,and a related class IsolationLeve.java also.in fact ,the MultiVersionConsistencyControl.java is used to:

  1.implement the symentic Isolation(reads on wrting) on the same component or accross some ones(eg wal and memstore etc)

  2.avoid read waitings when writing by unblocking the reads (which will be blocked if use Lock to do)

 

  --without a isolation,the readings will conflict with writings like below:



 

 

 

 

Part II---hbase's mvcc implemention

 

  here is the structure of MultiVersionConsistencyControl.java velow:

class MultiVersionConsistencyControl{
 int memstoreRead; //mark the current readable max write number(seq no)
 int memsotreWrite; //current the max seq no
 Object readWaiters; //lock to check whether a entry is readable
 linkedlist writeQueue; //fifo list used to collaborate read-write consistency
 .......
 //advacne the write number and keep it  
 public WriteEntry beginMemstoreInsert(){...}
 //the transaction is complete,wait to this entry to be readable;
 //it contains both advanceMemstore() and waitForRead() invokings.
 void completeMemstoreInsert(entry){...}
 //remove the leading completed entries and update the memstoreRead
 boolean advanceMemstore(e){...}
 //wait to check wthether this entry is finished.
 void waitForRead(e){...}

  ......

}

 

 actually,there is only one mvcc instance accross all the hbase project,which is placed in HRegion.mvcc.

 ------there is a Puts/Deletes process model :

1.start a read lock by this.lock

  //then append mutations to wal

  a.spawn a read lock by updateLock

  b. beginMemstoreInsert()

 c.add Puts to memstore

 d.append to wal without sync

 e.release the updateLock's read lock which placed in a

 f.sync wal(use flushLock to synchronize  )

 g.completeMemstoreInsert()

 

2.calculate the global flush.size(but here is bug in addAndGetGlobalMemstoreSize() 

3.release the read lock that placed in step 1.

4.flush the memstore if this mutations's size is bigger than the newly added size

 a.add a FlushRegionEntry to flushQueue,so the MemStoreFlusher will check it periodly

 //below is the flush-memstore actually.

  1) entry flushcache()

  2) start read lock by this.lock

  3) start write lock by tihs.updateLock

  4) beginMemstoreInsert() then advanceMemstore()  //mark this write number 

  5) generate a new log seq num by mvcc instance

  6) takes snapshts for all stores

  7) release write lock

  8) mvcc.waitForRead() //wait all global write entries less then current one to finish.so all the edits(Puts) will sync to wal before we can flsuh the memstore,so the data in hfile will be consistency with wal.

 

 b.flush to memstore to hfile

 c.append FLSUHCACHE-COMPLETE to log/wal 

 d.finally,release the read lock

 

ReadWriteLock model:

precondition later reads later writes
read lock o x
write lock x x

so with this Readwritelock,we can avoid the delay by common Lock object:by using a read-read mode concurrently.

 

through this model with mvcc,we know :

1.a abstract model:lock.readlock{   updateLock.read/write.lock{ memstore related logics....} }

2.block adding kvs to memstore(in step 1.c) when flushing( make snapshots);but add kvs to memstore  simultaneously is not mutex.

 

 

how to implement MultiVersion?

1.use mvcc to a versions keeper

2.data versions in memstore. todo

 

 

Part III---Practice in hbase

  i have changed some codes to demonstrate how mvcc operates below:

---MultiVersionConsistencyControl.java

boolean advanceMemstore(WriteEntry e) { //-return true if this specified entry is readable
    synchronized (writeQueue) {
      e.markCompleted(); //-mark current e in queue first,then below maybe check it; 

      long nextReadValue = -1;
      boolean ranOnce=false; //-same write entry will iterate this loop many times below: 
      while (!writeQueue.isEmpty()) { //-iterate all and remove elements until found an uncompleted one
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {	 
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }
        System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-last w id " + writeQueue.getLast().writeNumber +"first " + queueFirst.writeNumber + ",size " + writeQueue.size()); //TODO
        if (queueFirst.isCompleted()) {  
          nextReadValue = queueFirst.getWriteNumber(); //-let the readers seen this version
          System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-*complete w id " + queueFirst.getWriteNumber() + ",passed id " + e.getWriteNumber()); //TODO
          writeQueue.removeFirst();	//remove first element now
        } else {  
        	System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-queue size " + writeQueue.size() + ",passed id " + e.getWriteNumber() + 
        			",get first id " + queueFirst.getWriteNumber() ); //TODO test 
          break;
        }
      }//while

      if (!ranOnce) { //-writequeue is empty
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) { 
        synchronized (readWaiters) {
          memstoreRead = nextReadValue; //-advance the current readable version if found at least one write entry is complete
          readWaiters.notifyAll(); //-see waitForRead(x)
        }
      }
      if (memstoreRead >= e.getWriteNumber()) { 
        return true;
      }
      return false;
    }//sync writeQueue
  }

 

 ----TestMultiVersionConsistencyControl.java

static class Writer implements Runnable {
    final AtomicBoolean finished;
    final MultiVersionConsistencyControl mvcc;
    final AtomicBoolean status;

    Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
      this.finished = finished;
      this.mvcc = mvcc;
      this.status = status;
    }

    private Random rnd = new Random();
    public boolean failed = false;

    public void run() {
      while (!finished.get()) { 
        MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert();
        // System.out.println("Begin write: " + e.getWriteNumber());
        // 10 usec - 500usec (including 0)
        int sleepTime = rnd.nextInt(500);
        // 500 * 1000 = 500,000ns = 500 usec
        // 1 * 100 = 100ns = 1usec
        try {
          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
        } catch (InterruptedException e1) {
        }
        try {
          mvcc.completeMemstoreInsert(e); 
          System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-done insert");
        } catch (RuntimeException ex) {
          // got failure
          System.out.println(ex.toString());
          ex.printStackTrace();
          status.set(false);
          return;
          // Report failure if possible.
        }
      }
    }
  }

  public void testParallelism() throws Exception {
    final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();

    final AtomicBoolean finished = new AtomicBoolean(false);

    // fail flag for the reader thread
    final AtomicBoolean readerFailed = new AtomicBoolean(false);
    final AtomicLong failedAt = new AtomicLong();
    
    Runnable reader = new Runnable() {
      public void run() { //loop run to  check readpoint consistent
        long prev = mvcc.memstoreReadPoint();
        while (!finished.get()) {
          long newPrev = mvcc.memstoreReadPoint(); //-as here is wanted to read the point as quickly as possible,so no wait/sleep exists
          System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-cur read point " + newPrev  );
          if (newPrev < prev) {  
            // serious problem.
            System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
            readerFailed.set(true);
            // might as well give up
            failedAt.set(newPrev);
            return;
          }
        }
      }
    };

    // writer thread parallelism.
    int n = 20;
    n = 3;//-
    Thread[] writers = new Thread[n];
    AtomicBoolean[] statuses = new AtomicBoolean[n];
    Thread readThread = new Thread(reader);

    for (int i = 0; i < n; ++i) {
      statuses[i] = new AtomicBoolean(true);
      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
      writers[i].start();
    }
    readThread.start();

    try {
    	int sec  = 10 * 1000;
    	sec = 20;//-
      Thread.sleep(sec );
    } catch (InterruptedException ex) {
    }

    finished.set(true); //-flat all threads to be complete

    readThread.join(); //-wait to complete this thread,same like below
    for (int i = 0; i < n; ++i) {
      writers[i].join();
    }

    // check failure.
    assertFalse(readerFailed.get());
    for (int i = 0; i < n; ++i) {
      assertTrue(statuses[i].get()); //-not change this status by writers
    }

  }

 

 and some logs output will be like as below:

 

9-1408679151040267000-cur read point 0

9-1408679151040587000-cur read point 0

9-1408679151040637000-cur read point 0

9-1408679151040683000-cur read point 0

10-1408679151040686000-last w id 3first 1,size 3

10-1408679151040821000-*complete w id 1,passed id 1

10-1408679151040889000-last w id 3first 2,size 2

10-1408679151040937000-queue size 2,passed id 1,get first id 2

9-1408679151040736000-cur read point 0

11-1408679151041070000-last w id 3first 2,size 2

11-1408679151041201000-*complete w id 2,passed id 2

11-1408679151041256000-last w id 3first 3,size 1

11-1408679151041303000-queue size 1,passed id 2,get first id 3

11-1408679151041394000-done insert

12-1408679151041447000-last w id 3first 3,size 1

12-1408679151041512000-*complete w id 3,passed id 3

12-1408679151041603000-done insert

10-1408679151041018000-done insert

9-1408679151041142000-cur read point 1

9-1408679151041753000-cur read point 3

….

9-1408679151045943000-cur read point 3

12-1408679151045954000-last w id 6first 4,size 3

12-1408679151046069000-*complete w id 4,passed id 4

12-1408679151046141000-last w id 6first 5,size 2

12-1408679151046280000-queue size 2,passed id 4,get first id 5

12-1408679151046352000-done insert

9-1408679151045983000-cur read point 3

9-1408679151046515000-cur read point 4

9-1408679151046570000-cur read point 4

….

9-1408679151048488000-cur read point 4

9-1408679151048543000-cur read point 4

10-1408679151046449000-last w id 7first 5,size 3

10-1408679151048692000-queue size 3,passed id 6,get first id 5

9-1408679151048598000-cur read point 4

9-1408679151048818000-cur read point 4

9-1408679151048856000-cur read point 4

….

9-1408679151050356000-cur read point 4

9-1408679151050648000-cur read point 4

9-1408679151050800000-cur read point 4

11-1408679151049531000-last w id 7first 5,size 3

11-1408679151050973000-*complete w id 5,passed id 5

11-1408679151051028000-last w id 7first 6,size 2

11-1408679151051090000-*complete w id 6,passed id 5

11-1408679151051142000-last w id 7first 7,size 1

11-1408679151051197000-queue size 1,passed id 5,get first id 7

9-1408679151050924000-cur read point 4

12-1408679151051306000-last w id 7first 7,size 1

10-1408679151051286000-done insert

11-1408679151051263000-done insert

12-1408679151051393000-*complete w id 7,passed id 7

12-1408679151051524000-done insert

9-1408679151051346000-cur read point 6

9-1408679151051638000-cur read point 7

9-1408679151051710000-cur read point 7

9-1408679151057924000-cur read point 7

9-1408679151057971000-cur read point 7

11-1408679151052653000-last w id 10first 8,size 3

9-1408679151058023000-cur read point 7

9-1408679151058150000-cur read point 7

9-1408679151058190000-cur read point 7

9-1408679151058234000-cur read point 7

9-1408679151058272000-cur read point 7

9-1408679151058333000-cur read point 7

9-1408679151058387000-cur read point 7

9-1408679151058442000-cur read point 7

9-1408679151058500000-cur read point 7

11-1408679151058085000-queue size 3,passed id 9,get first id 8

9-1408679151058560000-cur read point 7

9-1408679151058760000-cur read point 7

9-1408679151058817000-cur read point 7

9-1408679151058867000-cur read point 7

9-1408679151058916000-cur read point 7

……

9-1408679151060011000-cur read point 7

10-1408679151058692000-last w id 10first 8,size 3

10-1408679151060162000-queue size 3,passed id 10,get first id 8

9-1408679151060094000-cur read point 7

9-1408679151060317000-cur read point 7

9-1408679151060373000-cur read point 7

9-1408679151060428000-cur read point 7

9-1408679151060482000-cur read point 7

9-1408679151060536000-cur read point 7

9-1408679151060590000-cur read point 7

12-1408679151060256000-last w id 10first 8,size 3

12-1408679151060769000-*complete w id 8,passed id 8

12-1408679151060837000-last w id 10first 9,size 2

12-1408679151060896000-*complete w id 9,passed id 8

12-1408679151060953000-last w id 10first 10,size 1

12-1408679151061008000-*complete w id 10,passed id 8

12-1408679151061069000-done insert

10-1408679151061164000-done insert

9-1408679151060653000-cur read point 7

11-1408679151061233000-done insert

 

 

 so from this logs ,we can get some conclusions:

1.the read point is always increased monotonously;--read recent newest data

2.the max memstoreRead is readable only when the write entry is done,this means the reads is read_committed--isolation

3.the writeQueue in mvcc is sized by the callers -write threads

4.reads or writes or among them ,the operations are not guaranteed to be sequential-concurrent

 

ref:

2。hbase CRUD-Put operation(server side)

2。hbase CRUD--Read(query) operations(server side)

 

Apache HBase Internals: Locking and Multiversion Concurrency Control

http://hbase.apache.org/acid-semantics.html

 

数据库事务必须具备的ACID特性

http://en.wikipedia.org/wiki/Multiversion_concurrency_control

http://dev.mysql.com/doc/refman/5.1/en/innodb-multi-versioning.html

http://en.wikipedia.org/wiki/Concurrency_control

  • 大小: 317.4 KB
分享到:
评论

相关推荐

    hbase-meta-repair-hbase-2.0.2.jar

    HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...

    phoenix-core-4.7.0-HBase-1.1-API文档-中文版.zip

    赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...

    hbase-1.2.1-bin.tar.gz.zip

    标题“hbase-1.2.1-bin.tar.gz.zip”表明这是HBase 1.2.1版本的二进制发行版,以tar.gz格式压缩,并且进一步用zip压缩。这种双重压缩方式可能用于减小文件大小,方便在网络上传输。用户需要先对zip文件进行解压,...

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    phoenix-client-hbase-2.2-5.1.2.jar

    phoenix-client-hbase-2.2-5.1.2.jar

    phoenix-hbase-2.4-5.1.2

    《Phoenix与HBase的深度解析:基于phoenix-hbase-2.4-5.1.2版本》 在大数据处理领域,Apache HBase和Phoenix是两个至关重要的组件。HBase作为一个分布式、列式存储的NoSQL数据库,为海量数据提供了高效、实时的访问...

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

    phoenix-5.0.0-HBase-2.0-client

    "phoenix-5.0.0-HBase-2.0-client" 是一个针对Apache HBase数据库的Phoenix客户端库,主要用于通过SQL查询语句与HBase进行交互。这个版本的Phoenix客户端是为HBase 2.0版本设计和优化的,确保了与该版本HBase的兼容...

    hbase-2.4.11-src.tar.gz

    `hbase-2.4.11`源码包中包含了多个模块,如`hbase-client`、`hbase-server`、`hbase-common`等。`hbase-client`包含了与HBase交互的API,`hbase-server`则包含了服务器端组件,如RegionServer和Master,而`hbase-...

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    `hbase-1.2.0-cdh5.14.2.tar.gz` 是针对Cloudera Distribution Including Apache Hadoop (CDH) 5.14.2的一个特定版本的HBase打包文件。CDH是一个流行的Hadoop发行版,包含了多个大数据组件,如HDFS、MapReduce、YARN...

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...

    hbase-2.4.17-bin 安装包

    这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍HBase的核心概念、安装步骤以及配置和管理。 一、HBase核心概念 1. 表(Table):HBase中的表是由行...

    hbase-1.3.1-bin.tar.gz.7z

    进入 `conf` 目录,复制 `hbase-site.xml.example` 文件为 `hbase-site.xml`,并编辑该文件,添加如下配置: ```xml &lt;name&gt;hbase.rootdir &lt;value&gt;hdfs://namenode_host:port/hbase &lt;name&gt;hbase.cluster....

    hive-hbase-handler-1.2.1.jar

    被编译的hive-hbase-handler-1.2.1.jar,用于在Hive中创建关联HBase表的jar,解决创建Hive关联HBase时报FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop....

    phoenix-hbase-1.4-4.16.1-bin

    《Phoenix与HBase的深度解析:基于phoenix-hbase-1.4-4.16.1-bin的探讨》 Phoenix是一种开源的SQL层,它为Apache HBase提供了高性能的关系型数据库查询能力。在大数据领域,HBase因其分布式、列式存储的特性,常被...

    hbase-2.4.11-bin.tar.gz

    标题中的“hbase-2.4.11-bin.tar.gz”是指HBase的2.4.11稳定版本的二进制压缩包,用户可以通过下载这个文件来进行安装和部署。 HBase的核心设计理念是将数据按照行和列进行组织,这种模式使得数据查询和操作更加...

    phoenix-4.14.1-HBase-1.2-client.jar

    phoenix-4.14.1-HBase-1.2-client.jar

    pinpoint的hbase初始化脚本hbase-create.hbase

    搭建pinpoint需要的hbase初始化脚本hbase-create.hbase

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

Global site tag (gtag.js) - Google Analytics