Hbase的RPC主要由HBaseRPC、RpcEngine、HBaseClient、HBaseServer、VersionedProtocol 5个概念组成。
1、HBaseRPC是hbase RPC的实现类,核心方法:
1)、RpcEngine getProtocolEngine():返回RpcEngine对象
2)、<T extends VersionedProtocol> T waitForProxy():调用RpcEngine的getProxy()方法,返回一个远程代理对象,比如:第一次访问HRegionServer时需要执行该方法,设置代理后,会缓存该对象到HConnectionImplementation中。
2、RpcEngine接口,其实现类:WritableRpcEngine,核心方法:
1)、VersionedProtocol getProxy():返回代理对象,HRegionServer和HMaster均是VersionedProtocol的实现类,即返回的对象可代理执行HRegionServer和HMaster的方法;
2)、Object[] call():调用程序接口,最终是经过HBaseClient的内部类Connection通过socket方式完成;
3)、RpcServer getServer():返回RpcServer的实现类,有一个抽象实现: HBaseServer和HBaseServer的子类:WritableRpcEngine.Server。
4)、stopProxy()
3、HBaseClient:RPC的client端实现,最核心的方法是call(),通过该方法可执行服务端的方法,该类中有一个重要的内部类:HBaseClient.Connection,该类封装了socket,具体原理就是把要执行的方法通过socket告诉服务端,服务端通过HBaseServer类从socket中读出client端的调用方法,然后执行相关类的相应方法,结果再通过socket传回。
4、HBaseServer:RPC的server端实现,HBaseServer有两个重要的内部类,一个是HBaseServer.Connection,另一个是Handler类,这里的Connection从socket中读出call方法并放入callQueue队列中,Handler类从该队列中取出call方法(比如:scan查询时执行的一次next(),该方法会执行到服务端HRegionServer的next(),这里next就是call方法)并执行,结果通过socket输出给client端,Handler是Thread的子类,在RS启动时就会创建所有的Handle线程,然后一直执行,具体的handler线程数可以通过配置项hbase.regionserver.handler.count配置,默认是10。
5、VersionedProtocol,该接口的类图如下:
可进行RPC调用的类必须是该接口的实现类,hbase client、 RS、Master相互之间的访问总结为:
- HBase Client 通过HMasterInterface接口访问HMasterServer,通过HRegionInterface接口访问HRegionServer;
- HRegionServer通过HMasterRegionInterface接口访问HMasterServer;
- HMaster通过HRegionInterface接口访问HRegionServer,在访问RS时Master就是HBase Client的角色。
以scan查询为例画时序图,通过时序图详细理解HBase的RPC调用过程
关于HBase的RPC一些知识补充如下:
1、HBaseClient缓存了HBaseClient.Connection,默认一个client应用与每个RS均只有一个socket链接,可以通过以下两个配置项修改:
1)、hbase.client.ipc.pool.type:默认为RoundRobin,共三种,如下:
// PoolMap类的createPool方法,在HBaseClient缓存connection时会调用,从pool中获取connection时,如果缓存的数量没有达到poolMaxSize,则会返回null,从而创建新的connection对象 protected Pool<V> createPool() { switch (poolType) { case Reusable: return new ReusablePool<V>(poolMaxSize); //复用的池 case RoundRobin: return new RoundRobinPool<V>(poolMaxSize); //轮询 case ThreadLocal: return new ThreadLocalPool<V>(); //本地线程 } return null; } //PoolMap.RoundRobinPool类的get方法,缓存的数量没有达到poolMaxSize,则会返回null,这时会创建新的connection对象,不同的poolType有不同的实现,比如:ReusablePool的get方法是:return poll(); public R get() { if (size() < maxSize) { return null; } nextResource %= size(); R resource = get(nextResource++); return resource; } |
2)、hbase.client.ipc.pool.size:socket链接池大小,默认为1
2、详解HBaseClient的getConnection()方法:
//HBaseClient的getConnection方法,默认一个regionserver和一个master均只会建立一个socket链接,可以通过修改hbase.client.ipc.pool.size(默认值为1)增加socket链接数,可参看本文档上面几行的内容 protected Connection getConnection(InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout, Call call) throws IOException, InterruptedException { if (!running.get()) { throw new IOException("The client is stopped"); } Connection connection; //一个regionserver对应一个ConnectionId ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); synchronized (connections) { //connections是PoolMap类的实例,如果connections中remoteId对应的链接数量小于hbase.client.ipc.pool.size的配置值则会返回null connection = connections.get(remoteId); if (connection == null) { connection = createConnection(remoteId); //一个regionserver对应一个ConnectionId connections.put(remoteId, connection); } } connection.addCall(call);
//如果没有socket链接则建立socket链接 connection.setupIOstreams(); return connection; }
//ConnectionId的equals方法 public boolean equals(Object obj) { if (obj instanceof ConnectionId) { ConnectionId id = (ConnectionId) obj; return address.equals(id.address) && protocol == id.protocol && ((ticket != null && ticket.equals(id.ticket)) || (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; } return false; }
//HBaseClient内部类:Connection的setupIOstreams方法,在getConnection()中被调用 protected synchronized void setupIOstreams() throws IOException, InterruptedException { //如果有可用的socket对象则直接返回 if (socket != null || shouldCloseConnection.get()) { return; }
if (failedServers.isFailedServer(remoteId.getAddress())) { IOException e = new FailedServerException( "This server is in the failed servers list: " + remoteId.getAddress()); markClosed(e); close(); throw e; }
try { setupConnection(); this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream (new BufferedOutputStream(NetUtils.getOutputStream(socket))); writeHeader();
// update last activity time touch();
// start the receiver thread after the socket connection has been set up start(); } catch (Throwable t) { failedServers.addToFailedServers(remoteId.address); IOException e; if (t instanceof IOException) { e = (IOException)t; } else { e = new IOException("Could not set up IO Streams", t); } markClosed(e); close();
throw e; } } //HBaseClient内部类:Connection的setupConnection方法,在setupIOstreams ()中被调用 //在这里创建socket对象 protected synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); if (remoteId.rpcTimeout > 0) { pingInterval = remoteId.rpcTimeout; // overwrite pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, maxRetries, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } } |
3、Hbase有三个链接类:
org.apache.hadoop.hbase.client.HConnection
org.apache.hadoop.hbase.ipc.HBaseClient.Connection
org.apache.hadoop.hbase.ipc.HBaseServer.Connection
l HConnection接口
实现类:HConnectionImplementation,该链接是client与hbase集群这个层面的链接对象,一个集群的一个client就一个该链接对象,在该对象持有
1) RpcEngine对象
2) ZooKeeperWatcher 对象
3) master的RPC代理对象:HMasterInterface
4) regionserver的RPC代理对象:HRegionInterface(一个rs对应一个HRegionInterface代理对象)
5) 缓存的region的location信息
6) 线程池batchPool,batchPool用于HTable的如下方法:
其中批量get查询api:get(List<Get> gets)会调用batch方法,而单条查询get不会,只要是可能涉及多个regionserver的操作均会使用多线程处理,像批量get、批量delete、put,scan一次只能查询一个RS,因此虽然功能上是批量查询数据,但是不需要线程池。
有两个地方均可能创建该批量操作的线程池(注意是线程池不是连接池),分别是HTablePool和HConnectionImplementation,如果通过HTablePool获取HTable对象则采用HTablePool的线程池,如果采用HConnectionImplementation获取HTable对象,则采用HConnectionImplementation的线程池,这个取决于hbase client程序的用法,HTablePool已经是不推荐的方式,0.94.12的版本推荐通过HConnection获取HTable。
// HTable的默认线程池,HTablePool每次创建HTable时均会创建一个直接提交的线程池(采用SynchronousQueue队列),该线程池的特点是不会缓存任务,有任务会直接执行,缺点是:如果并发大会导致同时存活大量的线程,优点是不会缓存任务,从而不会存在任务堆积过多导致jvm内存暴涨,不过开启过多的线程也会导致大量的消耗内存 private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = 1; // is there a better default? } long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table")); pool.allowCoreThreadTimeOut(true); return pool; }
// HConnectionImplementation的默认线程池 private ExecutorService getBatchPool() { if (batchPool == null) { // shared HTable thread executor not yet initialized synchronized (this) { if (batchPool == null) { int maxThreads = conf.getInt("hbase.hconnection.threads.max", Integer.MAX_VALUE); if (maxThreads == 0) { maxThreads = Runtime.getRuntime().availableProcessors(); } long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); this.batchPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxThreads, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-connection-shared-executor")); ((ThreadPoolExecutor) this.batchPool).allowCoreThreadTimeOut(true); } this.cleanupPool = true; } } return this.batchPool; } |
由源码可以看出:
两个线程池均是采用的直接提交线程池,唯一区别是创建ThreadPoolExecutor对象时指定的corePoolSize不一样,HTablePool是1,HConnection是Runtime.getRuntime().availableProcessors()。
l HBaseClient.Connection和HBaseServer.Connection
这两个Connection链接类是对socket的封装,HBaseClient.Connection类的实例数默认等于1个Master+ RS个数,可以通过hbase.client.ipc.pool.size配置,默认为1,如果是2则client与Master和每个RS均有两个Connection类的实例,也可理解为有2个socket链接,也可配置hbase.client.ipc.pool.type(默认为轮询)修改socket连接池的类型。
总之:HConnection缓存HMasterInterface和HRegionInterface的RPC代理对象,HMasterInterface和HRegionInterface的RPC代理对象最终均是通过Connection类建立的socket与服务端通信, master和每个RS均只对应一个RPC代理对象,每个RPC代理对象默认对应一个Connection对象,一个Connection对象持有一个socket链接。
4、关于参数hbase.regionserver.lease.period:RS的租凭期,RS的租凭设计用于当client端持有RS资源的场景,主要用于scan操作,RS会为client端保留scanner对象,以便多次交互,默认60秒,客户端必须在该时间内向RS发送心跳信息,否则RS认为client是deaded,超过该时间请求RS时,RS会抛出异常,对于scan操作一次next读取相当于一次心跳(参看:Leases类),在client端用该时间作为scan查询时每次next()的超时时间。
5、关于hbase.rpc.timeout配置:每次RPC的超时时间,默认为60000,如果没有超时则等待1s后再重试,直到超时或者重试成功,起三个作用:
1) Socket读数据的超时时间。如果超过超时值,将引发 java.net.SocketTimeoutException具体解释请参考:java.net.Socket.setSoTimeout()的API
2) 控制整个HBaseRPC.waitForProxy()方法的超时时间,在该方法中RPC远程执行HRegionServer的getProtocolVersion()方法,检查client和server端的协议版本,这个过程的总时间不能超过该配置时间,在RPC的过程中涉及建立socket链接和socket通信,因此该时间应该大于或者等于建立链接(ipc.socket.timeout)的时间(socket 通信时间=整体时间),如果小于建立链接的时间,则多导致无用的建立链接等待,就算建立成功但是也会因为整体超时后抛出异常。
3) 在定位root表所在的regionserver地址时,会从zk中获取root表的regionserver信息,该过程的超时时间由该参数控制,当zk不可用等原因时,会返回null,这时就会阻塞直到超时,这个设计应该是有问题,阻塞没有任何意义。
HBaseRPC有setRpcTimeout()方式,在一些场景会通过该方法修改rpc超时时间,通过HBaseRPC的如下方法可以看出,hbase最终采用的时间是编程方式制定的超时时间与hbase.rpc.timeout配置的时间中的最小时间:
//HBaseRPC public static void setRpcTimeout(int rpcTimeout) { HBaseRPC.rpcTimeout.set(rpcTimeout); } //HBaseRPC public static int getRpcTimeout(int defaultTimeout) {// defaultTimeout即hbase.rpc.timeout配置的时间 return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get()); } //WritableRpcEngine public <T extends VersionedProtocol> T getProxy( Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout) throws IOException { T proxy = (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(client, protocol, addr, User.getCurrent(), conf, HBaseRPC.getRpcTimeout(rpcTimeout))); }
|
通过编程方式设置rpc超时时间的操作只来自ServerCallable类的beforeCall()方法:
public void beforeCall() { this.startTime = EnvironmentEdgeManager.currentTimeMillis(); int remaining = (int)(callTimeout - (this.startTime - this.globalStartTime)); if (remaining < MIN_RPC_TIMEOUT) { remaining = MIN_RPC_TIMEOUT; } HBaseRPC.setRpcTimeout(remaining); } |
scan、get、delete等上十个与server交互的操作类均是通过ServerCallable实现。
6、HBase中client与server的socket连接是通过hadoop的org.apache.hadoop.net.NetUtils类实现,在hadoop的1.2.1版本中NetUtils类是基于nio实现socket通信的。
7、关于ipc.socket.timeout:默认20s,该参数控制java.nio.channels.Selector的select(timeout)超时时间,如果20s通道没有就绪则抛出超时异常,也即是socket建立连接的超时时间,读数据的超时时间由hbase.rpc.timeout配置。
8、HBaseClient的call方法
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) { Call call = new Call(param); Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter while (!call.done) { try { //如果远端调用没有执行完,则会每隔1s钟检查一次,可见是比较低效的。 call.wait(1000); } catch (InterruptedException ignored) { interrupted = true; } } |
相关推荐
### HBase开启审计日志详解 #### 一、概述 HBase是一款分布式列式存储系统,基于Google的Bigtable论文实现。它具有高可靠性、高性能、面向列、可伸缩的特点,非常适合处理海量数据。在大数据领域,HBase被广泛用于...
《HbaseClient详解》 HbaseClient是Apache HBase的核心组件之一,它是客户端与HBase分布式数据库进行交互的桥梁。本文将深入探讨HbaseClient的工作原理、主要功能以及使用技巧,帮助读者更好地理解和掌握HBase的...
《HBase客户端详解》 HBase,作为Apache Hadoop生态系统中的一个分布式、版本化、基于列族的NoSQL数据库,提供了高效的数据存储和查询能力。HBase客户端是连接HBase服务器并与之交互的重要工具,它使得开发者能够在...
### HBase技术详解 #### 一、HBase概述 **HBase**,全称为Hadoop Database,是一款构建在Hadoop之上、面向列的分布式数据库系统。它具备高可靠性、高性能和可扩展性等特点,能够在成本相对低廉的硬件设备上构建大...
### HBase集群搭建详解 HBase是基于Hadoop的一个分布式、可扩展的列式存储系统,主要用于处理海量数据。为了确保系统的稳定性和高效性,在实际部署时通常会采用集群模式进行搭建。本文将从以下几个方面对HBase集群...
### HBase性能调优策略详解 #### 一、服务端参数配置优化 针对HBase的服务端性能调优,可以通过合理调整一系列关键参数来提升系统的稳定性和效率。下面将详细介绍几个核心参数及其调优方法。 ##### 1. `hbase....
### HBase简介及安装知识点详解 #### 一、HBase概述 **HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写...
【HBase运维工具详解】 HBase,作为一款流行的NoSQL分布式数据库,因其复杂的设计和流程,对于缺乏大数据经验的运维人员来说,管理起来具有一定的挑战。为了帮助开发人员和运维人员更好地管理和维护HBase集群,这里...
### HBase源码分析——关键知识点详解 #### 一、HBase性能测试总结与环境配置 **测试环境:** - **硬件配置:** - 1台客户端机器 - 5台RegionServer服务器 - 1台Master服务器 - 3台Zookeeper服务器 - **软件...
客户端首先通过Get()方法获取HDFS文件系统的实例,然后DistributedFileSystem利用RPC(远程过程调用)远程调用NameNode来决定文件数据块的位置信息。客户端通过FSDataInputStream实例的Read()方法读取数据,...
### Hadoop默认端口清单详解 #### Hadoop概述 Hadoop是一个开源软件框架,用于分布式存储和处理大型数据集。其核心组件包括HDFS(Hadoop Distributed File System)、MapReduce和YARN(Yet Another Resource ...
- **Hadoop底层IPC原理和RPC**:探讨Hadoop内部通信机制,包括RPC(远程过程调用)的实现细节。 - **Hadoop底层googleProtoBuf的协议分析**:分析Google Protobuf在Hadoop中的应用情况。 #### 四、分布式数据库...
- **Hadoop底层IPC原理和RPC**:讲解Hadoop内部的远程过程调用机制。 - **Hadoop的底层googleProtoBuf的协议分析**:分析Google Protobuf协议在Hadoop中的应用。 #### 四、分布式数据库与数据仓库 - **Hbase**:...
【Java框架】SpringBoot整合分布式Dubbo+Zookeeper的知识点详解 1. 分布式系统概念 分布式系统是由多台独立的计算机组成的集合,这些计算机对用户来说表现为单一系统。它们协同工作,提供单一服务或多个功能,使得...
【大数据相关软件详解】 大数据分析涉及多个层面,包括数据采集、数据存储、数据分析以及数据可视化。以下是对描述中提到的一些关键大数据软件的详细介绍: 1. **Hadoop**:Hadoop是一个开源的分布式计算框架,...
### OpenTSDB 知识点详解 #### 一、OpenTSDB是什么? OpenTSDB是一种分布式、可扩展的时间序列数据库,其设计目的是为了处理大量来自各种监控系统的数据,例如网络设备、操作系统或应用程序的监控数据。它基于...
### Hadoop基本概念详解 #### 一、Hadoop概述 Hadoop是一个开源软件框架,用于分布式存储和处理大型数据集。它最初由Apache基金会开发,旨在解决海量数据处理的问题。Hadoop的核心组件包括HDFS(Hadoop ...
- Hive是在Hadoop之上的数据仓库工具,允许用户使用类似SQL的查询语言HQL对存储在Hadoop的数据进行数据提取、转换和加载(ETL),简化了大数据处理流程。 7. **MapReduce** - MapReduce是一种分布式数据处理框架...
Source支持多种类型,如Avro、Thrift、Console(控制台)、RPC(Thrift-RPC)、Text(文件)、Tail(UNIX tail,跟踪文件变化)、Syslog(系统日志,支持TCP和UDP)以及Exec(命令执行)。Source接收到数据后,以...