Hadoop所有的跨节点的通信都是通过RPC来通信的, RPC通信是需要创建Stub,一个好的RPC需要通过良好的设计确保了对上层调用的透明性. 我们下面就通过Hadoop里面最常用的心跳(JobTrack和TaskTracker)来研究一下Hadoop的RPC机制.
心跳函数的调用在方法:TaskTracker.transmitHeartBeat();在此方法中会调用下面的代码段.这个方法实际就不一个本地调用,jobClient实际上就是一个动态代理生成的对象,这个对象已经包括了RPC的Stub.
// // Xmit the heartbeat // HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId);
也就是 TaskTracker 通过 jobClient(InterTrackerProtocol) 来调用 JobTracker, 而JobClient实现了接口 jobClient(InterTrackerProtocol).
这是一个典型的Java动态代理的应用场景. 同时RPC调用需要跨越网络,现在我们就来看看Hadoop是如何透明的创建一个jobClient实例.
//这个是对jobClient的定义 InterTrackerProtocol jobClient; //下面可以看出jobClient和JobTracker 实现了相同的接口 interface InterTrackerProtocol extends VersionedProtocol; public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean {
//下面是jobClient初始化的代码 this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
下图就是来自于对TaskTracker.initialize()的跟踪, 从该调用栈就可以看出TaskTracker是如何通过动态代理创建出一个RPC 客户端, 却不需要调用者提供任何信息.
2点注意:
1)RPC客户端如何得到端口号,服务器地址
下面的代码已经添加了注释,可以看到,RPC客户端如何得到服务器地址,端口号,并生成stub,形成动态代理.
//下面是Java动态代理调用处, RPC.getProxy() //参数addr已经封装了端口号,和地址, 这个地址就是TaskTracker里面的成员变量jobTrackAddr一级一级传递下来,传递路径就是上图所示. VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); //最后一个参数Invoker是RPC的一个内部类,它实现了接口InvocationHandler,下面是他的构造函数 private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; //在构造函数中,客户端和服务器建立连接 public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf); this.client = CLIENTS.getClient(conf, factory); } //调用时,直接把准备好的参数通过连接发给服务器并同步获取返回值 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } /* close the IPC client that's responsible for this invoker's RPCs */ synchronized private void close() { if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } }
2)服务器采用的是NIO,客户端采用的是BIO
客户端的调用就是BIO,不是NIO.调用方式在上文已经通过jobClient.heartbeat()举例做了相应的分析.
Listener将以独立的线程启动,并以异步(NIO)方式监听指定端口.下面的代码已经添加了注释,可以看出Listener如何监听连接,并开启对应的Reader来异步处理对应的数据.
Listener服务开启于JobTracker,因为要面对大量的TaskTracker连接,所以使用Server使用NIO来处理,这也是NIO典型的应用场景.
//Listener是 org.apache.hadoop.ipc.Server的内部类(实际上是Server的父类的内部类) //下面是Listener唯一的构造函数 public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; //并不会真正的开始处理数据,Reader在没有被Linster初始化之前,run()会一直处于自旋等待 readPool.execute(reader); } // Register accepts on the server socket with the selector. //绑定selector和连接初始化事件,连接初始化时,在doAccept里面先对Reader初始化,然后Reader开始处理数据. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
参考:http://blog.csdn.net/jiangwei0910410003/article/details/21155911
相关推荐
大巴塔大数据Hbase是数据库,Hive是数据仓库hadoop2.2.0分布式服务器1、准备Linux环境2、CentOS卸载OpenJDK并安装Sun JDK3、安装Hadoop4、修改windows系统的hosts文件5、配置ssh免登陆HDFS分布式文件系统Hadoop 全...
Java大数据学习笔记主要涵盖了一系列与Java编程和大数据技术相关的主题,这些主题对于现代软件开发,尤其是数据密集型应用至关重要。以下是对每个主题的详细解释: 1. **SSH**(Secure Shell):SSH是一种网络协议...
8. RPC:Remote Procedure Call,远程过程调用,是分布式系统中实现服务间通信的关键技术,如Dubbo、gRPC等。 9. 设计模式:作为软件开发的最佳实践,设计模式对于理解和编写可维护、可扩展的大数据系统至关重要。 ...
【标题】:“安装笔记:hadoop+hbase+sqoop2+phoenix+kerberos” 【描述】:在本文中,我们将探讨如何在两台云主机(实际环境可能需要三台或更多)上安装Hadoop、HBase、Sqoop2、Phoenix以及Kerberos的详细过程,...
《IT学习资料》-Java 大数据学习笔记.zip是一个涵盖了多方面IT技术的综合学习资源,特别是针对Java和大数据这两个热门领域。这份压缩包中包含了各种教程、笔记和参考资料,帮助学习者深入理解和掌握相关技能。 1. *...
Job提交执行过程详解黑斑羚impala集群搭建任务&资源调度相关Airflow 实战总结HadoopYarn架构实现解析Yarn-Federation源码串读Hadoop&Yarn Rpc源码剖析MR任务在Hadoop子系统中状态流转Hadoop Pipes Ping Timeout问题...
在其公开的大数据解决方案中,主要涵盖了Hadoop集群服务模式、跨机房方案、以及ODPS(Open Data Processing Service,开放式数据处理服务)的介绍。下面,我们将详细介绍这些知识点: 首先,Hadoop集群服务模式是...
在Java世界中,分布式应用是指由多个独立组件通过网络通信协同工作的系统。这种架构模式常用于构建大规模、高可用性、可扩展的系统。本笔记将深入探讨Java分布式应用的核心概念、技术和实践。 1. 分布式Java应用...
Hadoop的生态还包括诸如Avro(跨语言的序列化和RPC系统)、Zookeeper(分布式协调服务)和HBase(NoSQL数据库)等组件。这些组件协同工作,为大数据处理提供了强大的支撑。 在Java笔记中提到的common组件是Hadoop的...
- SparkStreaming:处理实时数据流,通过微批处理实现低延迟的流计算。 - MLlib:包含多种机器学习算法,简化了机器学习应用的开发。 - GraphX:是一个分布式图计算框架,适合处理图数据和执行图算法。 - BlinkDB...