`

flume源码分析-channel

 
阅读更多

Channel 相当于一个管道,source写数据到channel中,sink从channel取数据。Channel有三类,memory,file,jdbc。memory速度最快,但是当机器宕机的时候数据会丢失,file数据不会丢失,jdbc速度最慢,一般选择fileChannel。

 

 Source 中会调用ChannelProcessor的processEvent方法处理Log事件。对于每个channel都会获得一个Transaction,  然后调用tx.begin方法,put event 到channel,然后调用commit.如果有异常,调用transactoin的roll back方法,没有则调用tx.close()关闭transaction

 public void processEvent(Event event) {

    event = interceptorChain.intercept(event);
    if (event == null) {
      return;
    }

    // Process required channels
    List<Channel> requiredChannels = selector.getRequiredChannels(event);
    for (Channel reqChannel : requiredChannels) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        reqChannel.put(event);

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put event on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    List<Channel> optionalChannels = selector.getOptionalChannels(event);
    for (Channel optChannel : optionalChannels) {
      Transaction tx = null;
      try {
        tx = optChannel.getTransaction();
        tx.begin();

        optChannel.put(event);

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put event on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

   我们先看MemoryChannel对应的方法,对于source,memoryChannel会把Event先放到PutList,当调用commit时,会将Event从PutList移到Queue中,如果rollback,则从PutList直接删除。对于sink,当调用take方法时,会先将Event从queue中移到takeList中,如果commit,则将takeList清空。如果rollback,则将takeList中的Event移回到queue中

   

 //创建了一个MemoryTransaction 
 @Override
  protected BasicTransactionSemantics createTransaction() {
    return new MemoryTransaction(transCapacity, channelCounter);
  }

 @Override
    protected void doPut(Event event) throws InterruptedException {
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);

      if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) {
      putList.offer(event)
      putByteCounter += eventByteSize;
    }

    @Override
    protected Event doTake() throws InterruptedException {      
      Event event;
      synchronized(queueLock) {
        event = queue.poll();
      } 
      takeList.put(event); 
      return event;
    }

    @Override
    protected void doCommit() throws InterruptedException {
      int puts = putList.size();
      int takes = takeList.size();
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
               queue.offer(putList.removeFirst())
          }
        }
        putList.clear();
        takeList.clear();
      }     
    }

    @Override
    protected void doRollback() {
      int takes = takeList.size();
         while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
      putList.clear(); 
    }

  }

   FileChannel用来将数据存储到临时文件中。在createTransaction方法中创建了一个FileBackedTransaction对象,对于source,会调用doPut方法,doPut方法中会调用log.put方法,log.put方法主要是用来将event和transactionID包装成一个byteBuffer,写入到file中,返回fileID和offset. doTake方法会调用log.take方法,log.take方法主要用来将transactionID,fileID,offSet包装成一个byteBuffer写到log file中。rollback方法将transactionID包装成一个bytebuffer写到log file中。commit方法将transactionID和TransactionEventRecord.Type包装成 一个byteBuffer,写入到file中。log file位于dataDirs下,以“log-”为前缀.综上所述File channel类似于Memorychannel,但file channel把数据存储到文件中,并将每次的操作都记录下来。http://www.tuicool.com/articles/B3UbYn

@Override
  protected BasicTransactionSemantics createTransaction() {
    trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
        transactionCapacity, keepAlive, queueRemaining, getName(),
        channelCounter);
    transactions.set(trans);
    return trans;
  }

  @Override
    protected void doPut(Event event) throws InterruptedException {
      boolean success = false;
        FlumeEventPointer ptr = log.put(transactionID, event);
        Preconditions.checkState(putList.offer(ptr), "putList offer failed "
          + channelNameDescriptor);
    }

    @Override
    protected Event doTake() throws InterruptedException {
      /*
       * 1. Take an event which is in the queue.
       * 2. If getting that event does not throw NoopRecordException,
       *    then return it.
       * 3. Else try to retrieve the next event from the queue
       * 4. Repeat 2 and 3 until queue is empty or an event is returned.
       */

      try {
        while (true) {
          FlumeEventPointer ptr = queue.removeHead(transactionID);
          if (ptr == null) {
            return null;
          } else {
            try {
              // first add to takeList so that if write to disk
              // fails rollback actually does it's work
              Preconditions.checkState(takeList.offer(ptr),
                "takeList offer failed "
                  + channelNameDescriptor);
              log.take(transactionID, ptr); // write take to disk
              Event event = log.get(ptr);
              return event;
            } catch (IOException e) {
              throw new ChannelException("Take failed due to IO error "
                + channelNameDescriptor, e);
            } catch (NoopRecordException e) { 
              takeList.remove(ptr);
            }
          } 
    }
    @Override
    protected void doCommit() throws InterruptedException {
      int puts = putList.size();
      int takes = takeList.size();
     if(puts > 0) {
          log.commitPut(transactionID);
          synchronized (queue) {
            while(!putList.isEmpty()) {
               queue.addTail(putList.removeFirst())              
            } 
      } else if (takes > 0) {
         log.commitTake(transactionID);
          queue.completeTransaction(transactionID);
      }
      putList.clear();
      takeList.clear();
    }
    @Override
    protected void doRollback() throws InterruptedException {
      int puts = putList.size();
      int takes = takeList.size();
        if(takes > 0) { 
            while (!takeList.isEmpty()) {
              Preconditions.checkState(queue.addHead(takeList.removeLast()),
                  "Queue add failed, this shouldn't be able to happen "
                      + channelNameDescriptor);
             }
        }
        putList.clear();
        takeList.clear();
        queue.completeTransaction(transactionID);
        log.rollback(transactionID);
  }

 每隔一段时间,File Channel都会do checkpoint。


 

FlumeEventQueue{ 
synchronized boolean checkpoint(boolean force) {
    if (!elements.syncRequired() && !force) {
      LOG.debug("Checkpoint not required");
      return false;
    }

    // Start checkpoint
    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);

    //插入header信息,包括queue的header位置,queue的size
    updateHeaders();

    //将 文件的Id和文件的个数写入buffre中 
    List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
    for (Integer fileId : fileIDCounts.keySet()) {
      Integer count = fileIDCounts.get(fileId).get();
      long value = encodeActiveLogCounter(fileId, count);
      fileIdAndCountEncoded.add(value);
    }

    int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
    for (int i = 0; i < emptySlots; i++)  {
      fileIdAndCountEncoded.add(0L);
    }
    for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
      elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
    }
    //将elements中的对象sync到elementsBuffer中,elements中存储了fileId以及transcation的offset
    elements.sync();

    // Finish checkpoint
    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
    //将数据同步到device
    mappedBuffer.force();

    return true;
  }
}
    
class LogFile {
   //将当前位置写入到文件中 
   synchronized void markCheckpoint(long timestamp) throws IOException {
      long currentPosition = writeFileChannel.position();
      writeFileHandle.seek(checkpointPositionMarker);
      writeFileHandle.writeLong(currentPosition);
      writeFileHandle.writeLong(timestamp);
      writeFileChannel.position(currentPosition);
      LOG.info("Noted checkpoint for file: " + file + ", id: " + fileID
          + ", checkpoint position: " + currentPosition);
    }
}
    在flume crash后,flume会replay恢复到crash之前的状态
 void replay() throws IOException {
    Preconditions.checkState(!open, "Cannot replay after Log has been opened");

    checkpointWriterLock.lock();

    try {
      /*
       * First we are going to look through the data directories
       * and find all log files. We will store the highest file id
       * (at the end of the filename) we find and use that when we
       * create additional log files.
       *
       * Also store up the list of files so we can replay them later.
       */
      LOGGER.info("Replay started");
      nextFileID.set(0);
      List<File> dataFiles = Lists.newArrayList();
      for (File logDir : logDirs) {
        for (File file : LogUtils.getLogs(logDir)) {
          int id = LogUtils.getIDForFile(file);
          dataFiles.add(file);
          nextFileID.set(Math.max(nextFileID.get(), id));
          idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX
              + id)));
        }
      }
      LOGGER.info("Found NextFileID " + nextFileID +
          ", from " + Arrays.toString(logDirs));

      /*
       * sort the data files by file id so we can replay them by file id
       * which should approximately give us sequential events
       */
      LogUtils.sort(dataFiles);

      /*
       * Read the checkpoint (in memory queue) from one of two alternating
       * locations. We will read the last one written to disk.
       */
      queue = new FlumeEventQueue(queueCapacity,
                        new File(checkpointDir, "checkpoint"), channelName);

      long ts = queue.getTimestamp();
      LOGGER.info("Last Checkpoint " + new Date(ts) +
          ", queue depth = " + queue.getSize());

      /*
       * We now have everything we need to actually replay the log files
       * the queue, the timestamp the queue was written to disk, and
       * the list of data files.
       */
      ReplayHandler replayHandler = new ReplayHandler(queue);
      replayHandler.replayLog(dataFiles);

      for (int index = 0; index < logDirs.length; index++) {
        LOGGER.info("Rolling " + logDirs[index]);
        roll(index);
      }

      /*
       * Now that we have replayed, write the current queue to disk
       */
      writeCheckpoint(true);

      open = true;
    } catch (Exception ex) {
      LOGGER.error("Failed to initialize Log on " + channelNameDescriptor, ex);
      if (ex instanceof IOException) {
        throw (IOException) ex;
      }
      Throwables.propagate(ex);
    } finally {
      checkpointWriterLock.unlock();
    }
  }

 

  • 大小: 10.3 KB
分享到:
评论

相关推荐

    flume-ng-sql-source-1.5.2

    - **可靠性**:通过使用可配置的持久化Channels(如File Channel),Flume可以在网络故障或节点故障后恢复未完成的传输,确保数据完整性。 - **灵活性**:Flume支持多种数据源和目标,可以通过配置轻松地扩展和...

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。

    flume-ng-1.6.0-cdh5.14.0源码

    通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...

    flume-ng-elasticsearch-sink-6.5.4.jar.zip

    在Flume配置文件(如`conf/flume.conf`)中,我们需要定义一个Agent,并指定其Source、Channel和Sink。对于Elasticsearch Sink,配置示例如下: ``` agent.sources = source1 agent.channels = channel1 agent....

    apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar

    文件名称列表中的“apache-flume-1.9.0-bin.tar.gz”、“zookeeper-3.3.6_.tar.gz”和“kafka_2.11-0.10.1.0.tgz”分别是Flume、ZooKeeper和Kafka的不同版本的源码包。用户需要先解压缩这些文件,然后根据各自的安装...

    flume-ng-1.6.0-cdh5.5.2-src.tar.gz

    Flume NG的核心概念主要包括源(Source)、通道(Channel)和接收器(Sink)。源负责从各种数据源(如网络套接字、日志文件、Web服务器等)收集数据;通道则作为一个临时存储,确保数据在传输过程中的可靠性,它具有...

    apache-flume-1.8.0-src.tar.gz

    此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...

    大数据技术Flume1.9

    20-Flume副本机制channel选择器-需求分析.avi 21-Flume副本机制channel选择器-配置信息.avi 22-Flume副本机制channel选择器-案例测试.avi 25-Flume负载均衡案例-案例实操.avi 27-Flume聚合案例-案例实操.avi 29-...

    Flume ng share

    ##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source 发送到 Channel 时,会触发事务处理过程。 ##### 源码...

    flume1.7.0源码

    通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...

    flume-1.9.0.tgz

    "flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...

    flume1.8.0和elasticsearch5.2.6整合

    Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...

    【实战Apache-Flume采集DB数据到kafka】

    Flume 包括源(Source)、通道(Channel)和接收器(Sink)。源负责从数据源获取数据,通道存储这些数据,而接收器则将数据发送到目的地。在这个场景中,Flume 源将连接到数据库,通道会临时存储从 DB 获取的数据,...

    Flume配置双HA hdfsSink.docx

    2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...

    Apache flume1.6_src

    - 扩展性和可插拔架构:分析 Flume 如何支持自定义 Source、Sink 和 Channel 的开发。 源代码阅读可以提供对 Flume 工作原理的深刻理解,这对于优化 Flume 配置、排查问题或开发自定义组件都非常有价值。如果你想要...

    flume log4f示例源码

    Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。

    flume 1.8所有源代码 编译通过版 附 maven3.5.2 安装包

    为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...

    基于Java的实例源码-日志服务器 Apache Flume.zip

    通过分析和学习这个压缩包中的源码,开发者可以深入理解Flume的工作机制,了解如何利用Java实现类似的数据处理服务,同时也能为自定义Source、Sink和Channel提供参考。这将有助于在实际项目中更好地利用Flume来构建...

    基于 Apache Flume 定制的数据采集工具.zip

    Apache Flume 是一个高度可配置、可靠且分布式的数据采集系统,常用于收集、聚合和移动大量日志数据。...通过对源码的研究和实践,我们可以深入理解 Flume 的工作原理,以及如何根据具体需求定制和优化数据采集流程。

Global site tag (gtag.js) - Google Analytics