- 浏览: 37315 次
- 来自: 杭州
最新评论
HBase rpc 0.94中
例如在client put数据时,会调用htable的flushCommits,再调HConnectionImplementationr的processBatch,再调processBatchCallback
中,在这里异步调用线程,并使用future取得结果,最终执行的是call方法。
其中的异步调用的线程池
HMaster or HRegionServer初始化创建HBaseServer调用HBaseRPC.getServer->
HBase 0.94 HRegionServer默认启动10个handler线程用于处理rpc请求(hbase.regionserver.handler.count),
HBaseServer的Listerner的reader线程接到RPC请求后,会丢到queue中,
其中的reader线程数是由一个线程池决定
1. HBaseServer创建后有几个重要的角色.
1.1 Listener deamon线程,负责接收HMaster,HRegionServer,HBase Client的http请求.
1.2 Responder demon线程,负责将处理完的请求,发送回调用者.
1.3 Connection listener接收到的每个Socket请求都会创建一个Connection 实例.
1.4 Call 每一个客户端的发送的请求由Connection读取到有效数据后都会生成一个Call实例
1.5 LinkedBlockingQueue callQueue 每个由新生成的call都会放入到callQueue这个队列里.
1.6 Handler 从callQueue中取出call,并对call进行反射的调用,生成的结果value,交由responder处理.
1.7 LinkedList Connection.responseQueue ,用来存放已经由handler处理过的属于同一个Connection的call.
HBaseServer->listener = new Listener();->Reader.run->doRunLoop->doRead->Connection.readAndProcess->processData中将rpc请求生成call并加入到queue中。
HBaseServer的下面方法决定了接收RPC请求的queue大小maxQueueLength ,默认是100
当regionserver启动时调用startServiceThreads内会调用HBaseServer的下面方法启动10个hander线程。
handler线程最终会调用到run的call方法,再此方法中会射出相应的方法并最终调用,也就是会调用相应的regionserver的方法。
例如在client put数据时,会调用htable的flushCommits,再调HConnectionImplementationr的processBatch,再调processBatchCallback
中,在这里异步调用线程,并使用future取得结果,最终执行的是call方法。
// step 2: make the requests Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer : futures.entrySet()) { HRegionLocation loc = responsePerServer.getKey();
private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc, final MultiAction<R> multi, final byte [] tableName) { // TODO: This does not belong in here!!! St.Ack HConnections should // not be dealing in Callables; Callables have HConnections, not other // way around. final HConnection connection = this; return new Callable<MultiResponse>() { public MultiResponse call() throws IOException { ServerCallable<MultiResponse> callable = new ServerCallable<MultiResponse>(connection, tableName, null) { public MultiResponse call() throws IOException { return server.multi(multi); } @Override public void connect(boolean reload) throws IOException { server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); } }; return callable.withoutRetries(); } }; }
其中的异步调用的线程池
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); // Using the "direct handoff" approach, new threads will only be created // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new DaemonThreadFactory()); ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true); this.finishSetup();
HMaster or HRegionServer初始化创建HBaseServer调用HBaseRPC.getServer->
this.rpcServer = HBaseRPC.getServer(this, new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class}, initialIsa.getHostName(), // BindAddress is IP we got for this server. initialIsa.getPort(), numHandlers, 0, // we dont use high priority handlers in master conf.getBoolean("hbase.rpc.verbose", false), conf, 0); // this is a DNC w/o high priority handlers
HBase 0.94 HRegionServer默认启动10个handler线程用于处理rpc请求(hbase.regionserver.handler.count),
this.rpcServer = HBaseRPC.getServer(this, new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class, OnlineRegions.class}, initialIsa.getHostName(), // BindAddress is IP we got for this server. initialIsa.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), conf.getInt("hbase.regionserver.metahandler.count", 10), conf.getBoolean("hbase.rpc.verbose", false), conf, QOS_THRESHOLD); // Set our address. this.isa = this.rpcServer.getListenerAddress(); this.rpcServer.setErrorHandler(this); this.rpcServer.setQosFunction(new QosFunction());
HBaseServer的Listerner的reader线程接到RPC请求后,会丢到queue中,
其中的reader线程数是由一个线程池决定
this.readThreads = conf.getInt( "ipc.server.read.threadpool.size", 10);
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "IPC Reader %d on port " + port).setDaemon(true).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
1. HBaseServer创建后有几个重要的角色.
1.1 Listener deamon线程,负责接收HMaster,HRegionServer,HBase Client的http请求.
1.2 Responder demon线程,负责将处理完的请求,发送回调用者.
1.3 Connection listener接收到的每个Socket请求都会创建一个Connection 实例.
1.4 Call 每一个客户端的发送的请求由Connection读取到有效数据后都会生成一个Call实例
1.5 LinkedBlockingQueue callQueue 每个由新生成的call都会放入到callQueue这个队列里.
1.6 Handler 从callQueue中取出call,并对call进行反射的调用,生成的结果value,交由responder处理.
1.7 LinkedList Connection.responseQueue ,用来存放已经由handler处理过的属于同一个Connection的call.
HBaseServer->listener = new Listener();->Reader.run->doRunLoop->doRead->Connection.readAndProcess->processData中将rpc请求生成call并加入到queue中。
Call call = new Call(id, param, this, responder, callSize); callQueueSize.add(callSize);
HBaseServer的下面方法决定了接收RPC请求的queue大小maxQueueLength ,默认是100
String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size"); if (oldMaxQueueSize == null) { this.maxQueueLength = this.conf.getInt("ipc.server.max.callqueue.length", handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); } else { LOG.warn("ipc.server.max.queue.size was renamed " + "ipc.server.max.callqueue.length, " + "please update your configuration"); this.maxQueueLength = Integer.getInteger(oldMaxQueueSize); } this.maxQueueSize = this.conf.getInt("ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE); this.readThreads = conf.getInt( "ipc.server.read.threadpool.size", 10); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueLength);
当regionserver启动时调用startServiceThreads内会调用HBaseServer的下面方法启动10个hander线程。
public synchronized void startThreads() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(callQueue, i); handlers[i].start(); } if (priorityHandlerCount > 0) { priorityHandlers = new Handler[priorityHandlerCount]; for (int i = 0 ; i < priorityHandlerCount; i++) { priorityHandlers[i] = new Handler(priorityCallQueue, i); priorityHandlers[i].start(); } } }
handler线程最终会调用到run的call方法,再此方法中会射出相应的方法并最终调用,也就是会调用相应的regionserver的方法。
Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); //Verify protocol version. //Bypass the version check for VersionedProtocol if (!method.getDeclaringClass().equals(VersionedProtocol.class)) { long clientVersion = call.getProtocolVersion(); ProtocolSignature serverInfo = ((VersionedProtocol) instance) .getProtocolSignature(protocol.getCanonicalName(), call .getProtocolVersion(), call.getClientMethodsHash()); long serverVersion = serverInfo.getVersion(); if (serverVersion != clientVersion) { LOG.warn("Version mismatch: client version=" + clientVersion + ", server version=" + serverVersion); throw new RPC.VersionMismatch(protocol.getName(), clientVersion, serverVersion); } } Object impl = null; if (protocol.isAssignableFrom(this.implementation)) { impl = this.instance; } else { throw new HBaseRPC.UnknownProtocolException(protocol); } long startTime = System.currentTimeMillis(); Object[] params = call.getParameters(); Object value = method.invoke(impl, params);
发表评论
-
HBase中Lease创建、失效、及常见问题
2013-11-07 11:52 6366HBase通过租约来控制每个scanner的操作时间。 1 ... -
HBase HFile和Hlog的cleaner执行流程和配置项
2013-06-09 14:50 5858HFile和Hlog是HBase中两大文件存在格式,HFile ... -
HLog代码分析及与HBase replication延时
2013-02-19 19:51 2765在分享replication时,有同事提出replicatio ... -
HBase BlockCache 代码分析
2013-02-04 22:08 18371. Cache 读写 调用逻辑: hmaster.handl ... -
HBase replication 代码分析
2013-01-28 17:23 2967随着HBase的大规模应用,HBase的容灾显得特别的重要。 ... -
HBase大集群
2013-01-28 14:02 1230为什么HBase没有大集群? 因为如果一个HBase集群很大, ... -
HBase MVCC基本原理
2012-11-28 16:21 3263HBase MVCC(Multi Version Consi ... -
flush后split和compact后split
2012-10-11 17:51 1585什么时候split? 当某store所有文件总大小大于某个值时 ... -
HBase keyvalue大小导致OOM
2012-09-27 12:11 1468在HBase上传时,会通过配置参数hbase.client.k ... -
HBase什么时候做minor major compact
2012-10-09 18:11 3795我们都知道compact分为两类,一类叫Minor compa ... -
HBase Put及flush
2012-09-21 10:50 17841. Put及flush a. ...
相关推荐
4.Endpoint:终端是动态 RPC 插件的接口,它的实现代码被安装在服务器端,能够通过 HBase RPC 调用唤醒提供接口。 HBase Coprocessor 的应用场景: 1.二级索引:HBase Coprocessor 可以实现二级索引的创建和维护,...
- Endpoint:作为动态RPC插件的接口,其服务端实现可以被HBase RPC调用触发,提供自定义功能。 【Endpoint服务端实现】 Endpoint在服务端的实现涉及到RPC通信,因此客户端和服务端需定义一致的接口。HBase使用...
- **Endpoint**:动态 RPC 插件接口,实现代码部署在服务器端,通过 HBase RPC 调用触发。 #### 二、Endpoint服务端实现 Endpoint 作为一种特殊的 Coprocessor,允许在服务器端直接处理请求,无需将所有数据返回给...
HBase中的RPC机制与Java的RMI(远程方法调用)不同,它采用了一种轻量级的RPC机制,这种方式不依赖于Java的RMI机制,使得它能够更好地适应分布式环境下的复杂通信需求。在HBase中,RPC机制是其主要的通信手段,它...
在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和HRegionServer通信。对于数据读写操作,Client直接与HRegionServer交互,而对于表管理和元数据操作,Client则与HMaster...
这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoopRPC(RemoteProcedureCallProtocol,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。...
1. 全表扫描:在无缓存情况下,Hive on HBase的查询速度远低于Hive on HDFS,因为HBase的全表扫描涉及到大量RPC调用。然而,通过设置`hbase.client.scanner.caching`参数,可以显著提高Hive on HBase的性能,尽管...
3. **RPC机制**:HBase使用远程过程调用(RPC)与RegionServer进行通信,处理数据请求。这个JAR包包含了相关的RPC实现。 4. **行键(RowKey)索引**:HBase是一种列族式数据库,行键是其主要的索引方式。`hbase-...
增加RPC调用数量,通过修改hbase-site.xml中的hbase.regionserver.handler.count属性来实现,以提高并发处理能力。 在HBase的使用中,如果直接将时间戳作为行健,会导致写入单个region时出现热点问题。因为HBase的...
Thrift的核心在于它的序列化机制和RPC(远程过程调用)框架,使得开发者可以轻松地在各种编程语言间构建和消费服务。 在PHP访问HBase时,由于HBase本身是用Java实现的,因此需要一个中间层来桥接PHP和HBase。这就是...
这些客户端封装了HBase的RPC协议,简化了在不同语言环境下的开发工作。以Python的happybase为例,它提供了一种面向对象的方式来操作HBase,使得代码更加简洁易懂。 五、连接管理和安全性 HBase客户端需要配置正确...
HRegionServer与HMaster及客户端之间的通信采用RPC协议,即远程过程调用协议,这是一种用于不同计算机系统间的进程间通信的方式。 8. **HFile中的KeyValue结构** 在HFile数据格式中,KeyValue数据结构的Value...
- **解决方案**:为了实现这一目标,HBase采用了一种新的RPC引擎,即Protobuf RPC引擎来替代原有的Writable RPC引擎。这种改变不仅提高了RPC请求/响应消息的效率,还通过使用Protobuf格式实现了更灵活的数据序列化。...
1. **Thrift接口**:Thrift提供了一种序列化和RPC(远程过程调用)机制,允许开发者定义服务接口,并在多种语言之间实现这些接口。Thrift1是早期版本,虽然现在已更新到Thrift2,但对某些场景仍然适用。它通过生成...
《HBase运维手册》主要涵盖了HBase数据库的运维核心要点,包括Region管理、缓存机制、读写性能、压缩操作、内存使用以及RPC调用等多个方面。以下是对这些知识点的详细解析: 1. **Region管理**: - Region数量:...
4. **RPC通信**:`org.apache.hadoop.hbase.ipc`包下的RpcServer实现了客户端与服务器之间的远程过程调用,处理客户端请求。 5. **版本控制与并发控制**:每个Cell都有时间戳,用于版本控制;`org.apache.hadoop....