`
zhao_rock
  • 浏览: 191823 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Flume-ng 1.6启动过程源码分析(二)

阅读更多
阅读Flume源码后发现,Flume有两个顶级的接口:
1. ConfigurationProvider接口,提供了getConfiguration()方法,用于获取不同组件的配置。

2. LifecycleAware接口,提供了三个方法,start() stop()和getLifecycleState(),分别用于组件的启动 停止以及组件在生命周期中处的状态,可以说这个接口贯穿于整个Flume中。

继续Flume-ng启动过程的源码分析,Flume启动类org.apache.flume.node.Application,所有组件加载完毕后会调用start方法。下面的代码中可以看到,start方法会遍历所有组件,并调用类LifecycleSupervisor的supervise方法
public synchronized void start() {
    for(LifecycleAware component : components) {
    //supervisor作为LifecycleSupervisor的实例,调用supervise
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }


接下来supervisor再被实例化LifecycleSupervisor对象的时候会做哪些工作呢?
即new LifecycleSupervisor(),构造函数做了什么
public LifecycleSupervisor() {
    lifecycleState = LifecycleState.IDLE;
    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
    monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
    monitorService = new ScheduledThreadPoolExecutor(10,
        new ThreadFactoryBuilder().setNameFormat(
            "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
            .build());
    monitorService.setMaximumPoolSize(20);
    monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
    purger = new Purger();
    needToPurge = false;
  }

主要初始化了一个有10个线程,上限是20的线程池

再回到之前supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法,它是具体执行启动各个组件的方法
public synchronized void supervise(LifecycleAware lifecycleAware,
      SupervisorPolicy policy, LifecycleState desiredState) {
    if(this.monitorService.isShutdown()
        || this.monitorService.isTerminated()
        || this.monitorService.isTerminating()){
      throw new FlumeException("Supervise called on " + lifecycleAware + " " +
          "after shutdown has been initiated. " + lifecycleAware + " will not" +
          " be started");
    }

    Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
        "Refusing to supervise " + lifecycleAware + " more than once");

    if (logger.isDebugEnabled()) {
      logger.debug("Supervising service:{} policy:{} desiredState:{}",
          new Object[] { lifecycleAware, policy, desiredState });
    }

    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
  }

首先会把组件的渴望的状态desiredState(如LifecycleState.START)和策略policy(如new SupervisorPolicy.AlwaysRestartPolicy())放到静态内部类Supervisoree中,再将组件lifecycleAware及之前的信息,一并传值到MonitorRunnable线程中,下面线程类MonitorRunnable的run方法会根据switch case匹配的值去执行不同操作,当前回到START流程,执行lifecycleAware.start(),完成组件的启动
@Override
    public void run() {
      logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
          supervisoree);

      long now = System.currentTimeMillis();

      try {
        if (supervisoree.status.firstSeen == null) {
          logger.debug("first time seeing {}", lifecycleAware);

          supervisoree.status.firstSeen = now;
        }

        supervisoree.status.lastSeen = now;
        synchronized (lifecycleAware) {
          if (supervisoree.status.discard) {
            // Unsupervise has already been called on this.
            logger.info("Component has already been stopped {}", lifecycleAware);
            return;
          } else if (supervisoree.status.error) {
            logger.info("Component {} is in error state, and Flume will not"
                + "attempt to change its state", lifecycleAware);
            return;
          }

          supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();

          if (!lifecycleAware.getLifecycleState().equals(
              supervisoree.status.desiredState)) {

            logger.debug("Want to transition {} from {} to {} (failures:{})",
                new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                    supervisoree.status.desiredState,
                    supervisoree.status.failures });

            switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();
                } catch (Throwable e) {
                  logger.error("Unable to start " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    // This component can never recover, shut it down.
                    supervisoree.status.desiredState = LifecycleState.STOP;
                    try {
                      lifecycleAware.stop();
                      logger.warn("Component {} stopped, since it could not be"
                          + "successfully started due to missing dependencies",
                          lifecycleAware);
                    } catch (Throwable e1) {
                      logger.error("Unsuccessful attempt to "
                          + "shutdown component: {} due to missing dependencies."
                          + " Please shutdown the agent"
                          + "or disable this component, or the agent will be"
                          + "in an undefined state.", e1);
                      supervisoree.status.error = true;
                      if (e1 instanceof Error) {
                        throw (Error) e1;
                      }
                      // Set the state to stop, so that the conf poller can
                      // proceed.
                    }
                  }
                  supervisoree.status.failures++;
                }
                break;
              case STOP:
                try {
                  lifecycleAware.stop();
                } catch (Throwable e) {
                  logger.error("Unable to stop " + lifecycleAware
                      + " - Exception follows.", e);
                  if (e instanceof Error) {
                    throw (Error) e;
                  }
                  supervisoree.status.failures++;
                }
                break;
              default:
                logger.warn("I refuse to acknowledge {} as a desired state",
                    supervisoree.status.desiredState);
            }

            if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
              logger.error(
                  "Policy {} of {} has been violated - supervisor should exit!",
                  supervisoree.policy, lifecycleAware);
            }
          }
        }
      } catch(Throwable t) {
        logger.error("Unexpected error", t);
      }
      logger.debug("Status check complete");
    }


接下来再调用lifecycleAware.start()后,会根据组件的不同,找到对应组件的Runner类,这里以Source中的PollableSourceRunner为例,Flume Source提供了两种类别的Source即:EventDrivenSource和PollableSource,EventDrivenSource为事件驱动,PollableSource则是轮询机制,PollableSource类型的启动依赖于PollableSourceRunner,在PollableSourceRunner内部会有一个用于轮询的类
PollingRunner,在执行的过程中会根据PollableSource类型特有的source.process()返回值的状态,进行Sleep
public void run() {
      logger.debug("Polling runner starting. Source:{}", source);

      while (!shouldStop.get()) {
        counterGroup.incrementAndGet("runner.polls");

        try {
          if (source.process().equals(PollableSource.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.info("Source runner interrupted. Exiting");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (EventDeliveryException e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          counterGroup.incrementAndGet("runner.deliveryErrors");
        } catch (Exception e) {
          counterGroup.incrementAndGet("runner.errors");
          logger.error("Unhandled exception, logging and sleeping for " +
              maxBackoffSleep + "ms", e);
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }

      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }





0
0
分享到:
评论

相关推荐

    flume-ng安装

    使用以下命令启动 Flume-NG: `bin/flume-ng agent -n agent --conf conf/ -f conf/flume-conf.properties` 配置多个 Agent 在分布式日志收集系统中,可能需要配置多个 Agent 来收集日志。在这种情况下,可以配置...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    flume-ng-sql-source-release-1.5.2.zip

    Flume-ng-sql-source是Apache Flume的一个扩展插件,主要功能是允许用户从各种数据库中抽取数据并将其传输到其他目的地,如Apache Kafka。在本案例中,我们讨论的是版本1.5.2的发布包,即"flume-ng-sql-source-...

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-sql-source

    包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载

    flume-ng-sql-source.jar

    flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    "apache-flume-1.6.0-cdh5.5.0-bin" 是解压后的 Flume 安装包,包含了所有运行 Flume 所需的二进制文件、脚本和配置示例。在使用之前,通常需要配置 `conf/flume.conf` 文件以定制 Flume 的行为,然后通过命令行启动...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    二、Flume-ng 测试实例 在 conf 文件夹中创建 example.conf 文件,配置 Flume-ng 的源、 sink 和通道。使用 telnet 连接 source 写入数据,产生日志数据输出控制台。启动 Flume-ng,使用 telnet 命令写入一些数据,...

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    flume-ng-1.6.0 cdh5.7.0安装包

    该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...

    flume-ng-sql-source-1.4.1

    flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu

    flume-ng-sql-source-1.4.3.jar

    总的来说,Flume-ng-sql-source-1.4.3.jar是数据工程师和分析师的得力助手,它将数据库数据的采集与Flume的强大功能相结合,为企业的大数据战略提供了一条有效的数据输入途径。在当前大数据时代,掌握如何使用这一...

    flume-ng-1.6.0-cdh5.14.0源码

    在 CDH(Cloudera Distribution Including Hadoop)5.14.0 版本中,Flume-ng(下一代 Flume)1.6.0 版本是官方提供的组件,用于日志管理和大数据分析。这个源码包,`flume-ng-1.6.0-cdh5.14.0-src.tar.gz`,提供了...

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    解压`apache-flume-1.6.0-cdh5.10.1-bin`后,你会得到Flume的二进制文件,包括启动脚本、配置示例、库文件等。要开始使用Flume,你需要编写一个或多个配置文件来定义Agent的配置,然后通过命令行工具启动Agent。 ...

Global site tag (gtag.js) - Google Analytics