`
flyfoxs
  • 浏览: 298111 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

【大数据笔记】Hadoop通过动态代理实现RPC

阅读更多
Hadoop所有的跨节点的通信都是通过RPC来通信的, RPC通信是需要创建Stub,一个好的RPC需要通过良好的设计确保了对上层调用的透明性. 我们下面就通过Hadoop里面最常用的心跳(JobTrack和TaskTracker)来研究一下Hadoop的RPC机制.
心跳函数的调用在方法:TaskTracker.transmitHeartBeat();在此方法中会调用下面的代码段.这个方法实际就不一个本地调用,jobClient实际上就是一个动态代理生成的对象,这个对象已经包括了RPC的Stub.
    //
    // Xmit the heartbeat
    //
    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
                                                              justStarted,
                                                              justInited,
                                                              askForNewTask, 
                                                              heartbeatResponseId);
 
也就是 TaskTracker 通过 jobClient(InterTrackerProtocol) 来调用 JobTracker, 而JobClient实现了接口 jobClient(InterTrackerProtocol).
这是一个典型的Java动态代理的应用场景. 同时RPC调用需要跨越网络,现在我们就来看看Hadoop是如何透明的创建一个jobClient实例.
 
//这个是对jobClient的定义
InterTrackerProtocol jobClient;


//下面可以看出jobClient和JobTracker 实现了相同的接口
interface InterTrackerProtocol extends VersionedProtocol;


public class JobTracker implements MRConstants, InterTrackerProtocol,
    JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
    JobTrackerMXBean {
  
//下面是jobClient初始化的代码
this.jobClient = (InterTrackerProtocol)
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
 
下图就是来自于对TaskTracker.initialize()的跟踪, 从该调用栈就可以看出TaskTracker是如何通过动态代理创建出一个RPC 客户端, 却不需要调用者提供任何信息.
2点注意:
1)RPC客户端如何得到端口号,服务器地址 
下面的代码已经添加了注释,可以看到,RPC客户端如何得到服务器地址,端口号,并生成stub,形成动态代理.
//下面是Java动态代理调用处, RPC.getProxy()
//参数addr已经封装了端口号,和地址, 这个地址就是TaskTracker里面的成员变量jobTrackAddr一级一级传递下来,传递路径就是上图所示.
    VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));


//最后一个参数Invoker是RPC的一个内部类,它实现了接口InvocationHandler,下面是他的构造函数
private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
    private boolean isClosed = false;
//在构造函数中,客户端和服务器建立连接
    public Invoker(Class<? extends VersionedProtocol> protocol,
        InetSocketAddress address, UserGroupInformation ticket,
        Configuration conf, SocketFactory factory,
        int rpcTimeout) throws IOException {
      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, conf);
      this.client = CLIENTS.getClient(conf, factory);
    }
//调用时,直接把准备好的参数通过连接发给服务器并同步获取返回值
    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
   
    /* close the IPC client that's responsible for this invoker's RPCs */
    synchronized private void close() {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }
  }
  
2)服务器采用的是NIO,客户端采用的是BIO
客户端的调用就是BIO,不是NIO.调用方式在上文已经通过jobClient.heartbeat()举例做了相应的分析.
Listener将以独立的线程启动,并以异步(NIO)方式监听指定端口.下面的代码已经添加了注释,可以看出Listener如何监听连接,并开启对应的Reader来异步处理对应的数据.
Listener服务开启于JobTracker,因为要面对大量的TaskTracker连接,所以使用Server使用NIO来处理,这也是NIO典型的应用场景.
//Listener是  org.apache.hadoop.ipc.Server的内部类(实际上是Server的父类的内部类)

//下面是Listener唯一的构造函数
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        //并不会真正的开始处理数据,Reader在没有被Linster初始化之前,run()会一直处于自旋等待
        readPool.execute(reader);
      }
      // Register accepts on the server socket with the selector.
      //绑定selector和连接初始化事件,连接初始化时,在doAccept里面先对Reader初始化,然后Reader开始处理数据.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
 
 
 参考:http://blog.csdn.net/jiangwei0910410003/article/details/21155911
  • 大小: 45.1 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics