日志是系统数据的基石,对于系统的安全来说非常重要,它记录了系统每天发生的各种各样的事情,用户可以通过它来检查错误发生的原因,或者寻找受到攻击时攻击者留下的痕迹。日志主要的功能是审计和监测。它还可以实时地监测系统状态,监测和追踪侵入者。现在互联网上存在的日志组件各种各样,我们这里主要讲的是Flume。
Flume 发展历史
Cloudera 开发的分布式日志收集系统 Flume,是 hadoop 周边组件之一。其可以实时的将分布在不同节点、机器上的日志收集到 hdfs 中。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,这点可以在 BigInsights 产品文档的 troubleshooting 板块发现。为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。当然我们现在用的是Flume NG,所以不再讲Flume OG的内容。
Flume定义
Flume是一个高可用,高可靠,分布式海量日志采集、聚合和传输系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume架构介绍
Flume日志收集结构图
Flume 的核心是把数据从数据源收集过来,再送到目的地。
为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。
Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。
Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是 Source、Channel、Sink。
Source 可以接收外部源发送过来的数据。不同的 Source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。
Channel 是一个存储地,接收 Source 的输出,直到有 Sink 消费掉 Channel 中的数据。
Channel 中的数据直到进入到下一个Channel中或者进入终端才会被删除。
当 Sink 写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。
Sink 会消费 Channel 中的数据,然后送给外部源或者其他 Source。如数据可以写入到 HDFS 或者 HBase 中。
Flume 核心概念整理
Agent Agent中包含多个sources和sinks。
Client 生产数据,运行在一个独立的线程。
Source 从Client收集数据,传递给Channel。用来消费传递到该组件的Event。
Sink 从Channel收集数据,将Event传递到Flow Pipeline中的下一个Agent。
Channel 中转Event临时存储,保存Source传递过来Event,连接 sources 和 sinks 。
Events 一个数据单元,带有一个可选的消息头。可以是日志记录、 avro 对象等。
Flume 特点
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Agent是Flume中最小的运行单位,一个Agent中由Source、Sink和Channel三个组件构成。
Event是Flume中基本数据单位,Event中包含有传输数据及数据头数据包
如下图所示:
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。
如下图所示:
Flume 整体架构总结
Flume架构整体上看就是 source-->channel-->sink 的三层架构,类似生成者和消费者的架构,他们之间通过queue(channel)传输,解耦。
Source:完成对日志数据的收集,分成 transtion 和 event 打入到channel之中。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
Flume 下载、安装
安装JDK
1.将下载好的JDK包解压,比如我的解压到 /home/liuqing/jdk1.7.0_72 目录下
2.配置环境变量
在/etc/profile 文件中添加
export JAVA_HOME=/home/liuqing/jdk1.7.0_72 export PATH=$JAVA_HOME/bin:$PATH export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASS_PATH
3.执行source profile
4.在命令行输入 java -version
出现:
java version "1.7.0_72"
Java(TM) SE Runtime Environment (build 1.7.0_72-b14)
Java HotSpot(TM) 64-Bit Server VM (build 24.72-b04, mixed mode)
表示安装成功
安装Flume
1. 从官网 http://flume.apache.org/download.html 下载最新的安装包
2. 解压缩,比如我的解压到 /home/liuqing/hadoop/flume目录
3. 修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置
JAVA_HOME=/home/liuqing/jdk1.7.0_72
4. 验证是否安装成功
root@ubuntu:/home/liuqing/hadoop/flume/bin# ./flume-ng version
出现:
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
表示安装成功
案例
案例1. 单节点 Flume配置
1. 新建配置文件,配置文件示例
# example.conf: A single-node Flume configuration # agent组件名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source 配置 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # sink 配置 a1.sinks.k1.type = logger # 使用内存中Buffer Event Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定 source 和 sink 到channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
将上述配置存为:/home/liuqing/hadoop/flume/conf/example.conf
2. 然后我们就可以启动 Flume 了:
在/home/liuqing/hadoop/flume路径下运行:
bin/flume-ngagent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
其中 -c/--conf 后跟配置目录,-f/--conf-file 后跟具体的配置文件,-n/--name 指定agent的名称
3. 然后我们再开一个 shell 终端窗口,telnet 上配置中侦听的端口,就可以发消息看到效果了:
$ telnet localhost 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. Hello world! <ENTER> OK
4.Flume 终端窗口此时会打印出如下信息,就表示成功了
12/06/1915:32:19 INFO source.NetcatSource: Source starting 12/06/1915:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] 12/06/1915:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
至此,咱们的第一个 Flume Agent 算是部署成功了!
案例2. 结合实际项目
参考:https://github.com/gilt/logback-flume-appender
1. 在/home/liuqing/hadoop/flume/conf/下新建配置文件 test.conf
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure source1 agent1.sources.source1.type = avro agent1.sources.source1.bind = 0.0.0.0 agent1.sources.source1.port = 44444 # Describe sink1 #日志文件按时间生成 #agent1.sinks.sink1.type = FILE_ROLL #agent1.sinks.sink1.sink.directory = /home/liuqing/hadoop/flume/flume-out #agent1.sinks.sink1.sink.rollInterval = 1800 #agent1.sinks.sink1.batchSize = 5 #日志文件根据大小生成 #生成目录在conf文件夹下的log4j.properties可以配置 agent1.sinks.sink1.type = logger # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 1000 agent1.channels.channel1.transactionCapactiy = 100 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
2. 项目已经配好了logback.xml 文件
在logback.xml文件中添加
<appender name="flumeApplender" class="com.xxx.hd.extended.log.flume.FlumeLogstashV1Appender"> <flumeAgents> 192.168.23.235:44444, </flumeAgents> <flumeProperties> connect-timeout=4000; request-timeout=8000 </flumeProperties> <batchSize>2048</batchSize> <reportingWindow>20480</reportingWindow> <additionalAvroHeaders> myHeader=myValue </additionalAvroHeaders> <application>ProjectName</application> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - \(%file:%line\) - %msg%n%ex</pattern> </layout> </appender>
3. 然后我们就可以启动 Flume 了:
在/home/liuqing/hadoop/flume路径下运行:
bin/flume-ng agent --conf ./conf/ -f conf/lqtest.conf -n agent1
4. 现在日志会打印到/home/liuqing/hadoop/flume/logs目录下
日志文件满128M就会自动建一个新的
参考资料
http://flume.apache.org/FlumeUserGuide.html#more-sample-configs 官网用户指导
http://blog.csdn.net/weijonathan/article/details/38557959 flum 负载均衡 容错
http://my.oschina.net/leejun2005/blog/288136#OSC_h1_1 Flume NG 简介及配置实战
http://tech.meituan.com/mt-log-system-arch.html 美团日志收集系统
http://idoall.org/home.php?mod=space&uid=1&do=blog&id=550 Flume 的安装、部署、简单应用
http://shiyanjun.cn/archives/915.html Flume(NG)架构设计要点及配置实践
相关推荐
Flume日志收集与MapReduce模式
自定义开发 Flume 可能涉及创建新的源、接收器、通道或处理器,以便更好地适应特定的日志收集需求。在这个案例中,我们只看到了客户端的自定义部分,但完整的 Flume 集成还需要一个能够处理 Avro 事件的 Flume 代理...
Flume 是一个强大的日志收集系统,主要用于实时数据流的聚合和传输。它在IT行业中被广泛应用,特别是在大数据处理和实时分析场景下。Flume 的设计目标是分布式、可靠和高可用,它允许用户自定义数据源(Source),方便...
Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。它具有容错性和高可用性,设计用于将数据从多个源传输到集中存储系统,如 HDFS(Hadoop 分布式文件系统)。在这个文档中,我们将...
Flume 是一个强大的日志收集系统,用于实时数据流处理,尤其在大数据环境中广泛应用。它提供了灵活和可靠的架构,能够从各种数据源收集数据,并将其传输到存储或处理系统。在深入开发 Flume 的过程中,了解如何...
- **多代理流程**:在这种部署中,多个 Flume 代理协同工作,形成一个日志收集网络,其中每个代理可以接收来自上游代理的数据并将其传递给下游代理。 - **流合并**:在某些情况下,来自不同源头的数据可以合并到一...
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
基于Flume的分布式日志采集分析系统设计与实现 Flume是一种分布式日志采集系统,可以实时地采集和处理大量日志数据。该系统基于Flume、Elasticsearch和Kibana等技术手段,能够对海量日志数据进行实时采集、处理和...