packageorg.apache.flume.sink;
importjava.io.IOException;
importjava.util.Calendar;
importjava.util.List;
importjava.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;
importorg.apache.flume.Channel;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.EventDeliveryException;
importorg.apache.flume.Transaction;
importorg.apache.flume.conf.Configurable;
importorg.apache.flume.formatter.output.BucketPath;
importorg.apache.flume.instrumentation.SinkCounter;
importorg.apache.flume.serialization.EventSerializer;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importcom.google.common.base.Preconditions;
importcom.google.common.collect.Lists;
importcom.google.common.util.concurrent.ThreadFactoryBuilder;
publicclassFileSinkextendsAbstractSinkimplementsConfigurable
{
privatestaticfinalLoggerlogger=
LoggerFactory
.getLogger(FileSink.class);
privateStringpath;
privatestaticfinalStringdefaultFileName="FlumeData";
privatestaticfinalintdefaultMaxOpenFiles=
50;
/**
* Default length of time we wait for blocking BucketWriter calls before
* timing out the operation. Intended to prevent server hangs.
*/
privatelongtxnEventMax;
privateFileWriterLinkedHashMapsfWriters;
privateStringserializerType;
privateContextserializerContext;
privatebooleanneedRounding=false;
privateintroundUnit=
Calendar.SECOND;
privateintroundValue=
1;
privateSinkCountersinkCounter;
privateintmaxOpenFiles;
privateScheduledExecutorServicetimedRollerPool;
privatelongrollInterval;
@Override
publicvoidconfigure(Context
context) {
String directory = Preconditions.checkNotNull(
context.getString("file.path"),"file.path
is required");
String fileName = context.getString("file.filePrefix",defaultFileName);
this.path=
directory +"/"+ fileName;
maxOpenFiles=
context.getInteger("file.maxOpenFiles",
defaultMaxOpenFiles);
serializerType=
context.getString("sink.serializer","TEXT");
serializerContext=newContext(
context.getSubProperties(EventSerializer.CTX_PREFIX));
txnEventMax=
context.getLong("file.txnEventMax", 1l);
if(sinkCounter==null)
{
sinkCounter=newSinkCounter(getName());
}
rollInterval=
context.getLong("file.rollInterval", 30l);
String rollerName ="hdfs-"+
getName() +"-roll-timer-%d";
timedRollerPool=
Executors.newScheduledThreadPool(maxOpenFiles,
newThreadFactoryBuilder().setNameFormat(rollerName).build());
}
@Override
publicStatus
process()throwsEventDeliveryException
{
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketFileWriter> writers = Lists.newArrayList();
transaction.begin();
try{
Event event =null;
inttxnEventCount
= 0;
for(txnEventCount
= 0; txnEventCount <txnEventMax; txnEventCount++) {
event = channel.take();
if(event
==null) {
break;
}
// reconstruct the path
name by substituting place holders
String realPath = BucketPath
.escapeString(path,
event.getHeaders(),needRounding,
roundUnit,roundValue);
BucketFileWriter bucketFileWriter =sfWriters.get(realPath);
// we haven't seen this
file yet, so open it and cache the
// handle
if(bucketFileWriter
==null) {
bucketFileWriter =newBucketFileWriter();
bucketFileWriter.open(realPath,serializerType,
serializerContext,rollInterval,timedRollerPool,
sfWriters);
sfWriters.put(realPath,
bucketFileWriter);
}
// track the buckets getting
written in this transaction
if(!writers.contains(bucketFileWriter))
{
writers.add(bucketFileWriter);
}
// Write the data to File
bucketFileWriter.append(event);
}
if(txnEventCount
== 0) {
sinkCounter.incrementBatchEmptyCount();
}elseif(txnEventCount
==txnEventMax) {
sinkCounter.incrementBatchCompleteCount();
}else{
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets
before committing the transaction
for(BucketFileWriter
bucketFileWriter : writers) {
if(!bucketFileWriter.isBatchComplete())
{
flush(bucketFileWriter);
}
}
transaction.commit();
if(txnEventCount
> 0) {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
}
if(event
==null) {
returnStatus.BACKOFF;
}
returnStatus.READY;
}catch(IOException
eIO) {
transaction.rollback();
logger.warn("File
IO error", eIO);
returnStatus.BACKOFF;
}catch(Throwable
th) {
transaction.rollback();
logger.error("process
failed", th);
if(thinstanceofError)
{
throw(Error)
th;
}else{
thrownewEventDeliveryException(th);
}
}finally{
transaction.close();
}
}
privatevoidflush(BucketFileWriter
bucketFileWriter)throwsIOException
{
bucketFileWriter.flush();
}
@Override
publicsynchronizedvoidstart()
{
super.start();
this.sfWriters=newFileWriterLinkedHashMap(maxOpenFiles);
sinkCounter.start();
}
}
|
相关推荐
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...
Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...
这个压缩包包含了一个名为"flume-ng-sql-source-release-1.5.2.jar"的文件,这是该插件的核心组件,用于实现SQL查询以从数据库中提取数据。 Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志...
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
Apache Flume 是一个分布式...总的来说,Apache Flume-ng-1.6.0-cdh5.5.0 是一个强大且灵活的数据收集工具,特别适合在 CDH 环境中处理大规模的日志数据,它的易用性和可扩展性使其成为大数据基础设施的重要组成部分。
《Flume-ng-sql-source-1.4.3.jar:数据采集与SQL集成的利器》 Flume-ng-sql-source-1.4.3.jar 是Apache Flume的一个扩展组件,它为Flume提供了与SQL数据库交互的能力。Flume是Apache Hadoop生态体系中的一个分布式...
ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-...
flume-ng-hdfs-sink-1.7.0.jar,这个包里包含了flume和HDFS集成的所有类
flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu
该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...
`flume-ng-elasticsearch-sink-6.5.4.jar`正是这样一个插件,允许Flume将事件数据推送到Elasticsearch 6.5.4版本。这个jar文件包含所有必要的代码,使得Flume能够理解并处理Elasticsearch的相关配置和协议。 2. **...