`
zhangjun5965
  • 浏览: 12316 次
社区版块
存档分类
最新评论

hadoop源码解析之hdfs写数据全流程分析---客户端处理

阅读更多

 

DFSOutputStream介绍

DFSOutputStream概况介绍

这一节我们介绍hdfs写数据过程中,客户端的处理部分。客户端的处理主要是用到了DFSOutputStream对象,从名字我们可以看出,这个是对dfs文件系统输出流的一个封装,接下来我们先来详细了解一下用到的几个重要的类和其中的变量。

DFSOutputStream的主要功能在类的注释中其实已经说的很清楚了,大家先看下,英文不好,翻译的可能不太好。

/****************************************************************
 * DFSOutputStream从字节流创建文件
 * DFSOutputStream creates files from a stream of bytes.
 *
 * 客户端写的数据DFSOutputStream临时缓存了起来。数据被分解了一个个的数据包(DFSPacket),
 * 每个DFSPacket一般是64K大小,一个DFSPacket又包含了若干个块(chunks),每个chunk一般是512k并且
 * 有一个对应的校验和。
 * The client application writes data that is cached internally by
 * this stream. Data is broken up into packets, each packet is
 * typically 64K in size. A packet comprises of chunks. Each chunk
 * is typically 512 bytes and has an associated checksum with it.
 *
 * 当一个客户端程序写的的数据填充慢了当前的数据包的时候(DFSPacket类型的变量currentPacket),
 * 就会被有顺序的放入dataQueue队列中。DataStreamer线程从dataQueue中获取数据包(packets),
 * 发送该数据包给数据管道(pipeline)中的第一个datanode, 然后把该数据包从dataQueue中移除,添加到ackQueue。
 * ResponseProcessor会从各个datanode中接收ack确认消息。
 * 当对于一个DFSPacket的成功的ack确认消息被所有的datanode接收到了,ResponseProcessor将其从ackQueue列表中移除  
 * When a client application fills up the currentPacket, it is
 * enqueued into dataQueue.  The DataStreamer thread picks up
 * packets from the dataQueue, sends it to the first datanode in
 * the pipeline and moves it from the dataQueue to the ackQueue.
 * The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the
 * ackQueue.
 *
 *
 * 在有错误发生的时候,所有的未完成的数据包从ackQueue队列移除,一个新的不包含损坏的datanode的管道将会被建立,
 * DataStreamer线程将重新开始从dataQueue获取数据包发送。
 * In case of error, all outstanding packets and moved from
 * ackQueue. A new pipeline is setup by eliminating the bad
 * datanode from the original pipeline. The DataStreamer now
 * starts sending packets from the dataQueue.
****************************************************************/
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer
    implements Syncable, CanSetDropBehind { }

DFSOutputStream重要的变量

最重要的两个队列,dataQueue和ackQueue,这两个队列都是典型的生产者、消费者模式,对于dataQueue来说,生产者是客户端,消费者是DataStreamer,对于ackQueue来说,生产者是DataStreamer,消费者是ResponseProcessor

/**
   * dataQueue和ackQueue是两个非常重要的变量,他们是存储了DFSPacket对象的链表。
   * dataQueue列表用于存储待发送的数据包,客户端写入的数据,先临时存到这个队列里。
   * ackQueue是回复队列,从datanode收到回复消息之后,存到这里队列里。
   * 
   */
  // both dataQueue and ackQueue are protected by dataQueue lock
  private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
  private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
  private DFSPacket currentPacket = null;//当前正在处理的数据包
  private DataStreamer streamer;
  private long currentSeqno = 0;
  private long lastQueuedSeqno = -1;
  private long lastAckedSeqno = -1;
  private long bytesCurBlock = 0; // bytes written in current block 当前的数据块有多少个字节
  private int packetSize = 0; // write packet size, not including the header.
  private int chunksPerPacket = 0;

数据处理线程类DataStreamer

DataStreamer是用于处理数据的核心类,我们看下注释中的解释

/**
   *  DataStreamer负责往管道中的datanodes发送数据包, 从namenode中获取块的位置信息和blockid,然后开始
   *  将数据包发送到datanode的管道。
   *  每个包都有一个序列号。
   *  当所有的数据包都发送完毕并且都接收到回复消息之后,DataStreamer关闭当前的block
   * The DataStreamer class is responsible for sending data packets to the
   * datanodes in the pipeline. It retrieves a new blockid and block locations
   * from the namenode, and starts streaming packets to the pipeline of
   * Datanodes. Every packet has a sequence number associated with
   * it. When all the packets for a block are sent out and acks for each
   * if them are received, the DataStreamer closes the current block.
   */
  class DataStreamer extends Daemon {
      
    private volatile boolean streamerClosed = false;
    private volatile ExtendedBlock block; // its length is number of bytes acked
    private Token<BlockTokenIdentifier> accessToken;
    private DataOutputStream blockStream;//发送数据的输出流
    private DataInputStream blockReplyStream;//输入流,即接收ack消息的流
    private ResponseProcessor response = null;
    private volatile DatanodeInfo[] nodes = null; // list of targets for current block 将要发送的datanode的集合
    private volatile StorageType[] storageTypes = null;
    private volatile String[] storageIDs = null;
      
    ......................  
      
  }

响应处理类ResponseProcessor

ResponseProcessor是DataStreamer的子类,用于处理接收到的ack数据

//处理从datanode返回的相应信息,当相应到达的时候,将DFSPacket从ackQueue移除
    // Processes responses from the datanodes.  A packet is removed
    // from the ackQueue when its response arrives.
    //
    private class ResponseProcessor extends Daemon {}

处理流程

客户端发数据到dataQueue

创建文件之后返回一个FSDataOutputStream对象,调用write方法写数据,最终调用了org.apache.hadoop.fs.FSOutputSummer.write(byte[], int, int);

write调用write1()方法循环写入len长度的数据,当写满一个数据块的时候,调用抽象方法writeChunk来写入数据,具体的实现则是org.apache.hadoop.hdfs.DFSOutputStream类中的同名方法,

具体的写入是在writeChunkImpl方法中,具体的代码如下:

private synchronized void writeChunkImpl(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }

    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++, false);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.getSeqno() +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }

    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.incNumChunks();
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    //当一个DFSPacket写满了,则调用waitAndQueueCurrentPacket将其加入
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.getSeqno() +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
        currentPacket.setSyncBlock(shouldSyncBlock);
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

当packet满了的时候,调用waitAndQueueCurrentPacket方法,将数据包放入dataQueue队列中,waitAndQueueCurrentPacket方法开始的时候会进行packet的大小的判断,当dataQueue和ackQueue的值大于writeMaxPackets(默认80)时候,就等地,直到有足够的空间.

private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      try {
      // If queue is full, then wait till we have enough space
        boolean firstWait = true;
        try {
         //当大小不够的时候就wait
          while (!isClosed() && dataQueue.size() + ackQueue.size() >
              dfsClient.getConf().writeMaxPackets) {
                    ..................
            try {
              dataQueue.wait();
            } catch (InterruptedException e) {
                ..............
            }
          }
        } finally {
         ...............
        }
        checkClosed();
        //入队列
        queueCurrentPacket();
      } catch (ClosedChannelException e) {
      }
    }
  }

最后调用了queueCurrentPacket方法,将packet真正的放入了队列中

private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      dataQueue.addLast(currentPacket);//将数据包放到了队列的尾部
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;//当前packet置空,用于下一个数据包的写入
      dataQueue.notifyAll();//唤醒所有在dataQueue上的线程去处理
    }
  }

最终通过方法queueCurrentPacket将DFSPacket写入dataQueue,即dataQueue.addLast(currentPacket);

并通过dataQueue.notifyAll();唤醒dataQueue上面等待的所有线程来处理数据

private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;
      dataQueue.notifyAll();
    }
  }

DataStreamer处理dataQueue中的数据

DataStreamer处理发送数据的核心逻辑在run方法中。

处理错误

在开始的时候,首先判断是否有错误

具体的处理方法是private的processDatanodeError方法,如果发现了错误,就讲ack队列里的packet全部放回dataQueue中,然后创建一个新的流重新发送数据。

创建输出数据流,发送数据

通过nextBlockOutputStream()方法建立到datanode的输出流。

向namenode申请数据块

locateFollowingBlock方法申请数据块,具体的代码是
dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes);

dfsClient拿到namenode的代理,然后通过addBlock方法来申请新的数据块,addBlock方法申请数据块的时候还会提交上一个块,也就是参数中的block,即上一个数据块。
excludedNodes参数表示了申请数据块的时候需要排除的datanode列表,
favoredNodes参数表示了优先选择的datanode列表。

连接到第一个datanode

成功申请了数据块之后,会返回一个LocatedBlock对象,里面包含了datanode的相关信息。

然后通过createBlockOutputStream方法连接到第一个datanode,具体就是new了一个DataOutputStream对象来连接到datanode。 然后构造了一个Sender对象,来向DataNode发送操作码是80的写block的输出流, 发送到datanode的数据,datanode通过DataXceiver接收处理

new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
      dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
      nodes.length, block.getNumBytes(), bytesSent, newGS,
      checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
    (targetPinnings == null ? false :targetPinnings[0]), targetPinnings);

申请block,然后建立到datanode的连接,是在一个do while循环中做的,如果失败了会尝试重新连接,默认三次。

建立管道

nextBlockOutputStream方法成功的返回了datanode的信息之后,setPipeline方法建立到datanode的管道信息,这个方法比较简单,就是用申请到的datanode给相应的变量赋值。

private void setPipeline(LocatedBlock lb) {
      setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
    }
    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
        String[] storageIDs) {
      this.nodes = nodes;
      this.storageTypes = storageTypes;
      this.storageIDs = storageIDs;
    }

初始化数据流

initDataStreaming方法主要就是根据datanode列表建立ResponseProcessor对象,并且调动start方法启动,并将状态设置为DATA_STREAMING

/**
     * Initialize for data streaming
     */
    private void initDataStreaming() {
      this.setName("DataStreamer for file " + src +
          " block " + block);
      response = new ResponseProcessor(nodes);
      response.start();
      stage = BlockConstructionStage.DATA_STREAMING;
    }

发送数据包

一切准备就绪之后,从dataQueue头部拿出一个packet,放入ackQueue的尾部,并且唤醒在dataQueue上等待的所有线程,通过 one.writeTo(blockStream);发送数据包。

// send the packet
          Span span = null;
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              span = scope.detach();
              one.setTraceSpan(span);
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          TraceScope writeScope = Trace.startSpan("writeTo", span);
          try {
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN. If a failed or restarting node has already
            // been recorded by the responder, the following call will have no 
            // effect. Pipeline recovery can handle only one node error at a
            // time. If the primary node fails again during the recovery, it
            // will be taken out then.
            tryMarkPrimaryDatanodeFailed();
            throw e;
          } finally {
            writeScope.close();
          }

关闭数据流

当dataQueue中的所有数据块都发送完毕,并且确保都收到ack消息之后,客户端的写入操作就结束了,调用endBlock方法来关闭相应的流,

// Is this block full?
          if (one.isLastPacketInBlock()) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }

            endBlock();
          }

关闭响应,关闭数据流,将管道置空,状态变成PIPELINE_SETUP_CREATE

private void endBlock() {
      if(DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Closing old block " + block);
      }
      this.setName("DataStreamer for file " + src);
      closeResponder();
      closeStream();
      setPipeline(null, null, null);
      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

ResponseProcessor处理回复消息

这块逻辑相对比较简单

@Override
      public void run() {

        setName("ResponseProcessor for block " + block);
        PipelineAck ack = new PipelineAck();

        TraceScope scope = NullScope.INSTANCE;
        while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
          // process responses from datanodes.
          try {
            //从ack队列里读取packet
            // read an ack from the pipeline
            long begin = Time.monotonicNow();
            ack.readFields(blockReplyStream);
             ..............

                
            //一切都处理成功之后,将其从ack队列中删除
            synchronized (dataQueue) {
              scope = Trace.continueSpan(one.getTraceSpan());
              one.setTraceSpan(null);
              lastAckedSeqno = seqno;
              pipelineRecoveryCount = 0;
              ackQueue.removeFirst();
              dataQueue.notifyAll();

              one.releaseBuffer(byteArrayManager);
            }
          } catch (Exception e) {
          //如果遇到了异常,并没有立即处理,而是放到了一个AtomicReference类型的对象中,
            if (!responderClosed) {
              if (e instanceof IOException) {
                setLastException((IOException)e);
              }
                ............
            }
          } finally {
            scope.close();
          }
        }
      }
分享到:
评论

相关推荐

    Hadoop_2.X_HDFS源码剖析_带索引书签目录_徐鹏

    《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop Distributed File System)源码的专业书籍。这本书旨在帮助读者理解HDFS的核心机制,提升在分布式存储系统方面的专业技能。 ...

    Hadoop源码分析 完整版 共55章

    - **HDFS组成部分**:HDFS由NameNode、DataNode和Client组成,分别负责元数据管理、数据块存储以及客户端请求处理。 #### 五、深入分析MapReduce - **MapReduce章节**:MapReduce部分共14章,这部分将详细介绍...

    hadoop 源码解析-DataNode

    Hadoop 源码解析 - DataNode Hadoop 作为一个大数据处理框架,其核心组件之一是分布式文件系统(HDFS),而 DataNode 是 HDFS 中的重要组件之一。DataNode 负责存储和管理数据块,提供数据访问服务。本文将对 ...

    Hadoop学习总结和源码分析

    “Hadoop学习总结之三:Map-Reduce入门.doc”介绍了MapReduce编程模型,它是Hadoop处理数据的主要计算框架。Map阶段将输入数据分割成键值对,通过映射函数进行初步处理;Reduce阶段则将Map的输出聚合,通过化简函数...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    HDFS源码解析

    通过阅读和理解Hadoop源码分析-HDFS部分.pdf文档,我们可以更深入地理解这些组件的工作原理,掌握HDFS在处理大数据时的内部机制。这对于我们优化HDFS的性能,解决实际问题,以及开发相关的分布式应用都具有重要的...

    hadoop NameNode 源码解析

    Hadoop 的 NameNode 是 Hadoop 分布式文件系统(HDFS)的核心组件之一,负责管理文件系统的 namespace 和数据块的存储位置。在本文中,我们将深入探讨 Hadoop NameNode 的源码,了解其启动过程、配置加载、RPC ...

    hadoop-src源代码

    三、HDFS源码解析 1. `hadoop-hdfs`:此模块是HDFS的实现,包括NameNode、DataNode和Client等组件。NameNode负责元数据管理,DataNode存储实际数据,Client则为应用程序提供接口。源码中可以深入了解HDFS的Block、 ...

    Hadoop源码的入门解析

    Hadoop是一个开源的分布式计算框架,源于Apache Lucene项目,主要负责大规模数据的分布式存储和处理。它由几个核心组件构成,包括...对于分布式计算和云计算领域的专业人士来说,掌握Hadoop源码解析是一项重要的技能。

    hadoop源码.zip

    本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...

    java操作hadoop之mapreduce分析年气象数据最低温度实战源码

    通过这个实战项目,学习者不仅可以掌握Java操作Hadoop MapReduce的基本方法,还能深入了解大数据处理流程,以及如何从海量气象数据中提取有价值的信息。此外,对于提升数据处理能力和分布式计算的理解也大有裨益。...

    hadoop权威指南4和源码

    6. **Hadoop源码分析**:通过阅读Hadoop源码,可以深入了解其内部机制,例如NameNode如何管理文件系统元数据、DataNode如何进行数据块的读写、MapReduce的作业调度算法等。这对于优化Hadoop集群性能、开发自定义插件...

    hadoop-2.6.5-src

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心设计是解决大数据处理的问题。Hadoop 2.6.5是Hadoop发展...对于想要从事大数据处理和分析的工程师来说,深入理解Hadoop源码是提升技术水平的必经之路。

    Hadoop源码分析PDF(高清版)

    《Hadoop源码分析》是一本深度探讨Hadoop核心组件HDFS和MapReduce的书籍,提供了高清版的PDF格式供读者学习。这本书共分为55章,其中41章专门致力于HDFS(Hadoop分布式文件系统)的解析,剩余14章则详细剖析了...

    hadoop-2.5.0-cdh5.3.6-src.tar.gz

    《Hadoop 2.5.0-cdh5.3.6 源码解析与应用探索》 Hadoop,作为大数据处理领域的核心组件,一直以来都备受关注。本篇将深入探讨Hadoop 2.5.0-cdh5.3.6版本的源码,解析其设计理念、架构以及主要功能,旨在帮助读者...

    Hadoop云计算和云存储源码实现解析

    《Hadoop云计算和云存储源码实现解析》是针对大数据技术初学者及进阶者的一份宝贵资料,它深入探讨了Hadoop在云计算和云存储领域的应用与源码解析。Hadoop作为开源的大数据处理框架,是理解大数据处理机制的关键。本...

    hadoop-3.4.0-src.tar.gz

    《Hadoop 3.4.0源码解析与深度探讨》 Hadoop,作为大数据处理领域中的核心组件,是Apache基金会开源的一个分布式计算框架。它以其高效、可扩展的特性,成为了海量数据处理的首选工具。本文将围绕Hadoop 3.4.0的源码...

    hadoop 1.2.1核心源码

    8. **Hadoop源码分析**:通过对这些源码的学习,我们可以理解Hadoop内部的工作流程,如数据分块、副本策略、任务调度、错误恢复等。这对于开发和优化Hadoop应用程序、调试集群问题以及理解分布式系统原理非常有价值...

    hadoop源码分析

    下面,我们将深入探讨Hadoop的核心组件,包括HDFS(Hadoop Distributed File System)和MapReduce,并结合源码解析其运作机制。 1. HDFS:分布式文件系统 - 数据块(Block):HDFS将大文件切分为多个数据块进行...

    Hadoop_HDFS安装和管理.pdf

    - **Namenode**:作为HDFS的核心组件之一,Namenode负责管理文件系统的命名空间以及客户端的文件访问元数据。文档提到使用一台主机作为主Namenode (`ost2`),并通过热备方案配置了另一台从Namenode (`ost3`)。 - *...

Global site tag (gtag.js) - Google Analytics