`
qianshangding
  • 浏览: 127953 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Flume - MemoryChannel源码解析

 
阅读更多

MemoryChannel的简易类结构:


内部类MemoryTransaction的简易类结构:

一,configure(Context context)

1,capacity:MemroyChannel的容量,默认是100。

2,transCapacity:每个事务最大的容量,也就是每个事务能够获取的最大Event数量。默认也是100。

3,byteCapacityBufferPercentage:定义Channle中Event所占的百分比,需要考虑在Header中的数据。

4,byteCapacity:byteCapacity等于设置的byteCapacity值或堆的80%乘以1减去byteCapacityBufferPercentage的百分比,然后除以100。源码如下:

    try {
      byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
      if (byteCapacity < 1) {
        byteCapacity = Integer.MAX_VALUE;
      }
    } catch(NumberFormatException e) {
      byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
    }

5,keep-alive:增加和删除一个Event的超时时间(单位:秒)

6,初始化LinkedBlockingDeque对象,大小为capacity。以及各种信号量对象。

7,最后初始化计数器。


二,信号量

MemoryChannel有三个信号量用来控制事务,防止容量越界。这三个信号量分别是:

queueRemaining : 表示空闲的容量大小。

queueStored : 表示已经存储的容量大小。

bytesRemaining : 表示可以使用的内存大小。该大小就是计算后的byteCapacity值。


三,MemoryTransaction

MemoryTransaction用来接收数据和事务控制。该类继承BasicTransactionSemantics类,BasicTransactionSemantics类有三个主要的方法:

1,put(Event event) 往Channel放入数据。
2, take() 从Channel获取数据。
3, begin() 开始事务
3,commit() 提交事务
4,rollback() 回滚事务

无论是Sink,还是Source都会调用getTransaction()方法,获取该事务实例。下面我们看看MemoryTransaction的初始化过程:

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }

由上可见,MemoryTransaction维护了两个队列,一个用于Source的put,一个用于Sink的take,容量大小为事务的容量(即:transCapacity)。

最后让我们了解一下MemoryTransaction类中,几个比较重要方法:

1,doPut(Event event)方法是往Channel插入数据

    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();<pre name="code" class="java">      //estimateEventSize方法返回Event的大小(只是Body的大小,header没有计算在内),并且除以byteCapacitySlotSize后向上取整计算
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
      //添加到队列的尾部,如果超过了容量的限制,则添加失败,抛出ChannelException异常。
      if (!putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      //记录Event的byte值
      putByteCounter += eventByteSize;
    }


2,doTake() 方法是从Channel中获取数据。

protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      //takeList队列的剩余容量,如果为0,则抛异常
      if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      //尝试获取许可,如果可以获取到许可的话,证明queue队列有空间,否则返回null
      if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized(queueLock) {
        //从queue取出一个Event
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      //加入到takeList队列中
      takeList.put(event);
      //更新takeByteCounter大小
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }


3,doCommit() 提交

    protected void doCommit() throws InterruptedException {
      int remainingChange = takeList.size() - putList.size();
      //如果takeList更小,说明该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放
     if(remainingChange < 0) {
        //利用该信号量判断是否有剩余空间
        if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
          TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
            "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
            "reached. Please increase heap space/byte capacity allocated to " +
            "the channel as the sinks may not be keeping up with the sources");
        }
        //检查queue队列是否有足够的空间。
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      //如果上述两个信号量都有空间的话,那么把putList中的Event放到该MemoryChannel中的queue中。
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //最后情况putList和takeList
        putList.clear();
        takeList.clear();
      }
      //更新queue大小控制的信号量bytesRemaining
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      //takeList比putList大,说明该MemoryChannel中queue的数量应该是减少了,所以把(takeList-putList)的差值加到信号量queueRemaining
      if(remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      //更新计数器
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }

4,doRollback() 回滚

    protected void doRollback() {
      //获取takeList的大小,然后bytesRemaining中释放
      int takes = takeList.size();
      //将takeList中的Event重新放回到queue队列中。
      synchronized(queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        //最后清空putList
        putList.clear();
      }
      //清空了putList,所以需要把putList占用的空间添加到bytesRemaining中
      bytesRemaining.release(putByteCounter);
      putByteCounter = 0;
      takeByteCounter = 0;

      queueStored.release(takes);
      channelCounter.setChannelSize(queue.size());
    }

MemoryChannel的数据流向:

source -> putList -> queue -> takeList -> sink


MemoryChannel还是比较简单的,主要是通过MemoryTransaction中的putList、takeList与MemoryChannel中的queue进行数据流转和事务控制,MemoryChannel受内存空间的影响,如果数据产生的过快,同时获取信号量超时容易造成数据的丢失。而且Flume进程挂掉,数据也会丢失。

分享到:
评论

相关推荐

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    apache-flume-1.5.0-cdh5.3.6-bin.zip

    flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-release-1.5.2.zip

    Flume-ng-sql-source是Apache Flume的一个扩展插件,主要功能是允许用户从各种数据库中抽取数据并将其传输到其他目的地,如Apache Kafka。在本案例中,我们讨论的是版本1.5.2的发布包,即"flume-ng-sql-source-...

    flume-interceptor-1.0-SNAPSHOT.jar

    注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...

    flume-taildir-source-1.9.0.jar

    flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可

    apache-flume-1.9.0-bin.tar.gz

    1. **bin** 目录:包含可执行脚本,如 `flume-ng` 和 `flume-agent`,用于启动、停止和管理 Flume 代理。 2. **conf** 目录:存放配置文件,例如 `flume.conf`,这是默认的配置文件,用户可以在这里定义数据流的结构...

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    "flume-ng-1.6.0-cdh5.5.0.tar.gz" 是 Apache Flume 的一个特定版本,具体来说是 "Next Generation" (ng) 版本的 1.6.0,与 Cloudera Data Hub (CDH) 5.5.0 发行版兼容。CDH 是一个包含多个开源大数据组件的商业发行...

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-interceptor-1.0.0-jar-with-dependencies.jar

    flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    apache-flume-1.7.0-bin.tar.zip

    这个名为 "apache-flume-1.7.0-bin.tar.zip" 的压缩包包含 Apache Flume 的 1.7.0 版本的二进制文件。文件以 `.tar.gz` 格式压缩,这是一种常见的在 Unix-like 系统上使用的文件打包和压缩格式,它首先使用 `tar` ...

    flume-ng-1.6.0 cdh5.7.0安装包

    该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...

    apache-flume-1.7.0-bin.tar.gz

    在标题中的"apache-flume-1.7.0-bin.tar.gz"是一个压缩包,包含了Flume的1.7.0版本的二进制发行版。这个版本提供了执行Flume服务所需的全部组件和依赖,使得开发者和系统管理员可以方便地在他们的环境中安装和运行...

    flume-mysql.zip

    总结来说,通过结合`flume-ng-sql-source-json-1.0.jar`和`mysql-connector-java.jar`,Flume可以有效地从MySQL数据库中抽取数据并转换为JSON格式,再借助`apache-flume-1.9.0-bin.tar.gz`中的工具,将这些数据流式...

Global site tag (gtag.js) - Google Analytics