浏览 6169 次
锁定老帖子 主题:hadoop的心跳回忆
精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-07-11
namenode与datanode之间的通信,jobtracker与tasktracker直接的通信,都是通过“心跳”完成的。 以前看过hadoop心跳原理的源代码,今天再回忆一下,呵呵,所以叫“心跳回忆”。 1、心跳机制 心跳的机制大概是这样的: 1) master启动的时候,会开一个ipc server在那里。 2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。 2、找到心跳的代码 拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳的代码: /** * Main loop for the DataNode. Runs until shutdown, * forever calling remote NameNode functions. */ public void offerService() throws Exception { ... // // Now loop for a long time.... // while (shouldRun) { try { long startTime = now(); // // Every so often, send heartbeat or block-report // // 如果到了3秒钟,就向namenode发心跳 if (startTime - lastHeartbeat > heartBeatInterval) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // lastHeartbeat = startTime; DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount()); // 注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法?? myMetrics.heartbeats.inc(now() - startTime); //LOG.info("Just sent heartbeat, with name " + localName); // 处理对心跳的返回值(namenode传给datanode的指令) if (!processCommand(cmds)) continue; } // 这里省略很多代码 ... } // while (shouldRun) } // offerService 上面这段代码,如果是单机的程序,没什么值得奇怪的。但是,这是hadoop集群!datanode和namenode在2台不同的机器(或2个JVM)上运行!datanode机器竟然直接调用namenode的方法!这是怎么实现的?难道是传说中的RMI吗?? 下面我们主要就来分析这个方法调用的细节。 3、心跳的底层细节一:datanode怎么获得namenode对象的? 首先,DataNode类中,有一个namenode的成员变量: public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable { ... public DatanodeProtocol namenode = null; ... } 下面是NameNode类的定义: public class NameNode implements ClientProtocol, DatanodeProtocol, NamenodeProtocol, FSConstants, RefreshAuthorizationPolicyProtocol { ... } 注意:NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。 那么,DataNode类是怎么获取到NameNode类的引用呢? 在Datanode端,为namenode变量赋值的代码: // connect to name node this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class, DatanodeProtocol.versionID, nameNodeAddr, conf); 在继续去RPC类中追踪: VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(addr, ticket, conf, factory)); 现在,明白了! 1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!! 2) 上面这段代码中,protocol的类型是DatanodeProtocol.class 3) 对namenode的所有调用,都被委托(delegate)给了Invoker 4、心跳的底层细节二:看看Invoker类 Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类: private static class Invoker implements InvocationHandler {在这个类中,看invoke方法: public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ... ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), address, method.getDeclaringClass(), ticket); ... return value.get(); } 所有的方法调用又被delegate给client的call方法了! client是Invoker中的成员变量: private Client client; 所以可以看出:DatanodeProtocol中的每个方法调用,都被包装成一个Invocation对象,再由client.call()调用 5、心跳的底层细节三:Invocation类 Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类 没有什么业务逻辑方法,主要作用就是一个VO 6、心跳的底层细节四:client类的call方法 接下来重点看client类的call方法: public Writable call(Writable param, InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket) throws InterruptedException, IOException { Call call = new Call(param); // 将Invocation转化为Call Connection connection = getConnection(addr, protocol, ticket, call); // 连接远程服务器 connection.sendParam(call); // send the parameter // 将“序列化”后的call发给过去 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result // 等待调用结果 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception throw wrapException(addr, call.error); } } else { return call.value; // 返回 } } } 7、现在,一目了然了 datanode向namenode发送heartbeat过程是这样的: a) 在datanode初始化获得namenode的proxy b) 在datanode上,调用namenode proxy的heartbeat方法: namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount()); c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法 d) client call方法将Invocation转化为Call对象 e) client 将call发送到真正的namenode服务器 f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来! g) datanode接收结果,并将结果转化为DatanodeCommand[] 8、再看动态代理 动态代理:让“只有接口,没事对应的实现类”成为可能,因为具体方法的实现可以委托给另一个类!! 在这个例子中,就datanode而言,DatanodeProtocol接口是没有实现类的! *** THE END *** 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-07-12
楼主在做东西,采用hadoop;介绍hadoop的文章一直很少
|
|
返回顶楼 | |
发表时间:2010-07-12
写得很好。
接口模式的远程RPC基本都是如此实现的,譬如hessian。 |
|
返回顶楼 | |
发表时间:2010-10-27
那datanode发送心跳也是走的50010端口吗?我不想让datanode发送心跳给namenode,就是杀死datanode某个节点,该禁掉它哪个端口呀?
|
|
返回顶楼 | |