`
hongs_yang
  • 浏览: 61316 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

hbase put 流程分析client端

阅读更多

数据写入(Put)处理流程分析:

Put通过生成一个HTable实例,并调用其put方法时,的执行流程,此部分分析分为clientregionserver两个部分,

client:

Htable.put-->doPut,如果是put一个list时,会迭代调用doPut

privatevoiddoPut(Put put) throws InterruptedIOException,

RetriesExhaustedWithDetailsException {

检查上次提交是否出错,如果上次提交有错误,把这次的put添加到writeAsyncBuffer列表中。

并执行flush操作,等待flush完成操作

if (ap.hasError()){

writeAsyncBuffer.add(put);

backgroundFlushCommits(true);

}

put的内容进行检查:

1.检查put中是否指定cf,如果一个都没有,检查不合法

2.检查put中所有的kv中每一个kv的大小是否超过hbase.client.keyvalue.maxsize配置的值,默认-1表示不限制大小

validatePut(put);

把当前put的所有kv的大小,包含类定义大小加入到currentWriteBufferSize中,

此属性用来检查当前tablebuffer中的put大小是否超出了指定的buffersize

currentWriteBufferSize += put.heapSize();

把这次的put添加到writeAsyncBuffer列表中。

writeAsyncBuffer.add(put);

如果当前buffer中的put总大小超过了指定的table可存储的buffer大小时,进行flush,不等待flush完成操作。

flush过程中有可能writeAsyncBuffer的数据清空后submit出现错误,会把错误的put重新放入到此列表中。

while (currentWriteBufferSize > writeBufferSize) {

backgroundFlushCommits(false);

}

}

hbase.client.max.total.tasks,default=100

privatevoidbackgroundFlushCommits(booleansynchronous) throws

InterruptedIOException, RetriesExhaustedWithDetailsException {

 

try {

do {

writeAsyncBuffer列表中的数据通过RPC调用regionservermulti提交数据

提交过程中出现错误会throw InterruptedIOException

1.迭代并取出writeAsyncBuffer中的每一个put实例,与meta region所在的server创建rpc连接,

并从meta表中得到当前迭代的put对象row所在的HRegionLocation,

如果regionlocation获取失败,设置haserror=true,

在获取HRegionLocation时,先在cache中看是否能找到此regionLocation,

如果不能找到先得到meta regionlocation

生成meta rsrpc连接 ClientProtos.ClientService.BlockingInterface接口实现(HRegionServer)

通过client.getmeta region中得到当前putrow对应的region info,生成HRegionLocation,并添加到cache

2.通过hbase.client.max.perregion.tasks配置单个regionclient的并发数,默认为1

3.通过hbase.client.max.perserver.tasks配置单个regionserverclient的并发数,默认为2

4.通过hbase.client.max.total.tasks配置所有regionserver最大的并发连接任务个数,默认为100

检查任务数是否超过总可执行的任务数是通过tasksSent-tasksDone得到。

5.现在得到所有要put的数据对应的HRegionLocation列表,把每一个regionLocation对应要put的数据生成到

一个map<HRegionLocation,List<Row>>的集合列表中。

生成一个Action<Row>时,会相应的移出HTable.writeAsyncBuffer中对应的put实例。

在同一个regionserver中的所有region,它们的HRegionLocationequals都相同。

6.生成HConnectionManager.ServerErrorTracker实例,此实例

a.通过hbase.client.retries.number配置server的重试次数,默认为31

b.通过hbase.client.pause配置每次重试的间隔时间,默认100,每重试一次,超时时间会有相应延长.

7.针对每一个要提交的regionLocation(每一个region可能包含多个region),

a.tasksSent的值加一,表示总任务数加一,

b.regionLocation对应的regionservertaskCounterPerServer的值加一,表示此server的总任务加一

c.得到regionLocation中所有的region,把每一个region的任务数加一,taskCounterPerRegion值加一。

8.针对每一个regionLocation(每一个regionserver)创建一个rpc连接,并开始一个线程。

通过MultiServerCallable.call方法调用regionserver.multi方法添加数据。

9.每一个线程都提交rs后,等待rs的响应,如果提交失败,进行重试,直接timeout或重试次数到一定次数。

timeoutHConnectionManager.ServerErrorTracker实例生成时生成,具体请查看源代码。

每重试一次,timeout都会有相应的延长

10.根据每一个提交过去数据对应的region,把每一个regiontaskCounterPerRegion的值减一

taskCounterPerServer的值减一

tasksDone的值加一,表示完成一个任务,并把tasksDone进行notify.

Notify的目的是通知其它putsubmit任务结束等待,

submit方法最开始会检查是否等待,如果是tasksDonewait。具体请参见AsyncProcess.submit源代码。

 

ap.submit(writeAsyncBuffer, true);

} while (synchronous && !writeAsyncBuffer.isEmpty());

如果传入的参数是true,表示需要等待rpc调用结束,flushCommitsput中上一次提交error时此参数为true

等到tasksSent的值减去tasksDone的值等于0,tasksSent表示提交的任务数,tasksDone表示完成的任务数

if (synchronous) {

ap.waitUntilDone();

}

部分数据提交失败,也就是可能同时提交给两个regionserver,有一个成功,一个失败。

if (ap.hasError()) {

LOG.debug(tableName + ": One or more of the operations have failed -" +

" waiting for all operation in progress to finish (successfully or not)");

while (!writeAsyncBuffer.isEmpty()) {

ap.submit(writeAsyncBuffer, true);

}

等到tasksSent的值减去tasksDone的值等于0,tasksSent表示提交的任务数,tasksDone表示完成的任务数

ap.waitUntilDone();

如果有部分数据提交失败,同时没有设置清空失败的数据时,把数据重新添加到writeAsyncBuffer列表中

if (!clearBufferOnFail) {

// if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the

// write buffer. This is a questionable feature kept here for backward compatibility

writeAsyncBuffer.addAll(ap.getFailedOperations());

}

RetriesExhaustedWithDetailsException e = ap.getErrors();

ap.clearErrors();

throwe;

}

} finally {

清空当前 currentWriteBufferSize的大小,如果有数据没有提交成功,

重新把未提交的数据的大小计算起来添加到 currentWriteBufferSize中。

 

currentWriteBufferSize = 0;

for (Rowmut : writeAsyncBuffer) {

if (mutinstanceofMutation) {

currentWriteBufferSize += ((Mutation) mut).heapSize();

}

}

}

}

0
1
分享到:
评论

相关推荐

    最新版linux hbase-2.3.2-client-bin.tar.gz

    在Linux环境下,我们可以使用hbase-2.3.2-client.jar来实现这一目标。这个客户端库包含了连接HBase集群、执行Get、Put、Scan等操作所需的API。 1. 连接集群:首先,我们需要配置HBase的连接参数,如Zookeeper地址。...

    [原创]HbaseClient

    4. 错误处理:HbaseClient具有良好的错误恢复机制,当服务器端出现问题时,客户端会自动重试,保证数据操作的可靠性。 5. 客户端缓存:为了提高性能,HbaseClient使用了缓存策略,如Cell缓存和RowCache。Cell缓存将...

    hbase的java client实例

    HBase基于Google的Bigtable设计,适用于大数据分析和实时查询。 Java API是与HBase交互的主要方式之一,它允许开发者创建、修改和查询表。以下是一些基本操作: 1. **连接HBase**:使用`Configuration`类配置HBase...

    HBaseClient:HBase客户端数据管理软件

    HBaseClient HBase客户端数据管理软件 概要说明 类似PL/SQL,是一个HBase数据库的客户端数据管理软件。是免费开源的软件。 基于XJava,使用xml配置文件绘制可视化界面。 可视化界面操作 表 表的定义、编辑、删除; ...

    hbase-client

    HBaseClient_1.6.1_64.exe可能是针对Windows系统的HBase客户端安装程序,便于用户在Windows环境下快速部署和使用。 二、HBase Shell HBase Shell是HBase自带的一个基于JLine的命令行工具,提供了与HBase交互的命令...

    HBase源码分析

    总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...

    hbase- java开发连接工具类

    import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.conf.Configuration; public class HBaseJavaConnectExample { public static void main...

    scala API 操作hbase表

    import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.util.Bytes ``` 2. 配置HBase连接: 创建一个`Configuration`对象并加载HBase的配置...

    java链接hbase数据示例代码

    import org.apache.hadoop.hbase.client.Put; Put put = new Put(Bytes.toBytes("row_key")); // 替换为你要更新的行键 put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("new_value...

    基于spring boot 的spring-boot-starter-hbase自动注解实现

    例如,可以使用`HbaseTemplate`的`put`方法插入数据,`get`方法读取数据,`delete`方法删除数据,以及`scanner`方法进行扫描操作。 以下是一些使用`HbaseTemplate`的关键知识点: 1. **配置HBase**: 在`...

    HBaseClient-Download:HBaseClient 编译后的版本库

    编译后的 HBaseClient 包含了所有必要的类和库,可以无缝集成到Java项目中,简化了开发流程。 在 HBaseClient-Download-master 压缩包中,我们可以期待找到以下内容: 1. **源代码**:可能包含 HBaseClient 的源...

    java代码将mysql表数据导入HBase表

    import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import java.sql.*; public class MySQLToHBase { public static void ...

    hbase 的java代码 集合 hbase 0.96

    1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...

    golang connect hbase thrift2

    func put(client *hbase.Hbase, table string, rowKey []byte, family string, qualifier []byte, value []byte) error { pair := &hbase.TPut{ Row: rowKey, Families: []*hbase.TColumnValue{{Family: family...

    Java操作Hbase进行建表、删表以及对数据进行增删改查

    import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org....

    Hbase笔记 —— 利用JavaAPI的方式操作Hbase数据库(往hbase的表中批量插入数据).pdf

    &lt;artifactId&gt;hbase-client &lt;version&gt;1.4.6 ``` 接下来,我们将在Java程序中创建一个配置对象,通过它连接到HBase。这通常涉及设置ZooKeeper的地址,因为HBase的元数据存储在ZooKeeper中: ```java ...

    python thrift2 connect hbase

    client = hbase.Client(protocol) # 打开连接 transport.open() # put操作 row_key = 'row1' column_family = 'cf' qualifier = 'q' value = 'val1' mutation = Mutation(column=Text(column_family), value=Text...

    HBase使用的jar包

    1. **HBase客户端库**:这是与HBase交互的基础,包含了HBase的API,如`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`等,用于创建连接、打开表、执行Get、Put、Scan等操作。...

    Hbase的JavaAPI

    这通常通过`HBaseConfiguration.create()`方法创建一个配置对象,然后设置相关配置,如Zookeeper地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)等。接着,使用`ConnectionFactory.create...

Global site tag (gtag.js) - Google Analytics