一.DataXceiverServer类图
二.DataXceiverServer
- Server used for receiving/sending a block of data.This is created to listen for requests from clients or other DataNodes. This small server does not use the Hadoop IPC mechanism.DataXceiverServer用于接收和发送block数据,它监听着client或者其它DataNode的请求。DataXceiverServer没有采用RPC机制。DataXceiverServer是流式机制,而RPC是命令式接口。
- DataXceiverServer每接收到一个请求,就会创建一个DataXceiver来处理该请求。
class DataXceiverServer implements Runnable, FSConstants { public static final Log LOG = DataNode.LOG; ServerSocket ss; DataNode datanode; // Record all sockets opend for data transfer Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>()); static final int MAX_XCEIVER_COUNT = 256; //默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足 BlockBalanceThrottler balanceThrottler; DataXceiverServer(ServerSocket ss, Configuration conf, DataNode datanode) { this.ss = ss; this.datanode = datanode; } public void run() { while (datanode.shouldRun) { Socket s = ss.accept(); //接受client请求 s.setTcpNoDelay(true); //实例化DataXceiver,需要Socket和DataNode new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start(); } } }
三.DataXceiver类图
- DataXceiver处理Client或DataNode的五种请求(DataTransferProtocol接口定义)。
public static final byte OP_WRITE_BLOCK = (byte) 80; public static final byte OP_READ_BLOCK = (byte) 81; /** * @deprecated As of version 15, OP_READ_METADATA is no longer supported */ @Deprecated public static final byte OP_READ_METADATA = (byte) 82; public static final byte OP_REPLACE_BLOCK = (byte) 83; public static final byte OP_COPY_BLOCK = (byte) 84; public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
class DataXceiver implements Runnable, FSConstants { Socket s; final String remoteAddress; // address of remote side final String localAddress; // local address of this daemon DataNode datanode; DataXceiverServer dataXceiverServer; public DataXceiver(Socket s, DataNode datanode, DataXceiverServer dataXceiverServer) { this.s = s; this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; dataXceiverServer.childSockets.put(s, s); remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); } public void run() { DataInputStream in = null; in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); short version = in.readShort(); if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) { throw new IOException( "Version Mismatch" ); } //version boolean local = s.getInetAddress().equals(s.getLocalAddress()); //本地请求 byte op = in.readByte(); //op switch (op) { //读数据块 case DataTransferProtocol.OP_READ_BLOCK: readBlock(in); datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime); if (local) datanode.myMetrics.incrReadsFromLocalClient(); else datanode.myMetrics.incrReadsFromRemoteClient(); break; //写数据块 case DataTransferProtocol.OP_WRITE_BLOCK: writeBlock(in); datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime); if (local) datanode.myMetrics.incrWritesFromLocalClient(); else datanode.myMetrics.incrWritesFromRemoteClient(); break; //替换数据块 case DataTransferProtocol.OP_REPLACE_BLOCK: replaceBlock(in); datanode.myMetrics .addReplaceBlockOp(DataNode.now() - startTime); break; //拷贝数据块 case DataTransferProtocol.OP_COPY_BLOCK: // for balancing purpose; send to a proxy source copyBlock(in); datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime); break; //读取数据块校验码 case DataTransferProtocol.OP_BLOCK_CHECKSUM: // get the checksum of // a block getBlockChecksum(in); datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime); break; default: throw new IOException("Unknown opcode " + op + " in data stream"); } dataXceiverServer.childSockets.remove(s); } }
相关推荐
hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar hadoop-yarn-client-3.1.1.jar hadoop-client-api-3.1.1.jar hadoop-hdfs-client-3.1.1.jar hadoop-mapreduce-client-jobclient...
1. **数据读写**:Flink通过Hadoop的InputFormat和OutputFormat接口,可以读取和写入Hadoop支持的各种数据源,如HDFS、HBase等。这使得Flink可以方便地访问Hadoop生态系统中的存储系统,进行大规模的数据处理。 2. ...
Hadoop 2.X HDFS源码剖析-高清-完整目录-2016年3月,分享给所有需要的人!
标题中的“hadoop-common-2.6.0-bin-master”指的是Hadoop Common的2.6.0版本的源码编译后的二进制主目录,这个目录包含了运行Hadoop所需的各种基础工具和库。 在Windows 10环境下,由于操作系统本身的特性和Linux...
而Hadoop-LZO则是针对Hadoop优化的一种数据压缩库,旨在提高HDFS(Hadoop Distributed File System)上的数据压缩效率和读写性能。这个压缩库是由Groupon开发并维护的,它实现了LZO(Lempel-Ziv-Oberhumer)压缩算法...
在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce。这两个部分是Hadoop的核心基石,为大数据处理提供了基础架构。这里我们将深入探讨...
在压缩包内的“hadoop-2.7.4”文件夹中,通常包含了Hadoop的所有源码、编译后的可执行文件、配置文件以及文档等。用户可以通过解压来获取完整的Hadoop环境,然后按照官方文档或相关教程进行启动和使用。 总的来说,...
hadoop源码2.2.0 Apache Hadoop 2.2.0 is the GA release of Apache Hadoop 2.x. Users are encouraged to immediately move to 2.2.0 since this release is significantly more stable and is guaranteed to ...
编译hadoophadoop-3.2.2-src的源码
《深入剖析Hadoop 2.8.1源码:分布式系统的智慧结晶》 Hadoop,作为开源的大数据处理框架,自2006年诞生以来,一直是大数据领域的重要支柱。其2.8.1版本是Hadoop发展的一个关键节点,为用户提供了更稳定、高效的...
在大数据处理领域,Hadoop是不可或缺的关键组件,其分布式文件系统(HDFS)为海量数据提供了可靠的存储解决方案。本文将详细讲解如何使用Java API来操作HDFS,特别是创建目录的功能。我们将探讨Hadoop的环境配置、...
**HDFS源码分析** 1. **NameNode与DataNode**:HDFS的核心组件包括NameNode和DataNode。NameNode作为元数据管理节点,存储文件系统的命名空间信息和文件的块映射信息。DataNode则是数据存储节点,负责存储实际的...
《Hadoop 2.5.0-cdh5.3.6 源码解析与应用探索》 Hadoop,作为大数据处理领域的核心组件,一直以来都备受关注。本篇将深入探讨Hadoop 2.5.0-cdh5.3.6版本的源码,解析其设计理念、架构以及主要功能,旨在帮助读者...
│ ├─视频-零基础学习Hadoop3.0-HDFS从入门到源码 │ │ │ 00--课程内容大纲和学习目标.mp4 │ │ │ 01--大数据课程导论--大数据概念.mp4 │ │ │ 02--大数据课程导论--大数据特点(5V特征).mp4 │...
我的报错:Could not locate Hadoop executable: E:\big_data\hadoop-3.3.0\bin\winutils.ex hadoop的winutils.exe及hadoop.dll文件,可以用于hadoop3.3. 下载好直接将两个文件复制到我们hadoop的bin目录下就行了
总结来说,Hadoop 2.5.2源码提供了一个深入学习和研究分布式计算的机会。通过分析源码,开发者不仅能理解Hadoop的内部工作机制,还能根据实际需求进行定制和优化,提升大数据处理的效率和可靠性。无论是对Hadoop感...