`
hongs_yang
  • 浏览: 61293 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

hbase put 流程分析regionserver端

阅读更多

RegionServerput数据流程分析:

 

client端通过MultiServerCallable.call调用rsrpcmulti方法。

 

regionServer实例ClientProtos.ClientService.BlockingInterface接口。

 

 

 

 

 

public MultiResponse multi(finalRpcControllerrpcc, final MultiRequest request)

 

throws ServiceException {

 

 

 

// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.

 

// It is also the conduit via which we pass back data.

 

PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;

 

CellScannercellScanner = controller != null? controller.cellScanner(): null;

 

if (controller != null) controller.setCellScanner(null);

 

List<CellScannable> cellsToReturn = null;

 

MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();

 

RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();

 

 

 

 

 

得到当前提交的数据,数据按region,list的方式传入过来。

 

for (RegionAction regionAction : request.getRegionActionList()) {

 

this.requestCount.add(regionAction.getActionCount());

 

regionActionResultBuilder.clear();

 

HRegion region;

 

try {

 

从当前regionserver中的onlnieRegions中得到请求的region.

 

1.onlineRegions中取出HRegion实例,如果不为空,按如下流程走,否则:执行到5

 

2.如果onlineRegions列表中不包含此region,movedRegions列表中拿到region,regionmoved超时是2分钟,

 

如果movedRegions列表中能拿到此region,同时move时间超时,并从movedRegions列表中移出引region返回null,

 

否则返回正在movedregion,如果movedRegions中返回的region不为null,throw RegionMovedException

 

3.regionsInTransitionInRS中获取此region,如果能拿到,同时拿到的值为true,表示region还在做opening操作。

 

Throw RegionOpeningException

 

4.如果以上得到的值都为null,表示此server中没有此region, throw NotServingRegionException

 

此时基本上只有一个可能,region在做split.或者move到其它server(刚完成move,client求时不在此server)

 

5.如果1中拿到region,表示正常,region在此server中。

 

 

 

region = getRegion(regionAction.getRegion());

 

} catch (IOException e) {

 

regionActionResultBuilder.setException(ResponseConverter.buildException(e));

 

responseBuilder.addRegionActionResult(regionActionResultBuilder.build());

 

continue; // For this region it's a failure.

 

}

 

检查是否是原子操作:

 

if (regionAction.hasAtomic() && regionAction.getAtomic()) {

 

// How does this call happen? It may need some work to play well w/ the surroundings.

 

// Need to return an item per Action along w/ Action index. TODO.

 

Try {

 

执行原子操作:保证一个region中所有的action的操作的mvcc的值相同,如果有一个操作失败,整体rollback

 

mutateRows(region, regionAction.getActionList(), cellScanner);

 

} catch (IOException e) {

 

// As it's atomic, we may expect it's a global failure.

 

regionActionResultBuilder.setException(ResponseConverter.buildException(e));

 

}

 

} else {

 

// doNonAtomicRegionMutation manages the exception internally

 

执行非原子操作:doNonAtomicRegionMutation流程分析

 

cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,

 

regionActionResultBuilder, cellsToReturn);

 

}

 

responseBuilder.addRegionActionResult(regionActionResultBuilder.build());

 

}

 

// Load the controller with the Cells to return.

 

if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {

 

如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。

 

同时response要返回的result只返回result的大小

 

如果是批量的get/append/increment操作时,建议是把codec的配置设置上。

 

此时表示是get/append/increment的请求,生成一个CellScanner实例,此实时是查询得到的所有Result

 

clent通过此CellScanner来获取数据。

 

controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));

 

}

 

returnresponseBuilder.build();

 

}

 

 

 

doNonAtomicRegionMutation处理流程分析:

 

用于处理非原子性的put/delete/get操作。

 

 

 

privateList<CellScannable> doNonAtomicRegionMutation(final HRegion region,

 

final RegionAction actions, finalCellScannercellScanner,

 

final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn) {

 

// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do

 

// one at a time, we instead pass them in batch. Be aware that the corresponding

 

// ResultOrException instance that matches each Put or Delete is then added down in the

 

// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched

 

List<ClientProtos.Action> mutations = null;

 

request中指定region中所有的action进行迭代

 

for (ClientProtos.Action action: actions.getActionList()) {

 

ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;

 

try {

 

Result r = null;

 

如果此actionget操作,直接执行get

 

if (action.hasGet()) {

 

Get get = ProtobufUtil.toGet(action.getGet());

 

r = region.get(get);

 

} elseif (action.hasMutation()) {

 

MutationTypetype = action.getMutation().getMutateType();

 

if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&

 

!mutations.isEmpty()) {

 

// Flush out any Puts or Deletes already collected.

 

doBatchOp(builder, region, mutations, cellScanner);

 

mutations.clear();

 

}

 

switch (type) {

 

如果kv中存在原来的数据,把新kv的数据添加到old kv的后面,kv的大小等于oldkvsize+newkvsize

 

如果是新的kv,直接当成一列进行存储

 

caseAPPEND:

 

r = append(region, action.getMutation(), cellScanner);

 

break;

 

值自动增加。

 

caseINCREMENT:

 

r = increment(region, action.getMutation(), cellScanner);

 

break;

 

如果操作是put/delete,把所有的操作action集合在一起,

 

casePUT:

 

caseDELETE:

 

// Collect the individual mutations and apply in a batch

 

if (mutations == null) {

 

mutations = newArrayList<ClientProtos.Action>(actions.getActionCount());

 

}

 

mutations.add(action);

 

break;

 

default:

 

thrownewDoNotRetryIOException("Unsupported mutate type: " + type.name());

 

}

 

} else {

 

thrownewHBaseIOException("Unexpected Action type");

 

}

 

如果是get/append/increment操作,每一次都会得到一个result,result添加到返回的response中。

 

if (r != null) {

 

ClientProtos.Result pbResult = null;

 

如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。

 

同时response要返回的result只返回result的大小

 

如果是批量的get/append/increment操作时,建议是把codec的配置设置上。

 

if (isClientCellBlockSupport()) {

 

pbResult = ProtobufUtil.toResultNoData(r);

 

// Hard to guess the size here. Just make a rough guess.

 

if (cellsToReturn == null) cellsToReturn = newArrayList<CellScannable>();

 

cellsToReturn.add(r);

 

} else {

 

如果没有配置codec时,把result的所有数据写入到response中,当成client request的响应信息,

 

这样如果批量的值比较大时,可能会影响到响应的速度。

 

pbResult = ProtobufUtil.toResult(r);

 

}

 

resultOrExceptionBuilder =

 

ClientProtos.ResultOrException.newBuilder().setResult(pbResult);

 

}

 

// Could get to here and there was no result and no exception. Presumes we added

 

// a Put or Delete to the collecting Mutations List for adding later. In this

 

// case the corresponding ResultOrException instance for the Put or Delete will be added

 

// down in the doBatchOp method call rather than up here.

 

} catch (IOException ie) {

 

resultOrExceptionBuilder = ResultOrException.newBuilder().

 

setException(ResponseConverter.buildException(ie));

 

}

 

if (resultOrExceptionBuilder != null) {

 

// Propagate index.

 

把第一个子的响应信息添加到集合列表中,等待执行完成统一进行响应。

 

resultOrExceptionBuilder.setIndex(action.getIndex());

 

builder.addResultOrException(resultOrExceptionBuilder.build());

 

}

 

}

 

// Finish up any outstanding mutations

 

针对put/delete操作,执行批量操作。

 

if (mutations != null && !mutations.isEmpty()) {

 

doBatchOp(builder, region, mutations, cellScanner);

 

}

 

returncellsToReturn;

 

}

 

针对put/delete操作的批量操作处理方法

 

protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,

 

finalList<ClientProtos.Action> mutations, finalCellScannercells) {

 

Mutation[] mArray = newMutation[mutations.size()];

 

longbefore = EnvironmentEdgeManager.currentTimeMillis();

 

booleanbatchContainsPuts = false, batchContainsDelete = false;

 

try {

 

inti = 0;

 

迭代要操作的所有action,生成put/deletemutation实例

 

for (ClientProtos.Action action: mutations) {

 

MutationProto m = action.getMutation();

 

Mutationmutation;

 

if (m.getMutateType() == MutationType.PUT) {

 

mutation = ProtobufUtil.toPut(m, cells);

 

batchContainsPuts = true;

 

} else {

 

mutation = ProtobufUtil.toDelete(m, cells);

 

batchContainsDelete = true;

 

}

 

mArray[i++] = mutation;

 

}

 

 

 

requestCount.add(mutations.size());

 

如果不是metatable的写入,也就是用户表写入,

 

if (!region.getRegionInfo().isMetaTable()) {

 

检查并等待全局flush的完成。

 

1.检查全局的flush是否超过hbase.regionserver.global.memstore.upperLimit配置的值,默认是0.4

 

如果当前rs中所有的memstoresize总和超过了此值,强制进行flsuh,并等待flush完成。线程wait,

 

等待MemStoreFlusher.flushRegionnotify此线程的等待。

 

2.检查全局的flush是否超过hbase.regionserver.global.memstore.lowerLimit配置,默认为0.35

 

如果rs中所有的memstoresize总和超过了此值,发起flush请求,不等待flush完成,执行下面流程

 

cacheFlusher.reclaimMemStoreMemory();

 

}

 

通过HRegion执行更新操作。

 

OperationStatus codes[] = region.batchMutate(mArray, false);

 

处理更新后每一条数据是否成功的信息,并添加到response中。

 

for (i = 0; i < codes.length; i++) {

 

intindex = mutations.get(i).getIndex();

 

Exception e = null;

 

switch (codes[i].getOperationStatusCode()) {

 

caseBAD_FAMILY:

 

出现这种情况表示要更新的action中指定的cf不存在或cfnamenull,

 

e = newNoSuchColumnFamilyException(codes[i].getExceptionMsg());

 

builder.addResultOrException(getResultOrException(e, index));

 

break;

 

 

 

caseSANITY_CHECK_FAILURE:

 

出现这种情况表示要更新的action中有kvtimestamp的值超出了当前rs中的时间,

 

通过hbase.hregion.keyvalue.timestamp.slop.millisecs配置可超出的时间范围,默认为不控制

 

e = newFailedSanityCheckException(codes[i].getExceptionMsg());

 

builder.addResultOrException(getResultOrException(e, index));

 

break;

 

 

 

default:

 

e = newDoNotRetryIOException(codes[i].getExceptionMsg());

 

builder.addResultOrException(getResultOrException(e, index));

 

break;

 

 

 

caseSUCCESS:

 

正常结束

 

builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index));

 

break;

 

}

 

}

 

} catch (IOException ie) {

 

for (inti = 0; i < mutations.size(); i++) {

 

builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));

 

}

 

}

 

更新监控数据

 

longafter = EnvironmentEdgeManager.currentTimeMillis();

 

if (batchContainsPuts) {

 

metricsRegionServer.updatePut(after - before);

 

}

 

if (batchContainsDelete) {

 

metricsRegionServer.updateDelete(after - before);

 

}

 

}

 

 

 

Hregion.batchMutate

 

 

 

OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)

 

throws IOException {

 

初始化批量执行处理程序,把每一个actionstatus设置为OperationStatus.NOT_RUN

 

BatchOperationInProgress<Mutation> batchOp =

 

newBatchOperationInProgress<Mutation>(mutations);

 

 

 

booleaninitialized = false;

 

如果处理还没有结束,一直迭代,

 

Hregion.BatchOperationInProgress.nextIndexToProcess==要处理的action个数表示完成处理

 

while (!batchOp.isDone()) {

 

if (!isReplay) {

 

checkReadOnly();

 

}

 

checkResources();

 

 

 

longnewSize;

 

if (isReplay) {

 

startRegionOperation(Operation.REPLAY_BATCH_MUTATE);

 

} else {

 

startRegionOperation(Operation.BATCH_MUTATE);

 

}

 

 

 

try {

 

if (!initialized) {

 

if (!isReplay) {

 

region请求加一,

 

this.writeRequestsCount.increment();

 

迭代每一个action,执行cpprePut/preDelete操作。

 

doPreMutationHook(batchOp);

 

}

 

initialized = true;

 

}

 

执行更新操作,

 

检查cf是否合法,如果是put,检查put传入的cf是否在table中存在,不存在此actionstatusBAD_FAMILY

 

检查puttimestamp是否合法,需要在rs当前时间的一个合理范围内。

 

不在范围内此action.statusSANITY_CHECK_FAILURE

 

检查deletecf是否合法。

 

如果是putaction,更新put的所有kvtimestamp为当前的rs时间

 

如果是deleteaction,通过get先读取每一个cf中的数据,检查是否需要执行删除,

 

删除的timestampLong.MAX_VALUE

 

在以上检查完成后,如果部分检查不合法的action,它们的状态为非OperationStatusCode.NOT_RUN状态

 

把所有现在是NOT_RUN状态的action添加到对应的cfmemstore中。

 

每一个storekvMvccVersion的值为mvccmemstoreWrite的值(region max seqid + 1)

 

根据更新的action中所有的store(cf),分别调用region中不同的storeadd(HStore.add)方法添加到memstore中,

 

一个action中不同的store中所有的kvmvccversion的都相同。

 

把更新过的每一个actionstatus的状态设置为OperationStatus.SUCCESS

 

写入wal日志,通过append方式添加日志,

 

日志的flush通过hbase.regionserver.optionallogflushinterval进行配置,默认为1*1000ms,-1表示实时更新

 

或可以通过table定义的时候设置DURABILITY属性,可设置为SYNC_WAL/FSYNC_WAL表示实时更新日志

 

得到当此添加的所的kv的总大小。

 

 

 

longaddedSize = doMiniBatchMutation(batchOp, isReplay);

 

 

 

把当前更新的size添加到rs中的全局memstore的大小,atomicGlobalMemstoreSize

 

把当前更新的size添加到当前regionmemstore中。memstoreSize

 

newSize = this.addAndGetGlobalMemstoreSize(addedSize);

 

} finally {

 

closeRegionOperation();

 

}

 

检查当前regionmemstore的大小是否超过hbase.hregion.memstore.flush.size配置的大小,默认1024*1024*128L

 

如果需要flush,通过MemStoreFlusher.requestFlush(HRegion)发起flush请求

 

if (isFlushSize(newSize)) {

 

requestFlush();

 

}

 

}

 

returnbatchOp.retCodeDetails;

 

}

 

 

 

0
1
分享到:
评论
2 楼 hongs_yang 2014-04-15  
可以有一些意见发表的,后期我会慢慢发布一些hadoop/yarn的分析情况。
1 楼 qindongliang1922 2014-04-15  
厉害

相关推荐

    HBase的性能优化

    Put操作是HBase中的一种基本操作,用于将数据写入到RegionServer中。通过设置Put操作的 AutoFlush 为false,可以提高写入性能。例如,在某个Region Server集群上,新建立一个LZO压缩表,测试Put和Get的性能,可以...

    HBase权威指南

    此外,HBase与MapReduce结合,可以进行批量处理和分析任务。 5. 高可用性:HBase提供了主备RegionServer机制,当主RegionServer故障时,备用RegionServer能够无缝接管。同时,数据的多版本策略也提高了系统的容错性...

    Hbase1.3.1源码

    HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。

    最新版linux hbase-2.3.2-client-bin.tar.gz

    HBase提供了丰富的监控工具,如JMX、HBase Master UI和RegionServer UI,可以帮助管理员监控集群状态、排查问题。同时,日志分析也是故障排查的重要手段。 总结,Linux环境下的HBase 2.3.2客户端为开发者提供了强大...

    scala API 操作hbase表

    在本文中,我们将深入探讨如何使用Scala API操作HBase数据库。HBase是一个分布式、面向列的NoSQL数据库,它构建于Hadoop之上,...同时,熟悉HBase的RegionServer和Master节点的工作原理也有助于优化你的应用程序性能。

    hbase-2.4.17-bin 安装包

    这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍HBase的核心概念、安装步骤以及配置和管理。 一、HBase核心概念 1. 表(Table):HBase中的表是由行...

    hbase指南 英文

    5. **启动 HBase**:通过命令行启动 HBase 的 Master 和 RegionServer。 **3.3 HBase Shell 操作** - **连接 HBase Shell**:使用 `hbase shell` 命令连接到 HBase Shell。 - **创建表**:使用 `create 'tablename...

    hbase-0.98.12.1-hadoop2-bin.tar.gz

    HBase 0.98.12.1能够与YARN无缝集成,使得HBase Master和RegionServer可以作为YARN的应用程序运行,提高了资源利用率和系统弹性。 四、安装与配置 1. 下载:首先,从Apache官网下载HBase 0.98.12.1的源码或二进制...

    HBase编程实践

    使用HBase的Master和RegionServer提供的监控指标,可以监控HBase集群的健康状态和性能。通过调整配置参数,如region大小、MemStore大小等,可以优化HBase的性能。 总之,HBase编程实践涵盖了从基础概念到高级应用...

    HBase大数据.zip

    - **实时数据分析**:HBase常用于实时分析,如日志分析、用户行为追踪等。 - **物联网(IoT)**:在物联网场景下,HBase能快速存储和查询大量设备产生的数据。 - **搜索引擎索引**:构建搜索索引,实现快速查询。 ...

    hbase 的java代码 集合 hbase 0.96

    在Java中,我们通过HBase客户端API与HBase交互,这些API包括了`HBaseAdmin`、`HTable`、`Put`、`Get`、`Scan`等核心类。 1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置...

    [原创]HbaseClient

    4. 错误处理:HbaseClient具有良好的错误恢复机制,当服务器端出现问题时,客户端会自动重试,保证数据操作的可靠性。 5. 客户端缓存:为了提高性能,HbaseClient使用了缓存策略,如Cell缓存和RowCache。Cell缓存将...

    hbase-0.98.6.1-src.zip

    - **Get和Put操作**:客户端通过HTable接口的get和put方法与HBase交互,发送请求到RegionServer。 - **Scanning**:用于批量获取数据,支持过滤器,优化数据检索效率。 - **Compaction**:定期合并region中的...

    hbase权威指南源码

    - **Master**:HBase主服务器,负责RegionServer的负载均衡、Region的分配和元数据管理。 3. **HBase API**: - **HBase客户端**:提供了Java API和命令行接口(HBase Shell),用于与HBase交互,包括创建表、...

    hbase0.94java源代码

    2. HBase服务器端:包括RegionServer和Master,它们负责处理客户端请求和集群管理。 3. Region分裂:当Region变得过大时,HBase会自动将其分裂成两个新的Region。 4. Compaction:定期合并小的HFile以减少Region中...

    HBase使用的jar包

    1. **HBase客户端库**:这是与HBase交互的基础,包含了HBase的API,如`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`等,用于创建连接、打开表、执行Get、Put、Scan等操作。...

    hbase 数据库

    10. **扩展性**:随着数据的增长,可以通过增加RegionServer节点来横向扩展HBase集群,提高处理能力和存储容量。 11. **优化策略**:包括合理设置Region大小、预分区表、选择合适的Column Family、启用BlockCache等...

    hbase的java client实例

    HBase的协处理器机制允许在RegionServer端执行自定义逻辑,从而提高数据处理效率和安全性。协处理器分为两种类型:`Observer`和`Coprocessor`。Observer可以监听并干预表的各种操作,比如在数据写入时进行验证,或者...

    在Ubuntu安装配置hbase

    hbase-daemon.sh start regionserver ``` 8. **验证启动**: 使用`jps`命令查看当前运行的Java进程,你应该能看到HMaster和HRegionServer的进程。 9. **测试HBase**: 启动HBase shell进行一些基本操作,如...

Global site tag (gtag.js) - Google Analytics