`
qianshangding
  • 浏览: 127872 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Exec Source使用及源码分析

 
阅读更多

Exec source介绍

Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr会被丢弃,除非logStdErr设置成true).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。
粗粒标注的必须配置:
属性名 默认 描述
channels

type 组件名:exec
command 执行的命令
shell 运行命令的外壳
restartThrottle 10000 在尝试重启命令进程之前,sleep多长时间(单位:毫秒)
restart false 如果执行命令挂掉,是否要重启命令进程。
logStdErr false 是否应该记录该命令的错误日志。
batchSize 20 一次读取和发送到Channel的最大行数。
batchTimeout 3000 如果buffer的大小还没有到达,花费多长时间(单位:毫秒)去等待
selector.type replicating 复制(replicating)或复用(multiplexing)
selector.*

依赖selector.type的值
interceptors 用空格分开的拦截器列表。
interceptors.*


ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。例如:通过tail -F去获取Nginx的访问日志,如果Flume挂掉,Nginx访问日志继续导入到日志文件中,那么在Flume挂掉的这段时间中,新产生的日志Flume是无法获取到的,为了更好的可靠性保证,可以考虑使用Spooling Directory Source,拿实时获取Nginx访问日志来说,Spooling Directory Source虽然做不到实时,但是也可以通过日志文件的切分,做到准实时。


Exec Source例子

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

‘shell’配置被用来调用‘command’配置的命令 (例如:Bash 或 Powershell). ‘command’ is passed as an argument to ‘shell’ for execution. ‘command’命令shell脚本的功能,例如:wildcards(通配符), back ticks(返回标记), pipes(管道), loops(循环), conditionals(条件语句) 等等,如果没有配置‘shell’,那么‘command’ 将直接调用。‘shell’的值一般为: ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, 等等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

Exec Source源码分析

一,configure(Context context)方法,ExecSource该方法配置比较简单,参考上述表格即可。
二,start()方法
@Override
  public void start() {
    logger.info("Exec source starting with command:{}", command);
    //线程池
    executor = Executors.newSingleThreadExecutor();
    //构建ExecRunnable线程对象,传入配置文件的参数
    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);

    // FIXME: Use a callback-like executor / future to signal us upon failure.
    runnerFuture = executor.submit(runner);

    /*
     * NB: This comes at the end rather than the beginning of the method because
     * it sets our state to running. We want to make sure the executor is alive
     * and well first.
     */
    //启动计数器
    sourceCounter.start();
    super.start();

    logger.debug("Exec source started");
  }
三,ExecRunnable:该类是Exec Source主要的实现类,继承了Runnable。下面我们看看他的run方法:
    @Override
    public void run() {
      do {
        String exitCode = "unknown";
        BufferedReader reader = null;
        String line = null;
        final List<Event> eventList = new ArrayList<Event>();

        timedFlushService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat(
                "timedFlushExecService" +
                Thread.currentThread().getId() + "-%d").build());
        try {
          if(shell != null) {
            //如果有配置shell,则将shell通过"\\s+"转化为数组,再将该数组+command一起组成一个新的数组。
           String[] commandArgs = formulateShellCommand(shell, command);
            //调用可执行系统命令
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            //将command通过"\\s+"转化为数组
            String[] commandArgs = command.split("\\s+");
            //调用可执行系统命令
            process = new ProcessBuilder(commandArgs).start();
          }
          //将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字
          //节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));

          // StderrLogger dies as soon as the input stream is invalid
          //初始化错误日志线程,如果logStderr为false将不会打印日志。
          StderrReader stderrReader = new StderrReader(new BufferedReader(
              new InputStreamReader(process.getErrorStream(), charset)), logStderr);
          stderrReader.setName("StderrReader-[" + command + "]");
          stderrReader.setDaemon(true);
          stderrReader.start();

          //该定时任务每batchTimeout执行一次,单位是毫秒
          future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (eventList) {
                    //eventList不能为空且超时
                    if(!eventList.isEmpty() && timeout()) {
                      //执行flush
                      flushEventBatch(eventList);
                    }
                  }
                } catch (Exception e) {
                  logger.error("Exception occured when processing event batch", e);
                  if(e instanceof InterruptedException) {
                      Thread.currentThread().interrupt();
                  }
                }
              }
          },
          batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
          //通过流,按行读取
          while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              //Event大小超过batchSize,或者超时了,就flush到Channel
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }
          //字节流中已经没有数据后,执行flush
          synchronized (eventList) {
              if(!eventList.isEmpty()) {
                flushEventBatch(eventList);
              }
          }
        } catch (Exception e) {
          logger.error("Failed while running command: " + command, e);
          if(e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
          }
        } finally {
          if (reader != null) {
            try {
              reader.close();
            } catch (IOException ex) {
              logger.error("Failed to close reader for exec source", ex);
            }
          }
          //杀杀子进程
          exitCode = String.valueOf(kill());
        }
        if(restart) {
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
              exitCode);
          try {
            //在重启命令进程之前,休眠多少长时间
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("Command [" + command + "] exited with " + exitCode);
        }
      } while(restart);
    //restart配置是指如果shell命令挂掉的话,是否重启该命令的进程,默认是false,配置为true的话,就会将刚才的所有代码循环一遍
    }
四,flushEventBatch方法
private void flushEventBatch(List<Event> eventList){
      //批量处理Event
      channelProcessor.processEventBatch(eventList);
      //统计
      sourceCounter.addToEventAcceptedCount(eventList.size());
      //清楚list
      eventList.clear();
      //获取最后一次push到Channel的时间,已便于判断超时
      lastPushToChannel = systemClock.currentTimeMillis();
    }
以上就是ExecSource大致流程。




分享到:
评论

相关推荐

    postgres内核源码分析实验环境配置导读.pptx

    通过以上步骤,我们可以构建一个完整的PostgreSQL内核源码分析实验环境,无论是使用GDB进行底层调试,还是利用Source Insight或Eclipse进行代码浏览和分析,都能为深入理解PostgreSQL的工作机制提供有力支持。

    Install-Source-Insight-under-Linux.rar_insight_source insight

    总之,虽然Source Insight原生不支持Linux,但借助Wine,我们仍然可以在Linux环境下享受到这个优秀的源码分析工具。请确保你的Wine配置得当,以获得最佳的使用体验。在使用过程中,如果遇到任何问题,记得查阅Wine的...

    Source Code.zip

    《源码解析:在TestStand中增强TestExec的测试序列》 TestStand是NI(National Instruments)公司开发的一款强大的测试管理软件,它允许用户通过序列化的方式组织和执行测试流程。在实际的测试环境中,确保测试的...

    进程门神 源码 sourcecode

    "进程门神"是一款用于系统监控和管理的软件,其源码分析可以帮助我们深入了解系统级程序的编写技术,以及如何实现对操作系统中进程的管理和控制。源码是软件开发的基础,通过阅读和学习源码,我们可以理解软件的工作...

    [unix.v6.v7.源码].unix.v6.v7.src.source.code

    源码].unix.v6.v7.src.source.code"是UNIX发展过程中的两个关键版本——V6和V7的源代码,它们在1970年代中期至晚期广泛使用,并为后续的UNIX版本奠定了基础。这些源代码是理解早期UNIX系统工作原理的重要资源,也是...

    sqlite source code

    "sqlite source code"这个压缩包包含的是SQLite 3.7.10版本的全部C语言源代码,将原本分散的源文件合并成一个单一的源文件,方便开发者进行查看和集成。 SQLite的源代码组织结构和关键组件包括以下几个方面: 1. *...

    orange's os source code

    源码分析可以帮助我们了解这些复杂的机制。 4. **文件系统(File System)**:文件系统管理磁盘上的数据存储,定义了文件的组织结构和访问方式。Orange's OS的源码中会包含文件的创建、读写、删除操作,以及目录...

    sqlite源码和编译方法

    一、SQLite源码分析 SQLite的源码结构清晰,主要包括以下几个部分: 1. `src/`:SQLite的核心源代码,包括解析器、虚拟机、SQL语句执行等模块。 2. `test/`:测试用例和脚本,用于验证SQLite的功能和性能。 3. `tool...

    apue.2e source code

    1. **系统调用**:了解如何使用`fork()`, `exec()`, `wait()`, `pipe()`, `socket()`等系统调用来创建子进程、执行新程序、进程间通信以及网络通信。 2. **文件I/O**:包括基本的文件操作,如`open()`, `close()`, ...

    vc实例精通源码,windows基本控件的使用Demo

    VC实例精通一书的源码。 第2章(\Chapter02) 示例描述:本章介绍常用Win32控件的使用方法。 01_EditDemo 演示静态文本、文本框、按钮控件的使用方法 02_CheckBoxDemo 演示复选框和单选按钮控件的使用方法。 ...

    驾照理论考试速成软件 MFC源代码

    如果你使用的是sql 2000的数据库,请将debug目录下的access数据库jzdata.mdb转换成sql server数据库,在查询分析器里执行: exec sp_configure 'show advanced options',1 reconfigure exec sp_configure 'Ad Hoc ...

    Flume ng share

    例如,`ExecSource` 可以从外部程序或脚本中获取数据。 ##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source ...

    sqlite-3.3.5+源码

    3. **性能分析与优化**: 查看源码有助于了解SQLite的内部工作流程,从而针对性地优化查询性能。 4. **移植性**: 由于SQLite是跨平台的,源码提供了在不同操作系统上构建和部署的灵活性。 5. **调试与问题定位**: 当...

    java源码:日志服务器 Apache Flume.tar.gz

    Flume 提供了多种内置的源类型,如 ExecSource 可以监听命令行输出,TailSource 可以监视文件尾部的变化,而 AvroSource 支持 Avro 协议的数据流。对于通道,有 MemoryChannel 和 FileChannel,前者在内存中存储数据...

    品评Minix内核源码

    Minix是一款轻量级的操作系统,其内核源码提供了深入了解操作系统原理的机会。在品评Minix内核源码时,首先要关注...此外,使用Source-Navigator等工具可以更有效地理解和跟踪源代码中的关系,提升对Minix内核的理解。

    androidjava源码-android-source-browsing.platform--external--javasqlite:自动

    本文将深入探讨`android-source-browsing.platform--external--javasqlite`这一源码库,分析其核心概念、功能以及使用方法。 首先,`javasqlite`是Android对外提供的SQLite数据库API的实现,它位于Android源码的`...

    Linux+C编程实战源码

    通过阅读和分析提供的“Linux+C编程实战源码”,你可以观察到上述知识点的实际应用,理解如何将理论知识转化为实际代码。这些源码可能会涵盖各种实际问题的解决方案,如进程间通信、多线程编程、文件操作、网络编程...

    UnixSystemProgramming(SVR4)SourceCode

    1. 进程管理:Unix系统的进程创建、调度、通信、同步和互斥机制,如fork、exec、wait、pipe、socket等系统调用的实现。 2. 内存管理:了解如何实现虚拟内存、页面替换算法、内存映射等功能,这对于优化程序性能和...

    初学QT使用Gdal

    - **GDAL源码下载**:最新版本为1.7.3,下载地址:[GDAL Source](http://trac.osgeo.org/gdal/wiki/DownloadSource)。 **2. 配置与编译** - **编译前准备**:首先确保MinGW已经安装并配置正确。 - **配置命令**: ...

    SQL Server SQL语句导入导出大全

    Extended Properties="Excel 8.0"'`:连接字符串,其中`Excel 8.0`对应Excel 2003及以前版本的文件格式。 ##### dBase文件的导入: ```sql SELECT * FROM OPENROWSET('Microsoft.Jet.OLEDB.4.0', 'dBaseIV;HDR=NO;...

Global site tag (gtag.js) - Google Analytics