Sink 将从channel接收event,然后将event发往目标地址。
/** * * A simple sink which reads events from a channel and writes them to HBase. * This Sink uses an aysnchronous API internally and is likely to * perform better. * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> * encountered in the classpath. This sink supports batch reading of * events from the channel, and writing them to Hbase, to minimize the number * of flushes on the hbase tables. To use this sink, it has to be configured * with certain mandatory parameters:<p>*/ public class AsyncHBaseSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { /* * Reference to the boolean representing failure of the current transaction. * Since each txn gets a new boolean, failure of one txn will not affect * the next even if errbacks for the current txn get called while * the next one is being processed. * */ if(!open){ throw new EventDeliveryException("Sink was never opened. " + "Please fix the configuration."); } AtomicBoolean txnFail = new AtomicBoolean(false); AtomicInteger callbacksReceived = new AtomicInteger(0); AtomicInteger callbacksExpected = new AtomicInteger(0); final Lock lock = new ReentrantLock(); final Condition condition = lock.newCondition(); /* * Callbacks can be reused per transaction, since they share the same * locks and conditions. */ Callback<Object, Object> putSuccessCallback = new SuccessCallback<Object, Object>( lock, callbacksReceived, condition); Callback<Object, Object> putFailureCallback = new FailureCallback<Object, Object>( lock, callbacksReceived, txnFail, condition); Callback<Long, Long> incrementSuccessCallback = new SuccessCallback<Long, Long>( lock, callbacksReceived, condition); Callback<Long, Long> incrementFailureCallback = new FailureCallback<Long, Long>( lock, callbacksReceived, txnFail, condition); Status status = Status.READY; Channel channel = getChannel(); int i = 0; try { txn = channel.getTransaction(); txn.begin(); for (; i < batchSize; i++) { Event event = channel.take(); if (event == null) { status = Status.BACKOFF; if (i == 0) { sinkCounter.incrementBatchEmptyCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } break; } else { serializer.setEvent(event); List<PutRequest> actions = serializer.getActions(); List<AtomicIncrementRequest> increments = serializer.getIncrements(); callbacksExpected.addAndGet(actions.size() + increments.size()); for (PutRequest action : actions) { client.put(action).addCallbacks(putSuccessCallback, putFailureCallback); } for (AtomicIncrementRequest increment : increments) { client.atomicIncrement(increment).addCallbacks( incrementSuccessCallback, incrementFailureCallback); } } } } catch (Throwable e) { this.handleTransactionFailure(txn); this.checkIfChannelExceptionAndThrow(e); } if (i == batchSize) { sinkCounter.incrementBatchCompleteCount(); } sinkCounter.addToEventDrainAttemptCount(i); lock.lock(); try { //等待处理完成或者超时 while ((callbacksReceived.get() < callbacksExpected.get()) && !txnFail.get()) { try { if(!condition.await(timeout, TimeUnit.MILLISECONDS)){ txnFail.set(true); logger.warn("HBase callbacks timed out. " + "Transaction will be rolled back."); } } catch (Exception ex) { logger.error("Exception while waiting for callbacks from HBase."); this.handleTransactionFailure(txn); Throwables.propagate(ex); } } } finally { lock.unlock(); } /* * At this point, either the txn has failed * or all callbacks received and txn is successful. * * This need not be in the monitor, since all callbacks for this txn * have been received. So txnFail will not be modified any more(even if * it is, it is set from true to true only - false happens only * in the next process call). * */ if (txnFail.get()) { this.handleTransactionFailure(txn); throw new EventDeliveryException("Could not write events to Hbase. " + "Transaction failed, and rolled back."); } else { try{ txn.commit(); txn.close(); sinkCounter.addToEventDrainSuccessCount(i); } catch (Throwable e) { this.handleTransactionFailure(txn); this.checkIfChannelExceptionAndThrow(e); } } return status; } }
/** * <p> * A {@link Sink} implementation that can send events to an RPC server (such as * Flume's {@link AvroSource}). * </p> * <p> * This sink forms one half of Flume's tiered collection support. Events sent to * this sink are transported over the network to the hostname / port pair using * the RPC implementation encapsulated in {@link RpcClient}. * The destination is an instance of Flume's {@link AvroSource}, which * allows Flume agents to forward to other Flume agents, forming a tiered * collection infrastructure. Of course, nothing prevents one from using this * sink to speak to other custom built infrastructure that implements the same * RPC protocol. * </p> * <p> * Events are taken from the configured {@link Channel} in batches of the * configured <tt>batch-size</tt>. The batch size has no theoretical limits * although all events in the batch <b>must</b> fit in memory. Generally, larger * batches are far more efficient, but introduce a slight delay (measured in * millis) in delivery. The batch behavior is such that underruns (i.e. batches * smaller than the configured batch size) are possible. This is a compromise * made to maintain low latency of event delivery. If the channel returns a null * event, meaning it is empty, the batch is immediately sent, regardless of * size. Batch underruns are tracked in the metrics. Empty batches do not incur * an RPC roundtrip.*/ public class AvroSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { Status status = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { transaction.begin(); verifyConnection(); List<Event> batch = Lists.newLinkedList(); for (int i = 0; i < client.getBatchSize(); i++) { Event event = channel.take(); if (event == null) { break; } batch.add(event); } int size = batch.size(); int batchSize = client.getBatchSize(); if (size == 0) { sinkCounter.incrementBatchEmptyCount(); status = Status.BACKOFF; } else { if (size < batchSize) { sinkCounter.incrementBatchUnderflowCount(); } else { sinkCounter.incrementBatchCompleteCount(); } sinkCounter.addToEventDrainAttemptCount(size); client.appendBatch(batch); } transaction.commit(); sinkCounter.addToEventDrainSuccessCount(size); } catch (Throwable t) { transaction.rollback(); if (t instanceof Error) { throw (Error) t; } else if (t instanceof ChannelException) { logger.error("Avro Sink " + getName() + ": Unable to get event from" + " channel " + channel.getName() + ". Exception follows.", t); status = Status.BACKOFF; } else { destroyConnection(); throw new EventDeliveryException("Failed to send events", t); } } finally { transaction.close(); } return status; } }
/** * * A simple sink which reads events from a channel and writes them to HBase. * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> * encountered in the classpath. This sink supports batch reading of * events from the channel, and writing them to Hbase, to minimize the number * of flushes on the hbase tables. To use this sink, it has to be configured * with certain mandatory parameters:<p> * <tt>table: </tt> The name of the table in Hbase to write to. <p> * <tt>columnFamily: </tt> The column family in Hbase to write to.<p> * This sink will commit each transaction if the table's write buffer size is * reached or if the number of events in the current transaction reaches the * batch size, whichever comes first.<p> * Other optional parameters are:<p> * <tt>serializer:</tt> A class implementing {@link HBaseEventSerializer}. * An instance of * this class will be used to write out events to hbase.<p> * <tt>serializer.*:</tt> Passed in the configure() method to serializer * as an object of {@link org.apache.flume.Context}.<p> * <tt>batchSize: </tt>This is the batch size used by the client. This is the * maximum number of events the sink will commit per transaction. The default * batch size is 100 events. * <p>*/ public class HBaseSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { Status status = Status.READY; Channel channel = getChannel(); Transaction txn = channel.getTransaction(); List<Row> actions = new LinkedList<Row>(); List<Increment> incs = new LinkedList<Increment>(); txn.begin(); for(long i = 0; i < batchSize; i++) { Event event = channel.take(); if(event == null){ status = Status.BACKOFF; counterGroup.incrementAndGet("channel.underflow"); break; } else { serializer.initialize(event, columnFamily); actions.addAll(serializer.getActions()); incs.addAll(serializer.getIncrements()); } } putEventsAndCommit(actions, incs, txn); return status; } private void putEventsAndCommit(List<Row> actions, List<Increment> incs, Transaction txn) throws EventDeliveryException { try { table.batch(actions); for(Increment i: incs){ table.increment(i); } txn.commit(); counterGroup.incrementAndGet("transaction.success"); } catch (Throwable e) { try{ txn.rollback(); } catch (Exception e2) { logger.error("Exception in rollback. Rollback might not have been" + "successful." , e2); } counterGroup.incrementAndGet("transaction.rollback"); logger.error("Failed to commit transaction." + "Transaction rolled back.", e); if(e instanceof Error || e instanceof RuntimeException){ logger.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } else { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); throw new EventDeliveryException("Failed to commit transaction." + "Transaction rolled back.", e); } } finally { txn.close(); } } }
public class HDFSEventSink extends AbstractSink implements Configurable { /** * Pull events out of channel and send it to HDFS - take at the most * txnEventMax, that's the maximum #events to hold in channel for a given * transaction - find the corresponding bucket for the event, ensure the file * is open - extract the pay-load and append to HDFS file <br /> * WARNING: NOT THREAD SAFE */ @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); List<BucketWriter> writers = Lists.newArrayList(); transaction.begin(); try { Event event = null; int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) { event = channel.take(); if (event == null) { break; } // reconstruct the path name by substituting place holders String realPath = BucketPath.escapeString(path, event.getHeaders(), needRounding, roundUnit, roundValue); BucketWriter bucketWriter = sfWriters.get(realPath); // we haven't seen this file yet, so open it and cache the handle if (bucketWriter == null) { HDFSWriter hdfsWriter = writerFactory.getWriter(fileType); FlumeFormatter formatter = HDFSFormatterFactory .getFormatter(writeFormat); bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context, realPath, codeC, compType, hdfsWriter, formatter, timedRollerPool, proxyTicket, sinkCounter); sfWriters.put(realPath, bucketWriter); } // track the buckets getting written in this transaction if (!writers.contains(bucketWriter)) { writers.add(bucketWriter); } // Write the data to HDFS append(bucketWriter, event); } if (txnEventCount == 0) { sinkCounter.incrementBatchEmptyCount(); } else if (txnEventCount == txnEventMax) { sinkCounter.incrementBatchCompleteCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { if (!bucketWriter.isBatchComplete()) { flush(bucketWriter); } } transaction.commit(); if (txnEventCount > 0) { sinkCounter.addToEventDrainSuccessCount(txnEventCount); } if(event == null) { return Status.BACKOFF; } return Status.READY; } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF; } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { transaction.close(); } } }
//IRC是网络传输协议 public class IRCSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { Status status = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { transaction.begin(); createConnection(); Event event = channel.take(); if (event == null) { counterGroup.incrementAndGet("event.empty"); status = Status.BACKOFF; } else { sendLine(event); counterGroup.incrementAndGet("event.irc"); } transaction.commit(); } catch (ChannelException e) { transaction.rollback(); logger.error( "Unable to get event from channel. Exception follows.", e); status = Status.BACKOFF; } catch (Exception e) { transaction.rollback(); logger.error( "Unable to communicate with IRC server. Exception follows.", e); status = Status.BACKOFF; destroyConnection(); } finally { transaction.close(); } return status; } }
public class LoggerSink extends AbstractSink { private static final Logger logger = LoggerFactory .getLogger(LoggerSink.class); @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; try { transaction.begin(); event = channel.take(); if (event != null) { if (logger.isInfoEnabled()) { logger.info("Event: " + EventHelper.dumpEvent(event)); } } else { // No event found, request back-off semantics from the sink runner result = Status.BACKOFF; } transaction.commit(); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to log event: " + event, ex); } finally { transaction.close(); } return result; } }
RollingFileSink每个sink.rollInterval都会生成一个新的文件用来存储event
public class RollingFileSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { if (shouldRotate) { logger.debug("Time to rotate {}", pathController.getCurrentFile()); if (outputStream != null) { logger.debug("Closing file {}", pathController.getCurrentFile()); try { serializer.flush(); serializer.beforeClose(); outputStream.flush(); outputStream.close(); shouldRotate = false; } catch (IOException e) { throw new EventDeliveryException("Unable to rotate file " + pathController.getCurrentFile() + " while delivering event", e); } serializer = null; outputStream = null; pathController.rotate(); } } if (outputStream == null) { File currentFile = pathController.getCurrentFile(); logger.debug("Opening output stream for file {}", currentFile); try { outputStream = new BufferedOutputStream( new FileOutputStream(currentFile)); serializer = EventSerializerFactory.getInstance( serializerType, serializerContext, outputStream); serializer.afterCreate(); } catch (IOException e) { throw new EventDeliveryException("Failed to open file " + pathController.getCurrentFile() + " while delivering event", e); } } Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Status result = Status.READY; try { transaction.begin(); event = channel.take(); if (event != null) { serializer.write(event); /* * FIXME: Feature: Rotate on size and time by checking bytes written and * setting shouldRotate = true if we're past a threshold. */ /* * FIXME: Feature: Control flush interval based on time or number of * events. For now, we're super-conservative and flush on each write. */ serializer.flush(); outputStream.flush(); } else { // No events found, request back-off semantics from runner result = Status.BACKOFF; } transaction.commit(); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to process event: " + event, ex); } finally { transaction.close(); } return result; } }
相关推荐
`flume-ng-elasticsearch-sink-6.5.4.jar`正是这样一个插件,允许Flume将事件数据推送到Elasticsearch 6.5.4版本。这个jar文件包含所有必要的代码,使得Flume能够理解并处理Elasticsearch的相关配置和协议。 2. **...
Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。
通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...
文件名称列表中的“apache-flume-1.9.0-bin.tar.gz”、“zookeeper-3.3.6_.tar.gz”和“kafka_2.11-0.10.1.0.tgz”分别是Flume、ZooKeeper和Kafka的不同版本的源码包。用户需要先解压缩这些文件,然后根据各自的安装...
Flume NG的核心概念主要包括源(Source)、通道(Channel)和接收器(Sink)。源负责从各种数据源(如网络套接字、日志文件、Web服务器等)收集数据;通道则作为一个临时存储,确保数据在传输过程中的可靠性,它具有...
此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...
2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...
Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...
通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...
"flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...
##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source 发送到 Channel 时,会触发事务处理过程。 ##### 源码...
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.topic = myTopic agent.sinks.kafkaSink.brokerList = localhost:9092 ``` 5. **运行 Flume**: - 使用 Flume ...
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。
为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...
- 扩展性和可插拔架构:分析 Flume 如何支持自定义 Source、Sink 和 Channel 的开发。 源代码阅读可以提供对 Flume 工作原理的深刻理解,这对于优化 Flume 配置、排查问题或开发自定义组件都非常有价值。如果你想要...
通过分析和学习这个压缩包中的源码,开发者可以深入理解Flume的工作机制,了解如何利用Java实现类似的数据处理服务,同时也能为自定义Source、Sink和Channel提供参考。这将有助于在实际项目中更好地利用Flume来构建...
Apache Flume 是一个高度可配置、可靠且分布式的数据采集系统,常用于收集、聚合和移动大量日志数据。...通过对源码的研究和实践,我们可以深入理解 Flume 的工作原理,以及如何根据具体需求定制和优化数据采集流程。
在源码分析过程中,可以关注以下几个关键点: 1. 数据流处理:了解 Flume 如何处理事件(Event)的生命周期,从创建、传输到最终写入目标。 2. 配置文件解析:学习如何编写和解析 Flume 配置文件,理解各组件间的...