`
rjhym
  • 浏览: 67044 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出

 
阅读更多
应用场景:需要实时收集多台服务器的nginx日志到一台机器。收集完成结果存放需要按天生成文件夹,按每5分钟生成文件,比如2012年12月29日12点26分的日志,需要放到/data/log/20121229/log-1225-对应的文件中。自己实现了类似flume-og和flume-ng的hdfs-sink的文件sink。

使用的时候配置如下:
agent.sources = source
agent.channels = channel
agent.sinks = sink

agent.sources.source.type = avro
agent.sources.source.bind = 192.168.0.100
agent.sources.source.port = 44444
agent.sources.source.channels = channel

agent.sinks.sink.type = org.apache.flume.sink.FileSink
agent.sinks.sink.file.path = /data/log/%{dayStr}
agent.sinks.sink.file.filePrefix = log-%{hourStr}%{minStr}-
agent.sinks.sink.file.txnEventMax = 10000
agent.sinks.sink.file.maxOpenFiles = 5
agent.sinks.sink.channel = channel

agent.channels.channel.type = memory
agent.channels.channel.capacity = 100000
agent.channels.channel.transactionCapacity = 100000
agent.channels.channel.keep-alive = 60


依赖的jar如下:
jakarta-oro-2.0.1.jar
flume-ng-core-1.3.0-SNAPSHOT.jar
flume-ng-sdk-1.3.0-SNAPSHOT.jar
flume-ng-configuration-1.3.0-SNAPSHOT.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
guava-10.0.1.jar

代码如下:
FileSink.java
packageorg.apache.flume.sink;

importjava.io.IOException;
importjava.util.Calendar;
importjava.util.List;
importjava.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;

importorg.apache.flume.Channel;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.EventDeliveryException;
importorg.apache.flume.Transaction;
importorg.apache.flume.conf.Configurable;
importorg.apache.flume.formatter.output.BucketPath;
importorg.apache.flume.instrumentation.SinkCounter;
importorg.apache.flume.serialization.EventSerializer;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;

importcom.google.common.base.Preconditions;
importcom.google.common.collect.Lists;
importcom.google.common.util.concurrent.ThreadFactoryBuilder;

publicclassFileSinkextendsAbstractSinkimplementsConfigurable {

privatestaticfinalLoggerlogger= LoggerFactory
.getLogger(FileSink.class);

privateStringpath;
privatestaticfinalStringdefaultFileName="FlumeData";
privatestaticfinalintdefaultMaxOpenFiles= 50;

/**
* Default length of time we wait for blocking BucketWriter calls before
* timing out the operation. Intended to prevent server hangs.
*/

privatelongtxnEventMax;

privateFileWriterLinkedHashMapsfWriters;

privateStringserializerType;
privateContextserializerContext;

privatebooleanneedRounding=false;
privateintroundUnit= Calendar.SECOND;
privateintroundValue= 1;
privateSinkCountersinkCounter;

privateintmaxOpenFiles;

privateScheduledExecutorServicetimedRollerPool;

privatelongrollInterval;

@Override
publicvoidconfigure(Context context) {

String directory = Preconditions.checkNotNull(
context.getString("file.path"),"file.path is required");
String fileName = context.getString("file.filePrefix",defaultFileName);
this.path= directory +"/"+ fileName;

maxOpenFiles= context.getInteger("file.maxOpenFiles",
defaultMaxOpenFiles);

serializerType= context.getString("sink.serializer","TEXT");
serializerContext=newContext(
context.getSubProperties(EventSerializer.CTX_PREFIX));
txnEventMax= context.getLong("file.txnEventMax", 1l);
if(sinkCounter==null) {
sinkCounter=newSinkCounter(getName());
}

rollInterval= context.getLong("file.rollInterval", 30l);
String rollerName ="hdfs-"+ getName() +"-roll-timer-%d";
timedRollerPool= Executors.newScheduledThreadPool(maxOpenFiles,
newThreadFactoryBuilder().setNameFormat(rollerName).build());
}

@Override
publicStatus process()throwsEventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketFileWriter> writers = Lists.newArrayList();
transaction.begin();
try{
Event event =null;
inttxnEventCount = 0;
for(txnEventCount = 0; txnEventCount <txnEventMax; txnEventCount++) {
event = channel.take();
if(event ==null) {
break;
}

// reconstruct the path name by substituting place holders
String realPath = BucketPath
.escapeString(path, event.getHeaders(),needRounding,
roundUnit,roundValue);
BucketFileWriter bucketFileWriter =sfWriters.get(realPath);

// we haven't seen this file yet, so open it and cache the
// handle
if(bucketFileWriter ==null) {
bucketFileWriter =newBucketFileWriter();
bucketFileWriter.open(realPath,serializerType,
serializerContext,rollInterval,timedRollerPool,
sfWriters);
sfWriters.put(realPath, bucketFileWriter);
}

// track the buckets getting written in this transaction
if(!writers.contains(bucketFileWriter)) {
writers.add(bucketFileWriter);
}

// Write the data to File
bucketFileWriter.append(event);
}

if(txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
}elseif(txnEventCount ==txnEventMax) {
sinkCounter.incrementBatchCompleteCount();
}else{
sinkCounter.incrementBatchUnderflowCount();
}

// flush all pending buckets before committing the transaction
for(BucketFileWriter bucketFileWriter : writers) {
if(!bucketFileWriter.isBatchComplete()) {
flush(bucketFileWriter);
}
}
transaction.commit();
if(txnEventCount > 0) {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
}

if(event ==null) {
returnStatus.BACKOFF;
}
returnStatus.READY;
}catch(IOException eIO) {
transaction.rollback();
logger.warn("File IO error", eIO);
returnStatus.BACKOFF;
}catch(Throwable th) {
transaction.rollback();
logger.error("process failed", th);
if(thinstanceofError) {
throw(Error) th;
}else{
thrownewEventDeliveryException(th);
}
}finally{
transaction.close();
}
}

privatevoidflush(BucketFileWriter bucketFileWriter)throwsIOException {
bucketFileWriter.flush();
}

@Override
publicsynchronizedvoidstart() {
super.start();
this.sfWriters=newFileWriterLinkedHashMap(maxOpenFiles);
sinkCounter.start();

}
}

BucketFileWriter.java
package org.apache.flume.sink;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketFileWriter {

private static final Logger logger = LoggerFactory
.getLogger(BucketFileWriter.class);
private static final String IN_USE_EXT = ".tmp";
/**
* This lock ensures that only one thread can open a file at a time.
*/
private final AtomicLong fileExtensionCounter;
private OutputStream outputStream;

private EventSerializer serializer;

private String filePath;

/**
* Close the file handle and rename the temp file to the permanent filename.
* Safe to call multiple times. Logs HDFSWriter.close() exceptions.
*
* @throws IOException
* On failure to rename if temp file exists.
*/

public BucketFileWriter() {
fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
}

public void open(final String filePath, String serializerType,
Context serializerContext, final long rollInterval,
final ScheduledExecutorService timedRollerPool,
final FileWriterLinkedHashMap sfWriters) throws IOException {
this.filePath = filePath;
File file = new File(filePath + fileExtensionCounter + IN_USE_EXT);
file.getParentFile().mkdirs();
outputStream = new BufferedOutputStream(new FileOutputStream(file));
logger.info("filename = " + file.getAbsolutePath());
serializer = EventSerializerFactory.getInstance(serializerType,
serializerContext, outputStream);
serializer.afterCreate();
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
@Override
public Void call() throws Exception {
logger.debug(
"Rolling file ({}): Roll scheduled after {} sec elapsed.",
filePath + fileExtensionCounter + IN_USE_EXT,
rollInterval);
if (sfWriters.containsKey(filePath)) {
sfWriters.remove(filePath);
}
close();
return null;
}
};
timedRollerPool.schedule(action, rollInterval, TimeUnit.SECONDS);
}
}

public void append(Event event) throws IOException {
serializer.write(event);
}

public boolean isBatchComplete() {
return true;
}

public void flush() throws IOException {
serializer.flush();
outputStream.flush();

}

/**
* Rename bucketPath file from .tmp to permanent location.
*/
private void renameBucket() {
File srcPath = new File(filePath + fileExtensionCounter + IN_USE_EXT);
File dstPath = new File(filePath + fileExtensionCounter);
if (srcPath.exists()) {
srcPath.renameTo(dstPath);
logger.info("Renaming " + srcPath + " to " + dstPath);
}
}

public synchronized void close() throws IOException, InterruptedException {
if (outputStream != null) {
outputStream.flush();
outputStream.close();
}
renameBucket();
}
}

FileWriterLinkedHashMap.java
package org.apache.flume.sink;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileWriterLinkedHashMap extends
LinkedHashMap<String, BucketFileWriter> {

private static final Logger logger = LoggerFactory
.getLogger(FileWriterLinkedHashMap.class);

private static final long serialVersionUID = -7860596835613215998L;
private final int maxOpenFiles;

public FileWriterLinkedHashMap(int maxOpenFiles) {
super(16, 0.75f, true); // stock initial capacity/load, access
this.maxOpenFiles = maxOpenFiles;
}

@Override
protected boolean removeEldestEntry(Entry<String, BucketFileWriter> eldest) {
if (size() > maxOpenFiles) {
// If we have more that max open files, then close the last one
// and
// return true
try {
eldest.getValue().close();
} catch (IOException e) {
logger.warn(eldest.getKey().toString(), e);
} catch (InterruptedException e) {
logger.warn(eldest.getKey().toString(), e);
Thread.currentThread().interrupt();
}
return true;
} else {
return false;
}
}
}


分享到:
评论

相关推荐

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    flume-ng-sql-source-release-1.5.2.zip

    这个压缩包包含了一个名为"flume-ng-sql-source-release-1.5.2.jar"的文件,这是该插件的核心组件,用于实现SQL查询以从数据库中提取数据。 Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志...

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-elasticsearch6-sink.zip

    flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...

    flume-ng-sql-source

    包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载

    flume-ng-sql-source.jar

    flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    Apache Flume 是一个分布式...总的来说,Apache Flume-ng-1.6.0-cdh5.5.0 是一个强大且灵活的数据收集工具,特别适合在 CDH 环境中处理大规模的日志数据,它的易用性和可扩展性使其成为大数据基础设施的重要组成部分。

    flume-ng-sql-source-1.4.3.jar

    《Flume-ng-sql-source-1.4.3.jar:数据采集与SQL集成的利器》 Flume-ng-sql-source-1.4.3.jar 是Apache Flume的一个扩展组件,它为Flume提供了与SQL数据库交互的能力。Flume是Apache Hadoop生态体系中的一个分布式...

    flume-ng-1.5.0-cdh5.3.6.rar

    ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-...

    flume-ng-hdfs-sink-1.7.0.jar

    flume-ng-hdfs-sink-1.7.0.jar,这个包里包含了flume和HDFS集成的所有类

    flume-ng-sql-source-1.4.1

    flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu

    flume-ng-1.6.0 cdh5.7.0安装包

    该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...

    flume-ng-elasticsearch-sink-6.5.4.jar.zip

    `flume-ng-elasticsearch-sink-6.5.4.jar`正是这样一个插件,允许Flume将事件数据推送到Elasticsearch 6.5.4版本。这个jar文件包含所有必要的代码,使得Flume能够理解并处理Elasticsearch的相关配置和协议。 2. **...

Global site tag (gtag.js) - Google Analytics