`

flume源码分析

 
阅读更多

flume是一个高可靠性的分布式的大文件收集系统。它提供了transaction来保证数据不会丢失。

flume官网:http://flume.apache.org/

Flume文档:http://flume.apache.org/FlumeUserGuide.htmlhttp://flume.apache.org/FlumeDeveloperGuide.html

 

安装:从官网下载flume,然后解压

启动:nohup bin/flume-ng agent --conf <conf_file_path> --conf-file <conf_file> --name <agent_name> -Dflume.root.logger=DEBUG,console &

 

Flume主要包含三部分:source,channel,sink. source 用于接收数据,channel是一个缓冲通道,sink发送数据到目的端。source可以配置多个channel。channel可以通过channelSelect来选择发往那个channel。可以配置往每个channel发送,也可以配置一个参数,当满足特定值时,发往某个channel。每个channel可以配置多个sink。通过sinkprocess来做load balance,或者failover。

 

flume-ng命令会调用Application的main函数,如果需要reload configure 文件,则注册application到eventBus中,当文件变更时,调用application的handleConfigurationEvent方法

 public static void main(String[] args) {
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
      application.start();
  }

    application中的start方法会调用supervisor.supervise(),这个方法会尝试调用component的start方法,component列表中包含了PollingPropertiesFileConfigurationProvider对象,这个对象的start方法启动了一个线程来监控文件的变更,初始状态文件是变更的,接着就会调用application的handleConfigurationEvent方法

  public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

  在 handleConfigurationEvent中先调用 PropertiesFileConfigurationProvider的getConfiguration方法,这个方法通过配置文件创建了source,sink,channel,并调用了各个组件的configure方法,然后  调用了startAllComponents方法,启动了channel,source,sink,并且加载了monitor,用于监控flume的metrics

 private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

    for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    /*
     * Wait for all channels to start.
     */
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();
  }

 

  • 大小: 30.5 KB
分享到:
评论

相关推荐

    flume1.7.0源码

    通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...

    flume 1.8所有源代码 编译通过版 附 maven3.5.2 安装包

    为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...

    Apache flume1.6_src

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 1.6 版本是其一个重要里程碑,...如果你想要在大数据日志处理领域有所建树,对 Flume 源码的深入学习是必不可少的。

    flume log4f示例源码

    Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。

    电商数仓项目(八) Flume 系列源码

    在源码分析过程中,可以关注以下几个关键点: 1. 数据流处理:了解 Flume 如何处理事件(Event)的生命周期,从创建、传输到最终写入目标。 2. 配置文件解析:学习如何编写和解析 Flume 配置文件,理解各组件间的...

    flume-ng-sql-source-1.5.2

    - **代码结构**:源码分析可以帮助理解内部工作原理,如如何建立数据库连接、执行查询、将结果转化为事件等。 - **扩展性**:对于开发人员,源码提供了扩展和定制的基础,例如添加新的数据库适配器,或优化查询...

    Flume配置双HA hdfsSink.docx

    2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...

    Flume ng share

    ##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source 发送到 Channel 时,会触发事务处理过程。 ##### 源码...

    flume1.8.0和elasticsearch5.2.6整合

    Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...

    flume-ng-elasticsearch-sink-6.5.4.jar.zip

    总结,Flume NG与Elasticsearch 6.5.4的集成是构建高效日志管理和分析系统的有效途径。通过使用`flume-ng-elasticsearch-sink-6.5.4.jar`,我们可以轻松地将Flume收集的日志数据导入Elasticsearch,进一步利用ELK栈...

    apache-flume-1.8.0-src.tar.gz

    此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...

    flume-ng-1.6.0-cdh5.14.0源码

    通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...

    flume抽取数据库数据的source

    描述中的 "flume抽取数据库数据的源码,可以自动检测数据库的sql语句是否更新" 揭示了该源码的特性:它不仅执行查询,还能监控数据库,判断 SQL 语句是否有新的更改。这在实时数据流处理中至关重要,因为它确保了 ...

    flume-1.9.0.tgz

    "flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...

    flume1.11 jdk 8u144kafka 2.12-3.2.0logstash 7.9.2

    在IT领域,大数据处理和日志管理是至关重要的任务,而这四个组件——Flume、Java (JDK)、Kafka和Logstash,都是这个领域的关键工具。以下是对这些技术的详细解释: **Java (JDK 8u144)**:Java Development Kit ...

    基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档说明

    基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档说明基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档说明基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档...

    大数据技术Flume1.9

    20-Flume副本机制channel选择器-需求分析.avi 21-Flume副本机制channel选择器-配置信息.avi 22-Flume副本机制channel选择器-案例测试.avi 25-Flume负载均衡案例-案例实操.avi 27-Flume聚合案例-案例实操.avi 29-...

    【实战Apache-Flume采集DB数据到kafka】

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和...这个过程对于实时大数据分析、日志聚合和监控等场景非常有用。同时,了解如何配置和使用 Flume 的不同组件,将有助于我们更好地管理和优化数据采集流程。

    flume+kafka+storm搭建

    Kafka的安装和配置过程包括下载源码包,执行sbt更新、打包、组装包依赖,以及执行启动脚本。启动Kafka服务需要先启动Zookeeper服务,然后启动Kafka服务本身,创建topic,发送消息和启动消费者。 Storm的安装和配置...

Global site tag (gtag.js) - Google Analytics