`
qianshangding
  • 浏览: 127952 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

SpoolDirectorySource使用及源码分析

 
阅读更多

Spooling Directory Source简介

Spooling Directory Source可以获取硬盘上“spooling”目录的数据,这个Source将监视指定目录是否有新文件,如果有新文件的话,就解析这个新文件。事件的解析逻辑是可插拔的。在文件的内容所有的都读取到Channel之后,Spooling Directory Source会重名或者是删除该文件以表示文件已经读取完成。
不像Exec Source,这个Source是可靠的,且不会丢失数据。即使Flume重启或者被Kill。但是需要注意如下两点:
1,如果文件在放入spooling目录之后还在写,那么Flume会打印错误日志,并且停止处理该文件。
2,如果文件之后重复使用,Flume将打印错误日志,并且停止处理。
为了避免以上问题,我们可以使用唯一的标识符来命令文件,例如:时间戳。

尽管这个Source是可靠的,但是如果下游发生故障,也会导致Event重复,这种情况就需要通过Flume的其他组件提供保障了。

属性名默认描述
channels

type 组件名:spooldir.
spoolDir 读取文件的目录。
fileSuffix .COMPLETED Spooling读取过的文件,添加的后缀。
deletePolicy never 完成后的文件是否删除。never:不删除immediate:立即删除
fileHeader false 是不把路径加入到Heander
fileHeaderKey file 路径加入到Header的Key是什么
basenameHeader false 是不把文件名加入到Heander
basenameHeaderKey basename 文件名加入到Header的Key是什么
ignorePattern ^$ 采用正则表达是去过滤一些文件。只有符合正则表达式的文件才会被使用。
trackerDir .flumespool 被处理文件的元数据的存储目录,如果不是绝对路径,就被会解析到spoolDir目录下。
consumeOrder oldest 消费spooling目录文件的规则,分别有:oldest,youngest和random。在oldest 和 youngest的情况下,
通过文件的最后修改时间来比较文件。如果最后修改时间相同,就根据字典的序列从小开始。在随机的情况
下,就随意读取文件。如果文件列表很长,采用oldest/youngest可能会很慢,因为用oldest/youngest要
扫描文件。但是如果采用random的话,就可能造成新的文件消耗的很快,老的文件一直都没有被消费。
maxBackoff 4000 如果Channel已经满了,那么该Source连续尝试写入该Channel的最长时间(单位:毫秒)。
batchSize 100 批量传输到Channel的粒度。
inputCharset UTF-8 字符集
decodeErrorPolicy FAIL 在文件中有不可解析的字符时的解析策略。FAIL: 抛出一个异常,并且不能解析该文件。REPLACE: 取代不可
解析的字符,通常用Unicode U+FFFD.IGNORE: 丢弃不可能解析字符序列。
deserializer LINE 自定序列化的方式,自定的话,必须实现EventDeserializer.Builder.
deserializer.*


bufferMaxLines 已废弃。
bufferMaxLineLength 5000 (不推荐使用) 一行中最大的长度,可以使用deserializer.maxLineLength代替。
selector.type replicating replicating(复制) 或 multiplexing(复用)
selector.*

取决于selector.type的值
interceptors 空格分割的interceptor列表。
interceptors.*


SpoolDirectorySource示例

读取文件写入到file_roll中

a1.sources = source1
a1.sinks = sink1
a1.channels = channel1

#resources
a1.sources.source1.type = spooldir
a1.sources.source1.channels = channel1
a1.sources.source1.spoolDir = E:\\home\\spooling
a1.sources.source1.fileHeader = true
a1.sources.source1.fileHeaderKey = fishfile
a1.sources.source1.basenameHeader = true
a1.sources.source1.basenameHeaderKey = fishbasename

a1.sinks.sink1.type = file_roll
a1.sinks.sink1.sink.directory = E:\\home\\file_roll
a1.sinks.sink1.sink.rollInterval = 300
a1.sinks.sink1.sink.serializer = TEXT
a1.sinks.sink1.sink.batchSize = 100

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1

SpoolDirectorySource源码分析

一,调用configure(Context context)方法初始化:

@Override
  public synchronized void configure(Context context) {
    //spool目录
    spoolDirectory = context.getString(SPOOL_DIRECTORY);
    Preconditions.checkState(spoolDirectory != null,
        "Configuration must specify a spooling directory");
    //完成后的文件后缀
    completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
        DEFAULT_SPOOLED_FILE_SUFFIX);
    //删除策略,never:不删除 或 immediate:立即删除
    deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
    //以下四个参数是是否在header中加入文件名和文件路径。
    fileHeader = context.getBoolean(FILENAME_HEADER,
        DEFAULT_FILE_HEADER);
    fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
        DEFAULT_FILENAME_HEADER_KEY);
    basenameHeader = context.getBoolean(BASENAME_HEADER,
        DEFAULT_BASENAME_HEADER);
    basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
        DEFAULT_BASENAME_HEADER_KEY);
    //批量处理的数量
    batchSize = context.getInteger(BATCH_SIZE,
        DEFAULT_BATCH_SIZE);
    //字符集
    inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
    //在文件中有不可解析的字符时的解析策略
    decodeErrorPolicy = DecodeErrorPolicy.valueOf(
        context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
        .toUpperCase(Locale.ENGLISH));
    //过滤文件的正则表达式
    ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
    //被处理文件的元数据的存储目录
    trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);

    //序列化
    deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
    deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
        "."));
    //消费spooling目录文件的规则
    consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, 
        DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH));

    // "Hack" to support backwards compatibility with previous generation of
    // spooling directory source, which did not support deserializers
    Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
    if (bufferMaxLineLength != null && deserializerType != null &&
        deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
      deserializerContext.put(LineDeserializer.MAXLINE_KEY,
          bufferMaxLineLength.toString());
    }
    
    maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
    if (sourceCounter == null) {
      sourceCounter = new SourceCounter(getName());
    }
  }

start方法:

@Override
  public synchronized void start() {
    logger.info("SpoolDirectorySource source starting with directory: {}",
        spoolDirectory);

    executor = Executors.newSingleThreadScheduledExecutor();

    File directory = new File(spoolDirectory);
    //构建ReliableSpoolingFileEventReader对象
    try {
      reader = new ReliableSpoolingFileEventReader.Builder()
          .spoolDirectory(directory)
          .completedSuffix(completedSuffix)
          .ignorePattern(ignorePattern)
          .trackerDirPath(trackerDirPath)
          .annotateFileName(fileHeader)
          .fileNameHeader(fileHeaderKey)
          .annotateBaseName(basenameHeader)
          .baseNameHeader(basenameHeaderKey)
          .deserializerType(deserializerType)
          .deserializerContext(deserializerContext)
          .deletePolicy(deletePolicy)
          .inputCharset(inputCharset)
          .decodeErrorPolicy(decodeErrorPolicy)
          .consumeOrder(consumeOrder)
          .build();
    } catch (IOException ioe) {
      throw new FlumeException("Error instantiating spooling event parser",
          ioe);
    }
    //构建SpoolDirectoryRunnable线程。
    Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
    //每隔POLL_DELAY_MS(500ms)执行以下SpoolDirectoryRunnable线程。
    executor.scheduleWithFixedDelay(
        runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

    super.start();
    logger.debug("SpoolDirectorySource source started");
    sourceCounter.start();
  }
构建ReliableSpoolingFileEventReader类,构造方法功能:
1,spooling目录是否存在,是否是目录

2,通过创建零时文件测试spooling目录的权限。

3,创建trackerDir目录和.flumespool-main.meta文件

SpoolDirectoryRunnable线程,主要用于发送和读取Event:

private class SpoolDirectoryRunnable implements Runnable {
    private ReliableSpoolingFileEventReader reader;
    private SourceCounter sourceCounter;

    public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
        SourceCounter sourceCounter) {
      this.reader = reader;
      this.sourceCounter = sourceCounter;
    }

    @Override
    public void run() {
      int backoffInterval = 250;
      try {
        while (!Thread.interrupted()) {
          //ReliableSpoolingFileEventReader读取batchSize大小的Event
          List<Event> events = reader.readEvents(batchSize);
          if (events.isEmpty()) {
            break;
          }
          //统计
          sourceCounter.addToEventReceivedCount(events.size());
          sourceCounter.incrementAppendBatchReceivedCount();

          try {
            //将Event数组发送到Channel
            getChannelProcessor().processEventBatch(events);
            //commit会记录最后一次读取的行数,以便下次知道从哪里开始读
            reader.commit();
          } catch (ChannelException ex) {
            //ChannelProcessor批量提交Event出错,会抛出ChannelException异常,此时reader.commit是没有执行的
            //所以在接下来的continue后,继续通过reader读取文件的话,还是从原来的位置读取,以保证数据不会丢失。
            logger.warn("The channel is full, and cannot write data now. The " +
              "source will try again after " + String.valueOf(backoffInterval) +
              " milliseconds");
            hitChannelException = true;
            if (backoff) {
              TimeUnit.MILLISECONDS.sleep(backoffInterval);
              backoffInterval = backoffInterval << 1;
              backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
                                backoffInterval;
            }
            continue;
          }
          backoffInterval = 250;
          sourceCounter.addToEventAcceptedCount(events.size());
          sourceCounter.incrementAppendBatchAcceptedCount();
        }
      } catch (Throwable t) {
        logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
            "Uncaught exception in SpoolDirectorySource thread. " +
            "Restart or reconfigure Flume to continue processing.", t);
        hasFatalError = true;
        Throwables.propagate(t);
      }
    }
  }
ReliableSpoolingFileEventReader读取Event:

public List<Event> readEvents(int numEvents) throws IOException {
    //committed初始化为true
    if (!committed) {
      if (!currentFile.isPresent()) {
        throw new IllegalStateException("File should not roll when " +
            "commit is outstanding.");
      }
      logger.info("Last read was never committed - resetting mark position.");
      //正常情况下,会在SpoolDirectorySource类中记录读取的字节数之后,将commited设置为true
      //没有设置为true,可能是因为发送到Channel异常了,调用下面reset方法可以保证数据不丢失。
      currentFile.get().getDeserializer().reset();
    } else {
      // Check if new files have arrived since last call
      if (!currentFile.isPresent()) {
        //读取文件,读取文件过程中使用FileFilter过滤掉completedSuffix后缀的文件,然后根据消费文件的规则(consumeOrder)去消费文件。
        currentFile = getNextFile();
      }
      // Return empty list if no new files
      if (!currentFile.isPresent()) {
        return Collections.emptyList();
      }
    }
    
    EventDeserializer des = currentFile.get().getDeserializer();
    //根据序列化类读取Event
    List<Event> events = des.readEvents(numEvents);

    /* It's possible that the last read took us just up to a file boundary.
     * If so, try to roll to the next file, if there is one.
     * Loop until events is not empty or there is no next file in case of 0 byte files */
    while (events.isEmpty()) {
      logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
      retireCurrentFile();
      currentFile = getNextFile();
      if (!currentFile.isPresent()) {
        return Collections.emptyList();
      }
      events = currentFile.get().getDeserializer().readEvents(numEvents);
    }
    //添加<span style="font-family:Microsoft Yahei;">文件路径到Header</span>
    if (annotateFileName) {
      String filename = currentFile.get().getFile().getAbsolutePath();
      for (Event event : events) {
        event.getHeaders().put(fileNameHeader, filename);
      }
    }
    //添加文件名到Header
    if (annotateBaseName) {
      String basename = currentFile.get().getFile().getName();
      for (Event event : events) {
        event.getHeaders().put(baseNameHeader, basename);
      }
    }

    committed = false;
    lastFileRead = currentFile;
    return events;
  }
消费文件的规则,以OLDEST为例:


for (File candidateFile: candidateFiles) {
        //已选择文件的最后修改时间,减去文件列表取的文件最后修改时间
        long compare = selectedFile.lastModified() -
            candidateFile.lastModified();
        if (compare == 0) { // ts is same pick smallest lexicographically.
          //时间一样就根据字典序列排序
          selectedFile = smallerLexicographical(selectedFile, candidateFile);
        } else if (compare > 0) { // candidate is older (cand-ts < selec-ts).
          selectedFile = candidateFile;
        }
      }

在ReliableSpoolingFileEventReader类读取Events之后,调用commit方法提交,这里有个问题,batchSize为200,我们每次也就读取200行,那么SpoolingDirectorySource是如何标记我们读到文件的那个位置的呢?

其实SpoolingDirectorySource在commit时,会调用EventDeserializer的mark方法标记此时读取在文件的什么位置。源码如下:

  public synchronized void storePosition(long position) throws IOException {
    metaCache.setOffset(position);
    writer.append(metaCache);
    writer.sync();
    writer.flush();
  }
上面的方法会把文件的读取到什么位置记录到.flumespool\.flumespool-main.meta(默认情况下)文件中。

  public void commit() throws IOException {
    if (!committed && currentFile.isPresent()) {
      currentFile.get().getDeserializer().mark();
      //mark成功后,将committed设置为true
      committed = true;
    }
  }



分享到:
评论

相关推荐

    flume log4f示例源码

    # 使用SpoolDirectorySource,它会监控指定目录下新产生的文件 agent.sources.logSource.type = spoolDir # 指定日志文件的输入目录 agent.sources.logSource.spoolDir = /path/to/log/files # 文件被读取后自动...

    python入门-30.寻找列表中只出现一次的数字-寻找单身狗.py

    python入门-30.寻找列表中只出现一次的数字——寻找单身狗.py

    布尔教育linux优化笔记

    linux优化笔记,配套视频:https://www.bilibili.com/list/474327672?sid=4496133&spm_id_from=333.999.0.0&desc=1

    知识付费系统-直播+讲师入驻+课程售卖+商城系统-v2.1.9版本搭建以及资源分享下载

    知识付费系统-直播+讲师入驻+课程售卖+商城系统-v2.1.9版本搭建以及资源分享下载,CRMEB知识付费分销与直播营销系统是由西安众邦科技自主开发的一款在线教育平台,该系统不仅拥有独立的知识产权,还采用了先进的ThinkPhp5.0框架和Vue前端技术栈,集成了在线直播教学及课程分销等多种功能,旨在为用户提供全方位的学习体验,默认解压密码youyacaocom

    美妆神域-JAVA-基于springBoot美妆神域设计与实现

    美妆神域-JAVA-基于springBoot美妆神域设计与实现

    原生js制作Google粘土logo动画涂鸦代码.zip

    原生js制作Google粘土logo动画涂鸦代码.zip

    golin 扫描工具使用, 检查系统漏洞、web程序漏洞

    golin 扫描工具使用, 检查系统漏洞、web程序漏洞

    原生态纯js图片网格鼠标悬停放大显示特效代码下载.zip

    原生态纯js图片网格鼠标悬停放大显示特效代码下载.zip

    用AWLUM进行灰色编码2^2n-QAM调制的精确率Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    去水印web端独立版web

    去水印web端独立版web

    原生js制作左侧浮动可折叠在线客服代码.zip

    原生js制作左侧浮动可折叠在线客服代码.zip

    Chrome 谷歌浏览器下载

    Chrome 谷歌浏览器下载

    亲测全新完整版H5商城系统源码 附教程

    全新完整版H5商城系统源码 自己花钱买的,亲测可用,需要自行下载 H5商城系统设置是实现商城基本功能的核心部分,涵盖了从网站配置、短信和支付配置,到商品、工单、订单、分站和提现管理等多个模块的设置。以下是详细的设置指南,帮助您快速上手并高效管理商城系统。 测试环境:Nginx+PHP7.0+MySQL5.6 1. 网站配置 设置商城名称、LOGO、标题、联系方式和SEO关键词等,确保商城专业和易于搜索。 2. 短信配置 配置短信接口和模板,用于发送订单通知、验证码等,提升用户体验。 3. 支付接口配置 配置微信、支付宝等支付接口,填写API密钥和回调地址,确保支付流畅。 4. 商品分类管理 对商品进行分类和排序,设置分类名称和图标,便于用户查找商品。 5. 商品管理 添加和管理商品信息、规格、图片等,确保商品信息准确丰富。 6. 工单管理 查看和回复用户工单,记录售后问题,提升用户服务质量。 7. 订单管理 查看订单详情,更新订单状态,支持批量导出,方便订单跟踪。 8. 分站管理 创建不同区域分站,设置权限,统一管理各区域市场。 9. 提现管理

    短信3.141592672893982398674234

    apk安装包

    原生js选项卡插件自定义图片滑动选项卡切换.zip

    原生js选项卡插件自定义图片滑动选项卡切换.zip

    1-宗教信息佛教佛寺寺庙庵堂相关数据-社科数据.zip

    宗教信息佛教佛寺寺庙庵堂相关数据集提供了全国各个地区省市县各个佛教寺庙的详细信息。这些数据不仅包括寺庙的名称和负责人姓名,还涵盖了所属省份、地级市、区县、具体地址、建立日期以及支派类别等关键信息。该数据集整理了超过3万条样本,为研究中国佛教寺庙的分布、历史和文化提供了丰富的第一手资料。这些信息有助于了解佛教在中国的传播和发展,以及寺庙在社会和文化中的作用。数据的整理和提供,对于宗教学、社会学、历史学和文化研究等领域的学者来说,是一个宝贵的资源。

    线性电阻网络的等效电阻计算Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    简单的 Python 版本管理.zip

    简单的 Python 版本管理pyenvpyenv 可让您轻松在多个 Python 版本之间切换。它简单、不引人注目,并遵循 UNIX 传统,即使用单一用途的工具来做好一件事。该项目由rbenv和 ruby​​-build分叉而来,并针对 Python 进行了修改。pyenv 的作用是什么......允许您根据每个用户更改全局 Python 版本。为每个项目的 Python 版本提供支持。允许您使用环境变量覆盖 Python 版本。一次搜索多个 Python 版本的命令。这可能有助于使用tox跨 Python 版本进行测试。与 pythonbrew 和 pythonz 相比,pyenv没有……依赖于Python本身。pyenv由纯shell脚本制作。不存在Python的引导问题。需要加载到你的 shell 中。相反,pyenv 的 shim 方法通过向你的 中添加目录来工作PATH。管理虚拟环境。当然,你可以自己创建虚拟环境 ,或者使用pyenv-virtualenv 来自动化该过程。目录安装获取 PyenvLinux/UNIX自动安装程序基本

    Notepad-v2.20工具,是替代Notepad++的首选工具

    Notepad-v2.20工具,是替代Notepad++的首选工具

    原生js随机图片拖拽排序代码.zip

    原生js随机图片拖拽排序代码.zip

Global site tag (gtag.js) - Google Analytics