数据写入(Put)处理流程分析:
Put通过生成一个HTable实例,并调用其put方法时,的执行流程,此部分分析分为client与regionserver两个部分,
client端:
Htable.put-->doPut,如果是put一个list时,会迭代调用doPut
privatevoiddoPut(Put put) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
检查上次提交是否出错,如果上次提交有错误,把这次的put添加到writeAsyncBuffer列表中。
并执行flush操作,等待flush完成操作
if (ap.hasError()){
writeAsyncBuffer.add(put);
backgroundFlushCommits(true);
}
对put的内容进行检查:
1.检查put中是否指定cf,如果一个都没有,检查不合法
2.检查put中所有的kv中每一个kv的大小是否超过hbase.client.keyvalue.maxsize配置的值,默认-1表示不限制大小
validatePut(put);
把当前put的所有kv的大小,包含类定义大小加入到currentWriteBufferSize中,
此属性用来检查当前table的buffer中的put大小是否超出了指定的buffersize
currentWriteBufferSize += put.heapSize();
把这次的put添加到writeAsyncBuffer列表中。
writeAsyncBuffer.add(put);
如果当前buffer中的put总大小超过了指定的table可存储的buffer大小时,进行flush,不等待flush完成操作。
在flush过程中有可能writeAsyncBuffer的数据清空后submit出现错误,会把错误的put重新放入到此列表中。
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
hbase.client.max.total.tasks,default=100
privatevoidbackgroundFlushCommits(booleansynchronous) throws
InterruptedIOException, RetriesExhaustedWithDetailsException {
try {
do {
把writeAsyncBuffer列表中的数据通过RPC调用regionserver的multi提交数据
提交过程中出现错误会throw InterruptedIOException
1.迭代并取出writeAsyncBuffer中的每一个put实例,与meta region所在的server创建rpc连接,
并从meta表中得到当前迭代的put对象row所在的HRegionLocation,
如果regionlocation获取失败,设置haserror=true,
在获取HRegionLocation时,先在cache中看是否能找到此regionLocation,
如果不能找到先得到meta regionlocation,
生成meta rs的rpc连接 ClientProtos.ClientService.BlockingInterface接口实现(HRegionServer)
通过client.get从meta region中得到当前put的row对应的region info,生成HRegionLocation,并添加到cache
2.通过hbase.client.max.perregion.tasks配置单个region在client的并发数,默认为1
3.通过hbase.client.max.perserver.tasks配置单个regionserver在client的并发数,默认为2
4.通过hbase.client.max.total.tasks配置所有regionserver最大的并发连接任务个数,默认为100
检查任务数是否超过总可执行的任务数是通过tasksSent-tasksDone得到。
5.现在得到所有要put的数据对应的HRegionLocation列表,把每一个regionLocation对应要put的数据生成到
一个map<HRegionLocation,List<Row>>的集合列表中。
生成一个Action<Row>时,会相应的移出HTable.writeAsyncBuffer中对应的put实例。
在同一个regionserver中的所有region,它们的HRegionLocation的equals都相同。
6.生成HConnectionManager.ServerErrorTracker实例,此实例
a.通过hbase.client.retries.number配置server的重试次数,默认为31次
b.通过hbase.client.pause配置每次重试的间隔时间,默认100,每重试一次,超时时间会有相应延长.
7.针对每一个要提交的regionLocation(每一个region可能包含多个region),
a.把tasksSent的值加一,表示总任务数加一,
b.把regionLocation对应的regionserver,taskCounterPerServer的值加一,表示此server的总任务加一
c.得到regionLocation中所有的region,把每一个region的任务数加一,taskCounterPerRegion值加一。
8.针对每一个regionLocation(每一个regionserver)创建一个rpc连接,并开始一个线程。
通过MultiServerCallable.call方法调用regionserver.multi方法添加数据。
9.每一个线程都提交rs后,等待rs的响应,如果提交失败,进行重试,直接timeout或重试次数到一定次数。
timeout在HConnectionManager.ServerErrorTracker实例生成时生成,具体请查看源代码。
每重试一次,timeout都会有相应的延长
10.根据每一个提交过去数据对应的region,把每一个region的taskCounterPerRegion的值减一
把 taskCounterPerServer的值减一
把tasksDone的值加一,表示完成一个任务,并把tasksDone进行notify.
Notify的目的是通知其它put的submit任务结束等待,
submit方法最开始会检查是否等待,如果是tasksDone会wait。具体请参见AsyncProcess.submit源代码。
ap.submit(writeAsyncBuffer, true);
} while (synchronous && !writeAsyncBuffer.isEmpty());
如果传入的参数是true,表示需要等待rpc调用结束,flushCommits或put中上一次提交error时此参数为true
等到tasksSent的值减去tasksDone的值等于0,tasksSent表示提交的任务数,tasksDone表示完成的任务数
if (synchronous) {
ap.waitUntilDone();
}
部分数据提交失败,也就是可能同时提交给两个regionserver,有一个成功,一个失败。
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -" +
" waiting for all operation in progress to finish (successfully or not)");
while (!writeAsyncBuffer.isEmpty()) {
ap.submit(writeAsyncBuffer, true);
}
等到tasksSent的值减去tasksDone的值等于0,tasksSent表示提交的任务数,tasksDone表示完成的任务数
ap.waitUntilDone();
如果有部分数据提交失败,同时没有设置清空失败的数据时,把数据重新添加到writeAsyncBuffer列表中
if (!clearBufferOnFail) {
// if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
// write buffer. This is a questionable feature kept here for backward compatibility
writeAsyncBuffer.addAll(ap.getFailedOperations());
}
RetriesExhaustedWithDetailsException e = ap.getErrors();
ap.clearErrors();
throwe;
}
} finally {
清空当前 currentWriteBufferSize的大小,如果有数据没有提交成功,
重新把未提交的数据的大小计算起来添加到 currentWriteBufferSize中。
currentWriteBufferSize = 0;
for (Rowmut : writeAsyncBuffer) {
if (mutinstanceofMutation) {
currentWriteBufferSize += ((Mutation) mut).heapSize();
}
}
}
}
相关推荐
在Linux环境下,我们可以使用hbase-2.3.2-client.jar来实现这一目标。这个客户端库包含了连接HBase集群、执行Get、Put、Scan等操作所需的API。 1. 连接集群:首先,我们需要配置HBase的连接参数,如Zookeeper地址。...
4. 错误处理:HbaseClient具有良好的错误恢复机制,当服务器端出现问题时,客户端会自动重试,保证数据操作的可靠性。 5. 客户端缓存:为了提高性能,HbaseClient使用了缓存策略,如Cell缓存和RowCache。Cell缓存将...
HBase基于Google的Bigtable设计,适用于大数据分析和实时查询。 Java API是与HBase交互的主要方式之一,它允许开发者创建、修改和查询表。以下是一些基本操作: 1. **连接HBase**:使用`Configuration`类配置HBase...
HBaseClient HBase客户端数据管理软件 概要说明 类似PL/SQL,是一个HBase数据库的客户端数据管理软件。是免费开源的软件。 基于XJava,使用xml配置文件绘制可视化界面。 可视化界面操作 表 表的定义、编辑、删除; ...
HBaseClient_1.6.1_64.exe可能是针对Windows系统的HBase客户端安装程序,便于用户在Windows环境下快速部署和使用。 二、HBase Shell HBase Shell是HBase自带的一个基于JLine的命令行工具,提供了与HBase交互的命令...
总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.conf.Configuration; public class HBaseJavaConnectExample { public static void main...
import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.util.Bytes ``` 2. 配置HBase连接: 创建一个`Configuration`对象并加载HBase的配置...
编译后的 HBaseClient 包含了所有必要的类和库,可以无缝集成到Java项目中,简化了开发流程。 在 HBaseClient-Download-master 压缩包中,我们可以期待找到以下内容: 1. **源代码**:可能包含 HBaseClient 的源...
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import java.sql.*; public class MySQLToHBase { public static void ...
1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...
func put(client *hbase.Hbase, table string, rowKey []byte, family string, qualifier []byte, value []byte) error { pair := &hbase.TPut{ Row: rowKey, Families: []*hbase.TColumnValue{{Family: family...
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org....
import org.apache.hadoop.hbase.client.Put; Put put = new Put(Bytes.toBytes("row_key")); // 替换为你要更新的行键 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("new_value...
<artifactId>hbase-client <version>1.4.6 ``` 接下来,我们将在Java程序中创建一个配置对象,通过它连接到HBase。这通常涉及设置ZooKeeper的地址,因为HBase的元数据存储在ZooKeeper中: ```java ...
client = hbase.Client(protocol) # 打开连接 transport.open() # put操作 row_key = 'row1' column_family = 'cf' qualifier = 'q' value = 'val1' mutation = Mutation(column=Text(column_family), value=Text...
1. **HBase客户端库**:这是与HBase交互的基础,包含了HBase的API,如`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`等,用于创建连接、打开表、执行Get、Put、Scan等操作。...
这通常通过`HBaseConfiguration.create()`方法创建一个配置对象,然后设置相关配置,如Zookeeper地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)等。接着,使用`ConnectionFactory.create...
在Java API中,我们主要通过`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`这两个核心类来进行交互。 1. **连接HBase**: 使用`ConnectionFactory.createConnection...