`
zy19982004
  • 浏览: 662060 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:251996
社区版块
存档分类
最新评论

Hadoop学习九:Hadoop-hdfs RPC源码 Server

 
阅读更多

一.Server类图

 

二.详细描述

  1. Call:server端的Call对象,对应着client的一个Call对象,两者id相同。同client Call一样,server Call封装了每次方法调用的参数信息和调用结果。
      //server端的Call对象,对应着client的一个Call对象,两者id相同
      private static class Call {
        private int id;                               // cleint Call的id
        private Writable param;                // client Call传过来的参数,实际上就是client Call的param,实际上也是RPC.Invocation
        private Connection connection;      //server到client的连接
        private ByteBuffer response;         //server Call的结果,类似client Call的value
    
        public Call(int id, Writable param, Connection connection) { 
          this.id = id;
          this.param = param;
          this.connection = connection;
          this.response = null;
        }
    
        public void setResponse(ByteBuffer response) {
          this.response = response;
        }
      }
  2. Connection:client向server发送消息时,server端nio接受后会创建一个到client的连接,用来向client发送消息。
      public class Connection {
        private boolean rpcHeaderRead = false; // if initial rpc header is read
        private boolean headerRead = false;  //if the connection header that
    
        private SocketChannel channel;
        private ByteBuffer data;
        private LinkedList<Call> responseQueue;//需要发往client端的Call
        private Socket socket;
        
        //初始化server Connction
        //channel  client的SocketChannel
        public Connection(SelectionKey key, SocketChannel channel, 
          this.channel = channel;
          this.socket = channel.socket(); 
          this.responseQueue = new LinkedList<Call>();
        }   
     
  3. Listener:监听client发送过来的消息,分发给Reader线程处理,最终目的是把消息封装成Server call对象,放入callQueue。
    //监听线程,NIO Reactor模式,读取client发送过来的消息
      private class Listener extends Thread {
        
        private ServerSocketChannel acceptChannel = null; //the accept channel
        private Selector selector = null; //the selector that we use for the server
        private Reader[] readers = null;
        private InetSocketAddress address; //the address we bind at
        private ExecutorService readPool; 
       
        public Listener() throws IOException {
          address = new InetSocketAddress(bindAddress, port);
          acceptChannel = ServerSocketChannel.open();
          acceptChannel.configureBlocking(false);
    
          // bindserver socket到本机ip+指定port 50020
          bind(acceptChannel.socket(), address, backlogLength);
          // create a selector;
          selector= Selector.open();
          //线程池执行所有消息
          readers = new Reader[readThreads];
          readPool = Executors.newFixedThreadPool(readThreads);
          for (int i = 0; i < readThreads; i++) {
            Selector readSelector = Selector.open();
            Reader reader = new Reader(readSelector);
            readers[i] = reader;
            readPool.execute(reader);
          }
          acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        
        //3.1.创建Reader线程读取消息,最终目的把消息封装成Server call对象,放入callQueue
        private class Reader implements Runnable {
    	      private Selector readSelector = null;
    	
    	      Reader(Selector readSelector) {
    	        this.readSelector = readSelector;
    	      }
    	      public void run() {
    	    	  nio方式处理消息{
    	    		  doRead(key);
    	    	  }
    	      }
          } 
  4. Handler:从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法,真正执行远程命令的地方),远程命令执行完后,决定是否立刻向client发送执行命令的结果(Server call的response)。
      //4.从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法)
      private class Handler extends Thread {
        @Override
        public void run() {
          while (running) {
        	  //4.1 从callQueue里面取Call,调用call方法
              final Call call = callQueue.take(); // pop the queue; maybe blocked here
              CurCall.set(call);
               value = call(call.connection.protocol, call.param, call.timestamp);
              CurCall.set(null);
               //4.2 为Server call赋值
                setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR,  value, errorClass, error);
                //4.3处理赋值完后的call
                responder.doRespond(call);
          }
        }
    
      }
  5. Responder:异步向client发送Server call的结果。
    //5.异步发送Server call的response到client端
      private class Responder extends Thread {
        private Selector writeSelector;
        
        Responder() throws IOException {
          writeSelector = Selector.open(); // create a selector
        }
    
        @Override
        public void run() {
          while (running) {
              Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                 //5.异步写
                  if (key.isValid() && key.isWritable()) {
                      doAsyncWrite(key);
                  }
                } 
          }
        }
        
        //4.3将赋值完的server call放入responder队列里
        void doRespond(Call call) throws IOException {
        	synchronized (call.connection.responseQueue) {
        		call.connection.responseQueue.addLast(call);
        		//如果真有一个call,同步发送
        		if (call.connection.responseQueue.size() == 1) {
        			processResponse(call.connection.responseQueue, true);
        		}
        		//如果多个call,交给Responder线程异步发送
        	}
        }
    
        //5异步发送
        private void doAsyncWrite(SelectionKey key) throws IOException {
          Call call = (Call)key.attachment();
    	  processResponse(call.connection.responseQueue, false);
        } 
        //向client写消息
        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
        	Call call = responseQueue.removeFirst();
        	SocketChannel channel = call.connection.channel;
        	channelWrite(channel, call.response);
        }
    
      }
  6.  Server:IPC Server。
    public abstract class Server {
      
      private BlockingQueue<Call> callQueue; // queued calls
    
      private Listener listener = null;
      private Responder responder = null;
      private int numConnections = 0;
      private Handler[] handlers = null;
      
      //1.初始化一个IPC server,指定RPC服务器地址和端口
      protected Server(String bindAddress, int port,   Invocation.class, ...) throws IOException {
        this.bindAddress = bindAddress;
        this.port = port;
        this.paramClass = paramClass;
        //创建listener线程
        listener = new Listener();
        this.port = listener.getAddress().getPort();    
        //创建responder线程
        responder = new Responder();
      }
      
    
      //2.启动IPC server
      public synchronized void start() {
    	//启动istener线程
        responder.start();
        //启动responder线程
        listener.start();
        //创建Handler线程,启动Handler线程
        handlers = new Handler[handlerCount];
        
        for (int i = 0; i < handlerCount; i++) {
          handlers[i] = new Handler(i);
          handlers[i].start();
        }
      }
      
     //3.client第一次发送消息到server时触发doAccept
      public void run() {
      	nio方式接受消息{
      		doAccept(key);
      	}
      }
    
        //3.doAccept
        void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
          Connection c = null;
          ServerSocketChannel server = (ServerSocketChannel) key.channel();
          SocketChannel channel;
          while ((channel = server.accept()) != null) {
        	  channel.configureBlocking(false);
        	  channel.socket().setTcpNoDelay(tcpNoDelay);
        	//3.1.创建Reader线程读取消息
          	  Reader reader = getReader();
          	  reader.startAdd();
          	  SelectionKey readKey = reader.registerChannel(channel);
          	  //3.2得根据client的SocketChannel创建一个server Connection,传递给Handler线程
          	  c = new Connection(readKey, channel, System.currentTimeMillis());
          	  readKey.attach(c);
          }
        }
    
        //3.1Reader线程读取消息
        void doRead(SelectionKey key) throws InterruptedException {
          Connection c = (Connection)key.attachment();
          c.readAndProcess();
        }   
    
      }
        
        //3.1读取channel
        public int readAndProcess() throws IOException, InterruptedException {
              int count = channelRead(channel, rpcHeaderBuffer);
              count =  channelRead(channel, data);
              processOneRpc(data.array());
              return count;
        }
    
        private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
        	processData(buf);
        }
        
        //3.1根据client channel内容(方法参数)创建一个server Call,放入callQueue
        private void processData(byte[] buf) throws  IOException, InterruptedException {
          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
          int id = dis.readInt();                    // try to read an id
            
          Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
          param.readFields(dis);        
            
          Call call = new Call(id, param, this);
          callQueue.put(call);              // queue the call; maybe blocked here
        }
    
      }
      
      //4.1
      public abstract Writable call(Class<?> protocol,  Writable param, long receiveTime , throws IOException;
      
      //4.2 为server call赋值(response)
      private void setupResponse(ByteArrayOutputStream response,  Call call, Status status, 
                                 Writable rv, String errorClass, String error)  throws IOException {
        call.setResponse(ByteBuffer.wrap(response.toByteArray()));
      }
    
    }
    
     

三.关于Socket和ServerSocket的bind方法

 

分享到:
评论

相关推荐

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    hadoop-2.5.2:1.HDFS源码分析,代码注释参考自《 Hadoop2.x HDFS源码剖析》

    本文将重点探讨HDFS的源码分析,基于《Hadoop2.x HDFS源码剖析》这本书中的参考注释。首先,我们来看HDFS的核心组件——NameNode和DataNode。 1. NameNode:作为HDFS的元数据管理节点,NameNode负责维护文件系统的...

    Hadoop 培训课程(2)HDFS

    Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...

    hdfs源码分析整理

    在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...

    HDFS源码剖析带书签目录高清.zip

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    hadoop-2.8.1-src.tar.gz

    2. **hadoop-hdfs**: Hadoop分布式文件系统(HDFS)的源码,负责数据的分布式存储。HDFS的设计目标是高度容错性和高吞吐量的数据访问。它将大文件分割成块,并在多台机器上复制,以确保数据的可靠性。 3. **hadoop-...

    hdfs源码.zip

    1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...

    HDFS源码解析

    《HDFS源码解析——揭示分布式文件系统的内在...总之,HDFS源码解析是一次探索分布式存储奥秘的旅程,通过对源码的深入学习,我们可以了解到如何构建一个健壮、高效的分布式文件系统,从而更好地适应大数据时代的需求。

    Hadoop源码分析HDFS数据流

    Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...

    hadoop-2.6.5:编译好的hadoop2.6.5,主要用于研究HDFS2

    总之,Hadoop 2.6.5版本对于HDFS2的研究提供了丰富的资源和工具,无论是对于初学者还是资深开发者,都是一个极佳的学习和探索平台。通过这个版本,我们可以深入了解分布式存储系统的设计理念,提升处理大数据问题的...

    hadoop源码.zip

    总结,Hadoop HDFS源码的学习是一项深入理解大数据存储技术的重要任务。通过源码,我们可以更清晰地看到HDFS是如何在分布式环境下实现高可用性和容错性的,这对于提升开发和运维技能,以及解决实际问题具有重大意义...

    Hadoop2.7.3源码Eclipse工程

    【标题】"Hadoop2.7.3源码Eclipse工程"揭示了这个压缩包包含的是Hadoop 2.7.3版本的源代码,并且是为Eclipse IDE准备的项目工程,便于开发者在Eclipse环境中进行源码级别的学习、调试和开发。 【描述】中的信息说明...

    rpc架构与hadoop分享

    ### RPC架构概述 RPC(Remote Procedure Call Protocol,...通过学习Dubbo和Hadoop的相关知识,不仅可以深入了解分布式系统的架构原理和技术细节,还能够在实际工作中更好地应对大规模数据处理和微服务架构的挑战。

    hadoop2.9.x源码编译工具包

    在Hadoop中,protobuf用于定义数据交换格式,特别是HDFS和MapReduce中的RPC通信。要编译Hadoop源码,你需要安装protobuf的编译器,并确保其在PATH环境变量中。Hadoop源码中包含protobuf的配置,Maven会自动调用...

Global site tag (gtag.js) - Google Analytics