`
rjhym
  • 浏览: 67047 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Flume-ng生产环境实践(四)实现log格式化interceptor

 
阅读更多
续上篇,由于filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-这样文件格式的,为了使file-sink能使用%{dayStr}这样的标签,需要在数据传输过程中,给event的header中添加对应的键值对。在flume-ng中提供了很方便的方式:Interceptor
以下为实现的interceptor,首先使用正则表达式匹配nginx日志,如何匹配成功,则获取匹配到的数据,并且对url中的参数进行处理,最后所有日志信息都被存储在Map中。根据配置文件中需要输出的键找到对应的值,按照顺序输出为csv格式的行。
原始日志格式:
112.245.239.72 - - [29/Dec/2012:15:00:00 +0800] "GET /p.gif?a=1&b=2HTTP/1.1" 200 0 "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4
.0; 4399Box.1357; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; AskTbPTV2/5.9.1.14019; 4399Box.1357)"

最终结果:
1,2
配置信息为:
agent.sources = source
agent.channels = channel
agent.sinks = sink

agent.sources.source.type = exec
#agent.sources.source.command = tail -n +0 -F /data/tmp/accesspvpb_2012-11-18.log
agent.sources.source.command = cat /opt/nginx/logs/vvaccess_log_pipe
agent.sources.source.interceptors = logformat

agent.sources.source.interceptors.logformat.type = org.apache.flume.interceptor.LogFormatInterceptor$Builder
agent.sources.source.interceptors.logformat.confpath = /usr/programs/flume/conf/logformat_vv.properties
agent.sources.source.interceptors.logformat.dynamicprop = true
agent.sources.source.interceptors.logformat.hostname = vv111
agent.sources.source.interceptors.logformat.prop.monitor.rollInterval = 100000
# The channel can be defined as follows.
agent.sources.source.channels = channel


agent.sinks.sink.type = avro
agent.sinks.sink.hostname = 192.168.0.100
agent.sinks.sink.port = 44444
agent.sinks.sink.channel = channel

# Each channel's type is defined.
agent.channels.channel.type = file
agent.channels.channel.checkpointDir = /data/tmpc/checkpoint
agent.channels.channel.dataDirs = /data/tmpc/data
agent.channels.channel.transactionCapacity = 15000

/usr/programs/flume/conf/logformat_vv.properties文件内容为:
keys=a,b
regexp=([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})\\s-\\s-\\s\\[([^]]+)\\]\\s\"GET\\s/p.gif\\?(.+)\\s.*\"\\s[0-9]+\\s[0-9]+\\s\"(.+)\"

interceptor的代码:
packageorg.apache.flume.interceptor;

importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;

importjava.io.File;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.IOException;
importjava.text.ParseException;
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;

importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.event.EventBuilder;
importorg.apache.oro.text.regex.MalformedPatternException;
importorg.apache.oro.text.regex.MatchResult;
importorg.apache.oro.text.regex.Pattern;
importorg.apache.oro.text.regex.PatternCompiler;
importorg.apache.oro.text.regex.PatternMatcher;
importorg.apache.oro.text.regex.Perl5Compiler;
importorg.apache.oro.text.regex.Perl5Matcher;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;

publicclassLogFormatInterceptorimplementsInterceptor{

privatestaticfinalLoggerlogger= LoggerFactory
.getLogger(LogFormatInterceptor.class);

privateStringconf_path=null;
privatebooleandynamicProp=false;
privateStringhostname=null;

privatelongpropLastModify= 0;
privatelongpropMonitorInterval;

privateStringregexp=null;
privateList<String>keys=null;

privatePatternpattern=null;
privatePatternCompilercompiler=null;
privatePatternMatchermatcher=null;
privateSimpleDateFormatsdf=null;
privateSimpleDateFormatsd=null;
privateSimpleDateFormatsh=null;
privateSimpleDateFormatsm=null;
privateSimpleDateFormatsdfAll=null;

privatelongeventCount= 0l;

publicLogFormatInterceptor(String conf_path,booleandynamicProp,
String hostname,longpropMonitorInterval) {
this.conf_path= conf_path;
this.dynamicProp= dynamicProp;
this.hostname= hostname;
this.propMonitorInterval= propMonitorInterval;
}

@Override
publicvoidclose() {

}

@Override
publicvoidinitialize() {
try{
// 读取配置文件,初始化正在表达式和输出的key列表
File file =newFile(conf_path);
propLastModify= file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
regexp= props.getProperty("regexp");
String strKey = props.getProperty("keys");
if(strKey !=null) {
String[] strkeys = strKey.split(",");
keys=newLinkedList<String>();
for(String key : strkeys) {
keys.add(key);
}
}
if(keys==null) {
logger.error("====================keys is null====================");
}else{
logger.info("keys="+keys);
}
if(regexp==null) {
logger.error("====================regexp is null====================");
}else{
logger.info("regexp="+regexp);
}

// 初始化正在表达式以及时间格式化类
compiler=newPerl5Compiler();
pattern=compiler.compile(regexp);
matcher=newPerl5Matcher();

sdf=newSimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",
java.util.Locale.US);
sd=newSimpleDateFormat("yyyyMMdd");
sh=newSimpleDateFormat("HH");
sm=newSimpleDateFormat("mm");
sdfAll=newSimpleDateFormat("yyyyMMddHHmmss");

}catch(MalformedPatternException e) {
logger.error("Could not complile pattern!", e);
}catch(FileNotFoundException e) {
logger.error("conf file is not found!", e);
}catch(IOException e) {
logger.error("conf file can not be read!", e);
}
}

@Override
publicEventintercept(Event event) {
++eventCount;
try{
if(dynamicProp&&eventCount>propMonitorInterval) {
File file =newFile(conf_path);
if(file.lastModified() >propLastModify) {
propLastModify= file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
String strKey = props.getProperty("keys");
if(strKey !=null) {
String[] strkeys = strKey.split(",");
List<String> keystmp =newLinkedList<String>();
for(String key : strkeys) {
keystmp.add(key);
}
if(keystmp.size() >keys.size()) {
keys= keystmp;
logger.info("dynamicProp status updated = "+keys);
}else{
logger.error("dynamicProp status new keys size less than old,so status update fail = "
+keys);
}
}else{
logger.error("dynamicProp status get keys fail ,so status update fail = "
+keys);
}

}
}

Map<String, String> headers = event.getHeaders();
headers.put("host",hostname);
String body =newString(event.getBody());
if(pattern!=null) {
StringBuffer stringBuffer =newStringBuffer();
Date date =null;
Map<String, String> index =newHashMap<String, String>();
if(matcher.contains(body,pattern)) {
index.put("host",hostname);
MatchResult result =matcher.getMatch();
index.put("ip", result.group(1));
try{
date =sdf.parse(result.group(2));
index.put("loc_time",sdfAll.format(date));
}catch(ParseException e1) {

}
String url = result.group(3).replaceAll(",","|");
String[] params = url.split("&");
for(String param : params) {
String[] p = param.split("=");
if(p.length== 2) {
index.put(p[0], p[1]);
}
}
index.put("browser", result.group(4).replaceAll(",","|"));
for(String key :keys) {
if(index.containsKey(key)) {
stringBuffer.append(index.get(key) +",");
}else{
stringBuffer.append("~,");
}
}
if(stringBuffer.length() > 0) {
stringBuffer.deleteCharAt(stringBuffer.length() - 1);
}else{
stringBuffer.append("error="+ body);
}

if(date !=null) {
headers.put("dayStr",sd.format(date));
headers.put("hourStr",sh.format(date));
Integer m = Integer.parseInt(sm.format(date));
String min ="";
if(m >= 0 && m < 10) {
min ="0"+ (m / 5) * 5;
}else{
min = (m / 5) * 5 +"";
}
headers.put("minStr", min);
}else{
headers.put("dayStr","errorLog");
}
Event e = EventBuilder.withBody(stringBuffer.toString()
.getBytes(), headers);
returne;
}
}
}catch(Exception e) {
logger.error("LogFormat error!", e);
}
returnnull;
}

@Override
publicList<Event>intercept(List<Event> events) {
List<Event> list =newLinkedList<Event>();
for(Event event : events) {
Event e = intercept(event);
if(e !=null) {
list.add(e);
}
}
returnlist;
}

/**
* Builder which builds new instances of the HostInterceptor.
*/
publicstaticclassBuilderimplementsInterceptor.Builder {

privateStringconfPath;
privatebooleandynamicProp;
privateStringhostname;
privatelongpropMonitorInterval;

@Override
publicInterceptor build() {
returnnewLogFormatInterceptor(confPath,dynamicProp,hostname,
propMonitorInterval);
}

@Override
publicvoidconfigure(Context context) {
confPath= context.getString(CONF_PATH);
dynamicProp= context.getBoolean(DYNAMICPROP,DYNAMICPROP_DFLT);
hostname= context.getString(HOSTNAME,HOSTNAME_DFLT);
propMonitorInterval= context.getLong(PROPMONITORINTERVAL,
PROPMONITORINTERVAL_DFLT);
}

}

publicstaticclassConstants {

publicstaticStringCONF_PATH="confpath";

publicstaticStringDYNAMICPROP="dynamicprop";
publicstaticbooleanDYNAMICPROP_DFLT=false;

publicstaticStringHOSTNAME="hostname";
publicstaticStringHOSTNAME_DFLT="hostname";

publicstaticStringPROPMONITORINTERVAL="prop.monitor.rollInterval";
publicstaticlongPROPMONITORINTERVAL_DFLT= 500000l;

}

}
至此,获取nginx日志,进行格式化清洗,传输到collector机器,按照格式化的目录和文件名进行输出全部完成。
分享到:
评论

相关推荐

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    flume-ng-sql-source-release-1.5.2.zip

    这个压缩包包含了一个名为"flume-ng-sql-source-release-1.5.2.jar"的文件,这是该插件的核心组件,用于实现SQL查询以从数据库中提取数据。 Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    flume-ng-sql-source.jar

    flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume

    flume-ng-sql-source

    包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    Apache Flume 是一个分布式...总的来说,Apache Flume-ng-1.6.0-cdh5.5.0 是一个强大且灵活的数据收集工具,特别适合在 CDH 环境中处理大规模的日志数据,它的易用性和可扩展性使其成为大数据基础设施的重要组成部分。

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    flume-ng-sql-source-1.4.1

    flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu

    flume-ng-1.6.0 cdh5.7.0安装包

    该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...

    flume-ng-sql-source-1.4.3.jar

    《Flume-ng-sql-source-1.4.3.jar:数据采集与SQL集成的利器》 Flume-ng-sql-source-1.4.3.jar 是Apache Flume的一个扩展组件,它为Flume提供了与SQL数据库交互的能力。Flume是Apache Hadoop生态体系中的一个分布式...

    flume-interceptor-1.0-SNAPSHOT.jar

    注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...

    flume-ng-1.6.0-cdh5.14.2.tar.gz

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

Global site tag (gtag.js) - Google Analytics