[ xcly原创于iteye,见http://xcly.iteye.com ]
本节重点介绍 agentSink中ENDTOEND的实现。
每一个节点通过source获得事件Event,然后由sink处理,sink同source一样,flume提供了多种实现,sink的生成同Flume源代码解读一中介绍的实现方式类似,由SinkFactory工厂方法实现,跟SourceFactory不一样的是定义了取得SinkDecoBuilder的抽象方法,在SinkFactoryImpl中不止通过name可以获得EventSink,也可以获得EventSinkDecorator,关键点是EventSinkDecorator也是EventSink.Base的子类。这种装饰器模式的实现方式給事件的处理方式提供了类似于管道流的一种实现,我们可以将任意EventSinkDecorator串成一个管道,用来对事件进行加工和处理。
我们看ENDTOEND的实现,就是一串ackedWriteAhead => { stubbornAppend => { insistentOpen =>rpcSink,由FlumeBuilder的buildSink实现。 这里使用了强大的语言识别工具Antlr, 具体实现细节还有待深究。
下次补上。
ackedWriteAhead 对应的实现类是NaiveFileWALDec, NaiveFileWALDeco是一个非常重要的类, 它里面有两套数据流机制, 一套是RollSink开始的将数据不断写入本地硬盘的数据流,当数据写入本地硬盘后,通过DirectDriver的一个线程不断循环从写好的硬盘数据中获取数据发送至collector,如果写成功,再删除硬盘上的数据。
NaiveFileWALDec的build中申明了几个关键的类是从FlumeNode实例中取得的,NaiveFileWALManager负责数据的持久化,当再次尝试发生时,也读取数据,最初数据都是写入writing目录。 WALAckManager和它之中的PendingAckQueuer, WALAckManager负责act check,并且调用PendingAckQueuer作为结束数据append的动作。 即PendingAckQueuer的end方法。
RollSink的newSink见NaiveFileWALManager的newAckWritingSink方法,AckChecksumInjector嵌套SeqfileEventSink, AckChecksumInjector在event中添加tag/checksum和时间作为校验和,并且对消息body使用了hash算法。 而 SeqfileEventSink主要负责将数据流写入本地文件系统。
未完待续
分享到:
相关推荐
在这个压缩包中,包含了Flume 1.8的源代码以及Maven 3.5.2的安装包,这对于开发者深入理解Flume的工作原理和进行定制化开发非常有帮助。 首先,要使用这些资源,你需要先安装Maven 3.5.2。解压`apache-maven-3.5.2....
Flume 1.7.0 是该软件的一个版本,包含了完整的源代码,便于开发者深入理解其工作原理并进行定制开发。 在Flume 1.7.0源码中,我们可以探索以下几个关键知识点: 1. **Flume架构**: Flume 的核心架构由三个主要...
在提供的压缩包文件`flume-ng-sql-source-develop`中,很可能包含了Flume JDBC源的源代码或者开发相关资源,供开发者自定义或扩展JDBC源的特性,例如添加新的查询策略、优化性能等。 使用Flume的JDBC源程序,企业...
在 `flume-interceptor` 压缩包中,你可能找到了示例拦截器的源代码。研究这些代码可以帮助你更好地理解拦截器的工作原理,并为自己的项目提供参考。通过实践编写和调试拦截器,你可以更深入地了解 Flume 如何处理...
3. **配置与部署**:完成源代码编写后,需要创建一个配置文件来声明你的自定义 Source。例如: ``` agent.sources = mySource agent.sources.mySource.type = org.example.MyCustomSource ``` 4. **测试与运行*...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明,含有代码注释,满分大作业资源,新手也可看懂,期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。...
Flume从1.5.0版本开始,重构了其内部架构,核心组件、配置以及代码架构都进行了重大改动,这个新版本被称为Flume NG(Next Generation),即Flume的新一代版本,用以替代了原来的Flume OG(Original Generation)。...
3. **验证安装**:在终端输入以下命令检查 Flume 是否正确安装: ```bash flume-ng version ``` 如果一切正常,应该会显示 Flume 的版本信息。 #### 五、配置 Flume 数据源 配置 Flume 数据源主要涉及 Source...
3. **数据格式化**:了解如何将Flume接收到的原始数据转换为Elasticsearch可以理解的格式。 4. **容错和恢复机制**:研究Flume的故障转移和数据恢复策略,以确保数据不丢失。 5. **性能优化**:探讨如何调整Flume和...
Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:Java JDK 安装 在安装 Flume-...
4. **Flume 与 Spark streaming 整合代码**:在代码层面,定义一个自定义的 Spark Sink,负责将接收到的 Flume 数据转换为 Spark 可处理的 DStream,然后进行进一步的实时处理。 二、Kafka 的安装部署 1. **Kafka*...
这个压缩包包含的 `code` 文件夹很可能是包含了已经修改过的 Java 源代码,这些代码修改了 Flume 的核心部分,使得 `exec` 和 `taildir` 源可以在 Windows 10 操作系统上正确运行。 描述中指出,为了使用这个修改后...
2. **数据源**: 数据源(如 syslog、kafka 或 JMS)负责从日志生成事件,将其推送到Flume代理。 3. **数据通道**: 通道作为临时存储,确保数据在传输到接收器之前不会丢失。常见的通道类型包括内存通道和文件通道。 ...
flume支持RabbitMQ插件
3. **添加第三方库**: 这里提到的 "lib" 文件可能包含这些自定义或更新的库,它们是 Flume 与 HBase 2.0 通信所必需的。将这些库添加到 Flume 的类路径中,可以让 Flume 正确识别和使用新的 API。 4. **配置 Flume*...
Apache Flume是一款分布式、可靠且可用的系统,主要用于高效地从多种不同的数据源收集、聚合和移动大量的日志数据到一个集中的存储库。除了用于日志数据聚合外,由于数据源是可定制的,Flume可以用来传输大量包括但...
这个压缩包“Apache flume1.6_src”包含了 Flume 1.6.0 版本的源代码,对于理解其工作原理、学习底层技术以及进行定制化开发非常有帮助。 Flume 的核心组件主要包括 Channel、Sink 和 Source 三部分: 1. **Source...
这个版本的 Flume 已经预先编译完成,用户下载后可以直接进行安装和使用,无需自行编译源代码。 描述中的 "编译好的flume1.9.0,下载安装即可使用" 提醒我们,该文件包含的 Flume 实例是已经准备好运行的,只需遵循...