当FlumeChannel启动时,或者故障恢复时,会经历一次重播(replay)过程,重播的目的就是还原上一次的“现场”,当然,最主要的就是恢复FlumeEventQueue中的内存队列相关数据。重播的主要实现是有Log类来做的,Log类的replay实现了整个重播过程,简单来说,重播过程分为如下几个步骤:
步骤1:获取检查点文件的独占锁(checkpointWriterLock.lock();)。
步骤2:将数据文件ID的初始值设置成0(nextFileID.set(0);)。
步骤3:便利所有数据文件目录(dataDirs),将所有数据文件放入文件列表dataFiles中,将nextFileID设置成当前存在的数据文件ID的值,便于后期调用能生成正确的数据文件ID,将数据文件ID和数据文件的随机读取器之间的映射关系放入idLogFileMap之中。
步骤4:对数据文件列表dataFiles安装数据文件ID进行升序排序。
步骤5:如果use-fast-replay设置成ture(默认值为false)且检查点文件(checkpoint)不存在,则进行快速全播(fast full replay)流程,见步骤7。
步骤6:如果use-fast-replay设置成false,则通过检查点进行重播。见步骤8.
步骤7:类似于步骤8,只是不通过检查点的中最大的写顺序ID开始重播而已(因为当时还没有检查点)。
步骤8:记住检查点的当前写顺序ID(从checkpoint.meta中获得),从inflightputs文件反序列化得到未提交的put的事务ID和Event指针(FlumeEventPointer)的映射inflightPuts,从inflighttakes文件中反序列化得到未提交的take的事务ID和Event指针的映射inflightTakes。
步骤9:遍历数据文件(通过前面得到的数据文件列表dataFiles),找出有包含比检查点的写数据ID还大的写数据ID的数据文件,因为这些数据文件才对恢复File Channel内存队列有用,别忘了File Channel内存队列保存的是所有未被消费的FlumeEventPoint(这当然也需要把那些没完成事务最后一步commit动作的事务给继续完成),并把这些数据文件的该写顺序ID后面的下一个行记录对象(即LogRecord,LogRecord中不仅包含数据记录,还包含记录类型和操作类型,记录类型有put/take/commit/rollback,操作类型有put/take)保存到队列logRecordBuffer中。
步骤10:通过logRecordBuffer可以将所有需要读取的数据文件的剩余部分遍历一遍,这样,我们可以得到一个只包含put和take操作但却未commit和rollback的事务ID对应FlumeEventPoint的Map,即transactionMap,同时,还会更新步骤8中提到的inflightTakes,移除掉已经成功commit的take事务。如果发现有已经提交的事务,则需要进行提交处理,如果是commit的put事务,则将其FlumeEventPoint添加到内存队列队尾,如果是commit的take事务,则从内存队列中移除。 当然,还可以得到当前最大的事务ID(transactionIDSeed)和最大的写顺序ID(writeOrderIDSeed),这个是为了让后面生成的事务ID和写顺序ID可用(TransactionIDOracle.setSeed(transactionIDSeed);WriteOrderOracle.setSeed(writeOrderIDSeed);)。这一步做完,内存队列中已经包含了所有已经完成事务commit但并没有被Sink消费的所有FlumeEventPoint了。
步骤11:将所有没有commit的take事务所包含的数据(inflightTakes中的数据)重新插入到内存队列的头部。
从以上步骤可以看出,Flume中有两种重播方式,一种是不通过检查点(此时必须检查点不存在且配置的use-fast-replay为true)的“快速全播”,一种是普通的通过检查点重播,这也是默认的重播方式。重播的目的就是为了通过磁盘文件来恢复File Channel的内存队列,使File Channel能继续运行,重播需要的时间和当时内存队列中未被消费的FlumeEventPoint成正比
关于步骤1,自不必多说,因为重播过程中,是不能接收消息的,就像JVM GC真正执行时需要Stop World一样。步骤二,需要理解什么是数据文件ID,可以看我前面的Flume快速入门(三)博文(http://manzhizhen.iteye.com/blog/2298394),这里再次阐述一次,为了保证每个数据目录(dataDir)下数据文件的合理大小,当数据超过一个数据文件的最大容量时,Flume会自动在当前目录新建一个数据文件,为了区分同一个数据目录下的数据文件,Flume采用文件名加数字后缀的形式(比如log-1、log-2),其中的数字后缀就是数据文件ID,它由Log实例中的nextFileID属性来维护,它是原子整形,由于在Flume Agent实例中,一个File Channel会对应一个Log实例,所以数据文件ID是唯一的,即使你配置了多个数据目录。每个数据文件都有一个对应的元数据文件(MetaDataFile),它和数据文件在同一目录,命名则是在数据文件后面加上.meta,比如log-1对应的元数据文件是log-1.meta,其实就是将Checkpoint对象通过谷歌的Protos协议序列化到元数据文件中,Checkpoint存储了对应数据文件的诸多重要信息,比如版本、写顺序ID(logWriteOrderID)、队列大小(queueSize)和队列头索引(queueHead)等。元数据文件主要用于快速将数据文件的信息载入到内存中。注意,检查点也有自己的元数据文件(checkpoint.meta)。其实,步骤1-4理解起来都不难,我们知道,检查点目录(checkpointDir)下一共有四个文件(除去锁文件和检查点的元数据文件):检查点文件(checkpoint)、提取未提交文件(inflighttakes)、写入未提交文件(inflightputs)和队列集合文件(queueset,更准确来说这是目录),内存队列正式在这四个文件的相互补充下,得到完整的恢复。
其中上述的步骤10是关键,但也容易理解,比如最常用的通过检查点来恢复,检查点的元数据文件中保存的最大写顺序ID,说明在这个写顺序ID之前的数据要不就已经在检查点文件中了要不就已经被Sink消费掉了,所以通过检查点文件恢复的内存队列,还需要补充两种数据:第一种数据是已经commit的数据但还未来得及对检查点数据刷盘(默认每30秒将内存队列写入检查点文件,可通过checkpointInterval来设置)。第二种数据是飞行中的数据,即还未来得及commit的数据。步骤10中提到的transactionMap的类型是MultiValueMap,所以可以从一个事务ID中找到和其相关的所有操作记录,从上述的步骤10和步骤11可以看出,对于第一种数据,可以通过遍历包含了检查点最大写顺序ID之后数据的数据文件来将其加载到内存队列中,但这时候内存队列中的数据是有冗余的,包含了已经被消费的commit事务的数据,所以这时候未提交数据凭证文件(inflightputs文件和inflighttakes文件)中的数据就起到作用了,将检查点写顺序ID后的所有事务数据(transactionMap)和通过inflightPuts映射、inflightTakes经过运算得到的数据(put未commit或者put已经commit但take没commit的数据)取“交集”得到的FlumeEventPoint集合,就是内存队列需要补充的数据了。
相关推荐
### Flume 1.6.0 入门详解:安装、部署及案例分析 #### 一、Flume 概述 Flume 是 Cloudera 开发的一款高效、可靠且易于扩展的日志收集系统,适用于大数据环境下的日志采集任务。Flume 的初始版本被称为 FlumeOG...
视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 1、介绍: ...章节九:Channel选择器 章节十:Sink处理器 章节十一:导入数据到HDFS 章节十二:Flume SDK 章节十三:Flume监控
5. **Channel**:Channel 作为一个中间缓存,存储事件直到它们被成功处理。常见的 Channel 类型包括 MemoryChannel(内存存储)和 FileChannel(文件系统存储)。 6. **Reliability and Recovery**:Flume 提供了三...
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...
3. Flume快速入门: - **安装地址**:Flume的官网、文档查看和下载链接分别在http://flume.apache.org/、http://flume.apache.org/FlumeUserGuide.html和http://archive.apache.org/dist/flume/。 - **安装部署...
以下是对Flume入门使用的详细说明: 1. **Flume 组件配置**: 在`netcat-logger.conf`配置文件中,我们看到Flume的配置主要由三部分组成:Sources、Sinks 和 Channels。 - **Sources**:在这里是`r1`,类型设置为...
Flume支持Memory Channel、File Channel等多种类型。 - **Sink(目标)**:负责从Channel中读取数据并将数据发送到目的地,例如HDFS、HBase或其他Flume节点等。 #### 三、Flume配置与部署 - **配置文件**:Flume的...
- **Flume Channel**:如内存 Channel,提供数据在 Source 和 Sink 之间的临时存储,保证数据传输的可靠性。 - **Flume Interceptor**:拦截器允许在数据进入 Channel 之前进行预处理,如过滤、转换等操作。 4. *...
Flume从1.5.0版本开始,重构了其内部架构,核心组件、配置以及代码架构都进行了重大改动,这个新版本被称为Flume NG(Next Generation),即Flume的新一代版本,用以替代了原来的Flume OG(Original Generation)。...
**大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...
大数据技术之Flume Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。它基于流式架构,灵活简单。 Flume定义 Flume是一个高可用的,高可靠的,分布式的海量日志采集、...
Flume 支持多种 Channel 类型,如 Memory Channel(内存通道)、File Channel(文件通道)和 JDBC Channel(基于数据库的通道)。这些 Channel 实现了不同的持久化策略,以适应不同场景的需求。 5. **Sink ...
1. 启动 Flume:使用命令 `cd $FLUME_HOME/conf`,然后使用命令 `flume-ng a` 启动 Flume。 五、Flume-Ng 组件概述 1. Flume-Ng:Flume 的下一代版本,提供了更好的性能和可扩展性。 2. Flume-Ng 组件:包括 Agent...
常见类型有 Memory Channel(内存通道)和 File Channel(文件通道),1.6.0 版本可能会优化这些通道的性能和容错机制。 6. **数据接收器(Sink)**: 数据接收器负责将数据发送到目的地。常见的 Sink 包括 HDFS ...
5. **通道(Channel)**:Channel是数据缓冲区,提供了Source和Sink之间的数据传输。常见类型有Memory Channel(内存通道)和File Channel(文件通道)。 6. **可靠性与容错性**:Flume利用Channel的持久化能力确保...
Flume由多个组件构成,包括源(Source)、通道(Channel)和接收器(Sink),这些组件协同工作,确保数据的稳定流动。 Apache Kafka是一个分布式的流处理平台,由LinkedIn开发并贡献给Apache社区。Kafka主要设计...
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。在1.8版本中,它继续提供了高效的数据传输能力,适用于实时大数据流处理。以下是关于 Flume 1.8 的一些核心...
flume进阶:如何设计一套Flume进阶课程体系+编程+研发; flume进阶:如何设计一套Flume进阶课程体系+编程+研发; flume进阶:如何设计一套Flume进阶课程体系+编程+研发; flume进阶:如何设计一套Flume进阶课程体系+...