`

flume源码分析-Sink

 
阅读更多

Sink 将从channel接收event,然后将event发往目标地址。 

 

/**
*
* A simple sink which reads events from a channel and writes them to HBase.
* This Sink uses an aysnchronous API internally and is likely to
* perform better.
* The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
* events from the channel, and writing them to Hbase, to minimize the number
* of flushes on the hbase tables. To use this sink, it has to be configured
* with certain mandatory parameters:<p>*/
public class AsyncHBaseSink extends AbstractSink implements Configurable {
 @Override
  public Status process() throws EventDeliveryException {
    /*
     * Reference to the boolean representing failure of the current transaction.
     * Since each txn gets a new boolean, failure of one txn will not affect
     * the next even if errbacks for the current txn get called while
     * the next one is being processed.
     *
     */
    if(!open){
      throw new EventDeliveryException("Sink was never opened. " +
          "Please fix the configuration.");
    }
    AtomicBoolean txnFail = new AtomicBoolean(false);
    AtomicInteger callbacksReceived = new AtomicInteger(0);
    AtomicInteger callbacksExpected = new AtomicInteger(0);
    final Lock lock = new ReentrantLock();
    final Condition condition = lock.newCondition();
    /*
     * Callbacks can be reused per transaction, since they share the same
     * locks and conditions.
     */
    Callback<Object, Object> putSuccessCallback =
            new SuccessCallback<Object, Object>(
            lock, callbacksReceived, condition);
    Callback<Object, Object> putFailureCallback =
            new FailureCallback<Object, Object>(
            lock, callbacksReceived, txnFail, condition);

    Callback<Long, Long> incrementSuccessCallback =
            new SuccessCallback<Long, Long>(
            lock, callbacksReceived, condition);
    Callback<Long, Long> incrementFailureCallback =
            new FailureCallback<Long, Long>(
            lock, callbacksReceived, txnFail, condition);

    Status status = Status.READY;
    Channel channel = getChannel();
    int i = 0;
    try {
      txn = channel.getTransaction();
      txn.begin();
      for (; i < batchSize; i++) {
        Event event = channel.take();
        if (event == null) {
          status = Status.BACKOFF;
          if (i == 0) {
            sinkCounter.incrementBatchEmptyCount();
          } else {
            sinkCounter.incrementBatchUnderflowCount();
          }
          break;
        } else {
          serializer.setEvent(event);
          List<PutRequest> actions = serializer.getActions();
          List<AtomicIncrementRequest> increments = serializer.getIncrements();
          callbacksExpected.addAndGet(actions.size() + increments.size());

          for (PutRequest action : actions) {
            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
          }
          for (AtomicIncrementRequest increment : increments) {
            client.atomicIncrement(increment).addCallbacks(
                    incrementSuccessCallback, incrementFailureCallback);
          }
        }
      }
    } catch (Throwable e) {
      this.handleTransactionFailure(txn);
      this.checkIfChannelExceptionAndThrow(e);
    }
    if (i == batchSize) {
      sinkCounter.incrementBatchCompleteCount();
    }
    sinkCounter.addToEventDrainAttemptCount(i);

    lock.lock();
    try {
      //等待处理完成或者超时
      while ((callbacksReceived.get() < callbacksExpected.get())
              && !txnFail.get()) {
        try {
          if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
            txnFail.set(true);
            logger.warn("HBase callbacks timed out. "
                    + "Transaction will be rolled back.");
          }
        } catch (Exception ex) {
          logger.error("Exception while waiting for callbacks from HBase.");
          this.handleTransactionFailure(txn);
          Throwables.propagate(ex);
        }
      }
    } finally {
      lock.unlock();
    }

    /*
     * At this point, either the txn has failed
     * or all callbacks received and txn is successful.
     *
     * This need not be in the monitor, since all callbacks for this txn
     * have been received. So txnFail will not be modified any more(even if
     * it is, it is set from true to true only - false happens only
     * in the next process call).
     *
     */
    if (txnFail.get()) {
      this.handleTransactionFailure(txn);
      throw new EventDeliveryException("Could not write events to Hbase. " +
          "Transaction failed, and rolled back.");
    } else {
      try{
        txn.commit();
        txn.close();
        sinkCounter.addToEventDrainSuccessCount(i);
      } catch (Throwable e) {
        this.handleTransactionFailure(txn);
        this.checkIfChannelExceptionAndThrow(e);
      }
    }

    return status;
  }
}

 

   

/**
 * <p>
 * A {@link Sink} implementation that can send events to an RPC server (such as
 * Flume's {@link AvroSource}).
 * </p>
 * <p>
 * This sink forms one half of Flume's tiered collection support. Events sent to
 * this sink are transported over the network to the hostname / port pair using
 * the RPC implementation encapsulated in {@link RpcClient}.
 * The destination is an instance of Flume's {@link AvroSource}, which
 * allows Flume agents to forward to other Flume agents, forming a tiered
 * collection infrastructure. Of course, nothing prevents one from using this
 * sink to speak to other custom built infrastructure that implements the same
 * RPC protocol.
 * </p>
 * <p>
 * Events are taken from the configured {@link Channel} in batches of the
 * configured <tt>batch-size</tt>. The batch size has no theoretical limits
 * although all events in the batch <b>must</b> fit in memory. Generally, larger
 * batches are far more efficient, but introduce a slight delay (measured in
 * millis) in delivery. The batch behavior is such that underruns (i.e. batches
 * smaller than the configured batch size) are possible. This is a compromise
 * made to maintain low latency of event delivery. If the channel returns a null
 * event, meaning it is empty, the batch is immediately sent, regardless of
 * size. Batch underruns are tracked in the metrics. Empty batches do not incur
 * an RPC roundtrip.*/
public class AvroSink extends AbstractSink implements Configurable {
 @Override
  public Status process() throws EventDeliveryException {
    Status status = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();

    try {
      transaction.begin();

      verifyConnection();

      List<Event> batch = Lists.newLinkedList();

      for (int i = 0; i < client.getBatchSize(); i++) {
        Event event = channel.take();

        if (event == null) {
          break;
        }

        batch.add(event);
      }

      int size = batch.size();
      int batchSize = client.getBatchSize();

      if (size == 0) {
        sinkCounter.incrementBatchEmptyCount();
        status = Status.BACKOFF;
      } else {
        if (size < batchSize) {
          sinkCounter.incrementBatchUnderflowCount();
        } else {
          sinkCounter.incrementBatchCompleteCount();
        }
        sinkCounter.addToEventDrainAttemptCount(size);
        client.appendBatch(batch);
      }

      transaction.commit();
      sinkCounter.addToEventDrainSuccessCount(size);

    } catch (Throwable t) {
      transaction.rollback();
      if (t instanceof Error) {
        throw (Error) t;
      } else if (t instanceof ChannelException) {
        logger.error("Avro Sink " + getName() + ": Unable to get event from" +
            " channel " + channel.getName() + ". Exception follows.", t);
        status = Status.BACKOFF;
      } else {
        destroyConnection();
        throw new EventDeliveryException("Failed to send events", t);
      }
    } finally {
      transaction.close();
    }

    return status;
  }
}

 

/**
 *
 * A simple sink which reads events from a channel and writes them to HBase.
 * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
 * encountered in the classpath. This sink supports batch reading of
 * events from the channel, and writing them to Hbase, to minimize the number
 * of flushes on the hbase tables. To use this sink, it has to be configured
 * with certain mandatory parameters:<p>
 * <tt>table: </tt> The name of the table in Hbase to write to. <p>
 * <tt>columnFamily: </tt> The column family in Hbase to write to.<p>
 * This sink will commit each transaction if the table's write buffer size is
 * reached or if the number of events in the current transaction reaches the
 * batch size, whichever comes first.<p>
 * Other optional parameters are:<p>
 * <tt>serializer:</tt> A class implementing {@link HBaseEventSerializer}.
 *  An instance of
 * this class will be used to write out events to hbase.<p>
 * <tt>serializer.*:</tt> Passed in the configure() method to serializer
 * as an object of {@link org.apache.flume.Context}.<p>
 * <tt>batchSize: </tt>This is the batch size used by the client. This is the
 * maximum number of events the sink will commit per transaction. The default
 * batch size is 100 events.
 * <p>*/
public class HBaseSink extends AbstractSink implements Configurable {
 @Override
  public Status process() throws EventDeliveryException {
    Status status = Status.READY;
    Channel channel = getChannel();
    Transaction txn = channel.getTransaction();
    List<Row> actions = new LinkedList<Row>();
    List<Increment> incs = new LinkedList<Increment>();
    txn.begin();
    for(long i = 0; i < batchSize; i++) {
      Event event = channel.take();
      if(event == null){
        status = Status.BACKOFF;
        counterGroup.incrementAndGet("channel.underflow");
        break;
      } else {
        serializer.initialize(event, columnFamily);
        actions.addAll(serializer.getActions());
        incs.addAll(serializer.getIncrements());
      }
    }
    putEventsAndCommit(actions, incs, txn);
    return status;
  }

  private void putEventsAndCommit(List<Row> actions, List<Increment> incs,
      Transaction txn) throws EventDeliveryException {
    try {
      table.batch(actions);
      for(Increment i: incs){
        table.increment(i);
      }
      txn.commit();
      counterGroup.incrementAndGet("transaction.success");
    } catch (Throwable e) {
      try{
        txn.rollback();
      } catch (Exception e2) {
        logger.error("Exception in rollback. Rollback might not have been" +
            "successful." , e2);
      }
      counterGroup.incrementAndGet("transaction.rollback");
      logger.error("Failed to commit transaction." +
          "Transaction rolled back.", e);
      if(e instanceof Error || e instanceof RuntimeException){
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        Throwables.propagate(e);
      } else {
        logger.error("Failed to commit transaction." +
            "Transaction rolled back.", e);
        throw new EventDeliveryException("Failed to commit transaction." +
            "Transaction rolled back.", e);
      }
    } finally {
      txn.close();
    }
  }
}

 

public class HDFSEventSink extends AbstractSink implements Configurable {
 /**
   * Pull events out of channel and send it to HDFS - take at the most
   * txnEventMax, that's the maximum #events to hold in channel for a given
   * transaction - find the corresponding bucket for the event, ensure the file
   * is open - extract the pay-load and append to HDFS file <br />
   * WARNING: NOT THREAD SAFE
   */
  @Override
  public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    List<BucketWriter> writers = Lists.newArrayList();
    transaction.begin();
    try {
      Event event = null;
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
        event = channel.take();
        if (event == null) {
          break;
        }

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(path, event.getHeaders(),
            needRounding, roundUnit, roundValue);
        BucketWriter bucketWriter = sfWriters.get(realPath);

        // we haven't seen this file yet, so open it and cache the handle
        if (bucketWriter == null) {
          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
          FlumeFormatter formatter = HDFSFormatterFactory
              .getFormatter(writeFormat);

          bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
              batchSize, context, realPath, codeC, compType, hdfsWriter,
              formatter, timedRollerPool, proxyTicket, sinkCounter);

          sfWriters.put(realPath, bucketWriter);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }

        // Write the data to HDFS
        append(bucketWriter, event);
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == txnEventMax) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        if (!bucketWriter.isBatchComplete()) {
          flush(bucketWriter);
        }
      }

      transaction.commit();

      if (txnEventCount > 0) {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
      }

      if(event == null) {
        return Status.BACKOFF;
      }
      return Status.READY;
    } catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();
    }
  }
}

    

//IRC是网络传输协议
public class IRCSink extends AbstractSink implements Configurable {
 @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();

        try {
            transaction.begin();
            createConnection();

            Event event = channel.take();

            if (event == null) {
                counterGroup.incrementAndGet("event.empty");
                status = Status.BACKOFF;
            } else {
                sendLine(event);
                counterGroup.incrementAndGet("event.irc");
            }

            transaction.commit();

        } catch (ChannelException e) {
            transaction.rollback();
            logger.error(
                    "Unable to get event from channel. Exception follows.", e);
            status = Status.BACKOFF;
        } catch (Exception e) {
            transaction.rollback();
            logger.error(
                    "Unable to communicate with IRC server. Exception follows.",
                    e);
            status = Status.BACKOFF;
            destroyConnection();
        } finally {
            transaction.close();
        }

        return status;
    }
}

   

    

public class LoggerSink extends AbstractSink {

  private static final Logger logger = LoggerFactory
      .getLogger(LoggerSink.class);

  @Override
  public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;

    try {
      transaction.begin();
      event = channel.take();

      if (event != null) {
        if (logger.isInfoEnabled()) {
          logger.info("Event: " + EventHelper.dumpEvent(event));
        }
      } else {
        // No event found, request back-off semantics from the sink runner
        result = Status.BACKOFF;
      }
      transaction.commit();
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to log event: " + event, ex);
    } finally {
      transaction.close();
    }

    return result;
  }
}

   RollingFileSink每个sink.rollInterval都会生成一个新的文件用来存储event 

public class RollingFileSink extends AbstractSink implements Configurable {
 @Override
  public Status process() throws EventDeliveryException {
    if (shouldRotate) {
      logger.debug("Time to rotate {}", pathController.getCurrentFile());

      if (outputStream != null) {
        logger.debug("Closing file {}", pathController.getCurrentFile());

        try {
          serializer.flush();
          serializer.beforeClose();
          outputStream.flush();
          outputStream.close();
          shouldRotate = false;
        } catch (IOException e) {
          throw new EventDeliveryException("Unable to rotate file "
              + pathController.getCurrentFile() + " while delivering event", e);
        }

        serializer = null;
        outputStream = null;
        pathController.rotate();
      }
    }

    if (outputStream == null) {
      File currentFile = pathController.getCurrentFile();
      logger.debug("Opening output stream for file {}", currentFile);
      try {
        outputStream = new BufferedOutputStream(
            new FileOutputStream(currentFile));
        serializer = EventSerializerFactory.getInstance(
            serializerType, serializerContext, outputStream);
        serializer.afterCreate();
      } catch (IOException e) {
        throw new EventDeliveryException("Failed to open file "
            + pathController.getCurrentFile() + " while delivering event", e);
      }
    }

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    Status result = Status.READY;

    try {
      transaction.begin();
      event = channel.take();

      if (event != null) {
        serializer.write(event);

        /*
         * FIXME: Feature: Rotate on size and time by checking bytes written and
         * setting shouldRotate = true if we're past a threshold.
         */

        /*
         * FIXME: Feature: Control flush interval based on time or number of
         * events. For now, we're super-conservative and flush on each write.
         */
        serializer.flush();
        outputStream.flush();
      } else {
        // No events found, request back-off semantics from runner
        result = Status.BACKOFF;
      }
      transaction.commit();
    } catch (Exception ex) {
      transaction.rollback();
      throw new EventDeliveryException("Failed to process event: " + event, ex);
    } finally {
      transaction.close();
    }

    return result;
  }
}

 

分享到:
评论

相关推荐

    flume-ng-elasticsearch-sink-6.5.4.jar.zip

    `flume-ng-elasticsearch-sink-6.5.4.jar`正是这样一个插件,允许Flume将事件数据推送到Elasticsearch 6.5.4版本。这个jar文件包含所有必要的代码,使得Flume能够理解并处理Elasticsearch的相关配置和协议。 2. **...

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。

    flume-ng-1.6.0-cdh5.14.0源码

    通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...

    apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar

    文件名称列表中的“apache-flume-1.9.0-bin.tar.gz”、“zookeeper-3.3.6_.tar.gz”和“kafka_2.11-0.10.1.0.tgz”分别是Flume、ZooKeeper和Kafka的不同版本的源码包。用户需要先解压缩这些文件,然后根据各自的安装...

    flume-ng-1.6.0-cdh5.5.2-src.tar.gz

    Flume NG的核心概念主要包括源(Source)、通道(Channel)和接收器(Sink)。源负责从各种数据源(如网络套接字、日志文件、Web服务器等)收集数据;通道则作为一个临时存储,确保数据在传输过程中的可靠性,它具有...

    apache-flume-1.8.0-src.tar.gz

    此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...

    Flume配置双HA hdfsSink.docx

    2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...

    flume1.8.0和elasticsearch5.2.6整合

    Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...

    flume1.7.0源码

    通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...

    flume-1.9.0.tgz

    "flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...

    Flume ng share

    ##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source 发送到 Channel 时,会触发事务处理过程。 ##### 源码...

    【实战Apache-Flume采集DB数据到kafka】

    agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.topic = myTopic agent.sinks.kafkaSink.brokerList = localhost:9092 ``` 5. **运行 Flume**: - 使用 Flume ...

    flume log4f示例源码

    Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。

    flume 1.8所有源代码 编译通过版 附 maven3.5.2 安装包

    为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...

    Apache flume1.6_src

    - 扩展性和可插拔架构:分析 Flume 如何支持自定义 Source、Sink 和 Channel 的开发。 源代码阅读可以提供对 Flume 工作原理的深刻理解,这对于优化 Flume 配置、排查问题或开发自定义组件都非常有价值。如果你想要...

    基于Java的实例源码-日志服务器 Apache Flume.zip

    通过分析和学习这个压缩包中的源码,开发者可以深入理解Flume的工作机制,了解如何利用Java实现类似的数据处理服务,同时也能为自定义Source、Sink和Channel提供参考。这将有助于在实际项目中更好地利用Flume来构建...

    基于 Apache Flume 定制的数据采集工具.zip

    Apache Flume 是一个高度可配置、可靠且分布式的数据采集系统,常用于收集、聚合和移动大量日志数据。...通过对源码的研究和实践,我们可以深入理解 Flume 的工作原理,以及如何根据具体需求定制和优化数据采集流程。

    电商数仓项目(八) Flume 系列源码

    在源码分析过程中,可以关注以下几个关键点: 1. 数据流处理:了解 Flume 如何处理事件(Event)的生命周期,从创建、传输到最终写入目标。 2. 配置文件解析:学习如何编写和解析 Flume 配置文件,理解各组件间的...

Global site tag (gtag.js) - Google Analytics