Channel 相当于一个管道,source写数据到channel中,sink从channel取数据。Channel有三类,memory,file,jdbc。memory速度最快,但是当机器宕机的时候数据会丢失,file数据不会丢失,jdbc速度最慢,一般选择fileChannel。
Source 中会调用ChannelProcessor的processEvent方法处理Log事件。对于每个channel都会获得一个Transaction, 然后调用tx.begin方法,put event 到channel,然后调用commit.如果有异常,调用transactoin的roll back方法,没有则调用tx.close()关闭transaction
public void processEvent(Event event) { event = interceptorChain.intercept(event); if (event == null) { return; } // Process required channels List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); reqChannel.put(event); tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels List<Channel> optionalChannels = selector.getOptionalChannels(event); for (Channel optChannel : optionalChannels) { Transaction tx = null; try { tx = optChannel.getTransaction(); tx.begin(); optChannel.put(event); tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put event on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } }
//创建了一个MemoryTransaction @Override protected BasicTransactionSemantics createTransaction() { return new MemoryTransaction(transCapacity, channelCounter); } @Override protected void doPut(Event event) throws InterruptedException { int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) { putList.offer(event) putByteCounter += eventByteSize; } @Override protected Event doTake() throws InterruptedException { Event event; synchronized(queueLock) { event = queue.poll(); } takeList.put(event); return event; } @Override protected void doCommit() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { queue.offer(putList.removeFirst()) } } putList.clear(); takeList.clear(); } } @Override protected void doRollback() { int takes = takeList.size(); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); putList.clear(); } }
FileChannel用来将数据存储到临时文件中。在createTransaction方法中创建了一个FileBackedTransaction对象,对于source,会调用doPut方法,doPut方法中会调用log.put方法,log.put方法主要是用来将event和transactionID包装成一个byteBuffer,写入到file中,返回fileID和offset. doTake方法会调用log.take方法,log.take方法主要用来将transactionID,fileID,offSet包装成一个byteBuffer写到log file中。rollback方法将transactionID包装成一个bytebuffer写到log file中。commit方法将transactionID和TransactionEventRecord.Type包装成 一个byteBuffer,写入到file中。log file位于dataDirs下,以“log-”为前缀.综上所述File channel类似于Memorychannel,但file channel把数据存储到文件中,并将每次的操作都记录下来。
@Override protected BasicTransactionSemantics createTransaction() { trans = new FileBackedTransaction(log,, transactionCapacity, keepAlive, queueRemaining, getName(), channelCounter); transactions.set(trans); return trans; } @Override protected void doPut(Event event) throws InterruptedException { boolean success = false; FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " + channelNameDescriptor); } @Override protected Event doTake() throws InterruptedException { /* * 1. Take an event which is in the queue. * 2. If getting that event does not throw NoopRecordException, * then return it. * 3. Else try to retrieve the next event from the queue * 4. Repeat 2 and 3 until queue is empty or an event is returned. */ try { while (true) { FlumeEventPointer ptr = queue.removeHead(transactionID); if (ptr == null) { return null; } else { try { // first add to takeList so that if write to disk // fails rollback actually does it's work Preconditions.checkState(takeList.offer(ptr), "takeList offer failed " + channelNameDescriptor); log.take(transactionID, ptr); // write take to disk Event event = log.get(ptr); return event; } catch (IOException e) { throw new ChannelException("Take failed due to IO error " + channelNameDescriptor, e); } catch (NoopRecordException e) { takeList.remove(ptr); } } } @Override protected void doCommit() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); if(puts > 0) { log.commitPut(transactionID); synchronized (queue) { while(!putList.isEmpty()) { queue.addTail(putList.removeFirst()) } } else if (takes > 0) { log.commitTake(transactionID); queue.completeTransaction(transactionID); } putList.clear(); takeList.clear(); } @Override protected void doRollback() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); if(takes > 0) { while (!takeList.isEmpty()) { Preconditions.checkState(queue.addHead(takeList.removeLast()), "Queue add failed, this shouldn't be able to happen " + channelNameDescriptor); } } putList.clear(); takeList.clear(); queue.completeTransaction(transactionID); log.rollback(transactionID); }
每隔一段时间,File Channel都会do checkpoint。
FlumeEventQueue{ synchronized boolean checkpoint(boolean force) { if (!elements.syncRequired() && !force) { LOG.debug("Checkpoint not required"); return false; } // Start checkpoint elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); //插入header信息,包括queue的header位置,queue的size updateHeaders(); //将 文件的Id和文件的个数写入buffre中 List<Long> fileIdAndCountEncoded = new ArrayList<Long>(); for (Integer fileId : fileIDCounts.keySet()) { Integer count = fileIDCounts.get(fileId).get(); long value = encodeActiveLogCounter(fileId, count); fileIdAndCountEncoded.add(value); } int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size(); for (int i = 0; i < emptySlots; i++) { fileIdAndCountEncoded.add(0L); } for (int i = 0; i < MAX_ACTIVE_LOGS; i++) { elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i)); } //将elements中的对象sync到elementsBuffer中,elements中存储了fileId以及transcation的offset elements.sync(); // Finish checkpoint elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE); //将数据同步到device mappedBuffer.force(); return true; } }
class LogFile { //将当前位置写入到文件中 synchronized void markCheckpoint(long timestamp) throws IOException { long currentPosition = writeFileChannel.position();; writeFileHandle.writeLong(currentPosition); writeFileHandle.writeLong(timestamp); writeFileChannel.position(currentPosition);"Noted checkpoint for file: " + file + ", id: " + fileID + ", checkpoint position: " + currentPosition); } }在flume crash后,flume会replay恢复到crash之前的状态
void replay() throws IOException { Preconditions.checkState(!open, "Cannot replay after Log has been opened"); checkpointWriterLock.lock(); try { /* * First we are going to look through the data directories * and find all log files. We will store the highest file id * (at the end of the filename) we find and use that when we * create additional log files. * * Also store up the list of files so we can replay them later. */"Replay started"); nextFileID.set(0); List<File> dataFiles = Lists.newArrayList(); for (File logDir : logDirs) { for (File file : LogUtils.getLogs(logDir)) { int id = LogUtils.getIDForFile(file); dataFiles.add(file); nextFileID.set(Math.max(nextFileID.get(), id)); idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX + id))); } }"Found NextFileID " + nextFileID + ", from " + Arrays.toString(logDirs)); /* * sort the data files by file id so we can replay them by file id * which should approximately give us sequential events */ LogUtils.sort(dataFiles); /* * Read the checkpoint (in memory queue) from one of two alternating * locations. We will read the last one written to disk. */ queue = new FlumeEventQueue(queueCapacity, new File(checkpointDir, "checkpoint"), channelName); long ts = queue.getTimestamp();"Last Checkpoint " + new Date(ts) + ", queue depth = " + queue.getSize()); /* * We now have everything we need to actually replay the log files * the queue, the timestamp the queue was written to disk, and * the list of data files. */ ReplayHandler replayHandler = new ReplayHandler(queue); replayHandler.replayLog(dataFiles); for (int index = 0; index < logDirs.length; index++) {"Rolling " + logDirs[index]); roll(index); } /* * Now that we have replayed, write the current queue to disk */ writeCheckpoint(true); open = true; } catch (Exception ex) { LOGGER.error("Failed to initialize Log on " + channelNameDescriptor, ex); if (ex instanceof IOException) { throw (IOException) ex; } Throwables.propagate(ex); } finally { checkpointWriterLock.unlock(); } }
