flume是一个高可靠性的分布式的大文件收集系统。它提供了transaction来保证数据不会丢失。
flume官网:http://flume.apache.org/
Flume文档:http://flume.apache.org/FlumeUserGuide.html,http://flume.apache.org/FlumeDeveloperGuide.html
安装:从官网下载flume,然后解压
启动:nohup bin/flume-ng agent --conf <conf_file_path> --conf-file <conf_file> --name <agent_name> -Dflume.root.logger=DEBUG,console &
Flume主要包含三部分:source,channel,sink. source 用于接收数据,channel是一个缓冲通道,sink发送数据到目的端。source可以配置多个channel。channel可以通过channelSelect来选择发往那个channel。可以配置往每个channel发送,也可以配置一个参数,当满足特定值时,发往某个channel。每个channel可以配置多个sink。通过sinkprocess来做load balance,或者failover。
flume-ng命令会调用Application的main函数,如果需要reload configure 文件,则注册application到eventBus中,当文件变更时,调用application的handleConfigurationEvent方法
public static void main(String[] args) { Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); } application.start(); }
application中的start方法会调用supervisor.supervise(),这个方法会尝试调用component的start方法,component列表中包含了PollingPropertiesFileConfigurationProvider对象,这个对象的start方法启动了一个线程来监控文件的变更,初始状态文件是变更的,接着就会调用application的handleConfigurationEvent方法
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
在 handleConfigurationEvent中先调用 PropertiesFileConfigurationProvider的getConfiguration方法,这个方法通过配置文件创建了source,sink,channel,并调用了各个组件的configure方法,然后 调用了startAllComponents方法,启动了channel,source,sink,并且加载了monitor,用于监控flume的metrics
private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e){ logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() .entrySet()) { try{ logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration .getSourceRunners().entrySet()) { try{ logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
相关推荐
通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...
为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...
Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 1.6 版本是其一个重要里程碑,...如果你想要在大数据日志处理领域有所建树,对 Flume 源码的深入学习是必不可少的。
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。
在源码分析过程中,可以关注以下几个关键点: 1. 数据流处理:了解 Flume 如何处理事件(Event)的生命周期,从创建、传输到最终写入目标。 2. 配置文件解析:学习如何编写和解析 Flume 配置文件,理解各组件间的...
- **代码结构**:源码分析可以帮助理解内部工作原理,如如何建立数据库连接、执行查询、将结果转化为事件等。 - **扩展性**:对于开发人员,源码提供了扩展和定制的基础,例如添加新的数据库适配器,或优化查询...
2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...
##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source 发送到 Channel 时,会触发事务处理过程。 ##### 源码...
Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...
总结,Flume NG与Elasticsearch 6.5.4的集成是构建高效日志管理和分析系统的有效途径。通过使用`flume-ng-elasticsearch-sink-6.5.4.jar`,我们可以轻松地将Flume收集的日志数据导入Elasticsearch,进一步利用ELK栈...
此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...
通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...
描述中的 "flume抽取数据库数据的源码,可以自动检测数据库的sql语句是否更新" 揭示了该源码的特性:它不仅执行查询,还能监控数据库,判断 SQL 语句是否有新的更改。这在实时数据流处理中至关重要,因为它确保了 ...
"flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...
在IT领域,大数据处理和日志管理是至关重要的任务,而这四个组件——Flume、Java (JDK)、Kafka和Logstash,都是这个领域的关键工具。以下是对这些技术的详细解释: **Java (JDK 8u144)**:Java Development Kit ...
基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档说明基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档说明基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统源码+文档...
20-Flume副本机制channel选择器-需求分析.avi 21-Flume副本机制channel选择器-配置信息.avi 22-Flume副本机制channel选择器-案例测试.avi 25-Flume负载均衡案例-案例实操.avi 27-Flume聚合案例-案例实操.avi 29-...
Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和...这个过程对于实时大数据分析、日志聚合和监控等场景非常有用。同时,了解如何配置和使用 Flume 的不同组件,将有助于我们更好地管理和优化数据采集流程。
Kafka的安装和配置过程包括下载源码包,执行sbt更新、打包、组装包依赖,以及执行启动脚本。启动Kafka服务需要先启动Zookeeper服务,然后启动Kafka服务本身,创建topic,发送消息和启动消费者。 Storm的安装和配置...