`
zy19982004
  • 浏览: 662054 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:251992
社区版块
存档分类
最新评论

Hadoop学习十五:Hadoop-Hdfs DataXceiverServer源码概述

 
阅读更多

一.DataXceiverServer类图

 

二.DataXceiverServer

  1.  Server used for receiving/sending a block of data.This is created to listen for requests from clients or other DataNodes.  This small server does not use the Hadoop IPC mechanism.DataXceiverServer用于接收和发送block数据,它监听着client或者其它DataNode的请求。DataXceiverServer没有采用RPC机制。DataXceiverServer是流式机制,而RPC是命令式接口。
  2. DataXceiverServer每接收到一个请求,就会创建一个DataXceiver来处理该请求。
    class DataXceiverServer implements Runnable, FSConstants {
      public static final Log LOG = DataNode.LOG;
      
      ServerSocket ss;	
      DataNode datanode;
      // Record all sockets opend for data transfer
      Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>());
      
      static final int MAX_XCEIVER_COUNT = 256;		//默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足
    
      BlockBalanceThrottler balanceThrottler;
      
      DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) {
        this.ss = ss;
        this.datanode = datanode;
      }
    
      public void run() {
        while (datanode.shouldRun) {
            Socket s = ss.accept();	//接受client请求
            s.setTcpNoDelay(true);
            //实例化DataXceiver,需要Socket和DataNode
            new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start();
        }
      }
    }

三.DataXceiver类图

  1.  DataXceiver处理Client或DataNode的五种请求(DataTransferProtocol接口定义)。
      public static final byte OP_WRITE_BLOCK = (byte) 80;
      public static final byte OP_READ_BLOCK = (byte) 81;
      /**
       * @deprecated As of version 15, OP_READ_METADATA is no longer supported
       */
      @Deprecated public static final byte OP_READ_METADATA = (byte) 82;
      public static final byte OP_REPLACE_BLOCK = (byte) 83;
      public static final byte OP_COPY_BLOCK = (byte) 84;
      public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
     
    class DataXceiver implements Runnable, FSConstants {
    
    	Socket s;
    	final String remoteAddress; // address of remote side
    	final String localAddress; // local address of this daemon
    	DataNode datanode;
    	DataXceiverServer dataXceiverServer;
    
    	public DataXceiver(Socket s, DataNode datanode,
    			DataXceiverServer dataXceiverServer) {
    
    		this.s = s;
    		this.datanode = datanode;
    		this.dataXceiverServer = dataXceiverServer;
    		dataXceiverServer.childSockets.put(s, s);
    		remoteAddress = s.getRemoteSocketAddress().toString();
    		localAddress = s.getLocalSocketAddress().toString();
    	}
    
    	public void run() {
    		DataInputStream in = null;
    		in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));
    		short version = in.readShort();	if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
            throw new IOException( "Version Mismatch" );
          }	//version
    		boolean local = s.getInetAddress().equals(s.getLocalAddress());	//本地请求
    		byte op = in.readByte();		//op
    		switch (op) {
    		//读数据块
    		case DataTransferProtocol.OP_READ_BLOCK:
    			readBlock(in);
    			datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
    			if (local)
    				datanode.myMetrics.incrReadsFromLocalClient();
    			else
    				datanode.myMetrics.incrReadsFromRemoteClient();
    			break;
    		//写数据块
    		case DataTransferProtocol.OP_WRITE_BLOCK:
    			writeBlock(in);
    			datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
    			if (local)
    				datanode.myMetrics.incrWritesFromLocalClient();
    			else
    				datanode.myMetrics.incrWritesFromRemoteClient();
    			break;
    		//替换数据块
    		case DataTransferProtocol.OP_REPLACE_BLOCK: 
    			replaceBlock(in);
    			datanode.myMetrics
    					.addReplaceBlockOp(DataNode.now() - startTime);
    			break;
    		//拷贝数据块
    		case DataTransferProtocol.OP_COPY_BLOCK:
    			// for balancing purpose; send to a proxy source
    			copyBlock(in);
    			datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
    			break;
    		//读取数据块校验码
    		case DataTransferProtocol.OP_BLOCK_CHECKSUM: // get the checksum of
    														// a block
    			getBlockChecksum(in);
    			datanode.myMetrics.addBlockChecksumOp(DataNode.now()
    					- startTime);
    			break;
    		default:
    			throw new IOException("Unknown opcode " + op
    					+ " in data stream");
    		}
    		dataXceiverServer.childSockets.remove(s);
    		}
    	}
     
1
1
分享到:
评论

相关推荐

    hadoop最新版本3.1.1全量jar包

    hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar hadoop-yarn-client-3.1.1.jar hadoop-client-api-3.1.1.jar hadoop-hdfs-client-3.1.1.jar hadoop-mapreduce-client-jobclient...

    flink-shaded-hadoop-2-uber-2.6.5-10.0.zip

    1. **数据读写**:Flink通过Hadoop的InputFormat和OutputFormat接口,可以读取和写入Hadoop支持的各种数据源,如HDFS、HBase等。这使得Flink可以方便地访问Hadoop生态系统中的存储系统,进行大规模的数据处理。 2. ...

    Hadoop 2.X HDFS源码剖析-高清-完整目录-2016年3月

    Hadoop 2.X HDFS源码剖析-高清-完整目录-2016年3月,分享给所有需要的人!

    hadoop-common-2.6.0-bin-master

    标题中的“hadoop-common-2.6.0-bin-master”指的是Hadoop Common的2.6.0版本的源码编译后的二进制主目录,这个目录包含了运行Hadoop所需的各种基础工具和库。 在Windows 10环境下,由于操作系统本身的特性和Linux...

    hadoop-lzo-0.4.21-SNAPSHOT jars

    而Hadoop-LZO则是针对Hadoop优化的一种数据压缩库,旨在提高HDFS(Hadoop Distributed File System)上的数据压缩效率和读写性能。这个压缩库是由Groupon开发并维护的,它实现了LZO(Lempel-Ziv-Oberhumer)压缩算法...

    java操作Hadoop源码之HDFS Java API操作-上传文件

    在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce。这两个部分是Hadoop的核心基石,为大数据处理提供了基础架构。这里我们将深入探讨...

    hadoop-2.7.4-with-centos-6.7.tar.gz

    在压缩包内的“hadoop-2.7.4”文件夹中,通常包含了Hadoop的所有源码、编译后的可执行文件、配置文件以及文档等。用户可以通过解压来获取完整的Hadoop环境,然后按照官方文档或相关教程进行启动和使用。 总的来说,...

    hadoop-2.2.0-src.tar

    hadoop源码2.2.0 Apache Hadoop 2.2.0 is the GA release of Apache Hadoop 2.x. Users are encouraged to immediately move to 2.2.0 since this release is significantly more stable and is guaranteed to ...

    编译hadoophadoop-3.2.2-src源码

    编译hadoophadoop-3.2.2-src的源码

    hadoop-2.8.1源码

    《深入剖析Hadoop 2.8.1源码:分布式系统的智慧结晶》 Hadoop,作为开源的大数据处理框架,自2006年诞生以来,一直是大数据领域的重要支柱。其2.8.1版本是Hadoop发展的一个关键节点,为用户提供了更稳定、高效的...

    java操作Hadoop源码之HDFS Java API操作-创建目录

    在大数据处理领域,Hadoop是不可或缺的关键组件,其分布式文件系统(HDFS)为海量数据提供了可靠的存储解决方案。本文将详细讲解如何使用Java API来操作HDFS,特别是创建目录的功能。我们将探讨Hadoop的环境配置、...

    hadoop源码分析-HDFS&MapReduce

    **HDFS源码分析** 1. **NameNode与DataNode**:HDFS的核心组件包括NameNode和DataNode。NameNode作为元数据管理节点,存储文件系统的命名空间信息和文件的块映射信息。DataNode则是数据存储节点,负责存储实际的...

    hadoop-2.5.0-cdh5.3.6-src.tar.gz

    《Hadoop 2.5.0-cdh5.3.6 源码解析与应用探索》 Hadoop,作为大数据处理领域的核心组件,一直以来都备受关注。本篇将深入探讨Hadoop 2.5.0-cdh5.3.6版本的源码,解析其设计理念、架构以及主要功能,旨在帮助读者...

    零基础学习Hadoop3.0从入门到源码

    │ ├─视频-零基础学习Hadoop3.0-HDFS从入门到源码 │ │ │ 00--课程内容大纲和学习目标.mp4 │ │ │ 01--大数据课程导论--大数据概念.mp4 │ │ │ 02--大数据课程导论--大数据特点(5V特征).mp4 │...

    hadoop的winutils.exe及hadoop.dll文件

    我的报错:Could not locate Hadoop executable: E:\big_data\hadoop-3.3.0\bin\winutils.ex hadoop的winutils.exe及hadoop.dll文件,可以用于hadoop3.3. 下载好直接将两个文件复制到我们hadoop的bin目录下就行了

    hadoop 2.5.2 源码

    总结来说,Hadoop 2.5.2源码提供了一个深入学习和研究分布式计算的机会。通过分析源码,开发者不仅能理解Hadoop的内部工作机制,还能根据实际需求进行定制和优化,提升大数据处理的效率和可靠性。无论是对Hadoop感...

Global site tag (gtag.js) - Google Analytics