4、整体流程
从以上部分我们可以看出,不管是Source还是Sink都依赖Channel,那么启动时应该先启动Channel然后再启动Source或Sink即可。
Flume有两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程,此处我们已Application分析为主。
首先进入org.apache.flume.node.Application的main方法启动:
//1、设置默认值启动参数、参数是否必须的 Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); //2、接着解析命令行参数 CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf"); if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true; } if (isZkConfigured) { //3、如果是通过ZooKeeper配置,则使用ZooKeeper参数启动,此处忽略,我们以配置文件讲解 } else { //4、打开配置文件,如果不存在则快速失败 File configurationFile = new File(commandLine.getOptionValue('f')); if (!configurationFile.exists()) { throw new ParseException( "The specified configuration file does not exist: " + path); } List<LifecycleAware> components = Lists.newArrayList(); if (reload) { //5、如果需要定期reload配置文件,则走如下方式 //5.1、此处使用Guava提供的事件总线 EventBus eventBus = new EventBus(agentName + "-event-bus"); //5.2、读取配置文件,使用定期轮训拉起策略,默认30s拉取一次 PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); //5.3、向Application注册组件 //5.4、向事件总线注册本应用,EventBus会自动注册Application中使用@Subscribe声明的方法 eventBus.register(application); } else { //5、配置文件不支持定期reload PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider( agentName, configurationFile); application = new Application(); //6.2、直接使用配置文件初始化Flume组件 application.handleConfigurationEvent(configurationProvider .getConfiguration()); } } //7、启动Flume应用 application.start(); //8、注册虚拟机关闭钩子,当虚拟机关闭时调用Application的stop方法进行终止 final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } });
以上流程只提取了核心代码中的一部分,比如ZK的实现直接忽略了,而Flume启动大体流程如下:
1、读取命令行参数;
2、读取配置文件;
3、根据是否需要reload使用不同的策略初始化Flume;如果需要reload,则使用Guava的事件总线实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化;
4、启动Application,并注册虚拟机关闭钩子。
handleConfigurationEvent方法比较简单,首先调用了stopAllComponents停止所有组件,接着调用startAllComponents使用配置文件初始化所有组件:
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
对于startAllComponents实现大体如下:
//1、首先启动Channel supervisor.supervise(Channels, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //2、确保所有Channel是否都已启动 for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { Thread.sleep(500); } catch (InterruptedException e) { Throwables.propagate(e); } } } //3、启动SinkRunner supervisor.supervise(SinkRunners, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //4、启动SourceRunner supervisor.supervise(SourceRunner, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //5、初始化监控服务 this.loadMonitoring();
从如下代码中可以看到,首先要准备好Channel,因为Source和Sink会操作它,对于Channel如果初始化失败则整个流程是失败的;然后启动SinkRunner,先准备好消费者;接着启动SourceRunner开始进行采集日志。此处我们发现有两个单独的组件LifecycleSupervisor和MonitorService,一个是组件守护哨兵,一个是监控服务。守护哨兵对这些组件进行守护,假设出问题了默认策略是自动重启这些组件。
对于stopAllComponents实现大体如下:
//1、首先停止SourceRunner supervisor.unsupervise(SourceRunners); //2、接着停止SinkRunner supervisor.unsupervise(SinkRunners); //3、然后停止Channel supervisor.unsupervise(Channels); //4、最后停止MonitorService monitorServer.stop();
此处可以看出,停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。
Application中的start方法代码实现如下:
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启。
而Application关闭执行了如下动作:
public synchronized void stop() { supervisor.stop(); if(monitorServer != null) { monitorServer.stop(); } }
即关闭守护哨兵和监控服务。
到此基本的Application分析结束了,我们还有很多疑问,守护哨兵怎么实现的。
整体流程可以总结为:
1、首先初始化命令行配置;
2、接着读取配置文件;
3、根据是否需要reload初始化配置文件中的组件;如果需要reload会使用Guava事件总线进行发布订阅变化;
4、接着创建Application,创建守护哨兵,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务;停止顺序:SourceRunner、SinkRunner、Channel;
5、如果配置文件需要定期reload,则需要注册Polling***ConfigurationProvider到守护哨兵;
6、最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
轮训实现的SourceRunner 和SinkRunner会创建一个线程进行工作,之前已经介绍了其工作方式。接下来我们看下守护哨兵的实现。
首先创建LifecycleSupervisor:
//1、用于存放被守护的组件 supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>(); //2、用于存放正在被监控的组件 monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); //3、创建监控服务线程池 monitorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat( "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d") .build()); monitorService.setMaximumPoolSize(20); monitorService.setKeepAliveTime(30, TimeUnit.SECONDS); //4、定期清理被取消的组件 purger = new Purger(); //4.1、默认不进行清理 needToPurge = false;
LifecycleSupervisor启动时会进行如下操作:
public synchronized void start() { monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS); lifecycleState = LifecycleState.START; }
首先每隔两个小时执行清理组件,然后改变状态为启动。而LifecycleSupervisor停止时直接停止了监控服务,然后更新守护组件状态为STOP:
//1、首先停止守护监控服务 if (monitorService != null) { monitorService.shutdown(); try { monitorService.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("Interrupted while waiting for monitor service to stop"); } } //2、更新所有守护组件状态为STOP,并调用组件的stop方法进行停止 for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { entry.getValue().status.desiredState = LifecycleState.STOP; entry.getKey().stop(); } } //3、更新本组件状态 if (lifecycleState.equals(LifecycleState.START)) { lifecycleState = LifecycleState.STOP; } //4、最后的清理 supervisedProcesses.clear(); monitorFutures.clear();
接下来就是调用supervise进行组件守护了:
if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ //1、如果哨兵已停止则抛出异常,不再接收任何组件进行守护 } //2、初始化守护组件 Supervisoree process = new Supervisoree(); process.status = new Status(); //2.1、默认策略是失败重启 process.policy = policy; //2.2、初始化组件默认状态,大多数组件默认为START process.status.desiredState = desiredState; process.status.error = false; //3、组件监控器,用于定时获取组件的最新状态,或者重新启动组件 MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //4、定期的去执行组件监控器,获取组件最新状态,或者重新启动组件 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
如果不需要守护了,则需要调用unsupervise:
public synchronized void unsupervise(LifecycleAware lifecycleAware) { synchronized (lifecycleAware) { Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); //1.1、设置守护组件的状态为被丢弃 supervisoree.status.discard = true; //1.2、设置组件盼望的最新生命周期状态为STOP this.setDesiredState(lifecycleAware, LifecycleState.STOP); //1.3、停止组件 lifecycleAware.stop(); } //2、从守护组件中移除 supervisedProcesses.remove(lifecycleAware); //3、取消定时监控组件服务 monitorFutures.get(lifecycleAware).cancel(false); //3.1、通知Purger需要进行清理,Purger会定期的移除cancel的组件 needToPurge = true; monitorFutures.remove(lifecycleAware); }
接下来我们再看下MonitorRunnable的实现,其负责进行组件状态迁移或组件故障恢复:
public void run() { long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { supervisoree.status.firstSeen = now; //1、记录第一次状态查看时间 } supervisoree.status.lastSeen = now; //2、记录最后一次状态查看时间 synchronized (lifecycleAware) { //3、如果守护组件被丢弃或出错了,则直接返回 if (supervisoree.status.discard || supervisoree.status.error) { return; } //4、更新最后一次查看到的状态 supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); //5、如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { switch (supervisoree.status.desiredState) { case START: //6、如果是启动状态,则启动组件 try { lifecycleAware.start(); } catch (Throwable e) { if (e instanceof Error) { supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); } catch (Throwable e1) { supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } } } supervisoree.status.failures++; } break; case STOP: //7、如果是停止状态,则停止组件 try { lifecycleAware.stop(); } catch (Throwable e) { if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: } } catch(Throwable t) { } } }
如上代码进行了一些简化,整体逻辑即定时去采集组件的状态,如果发现守护组件和组件的状态不一致,则可能需要进行启动或停止。即守护监视器可以用来保证组件如能失败后自动启动。默认策略是总是失败后重启,还有一种策略是只启动一次。
相关推荐
- **架构**:Flume由三个主要组件构成:Sources、Channels和Sinks。Sources负责数据的摄入,Channels作为临时存储,Sinks负责将数据传输到目的地。 - **可靠性**:通过使用可配置的持久化Channels(如File Channel...
Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。
2. **新特性与改进**:可能引入了新的Source、Sink或Channel类型,增强了Flume的灵活性和功能。 3. **稳定性增强**:修复了已知的bug,提高了服务的稳定性和可用性。 4. **安全特性**:可能增加了对身份验证和授权...
1. **Flume架构**: Flume 的核心架构由三个主要组件构成:Sources、Channels 和 Sinks。Sources 负责从各种数据源(如网络套接字、日志文件、应用程序API等)收集数据;Channels 作为临时存储,保证数据在传输过程...
标题中的三个文件“apache-flume-1.9.0-bin.tar”,“kafka_2.11-0.10.1.0”,以及“zookeeper-3.3.6_.tar”是三个重要的分布式系统组件,分别代表了Apache Flume、Apache Kafka和Apache ZooKeeper。这些组件在大...
Flume NG 的架构主要由三个核心组件组成: 1. **Source**(源):负责从外部系统收集数据,并将其送入 Channel 中。 2. **Channel**(通道):作为所有数据的中转站,存储从 Source 收集的数据直到 Sink 处理。 3. ...
在大数据处理领域,Flume 是一个至关重要的组件,尤其在日志管理和实时数据分析中扮演着核心角色。"flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的...
Flume 的核心组件主要包括 Channel、Sink 和 Source 三部分: 1. **Source**:源,是数据流入 Flume 的入口。Flume 支持多种类型的 Source,如 TailSource(监听日志文件尾部)、AvroSource(接收 Avro 格式的数据...
《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...
二、Flume架构及核心组件 三、Flume&JDK环境部署 1.前置条件 Java Runtime Environment - Java 1.8 or later Memory - Sufficient memory for configurations used by sources, channels or sinks Disk Space - ...
这个压缩包“基于Java的实例源码-日志服务器 Apache Flume.zip”包含了Apache Flume 1.2.0版本的源代码,可以为开发者提供深入理解Flume工作原理以及如何自定义Flume组件的参考。 Apache Flume的核心概念主要包括...
此外,搭建过程中也需要注意各个组件之间的网络通信问题,如Zookeeper与Kafka之间的通信、Kafka集群内部的通信,以及Flume和Kafka之间的数据交互。确保通信顺畅需要做好相应的网络安全和权限设置。 总结来看,利用...
《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...
2. **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...
Flume 提供了一个灵活的架构,允许用户通过配置不同的组件来构建自己的数据流管道。以下是一些关键的知识点: 1. **Flume 组件**:Flume 主要由三个基本组件构成:Source、Channel 和 Sink。Source 负责从数据源...
《构建基于Spark+Flume+Kafka+HBase的实时日志处理分析系统》 在当前大数据时代,实时数据处理已经成为企业决策与运营的关键。本项目以“基于Spark+Flume+Kafka+HBase的实时日志处理分析系统”为主题,旨在提供一个...
在构建大数据处理系统时,基于Flume、Kafka和Spark的架构是一种常见且高效的选择,尤其适用于电商领域的实时访问日志分析。这个“基于Flume + Kafka + Spark的电商实时访问日志分析系统”项目,提供了完整的源码,...
10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...
2. **Flume 架构** - **层次化架构**:Flume 支持多级配置,数据可以通过多个 Agent 进行级联传输,形成复杂的分布式数据流网络。 - **容错机制**:通过 Channel 的持久化和数据复制,Flume 可以在故障发生时恢复...