`

Flume Spooldir 源的一些问题

 
阅读更多
最近在用Flume做数据的收集。用到了里面的Spooldir的源在使用中有如下的问题:

如果文件的某一行有乱码,不符合指定的编码规范,那么flume会抛出一个exception,然后就停在那儿了。
spooldir指定的文件夹中的文件一旦被修改,flume就会抛出一个exception,然后停在那儿了。
其实,flume的最大问题就是不够鲁棒。一旦出现问题,不能跳过,只能死在那儿。不知道flume为什么要这么设计。理论上,它应该允许我们在配置文件中指定在遇到错误的行时,是停止还是跳过,不过它目前并不支持这个。所以,我们只能写一个自己的flume的插件了。

https://github.com/xlvector/flume
https://github.com/ponyma/flume
这个插件主要修复了前面提到的两个问题:

如果某一行有乱码,flume会忽略这一行
flume只会check最近N分钟没有修改过的文件
具体修改方法如下。首先,我们继承了SpoolDirectorySource,实现了一个叫做RobustSpoolDirectorySource的类。这个类的代码基本是拷贝了SpoolDirectorySource的代码。但做了如下的修改。

在getNextFile()的函数中,我们发现了一个filter,做了如下的修改

FileFilter filter = new FileFilter() {
    public boolean accept(File candidate) {
        String fileName = candidate.getName();
        if ((candidate.isDirectory()) ||
            (fileName.endsWith(completedSuffix)) ||
            (fileName.startsWith(".")) ||
            ignorePattern.matcher(fileName).matches() ||
            (System.currentTimeMillis() - candidate.lastModified() < 600000)) {
            return false;
        }
        return true;
    }
};
这里,我们加入了一个条件

(System.currentTimeMillis() - candidate.lastModified() < 600000)
也就是说10分钟之内修改过的文件我们不会处理。

第二个修改是关于编码的,你可以在ReliableSpoolingFileEventReader.java的代码中找到如下的代码:

ResettableInputStream in =
    new ResettableFileInputStream(nextFile, tracker,
        ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
        DecodeErrorPolicy.FAIL);
这里,我们只需要将DecodeErrorPolicy 改成 DecodeErrorPolicy.IGNORE 即可。
分享到:
评论

相关推荐

    Flume1.8安装部署

    5. sources.type:源的类型,例如 avro、exec、netcat、spooldir 和 syslog 等。 6. channels.type:通道的类型,例如 memory-channel、JDBC-channel、file-channel 等。 7. sinks.type:接收器类型,例如 avro、...

    Flume 数据采集实战

    spooldir 数据源允许 Flume 监听指定文件夹,一旦有新文件生成,Flume 就会将其内容传输到目标位置,如 HDFS。实验中,可以在 Linux 系统创建一个文件夹,通过 Winscp 上传配置文件,编写测试文件,然后观察 Flume ...

    集群flume详细安装步骤

    a1.sources.r1.spoolDir = /opt/apache-flume-1.7.0-bin/temp a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources....

    Flume集群搭建1

    在 `push.conf` 中,source 可能定义为监控某个目录的变化(例如使用 SpoolDir source),channel 可能是 MemoryChannel 或 FileChannel,sink 可能指向 hadoop13 的 IP 地址。相应的,在 `pull.conf` 中,source 将...

    flume集群搭建与使用文档

    2. Source配置:配置spooldir、tail-F和netcat三种类型的source。 3. Sink配置:配置logger和hdfs两种类型的sink,hdfs sink用于将日志数据写入HDFS。 4. Channel配置:配置channel,用于连接source和sink。 5. ...

    windows下flume1.7使用文档

    Apache Flume 是一个分布式的、可靠的且高可用的系统,用于从不同的数据源收集、汇总和传输大量的日志数据到集中式的数据存储中心。Flume 的设计目的是为了能够处理大规模的数据流,确保数据的完整性和准确性,同时...

    flume+kafka+storm最完整讲解

    这个实验组合展示了如何构建一个实时数据处理架构,从数据源(通过 Flume)到中间存储(Kafka)再到实时处理(Storm),这对于大数据实时分析和监控场景非常有用。理解并掌握这一流程对于 IT 专业人士来说至关重要,...

    flume log4f示例源码

    # 定义一个名为logSource的源 agent.sources = logSource # 使用SpoolDirectorySource,它会监控指定目录下新产生的文件 agent.sources.logSource.type = spoolDir # 指定日志文件的输入目录 agent.sources....

    星环大数据平台_Flume使用方法.pdf

    星环大数据平台提供的Flume使用方法文档是一份面向数据工程师的培训材料,旨在教授如何安装和使用Flume进行数据的分布式采集。文档详细介绍了Flume组件和配置,并通过实验步骤,帮助工程师理解和掌握数据采集的流程...

    flume-push.conf

    a1.sources.r1.spoolDir = /var/log/flume a1.sources.r1.fileHeader = true a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.10.130 a1.sinks.k1.port = 9999 a1.channels.c1.type = memory a1....

    大数据采集技术-Flume监控目录.pptx

    在配置过程中,首先需要创建这个文件,并设定Flume的源(Source)、通道(Channel)和接收器(Sink)。在Flume中,Source负责从数据源处获取数据,Channel作为临时存储,Sink则将数据传输到目标位置,如HDFS。例如,...

    全国首份接地气流处理文档,kafka,flume,整合

    ### 全国首份接地气流处理文档:Kafka与Flume整合详解 #### 一、Flume的安装与配置 Flume是一款高可用的、高可靠的、分布式的系统,主要用于收集、聚合和移动大量日志数据。Flume支持在日志系统中定制各类数据发送...

    大数据组件-Flume高可用集群搭建

    Flume是一个高度可靠的分布式数据收集系统,主要用于从多种数据源收集并传输数据至Hadoop生态系统内进行后续处理。由于其强大的可扩展性和可靠性,Flume在大型互联网公司的数据处理流程中占据着重要地位。 **特点**...

    flume的安装和实践.docx

    Flume 提供了多种 Source 类型,如 Exec、SpoolDir 和 Taildir。Exec 可以监控命令的输出,适用于实时追加的日志;SpoolDir 监控新文件,但不支持断点续传;Taildir 是最常用于实时追加日志文件的监控,具备断点续传...

    java大数据案例_7Flume、Kafka、Sqoop、Lucene

    在这里我们选择文件轮询(Spool Directory)作为数据源,它会定期检查指定目录中的新文件,并将其添加到Flume的处理管道中。 ```properties a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/path/to/...

    flume_sink_ext:flume收集日志到postgrepsql ,es扩展

    mvn clean mvn install将代码打成jar包后,上传到flume安装目录下的lib文件夹中,同时需要上传SQL的驱动jar包 -------------测试假脱机目录源-------------------------- conf:dbsql_sink.conf Name the components...

    白内障

    CISC 525 Apache Flume项目运行Flume代理源spooldir-通道文件-接收器记录器mkdir /tmp/spooldirflume-ng agent --conf-file spool-to-logger.properties --name agent1 --conf $FLUME_HOME /conf -Dflume.root....

    大数据基础-数据采集与预处理.pdf

    例如,配置一个监听指定文件目录变化的Flume代理,通过spooldir类型的Source监控文件,logger类型的Sink接收并打印数据,中间通过Channel连接,形成数据流转路径。启动Flume Agent时,它会读取配置文件并开始数据...

    hadoop-辅助工具-笔记.docx

    对于更实际的应用,如采集日志目录中的文件到HDFS,可以使用`spooldir`类型的Source监控特定目录。每当新文件出现,Flume会自动读取并将其内容传输到HDFS。完成采集的文件会被添加"COMPLETED"后缀,以标识处理状态。...

    Flume简单案例

     采集源,即 source——监控文件目录 : spooldir  下沉目标,即 sink——HDFS 文件系统: hdfs sink  source 和 sink 之间的传递通道——channel,可用 file channel 也可以用内存 channel 配置文件编写: # ...

Global site tag (gtag.js) - Google Analytics