1
2
3
4
|
#!/bin/sh filename=$ 1
hostname=`hostname -s` tail -F $ 1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}'
|
1
2
3
4
5
6
7
8
|
xxxx.sources.kafka1.interceptors = i1 xxxx.sources.kafka1.interceptors.i1.type = regex_extractor xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/ xxxx.sources.kafka1.interceptors.i1.serializers = s1 xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename xxxx.sources.kafka1.selector.type = multiplexing xxxx.sources.kafka1.selector.header = logtypename xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel |
1
2
3
4
5
6
|
serializers 定义匹配组(正则匹配之后的值作为header的值,比如如果 Event body为 1 : 2 : 3 .4foobar5,regex为(\\d):(\\d):(\\d),serializers
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name 为three,那么one-> 1 ,two-> 2 ,three-> 3 .4foobar5,注意可以不必匹配所有的组)
serializers.x.name 作为event的header |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
.... private void configureSerializers(Context context) {
String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
"Must supply at least one name and serializer" );
String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
Context serializerContexts =
new Context(context.getSubProperties( SERIALIZERS + "." ));
serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
for (String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
Context serializerContext = new Context(
serializerContexts.getSubProperties(serializerName + "." ));
String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
Preconditions. checkArgument(!StringUtils. isEmpty(name),
"Supplied name cannot be empty." );
if ( "DEFAULT" .equals(type)) {
serializerList .add( new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
} else {
serializerList .add( new NameAndSerializer(name, getCustomSerializer(
type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
}
}
}
|
1
2
3
4
|
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //直接返回,不做另外的操作(默认的类) org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer //使用指定的formatting pattern把传入的值转换为milliseconds |
1
|
return new RegexExtractorInterceptor( regex , serializerList );
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
static final String REGEX = "regex" ;
static final String SERIALIZERS = "serializers" ;
... public Event intercept(Event event) {
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
for ( int group = 0 , count = matcher.groupCount(); group < count; group++) {
int groupIndex = group + 1 ; // 匹配的index从1开始
if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
.... break ;
}
NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
.... headers.put(serializer. headerName,
serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
}
}
return event;
}
|
相关推荐
以下是关于Flume拦截器的详细知识: 一、Flume概述 Flume 是 Apache Hadoop 生态系统的一部分,主要用于收集、聚合和移动大量日志数据。它具有高度可扩展性和容错性,确保了数据的高可用性。Flume 通过简单灵活的...
Flume 的拦截器是其强大功能的一部分,允许用户在数据流动的过程中进行定制化的数据处理和过滤。 Flume 拦截器是 Flume Agent 中的一个组件,它在数据进入通道(Channel)之前,先对事件(Event)进行预处理。拦截...
在本项目的第二部分,我们将深入探讨 Flume 的拦截器开发,这是 Flume 自定义数据处理的一个关键环节。Flume 拦截器允许我们在数据流入通道之前对其进行预处理,比如过滤、修改或添加元数据等操作。了解如何开发拦截...
该项目为可缓存的 Flume 拦截器提供了一个框架。 它是使用提供缓存服务的 Spring Framework 4.1.5 版实现的。 要实现自己的拦截器,请克隆此项目并实现 CacheableInterceptor 和 FlumeCacheService 的缺失部分。 ...
"flume-interceptor.zip" 文件很可能是包含了自定义 Flume 拦截器的相关代码和配置,用于对数据进行特定的预处理。 拦截器(Interceptor)是 Flume 中的一个关键组件,它允许用户在数据流进入 Flume Agent 之前对其...
flume自定义拦截器学习
这个名为 "flume-demo_大数据_flume_DEMO_自定义拦截器_" 的项目,显然是一个示例,展示了如何在 Flume 中创建并使用自定义拦截器来过滤数据。下面我们将深入探讨 Flume 的基本概念、拦截器的作用以及如何自定义拦截...
2. Flume拦截器:自定义的Flume拦截器实现,用于在数据进入Channel之前进行解析。 3. 连接库:如HBase和Elasticsearch的客户端库,使得Flume能够与这些系统通信。 在实际部署中,你需要将这些jar文件添加到Flume的...
flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可
Flume 消费 Kafka 数据上传 HDFS ...Flume 消费 Kafka 数据上传 HDFS 需要考虑 Channel 的选择、FileChannel 优化、HDFS Sink 的小文件问题、Flume 拦截器的使用等问题,以确保数据传输的可靠性和高效性。
该项目为基于Java语言的Flume规则拦截器设计,包含77个文件,其中包括56个XML配置文件、9个Java源文件、7个Groovy脚本文件、2个Markdown文档以及少量其他类型文件。该设计旨在实现Flume数据流中的规则过滤功能,适用...
理解 Flume 的组件、事务机制、拦截器以及内存管理和小文件处理策略,对于优化大数据流处理和面试准备至关重要。在实际应用中,根据业务需求选择合适的 Channel 类型,配置合理的内存和硬盘资源,以及调整参数以减少...
3. **自定义过滤器**:通过编写自定义Flume拦截器插件,实现特定业务逻辑的过滤需求。 Flume的工作流程是通过Agent(代理)来完成的,每个Agent由Source、Channel和Sink三部分组成。在Source与Channel之间,或者...
4. **Flume拦截器** - 拦截器允许在数据流经Source到Sink的过程中添加预处理步骤,如过滤、转换或添加元数据,增强数据处理的灵活性。 5. **企业开发案例** - 在企业环境中,Flume常用于收集不同来源的日志数据,...
Flume进阶-自定义拦截器jar包
**Flume拦截器(Interceptor)** Flume 允许自定义拦截器来处理事件,以在数据流入系统之前进行预处理。在提供的代码中,我们看到一个名为 `Interceptor` 的自定义拦截器实现。这个拦截器检查事件体(Event body)...
Flume拦截器在此处发挥重要作用,允许用户编写自定义代码来处理事件,通过与外部系统(如HBase)交互获取决策所需的信息,实现毫秒级延迟。 3. **准实时事件分割处理**:与前一种模式类似,但更强调数据分割,以...
29-Flume自定义拦截器-多路复用选择器介绍).avi 30-Flume自定义拦截器-编码.avi 31-Flume自定义拦截器-打包&配置信息.avi 32-Flume自定义拦截器-案例测试.avi 35-Flume自定义Source-打包测试.avi 38-Flume-事务源码...
拦截器是 Flume 中用于处理事件的中间件,可以添加在 Source 和 Channel 之间,或 Channel 和 Sink 之间。它们可以用来修改事件内容、过滤事件或进行简单的数据转换,例如添加时间戳、去除无用信息等。 6. **Flume...
Flume拦截器根据事件标头中配置的passedTime检查时间戳字段有选择地过滤事件。 这支持基于包含或排除的过滤。 入门 克隆存储库 构建源 $ mvn clean package 创建拦截器目录并部署 $ mkdir -p /usr/lib/flume-ng/...