`
jinnianshilongnian
  • 浏览: 21519104 次
  • 性别: Icon_minigender_1
博客专栏
5c8dac6a-21dc-3466-8abb-057664ab39c7
跟我学spring3
浏览量:2421461
D659df3e-4ad7-3b12-8b9a-1e94abd75ac3
Spring杂谈
浏览量:3011217
43989fe4-8b6b-3109-aaec-379d27dd4090
跟开涛学SpringMVC...
浏览量:5641083
1df97887-a9e1-3328-b6da-091f51f886a1
Servlet3.1规范翻...
浏览量:260355
4f347843-a078-36c1-977f-797c7fc123fc
springmvc杂谈
浏览量:1598280
22722232-95c1-34f2-b8e1-d059493d3d98
hibernate杂谈
浏览量:250448
45b32b6f-7468-3077-be40-00a5853c9a48
跟我学Shiro
浏览量:5861467
Group-logo
跟我学Nginx+Lua开...
浏览量:703206
5041f67a-12b2-30ba-814d-b55f466529d5
亿级流量网站架构核心技术
浏览量:786114
社区版块
存档分类
最新评论

Flume架构与源码分析-MemoryChannel事务实现

阅读更多

Flume提供了可靠地日志采集功能,其高可靠是通过事务机制实现的。而对于Channel的事务我们本部分会介绍MemoryChannel和FileChannel的实现。

 

首先我们看下BasicChannelSemantics实现:

public abstract class BasicChannelSemantics extends AbstractChannel {
  //1、事务使用ThreadLocal存储,保证事务线程安全
  private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();

  private boolean initialized = false;
  //2、进行一些初始化工作
  protected void initialize() {}
  //3、提供给实现类的创建事务的回调
  protected abstract BasicTransactionSemantics createTransaction();
  //4、往Channel放Event,其直接委托给事务的put方法实现
  @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }
  //5、从Channel获取Event,也是直接委托给事务的take方法实现
  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  //6、获取事务,如果本实例没有初始化则先初始化;否则先从ThreadLocal获取事务,如果没有或者关闭了则创建一个并绑定到ThreadLocal。
  @Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
}

 

 

MemoryChannel事务实现

首先我们来看下MemoryChannel的实现,其是一个纯内存的Channel实现,整个事务操作都是在内存中完成。首先看下其内存结构:


1、首先由一个Channel Queue用于存储整个Channel的Event数据;

2、每个事务都有一个Take Queue和Put Queue分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。

 

MemoryChannel时设计时考虑了两个容量:Channel Queue容量和事务容量,而这两个容量涉及到了数量容量和字节数容量。

 

另外因为多个事务要操作Channel Queue,还要考虑Channel Queue的动态扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。

 

在configure方法中进行了一些参数的初始化,如容量、Channel Queue等。首先看下Channel Queue的容量是如何计算的:

try {
  capacity = context.getInteger("capacity", defaultCapacity);
} catch(NumberFormatException e) {
  capacity = defaultCapacity;
}

if (capacity <= 0) {
  capacity = defaultCapacity;
} 

 

即首先从配置文件读取数量容量,如果没有配置则是默认容量(默认100),而配置的容量小于等于0,则也是默认容量。

 

接下来是初始化事务数量容量:

try {
  transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
} catch(NumberFormatException e) {
  transCapacity = defaultTransCapacity;
}
if (transCapacity <= 0) {
  transCapacity = defaultTransCapacity;
}
Preconditions.checkState(transCapacity <= capacity,
"Transaction Capacity of Memory Channel cannot be higher than " +
        "the capacity."); 

 

整个过程和Channel Queue数量容量初始化类似,但是最后做了前置条件判断,事务容量必须小于等于Channel Queue容量。

 

接下来是字节容量限制:

try {
  byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage);
} catch(NumberFormatException e) {
  byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
}
try {
  byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
  if (byteCapacity < 1) {
    byteCapacity = Integer.MAX_VALUE;
  }
} catch(NumberFormatException e) {
  byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
} 

 

byteCapacityBufferPercentage:用来确定byteCapacity的一个百分比参数,即我们定义的字节容量和实际事件容量的百分比,因为我们定义的字节容量主要考虑Event body,而忽略Event header,因此需要减去Event header部分的内存占用,可以认为该参数定义了Event header占了实际字节容量的百分比,默认20%;

byteCapacity:首先读取配置文件定义的byteCapacity,如果没有定义,则使用默认defaultByteCapacity,而defaultByteCapacity默认是JVM物理内存的80%(Runtime.getRuntime().maxMemory() * .80);那么实际byteCapacity=定义的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默认100,即计算百分比的一个系数。

 

接下来定义keepAlive参数:

 

try {
  keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
} catch(NumberFormatException e) {
  keepAlive = defaultKeepAlive;
} 

 

keepAlive定义了操作Channel Queue的等待超时事件,默认3s。

 

接着初始化Channel Queue:

if(queue != null) {
  try {
    resizeQueue(capacity);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  }
} else {
  synchronized(queueLock) {
    queue = new LinkedBlockingDeque<Event>(capacity);
    queueRemaining = new Semaphore(capacity);
    queueStored = new Semaphore(0);
  }
} 

首先如果Channel Queue不为null,表示动态扩容;否则进行Channel Queue的创建。

 

首先看下首次创建Channel Queue,首先使用queueLock锁定,即在操作Channel Queue时都需要锁定,因为之前说过Channel Queue可能动态扩容,然后初始化信号量:Channel Queue剩余容量和向Channel Queue申请存储的容量,用于事务操作中预占Channel Queue容量。

 

接着是调用resizeQueue动态扩容:

private void resizeQueue(int capacity) throws InterruptedException {
  int oldCapacity;
  synchronized(queueLock) { //首先计算扩容前的Channel Queue的容量
    oldCapacity = queue.size() + queue.remainingCapacity();
  }

  if(oldCapacity == capacity) {//如果新容量和老容量相等,不需要扩容
    return;
  } else if (oldCapacity > capacity) {//如果老容量大于新容量,缩容
    //首先要预占老容量-新容量的大小,以便缩容容量
if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
   //如果获取失败,默认是记录日志然后忽略
} else {
  //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,因为这一系列操作要线程安全
      synchronized(queueLock) {
        LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
        newQueue.addAll(queue);
        queue = newQueue;
      }
    }
  } else {
    //如果不是缩容,则直接扩容即可
    synchronized(queueLock) {
      LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
      newQueue.addAll(queue);
      queue = newQueue;
}
//增加/减少Channel Queue的新的容量
    queueRemaining.release(capacity - oldCapacity);
  }
}

到此,整个Channel Queue相关的数据初始化完毕,接着会调用start方法进行初始化:
public synchronized void start() {
  channelCounter.start();
  channelCounter.setChannelSize(queue.size());
  channelCounter.setChannelCapacity(Long.valueOf(
          queue.size() + queue.remainingCapacity()));
  super.start();
} 

 

此处初始化了一个ChannelCounter,是一个计数器,记录如当前队列放入Event数、取出Event数、成功数等。

 

之前已经分析了大部分Channel会把put和take直接委托给事务去完成,因此接下来看下MemoryTransaction的实现。

 

首先看下MemoryTransaction的初始化:

private class MemoryTransaction extends BasicTransactionSemantics {
  private LinkedBlockingDeque<Event> takeList;
  private LinkedBlockingDeque<Event> putList;
  private final ChannelCounter channelCounter;
  private int putByteCounter = 0;
  private int takeByteCounter = 0;
  public MemoryTransaction(int transCapacity, ChannelCounter counter) {
    putList = new LinkedBlockingDeque<Event>(transCapacity);
    takeList = new LinkedBlockingDeque<Event>(transCapacity);
    channelCounter = counter;
  } 

 

可以看出MemoryTransaction涉及到两个事务容量大小定义的队列(链表阻塞队列)、队列字节计数器、另外一个是Channel操作的计数器。

 

事务中的放入操作如下:

protected void doPut(Event event) throws InterruptedException {
  //1、增加放入事件计数器
  channelCounter.incrementEventPutAttemptCount();
  //2、estimateEventSize计算当前Event body大小
  int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
  //3、往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
  if (!putList.offer(event)) {
      throw new ChannelException(
      "Put queue for MemoryTransaction of capacity " +
        putList.size() + " full, consider committing more frequently, " +
        "increasing capacity or increasing thread count");
  }
  //4、增加放入队列字节数计数器
  putByteCounter += eventByteSize;
} 

整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制。

 

接下来是事务中的取出操作:

protected Event doTake() throws InterruptedException {
  //1、增加取出事件计数器
  channelCounter.incrementEventTakeAttemptCount();
  //2、如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event
  if(takeList.remainingCapacity() == 0) {
    throw new ChannelException("Take list for MemoryTransaction, capacity " +
        takeList.size() + " full, consider committing more frequently, " +
        "increasing capacity, or increasing thread count");
  }
  //3、queueStored试图获取一个信号量,超时直接返回null
  if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
    return null;
  }
  //4、从Channel Queue获取一个Event
  Event event;
  synchronized(queueLock) {//对Channel Queue的操作必须加queueLock,因为之前说的动态扩容问题
    event = queue.poll();
  }
  //5、因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
  Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
      "signalling existence of entry");
  //6、暂存到事务的takeList队列
  takeList.put(event);
  //7、计算当前Event body大小并增加取出队列字节数计数器
  int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
  takeByteCounter += eventByteSize;
  return event;
}

接下来是提交事务:

protected void doCommit() throws InterruptedException {
  //1、计算改变的Event数量,即取出数量-放入数量;如果放入的多,那么改变的Event数量将是负数
  int remainingChange = takeList.size() - putList.size();
  //2、	如果remainingChange小于0,则需要获取Channel Queue剩余容量的信号量
  if(remainingChange < 0) {
    //2.1、首先获取putByteCounter个字节容量信号量,如果失败说明超过字节容量限制了,回滚事务
    if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
      throw new ChannelException("Cannot commit transaction. Byte capacity " +
        "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
        "reached. Please increase heap space/byte capacity allocated to " +
        "the channel as the sinks may not be keeping up with the sources");
    }
    //2.2、获取Channel Queue的-remainingChange个信号量用于放入-remainingChange个Event,如果获取不到,则释放putByteCounter个字节容量信号量,并抛出异常回滚事务
    if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
      bytesRemaining.release(putByteCounter);
      throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
          " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
  }
  int puts = putList.size();
  int takes = takeList.size();
  synchronized(queueLock) {//操作Channel Queue时一定要锁定queueLock
    if(puts > 0 ) {
      while(!putList.isEmpty()) { //3.1、如果有Event,则循环放入Channel Queue
        if(!queue.offer(putList.removeFirst())) { 
          //3.2、如果放入Channel Queue失败了,说明信号量控制出问题了,这种情况不应该发生
          throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
        }
      }
    }
    //4、操作成功后,清空putList和takeList队列
    putList.clear();
    takeList.clear();
  }
  //5.1、释放takeByteCounter个字节容量信号量
  bytesRemaining.release(takeByteCounter);
  //5.2、重置字节计数器
  takeByteCounter = 0;
  putByteCounter = 0;
  //5.3、释放puts个queueStored信号量,这样doTake方法就可以获取数据了
  queueStored.release(puts);
  //5.4、释放remainingChange个queueRemaining信号量
  if(remainingChange > 0) {
    queueRemaining.release(remainingChange);
  }
  //6、ChannelCounter一些数据计数
  if (puts > 0) {
    channelCounter.addToEventPutSuccessCount(puts);
  }
  if (takes > 0) {
    channelCounter.addToEventTakeSuccessCount(takes);
  }

  channelCounter.setChannelSize(queue.size());
} 

此处涉及到两个信号量:

queueStored表示Channel Queue已存储事件容量(已存储的事件数量),队列取出事件时-1,放入事件成功时+N,取出失败时-N,即Channel Queue存储了多少事件。queueStored信号量默认为0。当doTake取出Event时减少一个queueStored信号量,当doCommit提交事务时需要增加putList 队列大小的queueStored信号量,当doRollback回滚事务时需要减少takeList队列大小的queueStored信号量。

 

queueRemaining表示Channel Queue可存储事件容量(可存储的事件数量),取出事件成功时+N,放入事件成功时-N。queueRemaining信号量默认为Channel Queue容量。其在提交事务时首先通过remainingChange = takeList.size() - putList.size()计算获得需要增加多少变更事件;如果小于0表示放入的事件比取出的多,表示有- remainingChange个事件放入,此时应该减少-queueRemaining信号量;而如果大于0,则表示取出的事件比放入的多,表示有queueRemaining个事件取出,此时应该增加queueRemaining信号量;即消费事件时减少信号量,生产事件时增加信号量。

 

而bytesRemaining是字节容量信号量,超出容量则回滚事务。

 

 

最后看下回滚事务:

protected void doRollback() {
    int takes = takeList.size();
    synchronized(queueLock) { //操作Channel Queue时一定锁住queueLock
      //1、前置条件判断,检查是否有足够容量回滚事务
      Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
          "queue to rollback takes. This should never happen, please report");
      //2、回滚事务的takeList队列到Channel Queue
      while(!takeList.isEmpty()) {
        queue.addFirst(takeList.removeLast());
      }
      putList.clear();
    }
    //3、释放putByteCounter个bytesRemaining信号量
    bytesRemaining.release(putByteCounter);

    //4、计数器重置
    putByteCounter = 0;
    takeByteCounter = 0;
    //5、释放takeList队列大小个已存储事件容量
    queueStored.release(takes);
    channelCounter.setChannelSize(queue.size());
  }
} 

也就是说在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。

            

 

 

 

  • 大小: 6.5 KB
1
3
分享到:
评论

相关推荐

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    flume-ng-sql-source-release-1.5.2.zip

    这个压缩包包含了一个名为"flume-ng-sql-source-release-1.5.2.jar"的文件,这是该插件的核心组件,用于实现SQL查询以从数据库中提取数据。 Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志...

    flume-interceptor-1.0-SNAPSHOT.jar

    注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source.jar

    flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-sql-source

    包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    flume-ng-sql-source-1.4.1

    flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu

    flume-ng-sql-source-1.4.3.jar

    总的来说,Flume-ng-sql-source-1.4.3.jar是数据工程师和分析师的得力助手,它将数据库数据的采集与Flume的强大功能相结合,为企业的大数据战略提供了一条有效的数据输入途径。在当前大数据时代,掌握如何使用这一...

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    "flume-ng-1.6.0-cdh5.5.0.tar.gz" 是 Apache Flume 的一个特定版本,具体来说是 "Next Generation" (ng) 版本的 1.6.0,与 Cloudera Data Hub (CDH) 5.5.0 发行版兼容。CDH 是一个包含多个开源大数据组件的商业发行...

    flume-taildir-source-1.9.0.jar

    flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可

    spark-streaming-flume-sink_2.11-2.0.0.jar

    spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。

    flume-interceptor-1.0.0-jar-with-dependencies.jar

    flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可

Global site tag (gtag.js) - Google Analytics