`
flyfoxs
  • 浏览: 298526 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

【大数据笔记】Hadoop通过动态代理实现RPC

阅读更多
Hadoop所有的跨节点的通信都是通过RPC来通信的, RPC通信是需要创建Stub,一个好的RPC需要通过良好的设计确保了对上层调用的透明性. 我们下面就通过Hadoop里面最常用的心跳(JobTrack和TaskTracker)来研究一下Hadoop的RPC机制.
心跳函数的调用在方法:TaskTracker.transmitHeartBeat();在此方法中会调用下面的代码段.这个方法实际就不一个本地调用,jobClient实际上就是一个动态代理生成的对象,这个对象已经包括了RPC的Stub.
    //
    // Xmit the heartbeat
    //
    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
                                                              justStarted,
                                                              justInited,
                                                              askForNewTask, 
                                                              heartbeatResponseId);
 
也就是 TaskTracker 通过 jobClient(InterTrackerProtocol) 来调用 JobTracker, 而JobClient实现了接口 jobClient(InterTrackerProtocol).
这是一个典型的Java动态代理的应用场景. 同时RPC调用需要跨越网络,现在我们就来看看Hadoop是如何透明的创建一个jobClient实例.
 
//这个是对jobClient的定义
InterTrackerProtocol jobClient;


//下面可以看出jobClient和JobTracker 实现了相同的接口
interface InterTrackerProtocol extends VersionedProtocol;


public class JobTracker implements MRConstants, InterTrackerProtocol,
    JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
    JobTrackerMXBean {
  
//下面是jobClient初始化的代码
this.jobClient = (InterTrackerProtocol)
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
 
下图就是来自于对TaskTracker.initialize()的跟踪, 从该调用栈就可以看出TaskTracker是如何通过动态代理创建出一个RPC 客户端, 却不需要调用者提供任何信息.
2点注意:
1)RPC客户端如何得到端口号,服务器地址 
下面的代码已经添加了注释,可以看到,RPC客户端如何得到服务器地址,端口号,并生成stub,形成动态代理.
//下面是Java动态代理调用处, RPC.getProxy()
//参数addr已经封装了端口号,和地址, 这个地址就是TaskTracker里面的成员变量jobTrackAddr一级一级传递下来,传递路径就是上图所示.
    VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));


//最后一个参数Invoker是RPC的一个内部类,它实现了接口InvocationHandler,下面是他的构造函数
private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
    private boolean isClosed = false;
//在构造函数中,客户端和服务器建立连接
    public Invoker(Class<? extends VersionedProtocol> protocol,
        InetSocketAddress address, UserGroupInformation ticket,
        Configuration conf, SocketFactory factory,
        int rpcTimeout) throws IOException {
      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
          ticket, rpcTimeout, conf);
      this.client = CLIENTS.getClient(conf, factory);
    }
//调用时,直接把准备好的参数通过连接发给服务器并同步获取返回值
    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }
      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
   
    /* close the IPC client that's responsible for this invoker's RPCs */
    synchronized private void close() {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }
  }
  
2)服务器采用的是NIO,客户端采用的是BIO
客户端的调用就是BIO,不是NIO.调用方式在上文已经通过jobClient.heartbeat()举例做了相应的分析.
Listener将以独立的线程启动,并以异步(NIO)方式监听指定端口.下面的代码已经添加了注释,可以看出Listener如何监听连接,并开启对应的Reader来异步处理对应的数据.
Listener服务开启于JobTracker,因为要面对大量的TaskTracker连接,所以使用Server使用NIO来处理,这也是NIO典型的应用场景.
//Listener是  org.apache.hadoop.ipc.Server的内部类(实际上是Server的父类的内部类)

//下面是Listener唯一的构造函数
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        //并不会真正的开始处理数据,Reader在没有被Linster初始化之前,run()会一直处于自旋等待
        readPool.execute(reader);
      }
      // Register accepts on the server socket with the selector.
      //绑定selector和连接初始化事件,连接初始化时,在doAccept里面先对Reader初始化,然后Reader开始处理数据.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
 
 
 参考:http://blog.csdn.net/jiangwei0910410003/article/details/21155911
  • 大小: 45.1 KB
0
0
分享到:
评论

相关推荐

    大数据学习笔记.zip

    大巴塔大数据Hbase是数据库,Hive是数据仓库hadoop2.2.0分布式服务器1、准备Linux环境2、CentOS卸载OpenJDK并安装Sun JDK3、安装Hadoop4、修改windows系统的hosts文件5、配置ssh免登陆HDFS分布式文件系统Hadoop 全...

    《java学习》-Java 大数据学习笔记.zip

    Java大数据学习笔记主要涵盖了一系列与Java编程和大数据技术相关的主题,这些主题对于现代软件开发,尤其是数据密集型应用至关重要。以下是对每个主题的详细解释: 1. **SSH**(Secure Shell):SSH是一种网络协议...

    《IT学习资料3》-Java 大数据学习笔记.zip

    8. RPC:Remote Procedure Call,远程过程调用,是分布式系统中实现服务间通信的关键技术,如Dubbo、gRPC等。 9. 设计模式:作为软件开发的最佳实践,设计模式对于理解和编写可维护、可扩展的大数据系统至关重要。 ...

    安装笔记:hadoop+hbase+sqoop2+phoenix+kerberos

    【标题】:“安装笔记:hadoop+hbase+sqoop2+phoenix+kerberos” 【描述】:在本文中,我们将探讨如何在两台云主机(实际环境可能需要三台或更多)上安装Hadoop、HBase、Sqoop2、Phoenix以及Kerberos的详细过程,...

    《IT学习资料》-Java 大数据学习笔记.zip

    《IT学习资料》-Java 大数据学习笔记.zip是一个涵盖了多方面IT技术的综合学习资源,特别是针对Java和大数据这两个热门领域。这份压缩包中包含了各种教程、笔记和参考资料,帮助学习者深入理解和掌握相关技能。 1. *...

    大数据&&分布式系统学习过程中一些经验总结.zip

    Job提交执行过程详解黑斑羚impala集群搭建任务&资源调度相关Airflow 实战总结HadoopYarn架构实现解析Yarn-Federation源码串读Hadoop&Yarn Rpc源码剖析MR任务在Hadoop子系统中状态流转Hadoop Pipes Ping Timeout问题...

    阿里巴巴大数据解决方案.pdf

    在其公开的大数据解决方案中,主要涵盖了Hadoop集群服务模式、跨机房方案、以及ODPS(Open Data Processing Service,开放式数据处理服务)的介绍。下面,我们将详细介绍这些知识点: 首先,Hadoop集群服务模式是...

    Java分布式应用学习笔记

    在Java世界中,分布式应用是指由多个独立组件通过网络通信协同工作的系统。这种架构模式常用于构建大规模、高可用性、可扩展的系统。本笔记将深入探讨Java分布式应用的核心概念、技术和实践。 1. 分布式Java应用...

    个人java笔记

    Hadoop的生态还包括诸如Avro(跨语言的序列化和RPC系统)、Zookeeper(分布式协调服务)和HBase(NoSQL数据库)等组件。这些组件协同工作,为大数据处理提供了强大的支撑。 在Java笔记中提到的common组件是Hadoop的...

    Spark学习笔记(一)Spark初识【特性、组成、应用】

    - SparkStreaming:处理实时数据流,通过微批处理实现低延迟的流计算。 - MLlib:包含多种机器学习算法,简化了机器学习应用的开发。 - GraphX:是一个分布式图计算框架,适合处理图数据和执行图算法。 - BlinkDB...

Global site tag (gtag.js) - Google Analytics