Hadoop所有的跨节点的通信都是通过RPC来通信的, RPC通信是需要创建Stub,一个好的RPC需要通过良好的设计确保了对上层调用的透明性. 我们下面就通过Hadoop里面最常用的心跳(JobTrack和TaskTracker)来研究一下Hadoop的RPC机制.
心跳函数的调用在方法:TaskTracker.transmitHeartBeat();在此方法中会调用下面的代码段.这个方法实际就不一个本地调用,jobClient实际上就是一个动态代理生成的对象,这个对象已经包括了RPC的Stub.
// // Xmit the heartbeat // HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId);
也就是 TaskTracker 通过 jobClient(InterTrackerProtocol) 来调用 JobTracker, 而JobClient实现了接口 jobClient(InterTrackerProtocol).
这是一个典型的Java动态代理的应用场景. 同时RPC调用需要跨越网络,现在我们就来看看Hadoop是如何透明的创建一个jobClient实例.
//这个是对jobClient的定义 InterTrackerProtocol jobClient; //下面可以看出jobClient和JobTracker 实现了相同的接口 interface InterTrackerProtocol extends VersionedProtocol; public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean {
//下面是jobClient初始化的代码 this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
下图就是来自于对TaskTracker.initialize()的跟踪, 从该调用栈就可以看出TaskTracker是如何通过动态代理创建出一个RPC 客户端, 却不需要调用者提供任何信息.
2点注意:
1)RPC客户端如何得到端口号,服务器地址
下面的代码已经添加了注释,可以看到,RPC客户端如何得到服务器地址,端口号,并生成stub,形成动态代理.
//下面是Java动态代理调用处, RPC.getProxy() //参数addr已经封装了端口号,和地址, 这个地址就是TaskTracker里面的成员变量jobTrackAddr一级一级传递下来,传递路径就是上图所示. VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); //最后一个参数Invoker是RPC的一个内部类,它实现了接口InvocationHandler,下面是他的构造函数 private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; //在构造函数中,客户端和服务器建立连接 public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); } //调用时,直接把准备好的参数通过连接发给服务器并同步获取返回值 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } }
2)服务器采用的是NIO,客户端采用的是BIO
客户端的调用就是BIO,不是NIO.调用方式在上文已经通过jobClient.heartbeat()举例做了相应的分析.
Listener将以独立的线程启动,并以异步(NIO)方式监听指定端口.下面的代码已经添加了注释,可以看出Listener如何监听连接,并开启对应的Reader来异步处理对应的数据.
Listener服务开启于JobTracker,因为要面对大量的TaskTracker连接,所以使用Server使用NIO来处理,这也是NIO典型的应用场景.
//Listener是 org.apache.hadoop.ipc.Server的内部类(实际上是Server的父类的内部类) //下面是Listener唯一的构造函数 public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; //并不会真正的开始处理数据,Reader在没有被Linster初始化之前,run()会一直处于自旋等待 readPool.execute(reader); } // Register accepts on the server socket with the selector. //绑定selector和连接初始化事件,连接初始化时,在doAccept里面先对Reader初始化,然后Reader开始处理数据. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
参考:http://blog.csdn.net/jiangwei0910410003/article/details/21155911
相关推荐
大数据之Hadoop学习教程+笔记合计_超详细完整.zip
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......
本系统利用大数据技术,合理的为用户做出推荐,推荐的结果可靠程度很高,这就是我的优势所在,因为它和一般的推荐系统的推荐算法不太一样,我的推荐算法是利用Hadoop技术写的,我们可以利用Hadoop集群的高吞吐量,一...
03_Hadoop_概论_大数据的特点.mp4 04_Hadoop_概论_大数据的应用场景.mp4 06_Hadoop_概论_未来工作内容.mp4 07_Hadoop_入门_课程介绍.mp4 11_Hadoop_入门_Hadoop优势.mp4 13_Hadoop_入门_HDFS概述.mp4 14_Hadoop_入门...
在大数据的世界里,Hadoop是不可或缺的一个核心组件,它为海量数据处理提供了高效、可靠的解决方案。本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。...
在本套内部Hadoop系列培训资料中,我们将深入探讨大数据技术的核心——Hadoop及其生态系统,包括Spark、Hive、Storm、Hbase和Sqoop等关键组件。这些工具和框架共同构建了大数据解决方案的基础。 首先,Hadoop是...
(2)修改 module、software 文件夹的所有者 (1)查询是否安装 java 软件: (2)如果安装的版本低于 1.7,卸载该 jdk: (1)先获
技术领域:大数据领域Hadoop技能学习 技术关键词:大数据、Hadoop 内容:大数据小白晋升之路学习必备 用途:学习
大数据整理hadoop/hive
大数据之hadoop,spart全套全技术栈视频课程,包含spark,hadoop,storm,kafka,mllib等组件的安装,编程等,依次从基础,进阶直到实际实践。
大数据专业Hadoop开发技术课程的实践教学探索是一项针对高等教育大数据专业学生培养的重要研究,旨在解决当前大数据专业教学中的困境,特别是在实践教学方面的挑战。本课程的实践教学内容和方法涉及Java、Linux、...
大数据Hadoop视频教程大数据Hadoop视频教程大数据Hadoop视频教程
大数据与Hadoop是当前信息技术领域的核心概念,它们共同推动了数据处理能力的革命。大数据是指数据量巨大、复杂度高、处理速度快的数据集合,这些数据无法通过传统数据库管理系统或常规数据处理方式有效地处理。...
自己在大数据培训班学习整理的笔记,比较详细,适合新手学习,我感觉还是挺有帮助的,希望可以帮助到你
Hadoop是大数据技术中最重要的框架之一,是学习大数据必备的第一课,在Hadoop平台之上,可以更容易地开发和运行其他处理大规模数据的框架。尚硅谷Hadoop视频教程再次重磅升级!以企业实际生产环境为背景,增加了更...
"大数据Hadoop平台监控、预警及自动化" 大数据Hadoop平台监控、预警及自动化是大数据Hadoop平台的关键组件之一。它旨在提供实时的监控、预警和自动化功能,以确保Hadoop集群的稳定运行和高效运作。 监控是一种实时...
Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦