`
kavy
  • 浏览: 890587 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

flume拦截器

 
阅读更多

RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值

实例:
以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作

1
2
3
4
#!/bin/sh
filename=$1
hostname=`hostname -s`
tail -F $1 | awk -v filename=$filename -v hostname=$hostname '{print filename":"hostname":"$0}'

source的配置:

1
2
3
4
5
6
7
8
xxxx.sources.kafka1.interceptors = i1
xxxx.sources.kafka1.interceptors.i1.type = regex_extractor
xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/
xxxx.sources.kafka1.interceptors.i1.serializers = s1
xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename
xxxx.sources.kafka1.selector.type = multiplexing
xxxx.sources.kafka1.selector.header = logtypename
xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel


几个参数项:
regex 正则表达式

1
2
3
4
5
6
serializers  定义匹配组(正则匹配之后的值作为header的值,比如如果
Event body为1:2:3.4foobar5,regex为(\\d):(\\d):(\\d),serializers 
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name
为three,那么one->1,two->2,three->3.4foobar5,注意可以不必匹配所有的组)
 
serializers.x.name 作为event的header


首先看内部类Builder:
1)configureSerializers方法用来生成配置项,主要是操作List<NameAndSerializer>,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象,RegexExtractorInterceptorSerializer默认是org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,即对参数不做任何处理直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
....
     private void configureSerializers(Context context) {
      String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
      Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
          "Must supply at least one name and serializer" );
      String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
      Context serializerContexts =
          new Context(context.getSubProperties( SERIALIZERS + "."));
      serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
      for(String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
        Context serializerContext = new Context(
            serializerContexts.getSubProperties(serializerName + "." ));
        String type = serializerContext.getString( "type" "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
        String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
        Preconditions. checkArgument(!StringUtils. isEmpty(name),
            "Supplied name cannot be empty." );
        if ("DEFAULT" .equals(type)) {
          serializerList .add(new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
        else {
          serializerList .add(new NameAndSerializer(name, getCustomSerializer(
              type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
        }
      }
    }

这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:

1
2
3
4
org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer 
//直接返回,不做另外的操作(默认的类)
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer 
//使用指定的formatting pattern把传入的值转换为milliseconds

  
2)build方法用于返回一个RegexExtractorInterceptor对象

1
return new RegexExtractorInterceptor( regex , serializerList );

RegexExtractorInterceptor的主要方法intercept:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 static final String REGEX = "regex" ;
  static final String SERIALIZERS = "serializers" ;
...
  public Event intercept(Event event) {
    Matcher matcher = regex.matcher(
        new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
    Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
    if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
      for int group = 0, count = matcher.groupCount(); group < count; group++) {
        int groupIndex = group + 1// 匹配的index从1开始
        if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
....
          break;
        }
        NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
....
        headers.put(serializer. headerName,
            serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
      }
    }
    return event;
  }

 

本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1619537

分享到:
评论

相关推荐

    大数据采集技术-Flume拦截器的使用.pptx

    以下是关于Flume拦截器的详细知识: 一、Flume概述 Flume 是 Apache Hadoop 生态系统的一部分,主要用于收集、聚合和移动大量日志数据。它具有高度可扩展性和容错性,确保了数据的高可用性。Flume 通过简单灵活的...

    大数据采集技术-flume拦截器.pdf

    Flume 的拦截器是其强大功能的一部分,允许用户在数据流动的过程中进行定制化的数据处理和过滤。 Flume 拦截器是 Flume Agent 中的一个组件,它在数据进入通道(Channel)之前,先对事件(Event)进行预处理。拦截...

    电商数仓项目(八) Flume(2) 拦截器开发源代码

    在本项目的第二部分,我们将深入探讨 Flume 的拦截器开发,这是 Flume 自定义数据处理的一个关键环节。Flume 拦截器允许我们在数据流入通道之前对其进行预处理,比如过滤、修改或添加元数据等操作。了解如何开发拦截...

    flume-cacheable-interceptor-skeleton:可缓存 Flume 拦截器的骨架

    该项目为可缓存的 Flume 拦截器提供了一个框架。 它是使用提供缓存服务的 Spring Framework 4.1.5 版实现的。 要实现自己的拦截器,请克隆此项目并实现 CacheableInterceptor 和 FlumeCacheService 的缺失部分。 ...

    flume-interceptor.zip

    "flume-interceptor.zip" 文件很可能是包含了自定义 Flume 拦截器的相关代码和配置,用于对数据进行特定的预处理。 拦截器(Interceptor)是 Flume 中的一个关键组件,它允许用户在数据流进入 Flume Agent 之前对其...

    flume自定义拦截器学习

    flume自定义拦截器学习

    flume-demo_大数据_flume_DEMO_自定义拦截器_

    这个名为 "flume-demo_大数据_flume_DEMO_自定义拦截器_" 的项目,显然是一个示例,展示了如何在 Flume 中创建并使用自定义拦截器来过滤数据。下面我们将深入探讨 Flume 的基本概念、拦截器的作用以及如何自定义拦截...

    flume解析日志所需jar

    2. Flume拦截器:自定义的Flume拦截器实现,用于在数据进入Channel之前进行解析。 3. 连接库:如HBase和Elasticsearch的客户端库,使得Flume能够与这些系统通信。 在实际部署中,你需要将这些jar文件添加到Flume的...

    flume-interceptor-1.0.0-jar-with-dependencies.jar

    flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可

    flume消费kafka数据上传hdfs.doc

    Flume 消费 Kafka 数据上传 HDFS ...Flume 消费 Kafka 数据上传 HDFS 需要考虑 Channel 的选择、FileChannel 优化、HDFS Sink 的小文件问题、Flume 拦截器的使用等问题,以确保数据传输的可靠性和高效性。

    基于Java语言的Flume规则拦截器设计源码

    该项目为基于Java语言的Flume规则拦截器设计,包含77个文件,其中包括56个XML配置文件、9个Java源文件、7个Groovy脚本文件、2个Markdown文档以及少量其他类型文件。该设计旨在实现Flume数据流中的规则过滤功能,适用...

    大数据 + Flume 面试题 + 高频面试题(含答案)

    理解 Flume 的组件、事务机制、拦截器以及内存管理和小文件处理策略,对于优化大数据流处理和面试准备至关重要。在实际应用中,根据业务需求选择合适的 Channel 类型,配置合理的内存和硬盘资源,以及调整参数以减少...

    flume数据采集端过滤工程

    3. **自定义过滤器**:通过编写自定义Flume拦截器插件,实现特定业务逻辑的过滤需求。 Flume的工作流程是通过Agent(代理)来完成的,每个Agent由Source、Channel和Sink三部分组成。在Source与Channel之间,或者...

    大数据课程-Hadoop集群程序设计与开发-8.Flume日志采集系统_lk_edit.pptx

    4. **Flume拦截器** - 拦截器允许在数据流经Source到Sink的过程中添加预处理步骤,如过滤、转换或添加元数据,增强数据处理的灵活性。 5. **企业开发案例** - 在企业环境中,Flume常用于收集不同来源的日志数据,...

    Flume进阶-自定义拦截器jar包

    Flume进阶-自定义拦截器jar包

    Flumel对接kafka.docx

    **Flume拦截器(Interceptor)** Flume 允许自定义拦截器来处理事件,以在数据流入系统之前进行预处理。在提供的代码中,我们看到一个名为 `Interceptor` 的自定义拦截器实现。这个拦截器检查事件体(Event body)...

    Apache Hadoop准实时数据处理的架构模式.docx

    Flume拦截器在此处发挥重要作用,允许用户编写自定义代码来处理事件,通过与外部系统(如HBase)交互获取决策所需的信息,实现毫秒级延迟。 3. **准实时事件分割处理**:与前一种模式类似,但更强调数据分割,以...

    大数据技术Flume1.9

    29-Flume自定义拦截器-多路复用选择器介绍).avi 30-Flume自定义拦截器-编码.avi 31-Flume自定义拦截器-打包&配置信息.avi 32-Flume自定义拦截器-案例测试.avi 35-Flume自定义Source-打包测试.avi 38-Flume-事务源码...

    《Hadoop大数据开发实战》教学教案—09Flune.pdf

    拦截器是 Flume 中用于处理事件的中间件,可以添加在 Source 和 Channel 之间,或 Channel 和 Sink 之间。它们可以用来修改事件内容、过滤事件或进行简单的数据转换,例如添加时间戳、去除无用信息等。 6. **Flume...

    flume-timestamp-filter:水槽时间戳过滤器

    Flume拦截器根据事件标头中配置的passedTime检查时间戳字段有选择地过滤事件。 这支持基于包含或排除的过滤。 入门 克隆存储库 构建源 $ mvn clean package 创建拦截器目录并部署 $ mkdir -p /usr/lib/flume-ng/...

Global site tag (gtag.js) - Google Analytics