RegionServer端put数据流程分析:
client端通过MultiServerCallable.call调用rs的rpc的multi方法。
regionServer实例ClientProtos.ClientService.BlockingInterface接口。
public MultiResponse multi(finalRpcControllerrpcc, final MultiRequest request)
throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScannercellScanner = controller != null? controller.cellScanner(): null;
if (controller != null) controller.setCellScanner(null);
List<CellScannable> cellsToReturn = null;
MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
得到当前提交的数据,数据按region,list的方式传入过来。
for (RegionAction regionAction : request.getRegionActionList()) {
this.requestCount.add(regionAction.getActionCount());
regionActionResultBuilder.clear();
HRegion region;
try {
从当前regionserver中的onlnieRegions中得到请求的region.
1.从onlineRegions中取出HRegion实例,如果不为空,按如下流程走,否则:执行到5
2.如果onlineRegions列表中不包含此region,从movedRegions列表中拿到region,region的moved超时是2分钟,
如果movedRegions列表中能拿到此region,同时move时间超时,并从movedRegions列表中移出引region返回null,
否则返回正在moved的region,如果movedRegions中返回的region不为null,throw RegionMovedException
3.从regionsInTransitionInRS中获取此region,如果能拿到,同时拿到的值为true,表示region还在做opening操作。
Throw RegionOpeningException
4.如果以上得到的值都为null,表示此server中没有此region, throw NotServingRegionException
此时基本上只有一个可能,region在做split.或者move到其它server(刚完成move,client请求时不在此server)
5.如果1中拿到region,表示正常,region在此server中。
region = getRegion(regionAction.getRegion());
} catch (IOException e) {
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
continue; // For this region it's a failure.
}
检查是否是原子操作:
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
Try {
执行原子操作:保证一个region中所有的action的操作的mvcc的值相同,如果有一个操作失败,整体rollback
mutateRows(region, regionAction.getActionList(), cellScanner);
} catch (IOException e) {
// As it's atomic, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
} else {
// doNonAtomicRegionMutation manages the exception internally
执行非原子操作:见doNonAtomicRegionMutation流程分析
cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
regionActionResultBuilder, cellsToReturn);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
}
// Load the controller with the Cells to return.
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
此时表示是get/append/increment的请求,生成一个CellScanner实例,此实时是查询得到的所有Result
clent通过此CellScanner来获取数据。
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
returnresponseBuilder.build();
}
doNonAtomicRegionMutation处理流程分析:
用于处理非原子性的put/delete/get操作。
privateList<CellScannable> doNonAtomicRegionMutation(final HRegion region,
final RegionAction actions, finalCellScannercellScanner,
final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn) {
// Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
// one at a time, we instead pass them in batch. Be aware that the corresponding
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
对request中指定region中所有的action进行迭代
for (ClientProtos.Action action: actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
如果此action是get操作,直接执行get
if (action.hasGet()) {
Get get = ProtobufUtil.toGet(action.getGet());
r = region.get(get);
} elseif (action.hasMutation()) {
MutationTypetype = action.getMutation().getMutateType();
if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
!mutations.isEmpty()) {
// Flush out any Puts or Deletes already collected.
doBatchOp(builder, region, mutations, cellScanner);
mutations.clear();
}
switch (type) {
如果kv中存在原来的数据,把新kv的数据添加到old kv的后面,kv的大小等于oldkvsize+newkvsize
如果是新的kv,直接当成一列进行存储
caseAPPEND:
r = append(region, action.getMutation(), cellScanner);
break;
值自动增加。
caseINCREMENT:
r = increment(region, action.getMutation(), cellScanner);
break;
如果操作是put/delete,把所有的操作action集合在一起,
casePUT:
caseDELETE:
// Collect the individual mutations and apply in a batch
if (mutations == null) {
mutations = newArrayList<ClientProtos.Action>(actions.getActionCount());
}
mutations.add(action);
break;
default:
thrownewDoNotRetryIOException("Unsupported mutate type: " + type.name());
}
} else {
thrownewHBaseIOException("Unexpected Action type");
}
如果是get/append/increment操作,每一次都会得到一个result,把result添加到返回的response中。
if (r != null) {
ClientProtos.Result pbResult = null;
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
if (isClientCellBlockSupport()) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if (cellsToReturn == null) cellsToReturn = newArrayList<CellScannable>();
cellsToReturn.add(r);
} else {
如果没有配置codec时,把result的所有数据写入到response中,当成client request的响应信息,
这样如果批量的值比较大时,可能会影响到响应的速度。
pbResult = ProtobufUtil.toResult(r);
}
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
// Could get to here and there was no result and no exception. Presumes we added
// a Put or Delete to the collecting Mutations List for adding later. In this
// case the corresponding ResultOrException instance for the Put or Delete will be added
// down in the doBatchOp method call rather than up here.
} catch (IOException ie) {
resultOrExceptionBuilder = ResultOrException.newBuilder().
setException(ResponseConverter.buildException(ie));
}
if (resultOrExceptionBuilder != null) {
// Propagate index.
把第一个子的响应信息添加到集合列表中,等待执行完成统一进行响应。
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
}
}
// Finish up any outstanding mutations
针对put/delete操作,执行批量操作。
if (mutations != null && !mutations.isEmpty()) {
doBatchOp(builder, region, mutations, cellScanner);
}
returncellsToReturn;
}
针对put/delete操作的批量操作处理方法
protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
finalList<ClientProtos.Action> mutations, finalCellScannercells) {
Mutation[] mArray = newMutation[mutations.size()];
longbefore = EnvironmentEdgeManager.currentTimeMillis();
booleanbatchContainsPuts = false, batchContainsDelete = false;
try {
inti = 0;
迭代要操作的所有action,生成put/delete的mutation实例
for (ClientProtos.Action action: mutations) {
MutationProto m = action.getMutation();
Mutationmutation;
if (m.getMutateType() == MutationType.PUT) {
mutation = ProtobufUtil.toPut(m, cells);
batchContainsPuts = true;
} else {
mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
}
mArray[i++] = mutation;
}
requestCount.add(mutations.size());
如果不是metatable的写入,也就是用户表写入,
if (!region.getRegionInfo().isMetaTable()) {
检查并等待全局flush的完成。
1.检查全局的flush是否超过hbase.regionserver.global.memstore.upperLimit配置的值,默认是0.4
如果当前rs中所有的memstore的size总和超过了此值,强制进行flsuh,并等待flush完成。线程wait,
等待MemStoreFlusher.flushRegion去notify此线程的等待。
2.检查全局的flush是否超过hbase.regionserver.global.memstore.lowerLimit配置,默认为0.35
如果rs中所有的memstore的size总和超过了此值,发起flush请求,不等待flush完成,执行下面流程
cacheFlusher.reclaimMemStoreMemory();
}
通过HRegion执行更新操作。
OperationStatus codes[] = region.batchMutate(mArray, false);
处理更新后每一条数据是否成功的信息,并添加到response中。
for (i = 0; i < codes.length; i++) {
intindex = mutations.get(i).getIndex();
Exception e = null;
switch (codes[i].getOperationStatusCode()) {
caseBAD_FAMILY:
出现这种情况表示要更新的action中指定的cf不存在或cf的name为null,
e = newNoSuchColumnFamilyException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
caseSANITY_CHECK_FAILURE:
出现这种情况表示要更新的action中有kv的timestamp的值超出了当前rs中的时间,
通过hbase.hregion.keyvalue.timestamp.slop.millisecs配置可超出的时间范围,默认为不控制
e = newFailedSanityCheckException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
default:
e = newDoNotRetryIOException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
caseSUCCESS:
正常结束
builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
break;
}
}
} catch (IOException ie) {
for (inti = 0; i < mutations.size(); i++) {
builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
}
}
更新监控数据
longafter = EnvironmentEdgeManager.currentTimeMillis();
if (batchContainsPuts) {
metricsRegionServer.updatePut(after - before);
}
if (batchContainsDelete) {
metricsRegionServer.updateDelete(after - before);
}
}
Hregion.batchMutate:
OperationStatus[] batchMutate(Mutation[] mutations, boolean isReplay)
throws IOException {
初始化批量执行处理程序,把每一个action的status设置为OperationStatus.NOT_RUN
BatchOperationInProgress<Mutation> batchOp =
newBatchOperationInProgress<Mutation>(mutations);
booleaninitialized = false;
如果处理还没有结束,一直迭代,
Hregion.BatchOperationInProgress.nextIndexToProcess==要处理的action个数表示完成处理
while (!batchOp.isDone()) {
if (!isReplay) {
checkReadOnly();
}
checkResources();
longnewSize;
if (isReplay) {
startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
} else {
startRegionOperation(Operation.BATCH_MUTATE);
}
try {
if (!initialized) {
if (!isReplay) {
region请求加一,
this.writeRequestsCount.increment();
迭代每一个action,执行cp的prePut/preDelete操作。
doPreMutationHook(batchOp);
}
initialized = true;
}
执行更新操作,
检查cf是否合法,如果是put,检查put传入的cf是否在table中存在,不存在此action的status为BAD_FAMILY
检查put的timestamp是否合法,需要在rs当前时间的一个合理范围内。
不在范围内此action.status为SANITY_CHECK_FAILURE
检查delete的cf是否合法。
如果是put的action,更新put的所有kv中timestamp为当前的rs时间
如果是delete的action,通过get先读取每一个cf中的数据,检查是否需要执行删除,
删除的timestamp为Long.MAX_VALUE
在以上检查完成后,如果部分检查不合法的action,它们的状态为非OperationStatusCode.NOT_RUN状态
把所有现在是NOT_RUN状态的action添加到对应的cf的memstore中。
每一个store中kv的MvccVersion的值为mvcc中memstoreWrite的值(region max seqid + 1)
根据更新的action中所有的store(cf),分别调用region中不同的store的add(HStore.add)方法添加到memstore中,
一个action中不同的store中所有的kv的mvccversion的都相同。
把更新过的每一个action的status的状态设置为OperationStatus.SUCCESS
写入wal日志,通过append方式添加日志,
日志的flush通过hbase.regionserver.optionallogflushinterval进行配置,默认为1*1000ms,-1表示实时更新
或可以通过table定义的时候设置DURABILITY属性,可设置为SYNC_WAL/FSYNC_WAL表示实时更新日志
得到当此添加的所的kv的总大小。
longaddedSize = doMiniBatchMutation(batchOp, isReplay);
把当前更新的size添加到rs中的全局memstore的大小,atomicGlobalMemstoreSize
把当前更新的size添加到当前region的memstore中。memstoreSize
newSize = this.addAndGetGlobalMemstoreSize(addedSize);
} finally {
closeRegionOperation();
}
检查当前region中memstore的大小是否超过hbase.hregion.memstore.flush.size配置的大小,默认1024*1024*128L
如果需要flush,通过MemStoreFlusher.requestFlush(HRegion)发起flush请求
if (isFlushSize(newSize)) {
requestFlush();
}
}
returnbatchOp.retCodeDetails;
}
相关推荐
Put操作是HBase中的一种基本操作,用于将数据写入到RegionServer中。通过设置Put操作的 AutoFlush 为false,可以提高写入性能。例如,在某个Region Server集群上,新建立一个LZO压缩表,测试Put和Get的性能,可以...
此外,HBase与MapReduce结合,可以进行批量处理和分析任务。 5. 高可用性:HBase提供了主备RegionServer机制,当主RegionServer故障时,备用RegionServer能够无缝接管。同时,数据的多版本策略也提高了系统的容错性...
HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。
HBase提供了丰富的监控工具,如JMX、HBase Master UI和RegionServer UI,可以帮助管理员监控集群状态、排查问题。同时,日志分析也是故障排查的重要手段。 总结,Linux环境下的HBase 2.3.2客户端为开发者提供了强大...
在本文中,我们将深入探讨如何使用Scala API操作HBase数据库。HBase是一个分布式、面向列的NoSQL数据库,它构建于Hadoop之上,...同时,熟悉HBase的RegionServer和Master节点的工作原理也有助于优化你的应用程序性能。
这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍HBase的核心概念、安装步骤以及配置和管理。 一、HBase核心概念 1. 表(Table):HBase中的表是由行...
5. **启动 HBase**:通过命令行启动 HBase 的 Master 和 RegionServer。 **3.3 HBase Shell 操作** - **连接 HBase Shell**:使用 `hbase shell` 命令连接到 HBase Shell。 - **创建表**:使用 `create 'tablename...
HBase 0.98.12.1能够与YARN无缝集成,使得HBase Master和RegionServer可以作为YARN的应用程序运行,提高了资源利用率和系统弹性。 四、安装与配置 1. 下载:首先,从Apache官网下载HBase 0.98.12.1的源码或二进制...
使用HBase的Master和RegionServer提供的监控指标,可以监控HBase集群的健康状态和性能。通过调整配置参数,如region大小、MemStore大小等,可以优化HBase的性能。 总之,HBase编程实践涵盖了从基础概念到高级应用...
- **实时数据分析**:HBase常用于实时分析,如日志分析、用户行为追踪等。 - **物联网(IoT)**:在物联网场景下,HBase能快速存储和查询大量设备产生的数据。 - **搜索引擎索引**:构建搜索索引,实现快速查询。 ...
在Java中,我们通过HBase客户端API与HBase交互,这些API包括了`HBaseAdmin`、`HTable`、`Put`、`Get`、`Scan`等核心类。 1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置...
4. 错误处理:HbaseClient具有良好的错误恢复机制,当服务器端出现问题时,客户端会自动重试,保证数据操作的可靠性。 5. 客户端缓存:为了提高性能,HbaseClient使用了缓存策略,如Cell缓存和RowCache。Cell缓存将...
- **Get和Put操作**:客户端通过HTable接口的get和put方法与HBase交互,发送请求到RegionServer。 - **Scanning**:用于批量获取数据,支持过滤器,优化数据检索效率。 - **Compaction**:定期合并region中的...
- **Master**:HBase主服务器,负责RegionServer的负载均衡、Region的分配和元数据管理。 3. **HBase API**: - **HBase客户端**:提供了Java API和命令行接口(HBase Shell),用于与HBase交互,包括创建表、...
2. HBase服务器端:包括RegionServer和Master,它们负责处理客户端请求和集群管理。 3. Region分裂:当Region变得过大时,HBase会自动将其分裂成两个新的Region。 4. Compaction:定期合并小的HFile以减少Region中...
1. **HBase客户端库**:这是与HBase交互的基础,包含了HBase的API,如`org.apache.hadoop.hbase.client.Connection`和`org.apache.hadoop.hbase.client.Table`等,用于创建连接、打开表、执行Get、Put、Scan等操作。...
10. **扩展性**:随着数据的增长,可以通过增加RegionServer节点来横向扩展HBase集群,提高处理能力和存储容量。 11. **优化策略**:包括合理设置Region大小、预分区表、选择合适的Column Family、启用BlockCache等...
HBase的协处理器机制允许在RegionServer端执行自定义逻辑,从而提高数据处理效率和安全性。协处理器分为两种类型:`Observer`和`Coprocessor`。Observer可以监听并干预表的各种操作,比如在数据写入时进行验证,或者...
hbase-daemon.sh start regionserver ``` 8. **验证启动**: 使用`jps`命令查看当前运行的Java进程,你应该能看到HMaster和HRegionServer的进程。 9. **测试HBase**: 启动HBase shell进行一些基本操作,如...