`
zhangjun5965
  • 浏览: 12327 次
社区版块
存档分类
最新评论

hadoop源码解析之RPC分析

阅读更多

 

前言

因为hadoop底层各种通讯都用的是rpc,如client和namenode、client和datanode、namanode和datanode等。所以首先学习了一下hadoop rpc的内部实现,拜读了一下hadoop的源码


准备工作

首先下载hadoop的最新稳定版源码(目前是2.7.3),编译hadoop源码,因为hadoop的底层序列号用的是google的 protobuf,所以需要把这些proto文件编译成java文件,方便debug调试。如果比较懒的话,其实用maven把相关jar和源码包下载下来也行。

Hadoop的rpc并没有采用现成的rpc框架,如thrift等,而是采用jdk自带的库完全自己写了一套,更加轻量级,更加可控。

用到的主要的技术是java NIO、网络编程、反射和动态代理,如果对这几块不太熟悉的话,建议先找些资料看看相关的东西

#Hadoop rpc实现流程 Hadoop rpc框架位于hadoop源码的hadoop-commn项目里,就像我们学习任何语言先学习hello world一样,我们先来一个最简单的程序,这个程序是从hadoop源码test目录里找到的,testRPC.java,我们运行其中的main方法。 (我这在main方法简单改动,new了个Configuration()对象,当参数传进来)

这里写图片描述

定义接口

首先要定义一个接口协议,所有的接口都要继承VersionedProtocol

public interface TestProtocol extends VersionedProtocol {
		public static final long versionID = 1L;

		String echo(String value) throws IOException;
}

实现接口

要实现这个接口

public static class TestImpl implements TestProtocol {
		@Override
		public long getProtocolVersion(String protocol, long clientVersion) {
			return TestProtocol.versionID;
		}

		@Override
		public ProtocolSignature getProtocolSignature(String protocol,
				long clientVersion, int hashcode) {
			return new ProtocolSignature(TestProtocol.versionID, null);
		}
		@Override
		public String echo(String value) throws IOException {
			return value;
		}
}

启动一个server

Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
				.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
				.build();
server.start();

构建一个client的代理

TestProtocol proxy = RPC.getProxy(TestProtocol.class,TestProtocol.versionID,addr, conf);

执行相应的方法。

String stringResult = proxy.echo("hello hadoop rpc");
		System.out.println(stringResult);

重点内容

Server底层实现

内部类介绍

server类是org.apache.hadoop.ipc.Server,里面包含几个重要的内部类 这里写图片描述

内部类介绍

Call

将一个rpc请求需要的东西封装到Call对象里

private final int callId;             // the client's call id 客户端id
private final int retryCount;        // the retry count of the call 重试次数
private final Writable rpcRequest;    // Serialized Rpc request from client 序列号的请求
private final Connection connection;  // connection to client
private long timestamp;               // time received when response is null
                   // time served when response is not null
private ByteBuffer rpcResponse;       // the response for this call
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
private final Span traceSpan; // the tracing span on the server side

Connection。

客户端与服务器通信的一些信息在这个里面

Handler

用于处理接受到rpc请求

Listener

用于监听rpc请求。

Reader

用于读取Listener接受到的请求

Responder

用于将rpc请求返回客户端

Server的启动

服务器的构造是通过静态方法RPC.Builder(conf).build()创建的,通过跟踪代码我们发现他最后调用了Server的构造方法

protected Server(String bindAddress, int port,Class<? extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig) throws IOException {
…………………………………..
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),maxQueueSize, prefix, conf);
…………………………………………
    
    // Start the listener here and let it bind to the port
    listener = new Listener();
    this.port = listener.getAddress().getPort();    
………………………………………………..
    // Create the responder here
    responder = new Responder();
…………………………………………….
  }

我们看到我们上面提到的两个内部类listener和responder都是在这里创建的,之后调用start方法启动服务。

public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

接收请求

从Listener的构造方法中我们看到服务器监听了SelectionKey.OP_ACCEPT,他只是监听是否有请求过来,而不做处理,这样为了提高并发。 同时启动了一些Reader线程,这些线程是用来从channel读取数据的。

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      //监听OP_ACCEPT事件
      **acceptChannel.register(selector, SelectionKey.OP_ACCEPT);**
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

Reader线程读取数据

通过Listener的run方法我们看到如果一旦接受到请求,然后就让reader去处理

Connection c = connectionManager.register(channel);
        // If the connectionManager can't take it, close the connection.
        if (c == null) {
          if (channel.isOpen()) {
            IOUtils.cleanup(null, channel);
          }
          continue;
        }
        key.attach(c);  // so closeCurrentConnection can get the object
        **reader.addConnection(c);**

跟踪Reader的run方法,我们看到最后将读取的信息封装成了一个Call对象put到callQueue中

Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceSpan);

      callQueue.put(call);              // queue the call; maybe blocked here

Handler线程处理请求

final Call call = callQueue.take(); // pop the queue; maybe blocked here
          if (LOG.isDebugEnabled()) {
            LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
          }
          if (!call.connection.channel.isOpen()) {
            LOG.info(Thread.currentThread().getName() + ": skipped " + call);
            continue;
          }

最后调用了RpcInvoker的call方法最终通过反射来执行相应的方法

Method method =
              protocolImpl.protocolClass.getMethod(call.getMethodName(),
              call.getParameterClasses());
          method.setAccessible(true);
          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
          Object value = 
              method.invoke(protocolImpl.protocolImpl, call.getParameters());
          if (server.verbose) log("Return: "+value);
          return new ObjectWritable(method.getReturnType(), value);

客户端实现

获取代理

通过RPC的静态方法getProxy获取代理

TestProtocol proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
					addr, conf);

在这里是通过java的动态代理来获取代理。通过跟踪代码我们找到了这里

public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
                         AtomicBoolean fallbackToSimpleAuth)
    throws IOException {    

    if (connectionRetryPolicy != null) {
      throw new UnsupportedOperationException(
          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
    }

    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout, fallbackToSimpleAuth));
    return new ProtocolProxy<T>(protocol, proxy, true);
  }

发送请求。

在Invoker的构造方法里,我们看到在这里新建了一个org.apache.hadoop.ipc.Client对象,在invoke方法里调用了client里面的call方法,最终调用connection.sendRpcRequest(call); 来发送rpc请求

final Call call = createCall(rpcKind, rpcRequest);
    Connection connection = getConnection(remoteId, call, serviceClass,
      fallbackToSimpleAuth);
    try {
      connection.sendRpcRequest(call);                 // send the rpc request
    } catch (RejectedExecutionException e) {
      throw new IOException("connection has been closed", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("interrupted waiting to send rpc request to server", e);
      throw new IOException(e);
    }

在sendRpcRequest方法里,可以看到使用了基于tcp的socket通讯,将数据发送了服务器端。

synchronized (Connection.this.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }
                
                if (LOG.isDebugEnabled())
                  LOG.debug(getName() + " sending #" + call.id);
         
                byte[] data = d.getData();
                int totalLength = d.getLength();
                out.writeInt(totalLength); // Total Length
                out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                out.flush();
              }

总结

比较仓促,写的比较简陋,后续有时间还会继续跟进补充。

分享到:
评论

相关推荐

    Hadoop RPC机制分析

    本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、...

    hadoop NameNode 源码解析

    Hadoop NameNode 源码解析 ...本文对 Hadoop NameNode 的源码进行了深入分析,了解了其启动过程、配置加载、RPC 服务端创建、 Namenode 对象初始化等关键步骤,为读者提供了一个详细的 Hadoop NameNode 源码解析。

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    hadoop 源码分析 文档

    通过深入分析Hadoop的源码,我们可以理解其内部工作原理,优化性能,或者开发新的功能和扩展。这对于任何希望利用Hadoop进行大数据处理的开发者来说都是至关重要的。Hadoop的源码分析文档提供了宝贵的参考资料,有助...

    rpc架构与hadoop分享

    ### Hadoop架构设计与源码分析 #### Hadoop简介 Hadoop是一个能够对大量数据进行分布式处理的软件框架,由Apache基金会开发。它能够提供高可靠性、高扩展性以及高效的数据处理能力,被广泛应用于大数据处理领域。 ...

    hadoop 源码分析全

    ### Hadoop源码分析知识点详解 #### 一、Hadoop及其核心技术背景 Hadoop作为一款开源的分布式计算框架,其核心思想来源于Google发布的几篇重要论文。这些论文详细阐述了Google构建其分布式计算平台的关键技术和...

    hadoop源码分析

    《深入剖析Hadoop源码:理解其核心流程与机制》 Hadoop,作为大数据处理领域的重要框架,其源码分析对于开发者来说具有极高的价值。本文将深入探讨Hadoop的核心组件,包括Configuration、JobClient、JobConf以及...

    hadoop源码.zip

    本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...

    [HBase]源码级强力分析hadoop的RPC机制

    这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoopRPC(RemoteProcedureCallProtocol,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。...

    Hadoop源代码分析(三)

    在深入分析Hadoop源代码的过程中,我们关注到其在数据通信和序列化方面的重要机制。Hadoop并没有简单地采用Java自带的序列化方法,而是设计了一套自有的系统,这主要是为了提高性能、可扩展性和跨平台兼容性。在...

    最近很火的大数据Hadoop之Hbase0.99.2最新版源码

    《深入解析Hadoop之HBase 0.99.2源码分析》 在当今的信息化社会,大数据处理已经成为企业核心竞争力的关键要素。Hadoop作为开源大数据处理框架的领头羊,其生态中的HBase更是备受关注。HBase是基于Google Bigtable...

    HDFS源码解析

    通过阅读和理解Hadoop源码分析-HDFS部分.pdf文档,我们可以更深入地理解这些组件的工作原理,掌握HDFS在处理大数据时的内部机制。这对于我们优化HDFS的性能,解决实际问题,以及开发相关的分布式应用都具有重要的...

    修改hadoop中的io写的,远程调用对象的东西。

    7. **源码分析与工具**: 标签中的"源码"和"工具"提示我们,可能需要查看Hadoop的源代码来理解其内部工作原理,并利用Hadoop提供的工具(如Hadoop命令行工具、Hadoop配置工具等)进行调试和测试。 8. **案例研究**...

    大数据&&分布式系统学习过程中一些经验总结.zip

    Broadcast【Spark源码分析】Job提交执行过程详解黑斑羚impala集群搭建任务&资源调度相关Airflow 实战总结HadoopYarn架构实现解析Yarn-Federation源码串读Hadoop&Yarn Rpc源码剖析MR任务在Hadoop子系统中状态流转...

    hadoop学习资料地址

    - 提供了一份Hadoop源码包的功能分析表,帮助理解各组件的作用。 8. **BJZhanghao的博客**:`http://www.cnblogs.com/bjzhanghao/archive/2008/11/12/1325113.html` - 分享了关于Hadoop架构设计的文章,有助于...

    hadoop-2.6.0-src:原始解析

    总而言之,Hadoop 2.6.0的源码解析是一次深度的技术探索之旅,它让我们洞悉大数据处理背后的奥秘,理解分布式系统的运行机制,领略开源软件的魅力。对于任何想要深入理解和使用Hadoop的人来说,这都是一份不可多得的...

    HBase源码分析

    HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...

Global site tag (gtag.js) - Google Analytics