看了http://blog.csdn.net/TREND_CDC_SPN/archive/2010/05/04/5557626.aspx 这篇文章后才较仔细的看了下HBase的RowLock这一块,非常感谢趋势科技中国研发中心的热心者.
HBase(0.89.20100726)中的RowLock使用及实现
HBase只实现了基于row-key的锁机制。
1.客户端代码
RowLock rl = table.lockRow ("test".getBytes());
Put p = new Put(rowkey, rl );
....处理
table.unlockRow (rl);
//这里应该放在try{..}finally{..}中
2.服务器端是由HRegionServer来实现加锁的过程
a. public long lockRow(byte [] regionName, byte [] row) throws IOException 为实现的方法
核心代码为:
HRegion region = getRegion(regionName);
Integer r = region.obtainRowLock(row);
long lockId = addRowLock(r,region);
return lockId;
b. 调用的HRegion --> public Integer obtainRowLock(final byte [] row) throws IOException
--> private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
throws IOException 方法
核心代码为
private final Set<byte[]> lockedRows =
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
....
synchronized (lockedRows) {
while (lockedRows.contains(row)) {//如果要锁定的行已经处于锁定的状态
if (!waitForLock) { //如果不等待,那么直接返回
return null;
}
try {
lockedRows.wait(); //线程等待
} catch (InterruptedException ie) {
// Empty
}
}
// generate a new lockid. Attempt to insert the new [lockid, row].
// if this lockid already exists in the map then revert and retry
// We could have first done a lockIds.get, and if it does not exist only
// then do a lockIds.put, but the hope is that the lockIds.put will
// mostly return null the first time itself because there won't be
// too many lockId collisions.
byte [] prev = null;
Integer lockId = null;
do {
lockId = new Integer(lockIdGenerator++);
prev = lockIds.put(lockId, row);
if (prev != null) { //如果lockId之前已经存在,那么还原成之前的状态。
lockIds.put(lockId, prev); // revert old value
lockIdGenerator = rand.nextInt(); // generate new start point
}
} while (prev != null); //直到产生一个随机的值,这个值之前没有对应任何的row
lockedRows.add(row);
lockedRows.notifyAll(); //这里为会要notifyAll(),原因不明
return lockId;
}
c. HRegionServer中的protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException方法
{
long lockId = -1L;
lockId = rand.nextLong(); //这里产生一个随机的值
String lockName = String.valueOf(lockId);
rowlocks.put(lockName, r);//直接把这个值放到rowlocks(ConcurrentHashMap<String, Integer>)中应该会有冲突的存在。
//如果有冲突的话,那么会不会导致操作B的releaseLock()也把之前操作A的锁也释放掉了?
this.leases.
createLease(lockName, new RowLockListener(lockName, region));//这里是通过leases来注册一个定时的操作
return lockId;
}
实际上这里返回的是由HRegionServer产生的lockId,这里一直不是很清楚为什么HRegionServer来生成一个随机数(而且还可能有冲突,要解决冲突还要使用锁来解决竞争的问题),如果只是因为只通过HRegion产生的lockId不能保证在HRegionServer范围内的唯一性,再者传递给leases要有一个名称来对应Leases对象,那么使用Region与Region产生的lockId来作为唯一键不是更好?
d. org.apache.hadoop.hbase.Leases.createLease(String leaseName, LeaseListener listener) throws LeaseStillHeldException
Lease lease = new Lease(leaseName, listener,
System.currentTimeMillis() + leasePeriod);//Lease实现了Delayed接口,leasePeriod就指明这个定时的时间是在多少ms后
//可以通过hbase.regionserver.lease.period配置,默认为60ms
synchronized (leaseQueue) {
if (leases.containsKey(leaseName)) {
throw new LeaseStillHeldException(leaseName);//这里如果名称冲突会抛出异常
}
leases.put(leaseName, lease);
leaseQueue.add(lease);
}
//Leases extends Thread 每间隔leaseCheckFrequency (由hbase.server.thread.wakefrequency定义)扫描一次,调用的细节如下:
if (lease.getListener() == null) {
LOG.error("lease listener is null for lease " + lease.getLeaseName());
} else {
lease.getListener().leaseExpired();//调用listener的方法,在本例中指RowLockListener
}
synchronized (leaseQueue) {
leases.remove(lease.getLeaseName());
}
e. org.apache.hadoop.hbase.regionserver.HRegionServer.RowLockListener 的具体实现为
public void leaseExpired() {
Integer r = rowlocks.remove(this.lockName);//从映射中取那个lockName对应的region中的int lock值,
if(r != null) {
//这个值还有不为空的时候?也许unlockRow(byte [] regionName, long lockId)方法调用时
//rowlocks.remove(lockName);方法调用后还没来的及调用leases.cancelLease(lockName);时,leases开始了扫描
region.releaseRowLock(r);//如果这个值不为空,释放锁.
}
}
以上只是获得锁的部分代码.
3.获得锁后的操作中,比如put,get操作。
为了确保锁的持有者对row的锁定,HRegionServer的
public Result get(byte [] regionName, Get get) throws IOException
public void put(final byte [] regionName, final Put put) 方法都调用了
Integer org.apache.hadoop.hbase.regionserver.HRegionServer.getLockFromId(long lockId) 方法
{
String lockName = String.valueOf(lockId);
Integer rl = rowlocks.get(lockName);
if (rl == null) {
throw new IOException("Invalid row lock");
}
this.leases.renewLease(lockName);//这里把定时重新刷新一下
}
只是HRegion对get()操作忽略了lock锁,HRegion的put操作(包括批量)都调用了
Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException方法
{
Integer lid = null;
if (lockid == null) {
lid = internalObtainRowLock(row, waitForLock);//如果这个操作之前没有先去锁定row,那么这次操作中要先去获得一个锁.
//这个锁会在操作完成后在finally中清除掉
} else {
if (!isRowLocked(lockid)) {//如果之前获得的锁已经过期了,或者这个锁由客户端自己随意写的,那么这里要抛出异常.
throw new IOException("Invalid row lock");
}
lid = lockid;//这里返回用户的锁id,这个锁在本次操作中不会被清除,清除操作只发生在客户端释放锁,
//或者由HRegionServer中注册的RowLockListener因为超时来释放锁.
}
return lid;
}
分享到:
相关推荐
HBase 2.x之RIT问题解决 HBase 2.x中的Region-In-Transition(RIT)机制是一种Region状态变迁机制,例如merge、split、assign、unassign等操作。在RIT过程中,可能会出现异常情况,从而导致Region的状态一直保持在...
《HBase 2.0.0.3.0.0.0-1634 在 Ambari 2.7.x 下的编译与使用详解》 HBase,全称Apache HBase,是一款构建在Hadoop文件系统之上的分布式、版本化、列族式存储系统,主要用于处理大规模数据。它提供了高度可靠性和高...
* hbase.master.logcleaner.plugins:org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner, org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner,这个参数指定了预写日志的清理插件。...
3. **强一致性**:HBase使用主键(Row Key)确保数据的一致性,并通过Zookeeper实现分布式协调,保证数据的准确性和可用性。 4. **可扩展性**:HBase是水平扩展的,可以通过添加更多的服务器来增加存储和处理能力,...
第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase...
hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程 hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程 hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程
2. **表和列族**:HBase的表是稀疏、多维度的排序映射,由行键(Row Key)、列族(Column Family)、时间戳和值组成。列族是预定义的数据存储组,可以在创建表时指定,例如`cf1`和`cf2`。 3. **行键和分区**:行键...
7. **刷写与合并策略**:`hbase.hregion.flushsize`设定Region刷写到HDFS的阈值,`hbase.hregion.majorcompaction.interval`和`hbase.hregion.minorcompaction.interval`分别是大、小合并的间隔时间,影响数据的紧凑...
hbase培训.ppt
将hadoop-3.3.5与hbase-1.4.5放到 Environment 目录 2.配置hadoop 2.1配置环境变量 HADOOP_HOME=D:\Environment\hadoop-3.3.5 添加到path %HADOOP_HOME%\bin 2.2 配置 winutils.exe 将winutils.exe移动到...
搭建pinpoint需要的hbase初始化脚本hbase-create.hbase
这个“hbase.tar.gz”压缩包可能是预配置好的HBase环境,用户下载后可以直接解压使用,无需繁琐的配置步骤。下面将详细介绍HBase的核心概念、工作原理以及如何部署和使用。 一、HBase核心概念 1. 表(Table):...
hbase操作.txt
15.4. Publishing a new version of hbase.apache.org 15.5. 测试 15.6. Maven Build Commands 15.7. Getting Involved 15.8. 开发 15.9. 提交补丁 A. FAQ B. hbck In Depth B.1. Running hbck to identify ...
在本文档中,我们关注的问题是关于HBase中的SYSTEM.STATS表导致磁盘空间爆满的情况。SYSTEM.STATS表在Phoenix中扮演着至关重要的角色,它存储了所有表格的统计信息,如索引统计、列族信息等,这些信息对于优化查询...
HbaseAPI.java
在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data...
HbaseUtil.java
ambari-2.7.5 编译过程中四个大包下载很慢,所以需要提前下载,包含:hbase-2.0.2.3.1.4.0-315-bin.tar.gz ,hadoop-3.1.1.3.1.4.0-315.tar.gz , grafana-6.4.2.linux-amd64.tar.gz ,phoenix-5.0.0.3.1.4.0-315....