packageorg.apache.flume.interceptor;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;
importstaticorg.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;
importjava.io.File;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.IOException;
importjava.text.ParseException;
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.event.EventBuilder;
importorg.apache.oro.text.regex.MalformedPatternException;
importorg.apache.oro.text.regex.MatchResult;
importorg.apache.oro.text.regex.Pattern;
importorg.apache.oro.text.regex.PatternCompiler;
importorg.apache.oro.text.regex.PatternMatcher;
importorg.apache.oro.text.regex.Perl5Compiler;
importorg.apache.oro.text.regex.Perl5Matcher;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
publicclassLogFormatInterceptorimplementsInterceptor{
privatestaticfinalLoggerlogger=
LoggerFactory
.getLogger(LogFormatInterceptor.class);
privateStringconf_path=null;
privatebooleandynamicProp=false;
privateStringhostname=null;
privatelongpropLastModify=
0;
privatelongpropMonitorInterval;
privateStringregexp=null;
privateList<String>keys=null;
privatePatternpattern=null;
privatePatternCompilercompiler=null;
privatePatternMatchermatcher=null;
privateSimpleDateFormatsdf=null;
privateSimpleDateFormatsd=null;
privateSimpleDateFormatsh=null;
privateSimpleDateFormatsm=null;
privateSimpleDateFormatsdfAll=null;
privatelongeventCount=
0l;
publicLogFormatInterceptor(String
conf_path,booleandynamicProp,
String hostname,longpropMonitorInterval)
{
this.conf_path=
conf_path;
this.dynamicProp=
dynamicProp;
this.hostname=
hostname;
this.propMonitorInterval=
propMonitorInterval;
}
@Override
publicvoidclose()
{
}
@Override
publicvoidinitialize()
{
try{
// 读取配置文件,初始化正在表达式和输出的key列表
File file =newFile(conf_path);
propLastModify=
file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
regexp=
props.getProperty("regexp");
String strKey = props.getProperty("keys");
if(strKey
!=null) {
String[] strkeys = strKey.split(",");
keys=newLinkedList<String>();
for(String
key : strkeys) {
keys.add(key);
}
}
if(keys==null)
{
logger.error("====================keys
is null====================");
}else{
logger.info("keys="+keys);
}
if(regexp==null)
{
logger.error("====================regexp
is null====================");
}else{
logger.info("regexp="+regexp);
}
// 初始化正在表达式以及时间格式化类
compiler=newPerl5Compiler();
pattern=compiler.compile(regexp);
matcher=newPerl5Matcher();
sdf=newSimpleDateFormat("dd/MMM/yyyy:HH:mm:ss
Z",
java.util.Locale.US);
sd=newSimpleDateFormat("yyyyMMdd");
sh=newSimpleDateFormat("HH");
sm=newSimpleDateFormat("mm");
sdfAll=newSimpleDateFormat("yyyyMMddHHmmss");
}catch(MalformedPatternException
e) {
logger.error("Could
not complile pattern!", e);
}catch(FileNotFoundException
e) {
logger.error("conf
file is not found!", e);
}catch(IOException
e) {
logger.error("conf
file can not be read!", e);
}
}
@Override
publicEventintercept(Event
event) {
++eventCount;
try{
if(dynamicProp&&eventCount>propMonitorInterval)
{
File file =newFile(conf_path);
if(file.lastModified()
>propLastModify) {
propLastModify=
file.lastModified();
Properties props =newProperties();
FileInputStream fis;
fis =newFileInputStream(file);
props.load(fis);
String strKey = props.getProperty("keys");
if(strKey
!=null) {
String[] strkeys = strKey.split(",");
List<String> keystmp =newLinkedList<String>();
for(String
key : strkeys) {
keystmp.add(key);
}
if(keystmp.size()
>keys.size()) {
keys=
keystmp;
logger.info("dynamicProp
status updated = "+keys);
}else{
logger.error("dynamicProp
status new keys size less than old,so status update fail = "
+keys);
}
}else{
logger.error("dynamicProp
status get keys fail ,so status update fail = "
+keys);
}
}
}
Map<String, String> headers = event.getHeaders();
headers.put("host",hostname);
String body =newString(event.getBody());
if(pattern!=null)
{
StringBuffer stringBuffer =newStringBuffer();
Date date =null;
Map<String, String> index =newHashMap<String,
String>();
if(matcher.contains(body,pattern))
{
index.put("host",hostname);
MatchResult result =matcher.getMatch();
index.put("ip",
result.group(1));
try{
date =sdf.parse(result.group(2));
index.put("loc_time",sdfAll.format(date));
}catch(ParseException
e1) {
}
String url = result.group(3).replaceAll(",","|");
String[] params = url.split("&");
for(String
param : params) {
String[] p = param.split("=");
if(p.length==
2) {
index.put(p[0], p[1]);
}
}
index.put("browser",
result.group(4).replaceAll(",","|"));
for(String
key :keys) {
if(index.containsKey(key))
{
stringBuffer.append(index.get(key) +",");
}else{
stringBuffer.append("~,");
}
}
if(stringBuffer.length()
> 0) {
stringBuffer.deleteCharAt(stringBuffer.length() - 1);
}else{
stringBuffer.append("error="+
body);
}
if(date
!=null) {
headers.put("dayStr",sd.format(date));
headers.put("hourStr",sh.format(date));
Integer m = Integer.parseInt(sm.format(date));
String min ="";
if(m
>= 0 && m < 10) {
min ="0"+
(m / 5) * 5;
}else{
min = (m / 5) * 5 +"";
}
headers.put("minStr",
min);
}else{
headers.put("dayStr","errorLog");
}
Event e = EventBuilder.withBody(stringBuffer.toString()
.getBytes(), headers);
returne;
}
}
}catch(Exception
e) {
logger.error("LogFormat
error!", e);
}
returnnull;
}
@Override
publicList<Event>intercept(List<Event>
events) {
List<Event> list =newLinkedList<Event>();
for(Event
event : events) {
Event e = intercept(event);
if(e
!=null) {
list.add(e);
}
}
returnlist;
}
/**
* Builder which builds new instances of the HostInterceptor.
*/
publicstaticclassBuilderimplementsInterceptor.Builder
{
privateStringconfPath;
privatebooleandynamicProp;
privateStringhostname;
privatelongpropMonitorInterval;
@Override
publicInterceptor
build() {
returnnewLogFormatInterceptor(confPath,dynamicProp,hostname,
propMonitorInterval);
}
@Override
publicvoidconfigure(Context
context) {
confPath=
context.getString(CONF_PATH);
dynamicProp=
context.getBoolean(DYNAMICPROP,DYNAMICPROP_DFLT);
hostname=
context.getString(HOSTNAME,HOSTNAME_DFLT);
propMonitorInterval=
context.getLong(PROPMONITORINTERVAL,
PROPMONITORINTERVAL_DFLT);
}
}
publicstaticclassConstants
{
publicstaticStringCONF_PATH="confpath";
publicstaticStringDYNAMICPROP="dynamicprop";
publicstaticbooleanDYNAMICPROP_DFLT=false;
publicstaticStringHOSTNAME="hostname";
publicstaticStringHOSTNAME_DFLT="hostname";
publicstaticStringPROPMONITORINTERVAL="prop.monitor.rollInterval";
publicstaticlongPROPMONITORINTERVAL_DFLT=
500000l;
}
}
|
相关推荐
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...
Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...
这个压缩包包含了一个名为"flume-ng-sql-source-release-1.5.2.jar"的文件,这是该插件的核心组件,用于实现SQL查询以从数据库中提取数据。 Apache Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
Apache Flume 是一个分布式...总的来说,Apache Flume-ng-1.6.0-cdh5.5.0 是一个强大且灵活的数据收集工具,特别适合在 CDH 环境中处理大规模的日志数据,它的易用性和可扩展性使其成为大数据基础设施的重要组成部分。
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu
该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...
《Flume-ng-sql-source-1.4.3.jar:数据采集与SQL集成的利器》 Flume-ng-sql-source-1.4.3.jar 是Apache Flume的一个扩展组件,它为Flume提供了与SQL数据库交互的能力。Flume是Apache Hadoop生态体系中的一个分布式...
注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...