客户端
1.HTable.put
for (Put put : puts) {
validatePut(put);//验证Put有效,主要是判断kv的长度
writeBuffer.add(put);//写入缓存
currentWriteBufferSize += put.heapSize();//计算缓存容量
}
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();//如果自动Flush或者缓存到达阀值,则执行flush
}
2.HTable.flushCommits
try {
connection.processBatchOfPuts(writeBuffer, tableName, pool);//调用HConnection来提交Put,传入了一个线程池,看来是有异步调用的过程
} finally {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize = 0;
for (Put aPut : writeBuffer) {
currentWriteBufferSize += aPut.heapSize();//currentWriteBufferSize又重新计算了一遍,看来一批提交不一定会全部提交完
}
}
3.HConnectionManager.HConnectionImplementation.processBatch
第一步:按RegionServer把Put分类,以便于将一批Put按RegionServer批量提交
// step 1: break up into regionserver-sized chunks and build the data structs
Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();//RegionServer和批量Put的映射关系
for (int i = 0; i < workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);//定位Put在哪个Region上
HServerAddress address = loc.getServerAddress();//定位Region在哪个RegionServer上
byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction actions = actionsByServer.get(address);//看该RegionServer上的批量对象创建没有,没有就创建一个
if (actions == null) {
actions = new MultiAction();
actionsByServer.put(address, actions);
}
Action action = new Action(regionName, row, i);//根据Put创建一个响应对象,放到批量操作对象里,什么是响应对象呢,就是Put和返回结果的组合
lastServers[i] = address;
actions.add(regionName, action);
}
}
第二步:往RegionServer上提交请求,各个RegionServer是并发提交的
// step 2: make the requests
Map<HServerAddress,Future<MultiResponse>> futures =
new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
}
第三步,等待各RegionServer返回结果,并准备重试
// step 3: collect the failures and successes and prepare for retry
for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
HServerAddress address = responsePerServer.getKey();
try {
Future<MultiResponse> future = responsePerServer.getValue();
MultiResponse resp = future.get();
if (resp == null) {
// Entire server failed
LOG.debug("Failed all for server: " + address + ", removing from cache");
continue;
}
for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
byte[] regionName = e.getKey();
List<Pair<Integer, Object>> regionResults = e.getValue();
for (Pair<Integer, Object> regionResult : regionResults) {
if (regionResult == null) {
// if the first/only record is 'null' the entire region failed.
LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) +
", removing from cache");
} else {
// Result might be an Exception, including DNRIOE
results[regionResult.getFirst()] = regionResult.getSecond();
}
}
}
} catch (ExecutionException e) {
LOG.debug("Failed all from " + address, e);
}
}
第四步,识别返回的错误,准备重试
// step 4: identify failures and prep for a retry (if applicable).
// Find failures (i.e. null Result), and add them to the workingList (in
// order), so they can be retried.
retry = false;
workingList.clear();
for (int i = 0; i < results.length; i++) {
// if null (fail) or instanceof Throwable && not instanceof DNRIOE
// then retry that row. else dont.
if (results[i] == null ||
(results[i] instanceof Throwable &&
!(results[i] instanceof DoNotRetryIOException))) {
retry = true;
Row row = list.get(i);
workingList.add(row);
deleteCachedLocation(tableName, row.getRow());
} else {
// add null to workingList, so the order remains consistent with the original list argument.
workingList.add(null);
}
}
由以上四步可以看出,重点在于第二步,继续跟进,看Callable是怎么样call的,有两步,一是创建到RegionServer的连接,二是调用RegionServer上的multi方法,显然这是远程调用的过程。
3.HConnectionManager.HConnectionImplementation.processBatch
return new Callable<MultiResponse>() {
public MultiResponse call() throws IOException {
return getRegionServerWithoutRetries(
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
return server.multi(multi);//第二步:远程调用服务端RegionServer的multi方法,返回结果
}
@Override
public void instantiateServer(boolean reload) throws IOException {
server = connection.getHRegionConnection(address);//第一步:根据RegionServer的地址连上RegionServer
}
}
);
}
};
RegionServer服务端
上面客户端调用过程分析完毕,继续跟RegionServer服务端的处理,入口方法就是HRegionServer.multi
1.HRegionServer.multi
这个方法里有些是关于重试、上锁、结果收集的代码,忽略掉,重要的是两步
第一步:根据RegionName取得对应的Region
HRegion region = getRegion(regionName);
第二步:调用region的put方法实际put数据
OperationStatusCode[] codes =
region.put(putsWithLocks.toArray(new Pair[]{}));
2.HRegion.put
这个方法先检查、上锁、doMiniBatchPut、解锁、判断是否需要flush,重要的是doMiniBatchPut这个方法
long addedSize = doMiniBatchPut(batchOp);
3.HRegion.doMiniBatchPut
这个方法分为上锁、更新时间戳、写WAL、写memstore、解锁;重要的是写WAL和写memstore
// ------------------------------------
// STEP 3. Write to WAL
// ----------------------------------
WALEdit walEdit = new WALEdit();//第一步:创建WAL日志对象放内存里
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst();
if (!p.getWriteToWAL()) continue;
addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);//第二步:写put的内容到WAL日志对象里
}
// Append the edit to WAL
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);//第三步:写WAL日志对象到硬盘上
// ------------------------------------
// STEP 4. Write back to memstore
// ----------------------------------
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst();
addedSize += applyFamilyMapToMemstore(p.getFamilyMap());//把put放到memstore里
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
}
分享到:
相关推荐
在本文中,我们将围绕HBase的核心概念、架构以及如何通过源代码学习进行深入探讨。 HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的分布式数据库,特别适合处理海量结构化数据。它的设计目标是在廉价...
这个源代码包是针对Java开发者的宝贵资源,可以帮助他们深入理解HBase的内部工作原理,优化自己的应用,以及对HBase进行定制化开发。 HBase的核心概念包括: 1. 表(Table):HBase的数据存储在表中,每个表由行键...
VC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 ...
《HBase权威指南》是一本深入探讨分布式大数据存储系统HBase的专业书籍,其源代码的提供为读者提供了更直观的学习材料。HBase是基于Apache Hadoop的非关系型数据库(NoSQL),它在大规模数据存储方面表现卓越,尤其...
通过阅读《HBase权威指南》并结合源代码,开发者能够更好地掌握HBase的核心概念和技术,从而在实际项目中有效地运用HBase解决大数据问题。这些源代码实例对于学习HBase的开发、调试和运维都具有很高的参考价值。
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。
通过阅读《HBase 权威指南》的源代码,开发者不仅可以理解HBase的设计理念,还能学习到如何利用HBase解决实际问题,提升大数据处理的能力。无论是初学者还是经验丰富的开发者,这本书都将是提升HBase技能的宝贵资源...
《深入剖析HBase源代码:hbase-0.98.23》 HBase,作为Apache的一个开源项目,是构建在Hadoop之上的分布式、版本化、列族式的NoSQL数据库,它提供了高可靠性、高性能、可伸缩的数据存储解决方案。本文将基于hbase-...
通过阅读和理解这段代码,可以深入学习如何在实际项目中将HDFS数据导入HBase,这对于大数据平台的开发和运维人员来说是非常有价值的实践。 总的来说,将HDFS上的文件导入HBase是一个涉及数据处理、数据库设计和编程...
hbase操作必备客户端源代码
HBase基本操作 增删改查 java代码 要使用须导入对应的jar包
解压后的`hbase-book-master`包含项目的基本目录结构,如`src/main/java`用于存放Java源代码,`src/main/resources`存储资源配置文件,`pom.xml`是Maven项目对象模型,定义了项目的构建过程和依赖关系。 2. **...
《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其配套源码提供了书中所提及的示例代码和实践案例,方便读者更好地理解和应用HBase。以下将详细解析HBase的相关知识点。 HBase是建立在Apache ...
在Java中操作HBase数据库时,我们经常需要对大量数据进行高效的检索和处理。...提供的源代码应该包含如何创建、应用PageFilter以及处理扫描结果的完整示例,这对于学习和实践HBase的分页查询非常有帮助。
在Java代码中,首先需要导入必要的库,如`org.apache.hadoop.hbase.HBaseConfiguration`和`org.apache.hadoop.hbase.client.Connection`。然后,我们需要创建一个HBase的配置对象,设置Zookeeper的地址,这是HBase...
读书笔记:hbase权威指南学习代码
本文将详细介绍如何使用Java代码实现这一过程,包括样例MySQL表和数据,以及HBase表的创建。 首先,我们需要了解MySQL和HBase的基本概念。MySQL是一种关系型数据库管理系统,它基于ACID(原子性、一致性、隔离性和...
标题中的“WordCount”是...在实际的学习过程中,理解并实践这些知识点对于掌握Hadoop和HBase的集成使用是非常有帮助的。通过阅读和研究相关的博客文章,可以加深对大数据处理和NoSQL数据库应用的理解,提升技能水平。
1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...
5. **其他操作**:除了上述方法,HbaseTemplate还提供了插入(put)、删除(delete)和更新(update)等操作。例如,`put`方法用于向表中写入新的数据,`delete`方法根据行键删除一行,`update`方法则可以更新已有...