第六章:小朱笔记hadoop之源码分析-ipc分析
第三节:Server类分析
启动Listener进程,如果收到需要建立连接的请求,将建立连接,然后在上面捕获读操作的命令。收到命令之后,将把解析客户端发过来信息的工作委派给Connection。Connection把信息封装到Call对象中,放入队列中,待Handler处理。启动指定数目的Handler线程,处理客户端对指定方法调用的请求,然后把结果返回给客户端。
(1)nio的reactor模式
(a)一个线程来处理所有连接(使用一个Selector)
(b)一组线程来读取已经建立连接的数据(多个Selector,这里的线程数一般和cpu的核数相当);
(c)一个线程池(这个线程池大小可以根据业务需求进行设置)
(d)一个线程处理所有的连接的数据的写操作(一个Selector)
(2)RPC Server主要流程
RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。
接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:
(a)Listener线程监视RPC Client发送过来的数据。
(b)当有数据可以接收时,调用Connection的readAndProcess方法。
(c)Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。
(d)处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:
(e)Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。
(f)将Call交给RPC.Server处理。
(g)借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。
(h)返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出数据,则交由Server.Responder来完成。
(3)server类的结构
/** * An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on a * port and is defined by a parameter class and a value class. * * @see Client */ public abstract class Server { private final boolean authorize; private boolean isSecurityEnabled; /** * The first four bytes of Hadoop RPC connections */ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); // 1 : Introduce ping and server does not throw away RPCs // 3 : Introduce the protocol into the RPC connection header // 4 : Introduced SASL security layer public static final byte CURRENT_VERSION = 4; /** * How many calls/handler are allowed in the queue. */ private static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100; private static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size"; /** * Initial and max size of response buffer */ static int INITIAL_RESP_BUF_SIZE = 10240; static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size"; static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024; public static final Log LOG = LogFactory.getLog(Server.class); private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + Server.class.getName()); private static final String AUTH_FAILED_FOR = "Auth failed for "; private static final String AUTH_SUCCESSFULL_FOR = "Auth successfull for "; private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>(); private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>(); private String bindAddress; // 服务端绑定的地址 private int port; // port we listen on 服务端监听端口 private int handlerCount; // number of handler threads处理线程的数量 private int readThreads; // number of read threads private Class<? extends Writable> paramClass; // class of call // parameters调用的参数的类,必须实现Writable序列化接口 private int maxIdleTime; // the maximum idle time after 当一个客户端断开连接后的最大空闲时间 // which a client may be disconnected private int thresholdIdleConnections; // the number of idle // connections可维护的最大连接数量 // after which we will start // cleaning up idle // connections int maxConnectionsToNuke; // the max number of // connections to nuke // during a cleanup protected RpcInstrumentation rpcMetrics; // 维护RPC统计数据 private Configuration conf; private SecretManager<TokenIdentifier> secretManager; private int maxQueueSize;// 处理器Handler实例队列大小 private final int maxRespSize; private int socketSendBufferSize; // Socket Buffer大小 private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm volatile private boolean running = true; // true while server runs private BlockingQueue<Call> callQueue; // queued calls// 维护调用实例的队列 private List<Connection> connectionList = Collections.synchronizedList(new LinkedList<Connection>());// 维护客户端连接的列表 // maintain a list // of client connections private Listener listener = null;// // 监听Server Socket的线程,为处理器Handler线程创建任务 private Responder responder = null;// // 响应客户端RPC调用的线程,向客户端调用发送响应信息 private int numConnections = 0; private Handler[] handlers = null; // // 处理器Handler线程数组 /** * This is set to Call object before Handler invokes an RPC and reset after * the call returns. */ private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>(); ........ }
这里的Server类是个抽象类,唯一抽象的地方,就是
public abstract Writable call(Writable param, long receiveTime) throws IOException;由RPC.server来实现。
(a)Call
用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;
/** * A call queued for handling. * * 该类Server端使用队列维护的调用实体类 * */ private static class Call { private int id; // 客户端调用Call的ID private Writable param; // 客户端调用传递的参数 private Connection connection; // 到客户端的连接实例 private long timestamp; // 向客户端调用发送响应的时间戳 private ByteBuffer response; // 向客户端调用响应的字节缓冲区 ....... }
(b)Listener
监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列。
/** * Listens on the socket. Creates jobs for the handler threads * * 用来监听服务器Socket,并未Handler处理器线程创建处理任务 * * 在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法: * */ private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; // the accept channel private Selector selector = null; // the selector that we use for the // server private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; // the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; // the last time when a cleanup // connec- // -tion (for idle connections) ran private long cleanupInterval = 10000; // the minimum interval between // two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port);// 根据bindAddress和port创建一个Socket地址 // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); // 创建一个Server // Socket通道(ServerSocketChannel) acceptChannel.configureBlocking(false); // 设置Server Socket通道为非阻塞模式 // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); // Could be an // ephemeral port // // Socket绑定端口 // create a selector; selector = Selector.open(); // 创建一个选择器(使用选择器,可以使得指定的通道多路复用) readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); // 启动多个reader线程,为了防止请求多时服务端响应延时的问题 for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. // / // 注册连接事件 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server // Socket接受的操作集合 this.setName("IPC Server listener on " + port); // 设置监听线程名称 this.setDaemon(true); // 设置为后台线程 }
(c)Responder
响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端。
// Sends responses of RPC back to clients. // 该线程类实现发送RPC响应到客户端 // 通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况: // // 1、一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉; // // 2、另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。 private class Responder extends Thread { private Selector writeSelector; private int pending; // connections waiting to register final static int PURGE_INTERVAL = 900000; // 15mins Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); writeSelector = Selector.open(); // create a selector pending = 0; } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); long lastPurgeTime = 0; // last check for old calls. while (running) { try { waitPending(); // If a channel is being registered, wait.// // 等待一个通道中,接收到来的调用进行注册 writeSelector.select(PURGE_INTERVAL);// 设置超时时限 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合 SelectionKey key = iter.next(); iter.remove(); try { if (key.isValid() && key.isWritable()) { doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据 } } catch (IOException e) { LOG.info(getName() + ": doAsyncWrite threw exception " + e); } } long now = System.currentTimeMillis(); if (now < lastPurgeTime + PURGE_INTERVAL) { continue; } lastPurgeTime = now; // // If there were some calls that have not been sent out for // a // long time, discard them. // LOG.debug("Checking for old call responses."); ArrayList<Call> calls; // get the list of channels from list of keys. // 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL // 则这些调用被视为过期调用,进行清除 synchronized (writeSelector.keys()) { calls = new ArrayList<Call>(writeSelector.keys().size()); iter = writeSelector.keys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); Call call = (Call) key.attachment(); if (call != null && key.channel() == call.connection.channel) { calls.add(call); } } } for (Call call : calls) { try { doPurge(call, now); } catch (IOException e) { LOG.warn("Error in purging old calls " + e); } } } catch (OutOfMemoryError e) { // // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish // LOG.warn("Out of Memory in server select", e); try { Thread.sleep(60000); } catch (Exception ie) { } } catch (Exception e) { LOG.warn("Exception in Responder " + StringUtils.stringifyException(e)); } } LOG.info("Stopping " + this.getName()); } // 当某个通道上可写的时候,可以执行异步写响应数据的操作,实现方法为: private void doAsyncWrite(SelectionKey key) throws IOException { Call call = (Call) key.attachment(); if (call == null) { return; } if (key.channel() != call.connection.channel) { throw new IOException("doAsyncWrite: bad channel"); } synchronized (call.connection.responseQueue) { if (processResponse(call.connection.responseQueue, false)) {// 调用processResponse处理与调用关联的响应数据 try { key.interestOps(0); } catch (CancelledKeyException e) { /* * The Listener/reader might have closed the socket. We * don't explicitly cancel the key, so not sure if this * will ever fire. This warning could be removed. */ LOG.warn("Exception while changing ops : " + e); } } } } // // Remove calls that have been pending in the responseQueue // for a long time. // /** * 如果未被处理响应的调用在队列中滞留超过指定时限,要定时清除掉 */ private void doPurge(Call call, long now) throws IOException { LinkedList<Call> responseQueue = call.connection.responseQueue; synchronized (responseQueue) { Iterator<Call> iter = responseQueue.listIterator(0); while (iter.hasNext()) { call = iter.next(); if (now > call.timestamp + PURGE_INTERVAL) { closeConnection(call.connection); break; } } } } // Processes one response. Returns true if there are no more pending // data for this channel. // /** * 处理一个通道上调用的响应数据 如果一个通道空闲,返回true */ private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException { boolean error = true; boolean done = false; // there is more data for this channel. // // 一个通道channel有更多的数据待读取 int numElements = 0; Call call = null; try { synchronized (responseQueue) { // // If there are no items for this channel, then we are done // numElements = responseQueue.size(); if (numElements == 0) { error = false; return true; // no more data for this channel. } // // Extract the first call // // 从队列中取出第一个调用call call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection); } // // Send as much data as we can in the non-blocking fashion // int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中) if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回 return true; } if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中 call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1 if (numElements == 1) { // 最后一个调用已经处理完成 done = true; // 该通道channel没有更多的数据 } else { done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据 } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes + " bytes."); } } else { // // If we were unable to write the entire response out, // then // insert in Selector queue. // // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据 // /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中 call.connection.responseQueue.addFirst(call); if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理) call.timestamp = System.currentTimeMillis(); // 设置调用时间戳 incPending(); // 增加未被处理响应信息的调用计数 try { // Wakeup the thread blocked on select, only // then can the call // to channel.register() complete. writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程 channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector } catch (ClosedChannelException e) { // Its ok. channel might be closed else where. done = true; } finally { decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数 } } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes + " bytes."); } } error = false; // everything went off well } } finally { if (error && call != null) { LOG.warn(getName() + ", call " + call + ": output error"); done = true; // error. no more data for this channel. closeConnection(call.connection); } } return done; } // // Enqueue a response from the application. // void doRespond(Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); // 将执行完成的调用加入队列,准备响应客户端 if (call.connection.responseQueue.size() == 1) { processResponse(call.connection.responseQueue, true);// 如果队列中只有一个调用,直接进行处理 } } } private synchronized void incPending() { // call waiting to be enqueued. pending++; } private synchronized void decPending() { // call done enqueueing. pending--; notify(); } private synchronized void waitPending() throws InterruptedException { while (pending > 0) { wait(); } } }
(d)Connection
连接类,真正的客户端请求读取逻辑在这个类中。Connection,代表与Client端的连接,读取客户端的call并放到一个阻塞队列中,Handler负责从这个队列中读取数据并处理
/** * Reads calls from a connection and queues them for handling. * 该类表示服务端一个连接的抽象,主要是读取从Client发送的调用,并把读取到的调用Client.Call实例加入到待处理的队列。 * * * */ public class Connection { private boolean rpcHeaderRead = false; // if initial rpc header is read/ // 是否初始化签名,并读取了版本信息 private boolean headerRead = false; // if the connection header that// // 是否读取了头信息 // follows version is read. private SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; private LinkedList<Call> responseQueue; private volatile int rpcCount = 0; // number of outstanding rpcs private long lastContact; private int dataLength;// 数据长度 private Socket socket; // Cache the remote host & port info so that even if the socket is // disconnected, we can say where it used to connect to. private String hostAddress; private int remotePort; private InetAddress addr; ConnectionHeader header = new ConnectionHeader();// 连接头信息 Class<?> protocol;// 协议类 boolean useSasl; SaslServer saslServer; private AuthMethod authMethod; private boolean saslContextEstablished; private boolean skipInitialSaslHandshake; private ByteBuffer rpcHeaderBuffer; private ByteBuffer unwrappedData; private ByteBuffer unwrappedDataLengthBuffer; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before // auth // Fake 'call' for failed authorization response private final int AUTHROIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, this); private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); private boolean useWrap = false; public Connection(SelectionKey key, SocketChannel channel, long lastContact) { this.channel = channel;// Socket通道 this.lastContact = lastContact;// 最后连接时间 this.data = null; this.dataLengthBuffer = ByteBuffer.allocate(4); this.unwrappedData = null; this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); // 获取到与通道channel关联的Socket this.addr = socket.getInetAddress();// 获取Socket地址 if (addr == null) { this.hostAddress = "*Unknown*"; } else { this.hostAddress = addr.getHostAddress(); } this.remotePort = socket.getPort();// 获取到远程连接的端口号 this.responseQueue = new LinkedList<Call>();// 服务端待处理调用的队列 if (socketSendBufferSize != 0) { try { socket.setSendBufferSize(socketSendBufferSize); // 设置Socket // Buffer大小 } catch (IOException e) { LOG.warn("Connection: unable to set socket send buffer size to " + socketSendBufferSize); } } } ....... }
(e)Handler
请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。它从调用队列中获取调用信息,然后反射调用真正的对象,得到结果,然后再把此次调用放到响应队列(response queue)里。
/** Handles queued calls . */ private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true);// 作为后台线程运行 this.setName("IPC Server handler " + instanceNumber + " on " + port); } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this);// 作为后台线程运行 ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); // 存放响应信息的缓冲区 while (running) { try { final Call call = callQueue.take(); // pop the queue; maybe // blocked here// // 出队操作,获取到一个调用Server.Call // call if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection); String errorClass = null; String error = null; Writable value = null; CurCall.set(call);// 设置当前线程本地变量拷贝的值为出队得到的一个call调用实例 try { // Make the call as the user via Subject.doAs, thus // associating // the call with the Subject // 调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 if (call.connection.user == null) { value = call(call.connection.protocol, call.param, call.timestamp); } else { // // // 根据调用Server.Call关联的连接Server.Connection,所对应的用户Subject,来执行IPC调用过程 value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() { @Override public Writable run() throws Exception { // make the call return call(call.connection.protocol, call.param, call.timestamp); } }); } } catch (Throwable e) { LOG.info(getName() + ", call " + call + ": error: " + e, e); errorClass = e.getClass().getName(); error = StringUtils.stringifyException(e); } CurCall.set(null); synchronized (call.connection.responseQueue) { // setupResponse() needs to be sync'ed together with // responder.doResponse() since setupResponse may use // SASL to encrypt response data and SASL enforces // its own message ordering. setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); // Discard the large buf and reset it back to // smaller size to freeup heap if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); } responder.doRespond(call); } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (Exception e) { LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } LOG.info(getName() + ": exiting"); } }
(4)Server一次完整请求处理流程分析
demo示例:
/** * * Description: RPCserver test<br> * * Copyright: Copyright (c) 2013 <br> * Company: www.renren.com * * @author zhuhui{hui.zhu@renren-inc.com} 2013-5-17 * @version 1.0 */ public class RPCserver { /** * @param args */ public static void main(String[] args) { Server server; try { server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration()); server.start(); try { server.join(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } }
第一步、启动过程
server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
server.start();
(1)初始化
我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象,并且初始化了listener、 responder
protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.maxQueueSize = handlerCount * conf.getInt( IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); this.readThreads = conf.getInt( IPC_SERVER_RPC_READ_THREADS_KEY, IPC_SERVER_RPC_READ_THREADS_DEFAULT); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); this.rpcMetrics = RpcInstrumentation.create(serverName, this.port); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); // Create the responder here responder = new Responder(); if (isSecurityEnabled) { SaslRpcServer.init(conf); } }
(2)启动listener、 responder、 handlers
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。
第二步、Listener接收数据
(1)启动run,在 selector.select阻塞,等待初始化注册的SelectionKey.OP_ACCEPT事件
//注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server
// Socket接受的操作集合
this.setName("IPC Server listener on " + port); // 设置监听线程名称
this.setDaemon(true); // 设置为后台线程
while (running) { SelectionKey key = null; try { selector.select();// 选择一组key集合,这些选择的key相关联的通道已经为I/O操作做好准备 Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable())// 如果该key对应的通道已经准备好接收新的Socket连接 doAccept(key); // 调用,接收与该key关联的通道上的连接 } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) { } } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); }
(2)唤醒 reader, 根据key关联的Server Socket通道,接收该通道上Client端到来的连接
/** * 根据key关联的Server Socket通道,接收该通道上Client端到来的连接 */ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取到Server // Socket // 通道 SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); // 激活readSelector,设置adding为true SelectionKey readKey = reader.registerChannel(channel); // 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件 c = new Connection(readKey, channel, System.currentTimeMillis()); // 创建连接 readKey.attach(c); // 使连接实例与注册到选择器selector相关的读操作集合键相关联 // //将connection对象注入readKey synchronized (connectionList) { connectionList.add(numConnections, c);// 加入Server端连接维护列表 numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { // 设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使 // 用了wait()方法等待。因篇幅有限,就不贴出源码了。 reader.finishAdd(); } } } // 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件 public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); }
(3)读取数据readAndProcess>>processOneRpc>>processData
// 方法读取远程过程调用的数据,从一个Server.Connection的Socket通道中读取数据,并将调用任务加入到callQueue,转交给Handler线程去处理 // 上面方法是接收调用数据的核心方法,实现了如何从SocketChannel通道中读取数据。其中processHeader方法与processData方法已经在上面种详细分析了。 // 另外,作为Server.Connection是连接到客户端的,与客户端调用进行通信,所以一个连接定义了关闭的操作,关闭的时候需要关闭与客户端Socket关联的SocketChannel通道。 public int readAndProcess() throws IOException, InterruptedException { while (true) { /* * Read at most one RPC. If the header is not read completely * yet then iterate until we read first RPC or until there is no * data left. */ int count = -1; // 从通道channel中读取字节,加入到dataLengthBuffer字节缓冲区 if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); // 如果通道已经达到了流的末尾,会返回-1的 if (count < 0 || dataLengthBuffer.remaining() > 0) // 读取不成功,直接返回读取的字节数(读取失败可能返回0或-1) return count; } // // 如果版本号信息还没有读取 if (!rpcHeaderRead) { // Every connection is expected to send the header. if (rpcHeaderBuffer == null) { rpcHeaderBuffer = ByteBuffer.allocate(2); } count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0) { return count; } int version = rpcHeaderBuffer.get(0); byte[] method = new byte[] { rpcHeaderBuffer.get(1) }; authMethod = AuthMethod.read(new DataInputStream(new ByteArrayInputStream(method))); dataLengthBuffer.flip(); // 反转dataLengthBuffer缓冲区 // 如果读取到的版本号信息不匹配,返回-1(HEADER = // ByteBuffer.wrap("hrpc".getBytes()),CURRENT_VERSION = 3) if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { // Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); return -1; } dataLengthBuffer.clear(); if (authMethod == null) { throw new IOException("Unable to read authentication method"); } if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { AccessControlException ae = new AccessControlException("Authentication is required"); setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage()); responder.doRespond(authFailedCall); throw ae; } if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { doSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null); authMethod = AuthMethod.SIMPLE; // client has already sent the initial Sasl message and // we // should ignore it. Both client and server should fall // back // to simple auth from now on. skipInitialSaslHandshake = true; } if (authMethod != AuthMethod.SIMPLE) { useSasl = true; } // 成功读取到了版本号信息,清空dataLengthBuffer以便重用,同时设置versionRead为true rpcHeaderBuffer = null; rpcHeaderRead = true; continue; } if (data == null) { dataLengthBuffer.flip(); // 读取数据长度信息,以便分配data字节缓冲区 dataLength = dataLengthBuffer.getInt(); // 如果是Client端的ping调用,不需要处理数据,清空dataLengthBuffer,返回 if (dataLength == Client.PING_CALL_ID) { if (!useWrap) { // covers the !useSasl too dataLengthBuffer.clear(); return 0; // ping message } } if (dataLength < 0) { LOG.warn("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } data = ByteBuffer.allocate(dataLength);// 分配data数据缓冲区,准备接收调用参数数据 } // 从通道channel中读取字节到data字节缓冲区中 count = channelRead(channel, data); // /// 如果data已经如期读满 if (data.remaining() == 0) { dataLengthBuffer.clear(); data.flip();// 反转dat字节缓冲区,准备从data缓冲区读取数据 if (skipInitialSaslHandshake) { data = null; skipInitialSaslHandshake = false; continue; } boolean isHeaderRead = headerRead; if (useSasl) { saslReadAndProcess(data.array()); } else { processOneRpc(data.array()); } data = null; if (!isHeaderRead) { continue; } } return count; } }
private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id if (LOG.isDebugEnabled()) LOG.debug(" got #" + id); Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param param.readFields(dis); //这个就是client传递过来的Invocation,包含了函数名和参数 Call call = new Call(id, param, this); //封装成call callQueue.put(call); // 将call存入callQueue incRpcCount(); // 增加rpc请求的计数 }
第三步、Handler执行业务
(1)从callQueue获取需要处理 Call
final Call call = callQueue.take(); // pop the queue; maybe blocked here// 出队操作,获取到一个调用Server.Call call
(2)反射调用,业务处理,返回数据调用responder.doRespond(call);
public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { …........... Invocation call = (Invocation)param; Method method =protocol.getMethod(call.getMethodName(),call.getParameterClasses()); // 通过反射,根据调用方法名和方法参数类型得到Method实例 method.setAccessible(true); // 设置反射的对象在使用时取消Java语言访问检查,提高效率 Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数) …............. }
第四步、Responder返回数据
该线程类实现发送RPC响应到客户端,通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况:一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉;另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。
a: Handler在提交结果到Responder时,会再自己的线程里面执行Responder的发送buffer的逻辑,需要注意的是为保证一个buffer是连续的写出的,Handler在将buffer加入到connection.responseQueue中时,会判断responseQueue的size是不是大于等于1,如果是则表明上一个buffer没有发送完则不走发送流程,而是交给Responder来发送;在Handler走的发送逻辑里面,如果buffer发送完成则将其从connection.responseQueue中移除。如果没发送完成则此buffer仍然是connection.responseQueue[0],并在Responder的Selector上注册此Connection的ON_WRITE事件。
b: Handler提交的数据可能积累在responseQueue上,这些由Responder来发送,Responder的发送逻辑是:如果一个buffer没发送完成会到Responder的Selector上注册此Connection的ON_WRITE事件,Responder循环处理那些可写的Connection,对于一个Connection写完其responseQueue上的数据后就取消其ON_WRITE,对于长时间不可写的Connection采取关闭连接处理。
(1)初始化创建
Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); writeSelector = Selector.open(); // create a selector pending = 0; }
(2)发送数据processResponse
/** * 处理一个通道上调用的响应数据 如果一个通道空闲,返回true */ private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException { boolean error = true; boolean done = false; // there is more data for this channel. // // 一个通道channel有更多的数据待读取 int numElements = 0; Call call = null; try { synchronized (responseQueue) { // // If there are no items for this channel, then we are done // numElements = responseQueue.size(); if (numElements == 0) { error = false; return true; // no more data for this channel. } // // Extract the first call // // 从队列中取出第一个调用call call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection); } // // Send as much data as we can in the non-blocking fashion // int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中) if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回 return true; } if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中 call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1 if (numElements == 1) { // 最后一个调用已经处理完成 done = true; // 该通道channel没有更多的数据 } else { done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据 } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes + " bytes."); } } else { // // If we were unable to write the entire response out, // then // insert in Selector queue. // // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据 // /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中 call.connection.responseQueue.addFirst(call); if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理) call.timestamp = System.currentTimeMillis(); // 设置调用时间戳 incPending(); // 增加未被处理响应信息的调用计数 try { // Wakeup the thread blocked on select, only // then can the call // to channel.register() complete. writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程 channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector } catch (ClosedChannelException e) { // Its ok. channel might be closed else where. done = true; } finally { decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数 } } if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes + " bytes."); } } error = false; // everything went off well } } finally { if (error && call != null) { LOG.warn(getName() + ", call " + call + ": output error"); done = true; // error. no more data for this channel. closeConnection(call.connection); } } return done; }
(3)未发送完的数据转交给Responder run()处理
调用doAsyncWrite方法写数据,如果过期则清理
public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); long lastPurgeTime = 0; // last check for old calls. while (running) { try { waitPending(); // If a channel is being registered, wait.// // 等待一个通道中,接收到来的调用进行注册 writeSelector.select(PURGE_INTERVAL);// 设置超时时限 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合 SelectionKey key = iter.next(); iter.remove(); try { if (key.isValid() && key.isWritable()) { doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据 } } catch (IOException e) { LOG.info(getName() + ": doAsyncWrite threw exception " + e); } } long now = System.currentTimeMillis(); if (now < lastPurgeTime + PURGE_INTERVAL) { continue; } lastPurgeTime = now; // // If there were some calls that have not been sent out for // a // long time, discard them. // LOG.debug("Checking for old call responses."); ArrayList<Call> calls; // get the list of channels from list of keys. // 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL // 则这些调用被视为过期调用,进行清除 synchronized (writeSelector.keys()) { calls = new ArrayList<Call>(writeSelector.keys().size()); iter = writeSelector.keys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); Call call = (Call) key.attachment(); if (call != null && key.channel() == call.connection.channel) { calls.add(call); } } } for (Call call : calls) { try { doPurge(call, now); } catch (IOException e) { LOG.warn("Error in purging old calls " + e); } } } catch (OutOfMemoryError e) { // // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish // LOG.warn("Out of Memory in server select", e); try { Thread.sleep(60000); } catch (Exception ie) { } } catch (Exception e) { LOG.warn("Exception in Responder " + StringUtils.stringifyException(e)); } } LOG.info("Stopping " + this.getName()); }
时序图:
类图:
相关推荐
赠送jar包:hadoop-yarn-server-resourcemanager-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-resourcemanager-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-resourcemanager-2.6.0-sources.jar; 赠送...
赠送jar包:hadoop-yarn-server-common-2.6.5.jar; 赠送原API文档:hadoop-yarn-server-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-...
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
3. `hadoop-lzo-0.4.21-SNAPSHOT-sources.jar`:这个文件包含了Hadoop-LZO的源代码,对于开发者来说非常有用,因为可以直接查看源码来理解其内部工作原理,也可以方便地进行二次开发或调试。 集成Hadoop-LZO到你的...
赠送jar包:hadoop-yarn-server-applicationhistoryservice-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-applicationhistoryservice-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-...
赠送jar包:hadoop-yarn-server-web-proxy-2.6.0.jar; 赠送原API文档:hadoop-yarn-server-web-proxy-2.6.0-javadoc.jar; 赠送源代码:hadoop-yarn-server-web-proxy-2.6.0-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-yarn-server-common-2.5.1.jar; 赠送原API文档:hadoop-yarn-server-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-...
赠送jar包:hadoop-yarn-server-common-2.7.3.jar; 赠送原API文档:hadoop-yarn-server-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-...
赠送jar包:hadoop-yarn-server-common-2.5.1.jar; 赠送原API文档:hadoop-yarn-server-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-...
赠送jar包:hadoop-yarn-server-common-2.6.5.jar; 赠送原API文档:hadoop-yarn-server-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-...
hadoop2.7.4安装包补丁包,解决yarn定时调度启动问题!!
Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber
- `lib`:包含插件运行所需的第三方库文件,如Hadoop的相关JAR包。 - `META-INF`:存储插件的元数据信息,如MANIFEST.MF文件,描述了插件的基本信息和依赖。 - `resources`:包含插件的资源文件,如图标、帮助文档...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
Hadoop权威指南----读书笔记
**Hadoop Core源码分析** Hadoop-core-0.20.2是Hadoop早期版本的核心组件,它包含了Hadoop的文件系统接口、分布式计算模型MapReduce以及其他的工具和库。在源码中,我们可以看到以下几个关键部分: 1. **HDFS接口*...
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
Flink-1.11.2与Hadoop3集成JAR包,放到flink安装包的lib目录下,可以避免Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.这个报错,实现...
Hadoop常见问题及解决办法汇总 Hadoop是一个基于Apache的开源大数据处理框架,广泛应用于大数据处理、数据分析和机器学习等领域。然而,在使用Hadoop时,经常会遇到一些常见的问题,这些问题可能会导致Hadoop集群...