`

Flume源代码解读一

阅读更多

     [ xcly原创于iteye,见http://xcly.iteye.com ]

 

   年初团队接了搭建公司Hadoop平台的研发计划,负责公司产品日志的收集,分析两个工作。

   日志收集准备搭建flume(0.9.3)这个分布式日志收集集群.背景介绍完毕,马上开始。

 

Flume分为agent,collector,master三个概念节点,agent负责收集日志,发到collector, collector一般负责接收agent发过来的事件流,写到存储器,这里一般是hdfs. master负责配置及通信管理,类似于hadoop中的NameNode,是心脏,是整个集群控制器。

   此次业务收集的日志主要是公司上千台服务器的产品日志,包括访问日志,搜索日志,下载日志,全部要入到hdfs, agent收集日志的source准备用tailDir来处理,今天我们就来看看tailDir的实现。 

 

   tailDir为EventSource的一种方式,source指日志信息产生源,在SourceFactoryImpl定义了我们可以使用的各种事件源,SourceFactoryImpl集成SourceFactory,是典型的工厂方法,通过getSource方法返回EventSource。 这里getSource的实现方法并不是简单的返回具体EventSource实现类,而是调用SourceBuilder的build方法返回。 SourceBuilder的实现类似于工厂方法,只是在每一个EventSource都必须含有builder的静态方法返回具体SourceBuilder的实现,这种方式实现有一定的优雅性,各个EventSource封装了自己对参数的不同处理,又可以根据不同的情况调用不同的构造函数,甚至是调用另一个EventSource的实现。 

 

 

   tailDir的实现类为TailDirSource, 有4个参数,dir要监听的文件夹,regex正则表达式限定命名文件,startFromEnd启动时是否从文件尾开始读取,recurseDepth要监听的文件夹级数。 在配置tailDir的时候容易被flume user guide误导,犯一个错误,参数的传递不需要参数名=,直接用逗号隔开的配置值即可,否则是使用默认值的。而且四个参数的顺序是固定的,必须是dir,regex,startFromEnd,recurseDepth,如果只使用前两个,可只传递两个参数,如果要使用recurseDepth,必须传递4个参数。

    TailDirSource对文件的监听调用 DirWatcher实现,在open方法中调用,每一个文件夹包括子文件夹对应一个DirWatcher实例,而DirWather用Periodic监听文件夹,每250毫秒遍历文件夹下所有文件一次, 同第一次新建DirWatcher实例一样,用fileCreated方法新建Cursor,Cursor是读取遍历文件的关键类。

   TailDirSource open方法还新建了TailSource实例,而TailSource新建了TailThread线程,TailThread线程不断循环从Cursor队列数组中遍历读取每一行事件,放到TailSource的队列中中。而TailDirSource 的next方法调用TailSource的next方法,  每100ms从TailSource的队列中取事件。主线程取数据,分线程放入数据。

    Cursor是读取文件信息的关键类,核心是用RandomAccessFile来读取文件,RandomAccessFile支持任意位置的随机读取,非常强大。默认每次是文件开头或者文件结尾读取,我们此次对于过大的文件加入了文件点读取记录功能,防止重复读取。

 

 

 

 

分享到:
评论
2 楼 cunsky 2012-12-02  
lakeblur 写道
您好,能否请教一个flume的问题,collector的sink我用escapedFormatDfs,如果我想每分钟生成一个文件,配成:escapedFormatDfs("hdfs://localhost:9000/flume/%Y%m%d/%{category}/", "%M.log",raw())' ,如果不加roll参数,hdfs中产生的文件似乎一直都没有关闭,无法读取,而我配置了roll的参数小于1分钟的话,则会造成数据丢失,想问一下roll的作用到底是什么?roll的机制和根据时间段自定义文件夹或文件名的机制是否会有冲突?如能告知,不胜感激!

路过看到了,根据我的了解说下,roll是根据时间长度或者接收日志总量大小对sink进行关闭和重开启,反映到底层就是dfs文件的关闭和重新打开一个文件,你不加roll参数的话,文件必然是无法关闭的,至于数据有丢失,不知道具体情况不敢妄测。
1 楼 lakeblur 2012-09-28  
您好,能否请教一个flume的问题,collector的sink我用escapedFormatDfs,如果我想每分钟生成一个文件,配成:escapedFormatDfs("hdfs://localhost:9000/flume/%Y%m%d/%{category}/", "%M.log",raw())' ,如果不加roll参数,hdfs中产生的文件似乎一直都没有关闭,无法读取,而我配置了roll的参数小于1分钟的话,则会造成数据丢失,想问一下roll的作用到底是什么?roll的机制和根据时间段自定义文件夹或文件名的机制是否会有冲突?如能告知,不胜感激!

相关推荐

    flume 1.8所有源代码 编译通过版 附 maven3.5.2 安装包

    在这个压缩包中,包含了Flume 1.8的源代码以及Maven 3.5.2的安装包,这对于开发者深入理解Flume的工作原理和进行定制化开发非常有帮助。 首先,要使用这些资源,你需要先安装Maven 3.5.2。解压`apache-maven-3.5.2....

    flume1.7.0源码

    Flume 1.7.0 是该软件的一个版本,包含了完整的源代码,便于开发者深入理解其工作原理并进行定制开发。 在Flume 1.7.0源码中,我们可以探索以下几个关键知识点: 1. **Flume架构**: Flume 的核心架构由三个主要...

    Flume读取数据库JDBC源程序

    在提供的压缩包文件`flume-ng-sql-source-develop`中,很可能包含了Flume JDBC源的源代码或者开发相关资源,供开发者自定义或扩展JDBC源的特性,例如添加新的查询策略、优化性能等。 使用Flume的JDBC源程序,企业...

    电商数仓项目(八) Flume(2) 拦截器开发源代码

    在 `flume-interceptor` 压缩包中,你可能找到了示例拦截器的源代码。研究这些代码可以帮助你更好地理解拦截器的工作原理,并为自己的项目提供参考。通过实践编写和调试拦截器,你可以更深入地了解 Flume 如何处理...

    flume自定义功能实现代码

    1. **创建 Java 类**:首先,你需要创建一个新的 Java 类,这个类需要继承自 Flume 的 Source 接口或者 AbstractSource 类。例如,你可以创建一个名为 `MyCustomSource` 的类。 2. **实现接口方法**:在 `...

    基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明

    基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明,含有代码注释,满分大作业资源,新手也可看懂,期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。...

    Flume1.6.0入门:安装、部署、及flume的案例

    1. **解压安装包**:将下载的 Flume 包解压到指定目录(如 `/home/hadoop`)。 ```bash tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /home/hadoop ``` 2. **配置环境变量**:编辑 `flume-env.sh` 配置文件,...

    es522_flume16_bak0827_succ_modify.zip

    1. **Flume架构**:理解Flume的基本组件,如Sources(数据源)、Sinks(数据接收器)和Channels(临时存储)的工作原理。 2. **Elasticsearch Sink配置**:学习如何配置Flume的conf文件,包括指定Elasticsearch的URL...

    Flume集群环境搭建,flume监控

    Flume从1.5.0版本开始,重构了其内部架构,核心组件、配置以及代码架构都进行了重大改动,这个新版本被称为Flume NG(Next Generation),即Flume的新一代版本,用以替代了原来的Flume OG(Original Generation)。...

    apache-flume-1.9.0-bin.tar.gz

    这个版本的 Flume 已经预先编译完成,用户下载后可以直接进行安装和使用,无需自行编译源代码。 描述中的 "编译好的flume1.9.0,下载安装即可使用" 提醒我们,该文件包含的 Flume 实例是已经准备好运行的,只需遵循...

    flume支持RabbitMQ插件

    flume支持RabbitMQ插件

    flume用户手册

    Apache Flume是一款分布式、可靠且可用的系统,主要用于高效地从多种不同的数据源收集、聚合和移动大量的日志数据到一个集中的存储库。除了用于日志数据聚合外,由于数据源是可定制的,Flume可以用来传输大量包括但...

    apache-flume-1.8.0

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...

    flume-ng文档.doc

    Flume 代理(agent)是一个运行在Java虚拟机(JVM)上的进程,它承载了使事件从外部源流动到下一个目标的组件。 **数据流模型** - **Flume 源(source)**:从外部源如web服务器消费事件。外部源以Flume源能识别的...

    flume-ng安装

    Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:Java JDK 安装 在安装 Flume-...

    Flume学习文档(1){Flume基本概念、Flume事件概念与原理}.docx

    FlumeNG相比FlumeOG进行了代码精简和架构优化,更符合现代大数据处理的要求,建议使用FlumeNG。 #### 1.4 主要作用 Flume主要用于实时地读取服务器本地磁盘上的数据,并将其写入到Hadoop的分布式文件系统(HDFS)中...

    Flume集群搭建1

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在本场景中,我们将讨论如何在两台机器(hadoop12 和 Hadoop13)上搭建一个简单的 Flume 集群,以便进行数据推送和拉取。 ...

    Apache flume1.6_src

    这个压缩包“Apache flume1.6_src”包含了 Flume 1.6.0 版本的源代码,对于理解其工作原理、学习底层技术以及进行定制化开发非常有帮助。 Flume 的核心组件主要包括 Channel、Sink 和 Source 三部分: 1. **Source...

Global site tag (gtag.js) - Google Analytics