HBase RPC的client主要工作:
1.JDK动态代理获取代理类
2.调用对应服务方法,逻辑包装在Invoker里,新建连接,发送数据包,等待server端响应,默认超时60s
3.超时使用wait+loop检查实现
其类图如下
0.94实现如下
HBaseRPC getProxy入口
public static VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { //获取代理类 VersionedProtocol proxy = getProtocolEngine(protocol,conf) .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout())); //发起rpc调用 long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { return proxy; } throw new VersionMismatch(protocol.getName(), clientVersion, serverVersion); }
public VersionedProtocol getProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { //jdk动态代理 VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); if (proxy instanceof VersionedProtocol) { long serverVersion = ((VersionedProtocol)proxy) .getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion != clientVersion) { throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, serverVersion); } } return proxy; }
调用发起 invoker
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } //调用 HbaseObjectWritable value = (HbaseObjectWritable) client.call(new Invocation(method, args), address, protocol, ticket, rpcTimeout); if (logDebug) { // FIGURE HOW TO TURN THIS OFF! long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); }
Invocation组装
public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; if (method.getDeclaringClass().equals(VersionedProtocol.class)) { //VersionedProtocol is exempted from version check. clientVersion = 0; clientMethodsHash = 0; } else { try { Field versionField = method.getDeclaringClass().getField("VERSION"); versionField.setAccessible(true); this.clientVersion = versionField.getLong(method.getDeclaringClass()); } catch (NoSuchFieldException ex) { throw new RuntimeException("The " + method.getDeclaringClass(), ex); } catch (IllegalAccessException ex) { throw new RuntimeException(ex); } this.clientMethodsHash = ProtocolSignature.getFingerprint(method .getDeclaringClass().getMethods()); } }
调用过程
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException {、 //Call代表一次请求,具有唯一id Call call = new Call(param); //获取连接,此处会创建连接,初始化IO操作 Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); //发送请求 connection.sendParam(call); // send the parameter boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter //业务线程在此等待server端响应 synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } //如果有异常,则抛出 if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } // local exception throw wrapException(addr, call.error); } //否则,返回调用值 return call.value; } }
Call初始化
protected Call(Writable param) { this.param = param; this.startTime = System.currentTimeMillis(); //唯一id synchronized (HBaseClient.this) { this.id = counter++; } }
getConnection操作
protected Connection getConnection(InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout, Call call) throws IOException, InterruptedException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ //每个连接唯一标示,不同标示会建立不同连接 ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } //将请求添加到内部Map,方便后续server返回数据时处理 while (!connection.addCall(call)); //we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, //it will take longer to establish a connection and that will slow the //entire system down. //新建连接,初始化IO connection.setupIOstreams(); return connection; }
新建连接过程
protected synchronized void setupIOstreams() throws IOException, InterruptedException { if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+remoteId); } //创建连接 setupConnection(); //初始化IO this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream (new BufferedOutputStream(NetUtils.getOutputStream(socket))); //发送header,还在buffer中 writeHeader(); // update last activity time //最新活动时间 touch(); //启动读线程,该线程监听OS的READ事件,负责从server端读取数据 // start the receiver thread after the socket connection has been set up start(); } catch (IOException e) { markClosed(e); close(); throw e; } }
创建连接
protected synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { //默认是socketFactory是StandardSocketFactory this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s //连接,直到连上,默认超时20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); if (remoteId.rpcTimeout > 0) { pingInterval = remoteId.rpcTimeout; // overwrite pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, maxRetries, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
StandardSocketFactory创建socket
public Socket createSocket() throws IOException { /* * NOTE: This returns an NIO socket so that it has an associated * SocketChannel. As of now, this unfortunately makes streams returned * by Socket.getInputStream() and Socket.getOutputStream() unusable * (because a blocking read on input stream blocks write on output stream * and vice versa). * * So users of these socket factories should use * NetUtils.getInputStream(socket) and * NetUtils.getOutputStream(socket) instead. * * A solution for hiding from this from user is to write a * 'FilterSocket' on the lines of FilterInputStream and extend it by * overriding getInputStream() and getOutputStream(). */ //打开一个channel return SocketChannel.open().socket(); }
连接过程
public static void connect(Socket socket, SocketAddress endpoint, int timeout) throws IOException { if (socket == null || endpoint == null || timeout < 0) { throw new IllegalArgumentException("Illegal argument for connect()"); } SocketChannel ch = socket.getChannel(); if (ch == null) { // let the default implementation handle it. socket.connect(endpoint, timeout); } else { SocketIOWithTimeout.connect(ch, endpoint, timeout); } ...... }
static void connect(SocketChannel channel, SocketAddress endpoint, int timeout) throws IOException { //使用NIO boolean blockingOn = channel.isBlocking(); if (blockingOn) { channel.configureBlocking(false); } //尝试连接 try { if (channel.connect(endpoint)) { return; } //如果还没连上,则开启一个selector,监听CONNECT事件 long timeoutLeft = timeout; long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0; while (true) { // we might have to call finishConnect() more than once // for some channels (with user level protocols) //CONNECT是否就位 int ret = selector.select((SelectableChannel)channel, SelectionKey.OP_CONNECT, timeoutLeft); //如果就位,再看看channel是否已连接 if (ret > 0 && channel.finishConnect()) { return; } //如果没就位,或者超时了,抛出异常 if (ret == 0 || (timeout > 0 && (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) { throw new SocketTimeoutException( timeoutExceptionString(channel, timeout, SelectionKey.OP_CONNECT)); } } } catch (IOException e) { // javadoc for SocketChannel.connect() says channel should be closed. try { channel.close(); } catch (IOException ignored) {} throw e; } finally { if (blockingOn && channel.isOpen()) { channel.configureBlocking(true); } } }
再看SelectorPool的select过程
int select(SelectableChannel channel, int ops, long timeout) throws IOException { //从pool中拿一个selector,SelectorPool维护着一个Provider列表,每个provider都有一个selector队列 SelectorInfo info = get(channel); SelectionKey key = null; int ret = 0; try { while (true) { long start = (timeout == 0) ? 0 : System.currentTimeMillis(); //CONNECT就位事件 key = channel.register(info.selector, ops); //阻塞select,直到超时 ret = info.selector.select(timeout); if (ret != 0) { return ret; } /* Sometimes select() returns 0 much before timeout for * unknown reasons. So select again if required. */ //如果超时了,返回0,没超时则继续select if (timeout > 0) { timeout -= System.currentTimeMillis() - start; if (timeout <= 0) { return 0; } } if (Thread.currentThread().isInterrupted()) { throw new InterruptedIOException("Interruped while waiting for " + "IO on channel " + channel + ". " + timeout + " millis timeout left."); } } } finally { //处理完后,清理key if (key != null) { key.cancel(); } //clear the canceled key. try { info.selector.selectNow(); } catch (IOException e) { LOG.info("Unexpected Exception while clearing selector : " + StringUtils.stringifyException(e)); // don't put the selector back. info.close(); return ret; } release(info); } }
连上之后,初始化IO
public SocketInputStream(ReadableByteChannel channel, long timeout) throws IOException { SocketIOWithTimeout.checkChannelValidity(channel); reader = new Reader(channel, timeout); } SocketIOWithTimeout(SelectableChannel channel, long timeout) throws IOException { checkChannelValidity(channel); this.channel = channel; this.timeout = timeout; // Set non-blocking channel.configureBlocking(false); } public SocketOutputStream(WritableByteChannel channel, long timeout) throws IOException { SocketIOWithTimeout.checkChannelValidity(channel); writer = new Writer(channel, timeout); }
写header
private void writeHeader() throws IOException { //固定4个字节 out.write(HBaseServer.HEADER.array()); //版本信息,一个字节 out.write(HBaseServer.CURRENT_VERSION); //When there are more fields we can have ConnectionHeader Writable. DataOutputBuffer buf = new DataOutputBuffer(); //header序列化byte header.write(buf); //写长度,4个字节,写内容 int bufLen = buf.getLength(); out.writeInt(bufLen); out.write(buf.getData(), 0, bufLen); }
之后IO READ线程Connection启动
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //如果没有请求,则等一段时间 while (waitForWork()) {//wait here for work - read or close connection receiveResponse(); } } catch (Throwable t) { LOG.warn("Unexpected exception receiving call responses", t); markClosed(new IOException("Unexpected exception receiving call responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
数据读取
protected void receiveResponse() { if (shouldCloseConnection.get()) { return; } //活动时间 touch(); try { // See HBaseServer.Call.setResponse for where we write out the response. // It writes the call.id (int), a flag byte, then optionally the length // of the response (int) followed by data. // Read the call id. //请求的id int id = in.readInt(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); //从map中获取请求 Call call = calls.get(id); //成功还是失败 // Read the flag byte byte flag = in.readByte(); boolean isError = ResponseFlag.isError(flag); if (ResponseFlag.isLength(flag)) { // Currently length if present is unused. in.readInt(); } //RPC状态 int state = in.readInt(); // Read the state. Currently unused. //如果请求异常,则获取异常信息,包装成异常 if (isError) { if (call != null) { //noinspection ThrowableInstanceNeverThrown call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in))); } } else { //反序列化,HbaseObjectWritable,和server保持一致 Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. //call还没被删掉,则设置返回值 if (call != null) { call.setValue(value); } } //完事后,从map中删除 calls.remove(id); } catch (IOException e) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. closeException = e; } else { // Since the server did not respond within the default ping interval // time, treat this as a fatal condition and close this connection markClosed(e); } } catch (Exception e) { markClosed(new IOException(e.getMessage(), e)); } finally { //检查超时请求并处理 if (remoteId.rpcTimeout > 0) { cleanupCalls(remoteId.rpcTimeout); } } }
超时请求检查和处理
protected void cleanupCalls(long rpcTimeout) { Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator(); while (itor.hasNext()) { Call c = itor.next().getValue(); long waitTime = System.currentTimeMillis() - c.getStartTime(); //超时了。写回一个超时异常 if (waitTime >= rpcTimeout) { if (this.closeException == null) { // There may be no exception in the case that there are many calls // being multiplexed over this connection and these are succeeding // fine while this Call object is taking a long time to finish // over on the server; e.g. I just asked the regionserver to bulk // open 3k regions or its a big fat multiput into a heavily-loaded // server (Perhaps this only happens at the extremes?) this.closeException = new CallTimeoutException("Call id=" + c.id + ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); } c.setException(this.closeException); synchronized (c) { c.notifyAll(); } itor.remove(); } //calls是一个排序map,按照key的字典序,key是id,一个integer,也就是按照id升序,如果最老的请求(id最小)还没超时,则后续请求都没超时,直接break else { break; } } //修改SO_TIMEOUT try { if (!calls.isEmpty()) { Call firstCall = calls.get(calls.firstKey()); long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); if (maxWaitTime < rpcTimeout) { rpcTimeout -= maxWaitTime; } } if (!shouldCloseConnection.get()) { closeException = null; if (socket != null) { socket.setSoTimeout((int) rpcTimeout); } } } catch (SocketException e) { LOG.debug("Couldn't lower timeout, which may result in longer than expected calls"); } } }
Reader/WRITER IO操作
int doIO(ByteBuffer buf, int ops) throws IOException { /* For now only one thread is allowed. If user want to read or write * from multiple threads, multiple streams could be created. In that * case multiple threads work as well as underlying channel supports it. */ if (!buf.hasRemaining()) { throw new IllegalArgumentException("Buffer has no data left."); //or should we just return 0? } while (buf.hasRemaining()) { if (closed) { return -1; } //这里执行reader/writer的操作,基本就是对channel的read/write操作 try { int n = performIO(buf); //有数据处理,则返回,有可能只处理了部分数据,上层保证数据是否完整 if (n != 0) { // successful io or an error. return n; } } catch (IOException e) { if (!channel.isOpen()) { closed = true; } throw e; } //now wait for socket to be ready. //一个数据都没处理,则继续监听对应READ/WRITE事件 int count = 0; try { count = selector.select(channel, ops, timeout); } catch (IOException e) { //unexpected IOException. closed = true; throw e; } //超时了,否则继续执行读写操作,直到buffer被全部处理 if (count == 0) { throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops)); } // otherwise the socket should be ready for io. } return 0; // does not reach here. }
IO read线程启动之后,业务线程开始写数据
protected void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } // For serializing the data to be written. final DataOutputBuffer d = new DataOutputBuffer(); try { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //占位 d.writeInt(0xdeadbeef); // placeholder for data length //请求id,4个字节 d.writeInt(call.id); //把param序列化 call.param.write(d); //要写的数据 byte[] data = d.getData(); int dataLength = d.getLength(); // fill in the placeholder //替换 Bytes.putInt(data, 0, dataLength - 4); //noinspection SynchronizeOnNonFinalField //写并flush synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC out.write(data, 0, dataLength); out.flush(); } } catch(IOException e) { markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } }
SocketOutputStream写
public void write(byte[] b, int off, int len) throws IOException { ByteBuffer buf = ByteBuffer.wrap(b, off, len); //循环写,直到写完或抛异常 while (buf.hasRemaining()) { try { if (write(buf) < 0) { throw new IOException("The stream is closed"); } } catch (IOException e) { /* Unlike read, write can not inform user of partial writes. * So will close this if there was a partial write. */ if (buf.capacity() > buf.remaining()) { writer.close(); } throw e; } } }
业务线程发送请求后,就进入等待状态,read线程则等待server端返回的数据,将返回数据反序列化后,放入call对象,并唤醒业务线程继续处理
相关推荐
在HBase的RPC框架中,我们可以看到它将通信的角色分配为客户端和服务端。客户端通过调用服务端提供的接口进行远程调用,而服务端则实现这些接口以响应客户端的请求。为了实现这一通信机制,HBase中定义了三个通信...
在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和HRegionServer通信。对于数据读写操作,Client直接与HRegionServer交互,而对于表管理和元数据操作,Client则与HMaster...
`hbase-protocol-1.2.6.jar`定义了HBase的RPC协议,用于客户端与服务器之间的通信。 为了处理Zookeeper,你需要`zookeeper-3.4.6.jar`,这是一个分布式协调服务,HBase使用它来管理集群的状态信息和元数据。`...
HBase则是建立在Hadoop之上的分布式列式数据库,特别适合实时查询大规模数据集。 标题“hadoop2.73-eclipse开发hbase所需要的所有jar包”表明这是一个针对Eclipse的开发资源包,包含了使用Hadoop 2.7.3版本开发...
HBase构建在Hadoop的HDFS之上,这意味着HBase利用HDFS来存储其底层数据,从而获得HDFS提供的高可靠性和高容错性。 4. **消息通信机制** HBase使用Apache Zookeeper来提供消息通信机制,这包括协调服务、命名服务...
- **hbase-protocol.jar**:提供了HBase的RPC协议,客户端和服务端通信需要。 - **zookeeper.jar**:HBase依赖Zookeeper进行集群协调,所以需要Zookeeper的JAR。 - **slf4j-api.jar**和**slf4j-log4j12.jar**:...
Hadoop是一个分布式存储和计算框架,而HBase是一个构建在Hadoop之上的非关系型数据库(NoSQL),特别适合处理大规模数据。这里我们将详细探讨Java如何与这两个组件进行交互,并重点关注所需的jar包。 首先,Java...
在异步与吞吐优化中,针对流式计算对实时性高和秒级毛刺的挑战,阿里集团通过基于Netty实现non-blocking client以及基于protobuf的non-blocking Stub/RpcCallback来提升吞吐。在与Flink集成后,实测吞吐较同步模式...
Client是整个HBase系统的入口客户端使用RPC协议与HMaster和RegionServer进行通信对于管理类(表的增删)操作,Client与HMaster进行RPC通信对于数据读写类操作Client与RegionServer进行RPC交互客户端可以是多个,也...
1. **Client**:客户端通过RPC(Remote Procedure Call)与HBase交互,既可以直接与HMaster通信进行管理操作,也可以直接与HRegionServer通信进行数据读写。 2. **Zookeeper**:Zookeeper集群在HBase中扮演着至关...
3. **hbase-protocol.jar**:包含了HBase的RPC协议,用于客户端和HBase服务器之间的通信。 4. **hbase-server.jar**:如果在客户端执行操作,如管理表或Region,可能需要这个服务器端的jar包,尽管通常情况下只需要...
4. **Hbase/Hadoop集成及应用开发**:HBase是一个构建在Hadoop之上的分布式列式存储系统,适用于存储海量结构化数据。与Hadoop结合可以实现高性能的数据存储和查询。 5. **YARN框架源码分析**:深入理解YARN的工作...
在Hadoop的RPC实现方法中,Client类、Server类、RPC类以及HDFS通信协议组是核心。这些组件共同协作实现远程过程调用,使得HDFS中的各个组件能够相互交流和协作。通过这些组件,HDFS能够处理客户端请求,并在集群内部...
- **Hadoop底层IPC原理和RPC**:探讨Hadoop内部通信机制,包括RPC(远程过程调用)的实现细节。 - **Hadoop底层googleProtoBuf的协议分析**:分析Google Protobuf在Hadoop中的应用情况。 #### 四、分布式数据库...
ZooKeeper为HBase提供了分布式协调服务,使得HBase能够实现一致性选举、故障恢复等功能。 12. **LSM数据结构** - **知识点**:Log-Structured Merge Tree (LSM Tree)的工作原理。 - **详细解析**:LSM是一种数据...