`
乡里伢崽
  • 浏览: 113059 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

flume iterceptor

 
阅读更多
对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

官方上提供的已有的拦截器有:

Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor

像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。
Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到
Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。
Static Interceptor:可以在event的header中添加自定义的key和value。
Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。
Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:

  public class WriteLog {
	protected static final Log logger = LogFactory.getLog(WriteLog.class);

	/**
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		while (true) {
			logger.info(new Date().getTime());
			logger.info("{\"requestTime\":"
					+ System.currentTimeMillis()
					+ ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"测试商家名称\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}");
			Thread.sleep(2000);

		}
	}
}



又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

修改后的flume.conf如下:

  tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1

tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1

tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp

tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactionCapacity=1000
tier1.channels.channel1.keep-alive=30

tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.writeFormat=Text
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize=10240
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=60


我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。
分享到:
评论

相关推荐

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    apache-flume-1.8.0

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...

    尚硅谷大数据技术之Flume

    尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    Flume+kafka+Storm整合

    ### Flume+kafka+Storm整合知识点详解 #### 一、Flume、Kafka与Storm概述 在大数据领域,数据采集、传输与实时处理是至关重要的环节。本篇内容重点介绍了如何利用Flume、Kafka与Storm这三个开源工具实现一套完整的...

    Flume集群环境搭建,flume监控

    Flume从1.5.0版本开始,重构了其内部架构,核心组件、配置以及代码架构都进行了重大改动,这个新版本被称为Flume NG(Next Generation),即Flume的新一代版本,用以替代了原来的Flume OG(Original Generation)。...

    flume安装程序

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。它是Apache Hadoop生态系统的一部分,特别设计用于处理和传输大规模流式数据。Flume通过简单而灵活的架构实现了数据的高效...

    Flume1.6.0入门:安装、部署、及flume的案例

    ### Flume 1.6.0 入门详解:安装、部署及案例分析 #### 一、Flume 概述 Flume 是 Cloudera 开发的一款高效、可靠且易于扩展的日志收集系统,适用于大数据环境下的日志采集任务。Flume 的初始版本被称为 FlumeOG...

    大数据Ambari之flume集成编译好的源码包

    **大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...

    apache-flume-1.9.0-bin.tar.gz

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 提供了简单灵活的架构,允许数据在多个数据源和数据接收器之间流动。它被广泛应用于大数据处理环境,尤其适合监控...

    flume hbanse2.0 lib

    Flume 是 Apache 开源项目提供的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。它设计用于高可用性和高可靠性,使得在大数据环境中处理流式数据变得简单。在描述中提到的问题是关于 Flume 不...

    47_Flume、Logstash、Filebeat调研报告

    【Flume】 Flume是Apache Hadoop项目的一部分,专门设计用于高效地收集、聚合和移动大规模日志数据。它的核心架构基于事件流,具备分布式、高可靠性和高可用性。Flume通过agent来实现数据采集,每个agent包含source...

    Flume学习文档(1){Flume基本概念、Flume事件概念与原理}.docx

    根据提供的文档信息,本文将详细解析Flume的基本概念、Flume事件的概念与原理,并进一步探讨其在大数据领域的应用价值。 ### 一、Flume基本概念 #### 1.1 Flume简介 Flume是由Cloudera公司开源的一种分布式、可靠...

    apache-flume-1.9.0-bin.tar.zip

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。它设计用于处理和传输大规模的日志数据,是大数据生态系统中的重要组件,常用于实时流数据处理。Apache Flume 1.9.0 版本是该软件...

    flume kafak实验报告.docx

    在大数据实时处理领域,Flume、Kafka 和 Spark Streaming 是常用的数据采集、传输与处理工具。本实验报告详细阐述了如何将这三个组件结合使用,构建一个高效的数据流处理系统。 一、Flume 与 Spark Streaming 的...

    Flume1.8安装部署

    Flume 1.8 安装部署 Flume 是一个分布式、可靠、可扩展的日志收集、聚合和移动系统,广泛应用于大数据和实时数据处理领域。以下是 Flume 1.8 安装部署的详细步骤和相关知识点。 一、准备工作 1. 下载 apache ...

    Flume安装详细步骤

    Flume安装详细步骤 Flume是一款基于Java的分布式日志收集系统,主要用于收集和传输大规模日志数据。下面是Flume安装的详细步骤: Step 1: 安装JDK环境 在安装Flume之前,需要确保JDK环境已经安装。这里我们使用...

    flume支持RabbitMQ插件

    flume支持RabbitMQ插件

    Flume学习文档(2){Flume安装部署、Flume配置文件}.docx

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。在本文档中,我们将深入探讨Flume的安装部署以及配置文件的使用。 首先,要安装Flume,你需要访问官方网站...

    flume自学文档.pdf

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

Global site tag (gtag.js) - Google Analytics