0 flume地址:
0) 官网地址: http://flume.apache.org/
1)官网学习:
http://flume.apache.org/documentation.html 点击 User Guide ---> 进入如下链接http://flume.apache.org/FlumeUserGuide.html
2) 下载地址:
http://flume.apache.org/download.html
1 flume简介:
cloudera公司开发的实时日志收集系统,原名是flume og (original generation),
但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation,改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。
这就是flume og 和 flume ng别名的取名和两者区别。
简要提下 Flume NG (1.x.x)的主要变化:
- sources和sinks 使用channels 进行链接
- 两个主要channel 。1, in-memory channel 非持久性支持,速度快。 缺点:any events still left in the memory channel when an agent process dies can’t be recovered
- 2 , JDBC-based channel 持久性支持。
- 不再区分逻辑和物理node,所有物理节点统称为 “agents”,每个agents 都能运行0个或多个sources 和sinks
- 不再需要master节点和对zookeeper的依赖,以及去掉了collector,配置文件简单化。
- 插件化,一部分面对用户,工具或系统开发人员。
- 使用Thrift、Avro Flume sources 可以从flume0.9.4 发送 events 到flume 1.x
如下是flume og架构图:
具体 flume og到 flume ng到底转变了些什么,见如下链接(ibm developerworks)
http://www.ibm.com/developerworks/cn/data/library/bd-1404flumerevolution/index.html 写的不错
flume作用:
a) flume是一个分布式的数据收集系统,具有高可靠、高可用、事务管理、失败重启等功能。数据处理速度快,完全可以用于生产环境。
b) flume是分布式的日志收集系统(这里的日志是一个泛泛统称,可以是日志 可以是命令行输出 可以是数据文件),把收集来的数据传送到目的地去。
2 flume架构和组成 :
a) client: 生产数据的地方
b) event: 生产的数据
c) agent: flume核心组件,接收生产的数据,暂时存储并在发送数据到目的地后删除存储数据的处理单位,
一个agent就是一个jvm, agent又是由 source, channel,sink,Interceptor等构建而成。
对上述名词具体解释如下:
Client:生产数据,运行在一个独立的线程。
Events:可以是日志记录、 avro 对象等,如果是文本文件通常是一行记录,这也是事务的基本单位
Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
Source:从Client收集数据,传递给Channel。不同的 source,可以接受不同的数据格式,
比如监视外部源--目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,
如果目录中有文件产生,就会立刻读取其内容。
source组件可以处理各种格式的日志数据,eg:avro Sources、thrift Sources、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
支持的这些格式都可以通过http://flume.apache.org/documentation.html 点击 User Guide ---> http://flume.apache.org/FlumeUserGuide.html查询到
Channel:连接 sources 和 sinks ,这个有点像一个队列,是一个存储池,接收source的输出,直到有sink
消费掉channel中的数据,channel中的数据直到进入下一个channel或者进入sink才会被删除,
当sink写入失败后,可以自动重启,不会造成数据丢失。
临时存放的数据可以存放在memory Channel、jdbc Channel、file Channel、自定义。
Sink:从Channel收集数据,运行在一个独立线程。用于把数据发送到目的地的组件
目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。
在整个数据传输过程中,流动的是event。事务保证是在event级别
flume可以支持多级flume的agent,(即多个flume可以连成串,上一个flume可以把数据写到下一个flume上)
支持扇入(fan-in)、扇出(fan-out)
扇入(fan-in): source可以接受多个输入,
扇出(fan-out): sink可以输出到多个目的地
flume ng节点组成图:
多Agent并联下的架构图(flume的web架构):
3 flume特性:
a) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。
Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
end-to- end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。)
Store on failure(这也是scribe采用的策略,当数据接收方crash崩溃时,将数据写到本地,待恢复后,继续发送)
Best effort(数据发送到接收方后,不会进行确认)
b) 可扩展性
物理可扩展性:
Flume采用了三层架构,分别为agent,collector(clint端吧)和storage(sink数据流向方吧),每一层均可以水平扩展。其中,所有agent和 collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避 免了单点故障问题。(这是针对flume og在结合zk情况下)
逻辑可扩展性:
用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file,syslog等),collector和storage(file,HDFS等),就是flume可以采集多种类型的数据,数据最后可以流向保存到多种类型的目的中。
c) 可管理性 这是针对flume og在结合zk情况下
所有agent和Collector由master统一管理,这使得系统便于维护。多master情况,Flume利用 ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动 态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
4 flume安装:
解压: [root@h2sliver112 conf]# tar -zxvf apache-flume-1.5.2-bin.tar.gz 重命名: [root@h2sliver112 local]# mv apache-flume-1.5.2-bin flume1.5.2-bin 增加配置文件 [root@h2sliver112 flume1.5.2-bin]# cd conf [root@h2sliver112 conf]# cp flume-env.sh.template flume-env.sh [root@h2sliver112 conf]# cp flume-conf.properties.template flume-conf.properties [root@h2sliver112 conf]# vi flume-env.sh 修改JAVA_HOME=/opt/jdk1.7 [root@h2sliver112 conf]# 验证 flume-ng version Flume 1.5.2 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 229442aa6835ee0faa17e3034bcab42754c460f5 Compiled by hshreedharan on Wed Nov 12 12:51:22 PST 2014 From source with checksum 837f81bd1e304a65fcaf8e5f692b3f18
5 flume简单案例: 使用avro方式监听一个文件 并将文件通过log4j样式输出到console
0 目的:写一个简单案例(通过avro方式,从客户端上读取一个文件,然后提交到avro服务端的source获取,通过内存channel最后将数据输送到目的地logger 并在控制台输出) testflume----> agent-source-avro----> agent-channel-memory--->agent-sink-flume-logger--->-Dflume.root.logger=DEBUG,console控制台打印 1 对应agent配置文件写法为: 这里我将文件写在目录: # pwd /usr/local/flume1.5.2-bin/conf内 [root@h2sliver112 conf]# vi agent1.conf 内容如下: agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1 agent1.sources.source1.type=avro 表示读取数据源格式为avro agent1.sources.source1.bind=0.0.0.0 agent1.sources.source1.port=41414 端口表示这个client输送到这个agent要走的socket端口, 定义的agent不同端口下表示接受不同client发送来的数据。 agent1.sources.source1.channels=channel1 agent1.channels.channel1.type=memory agent1.sinks.sink1.type=logger 表示agent导出数据打印到flume日志文件中 agent1.sinks.sink1.channel=channel1 2 根据上面配置好的agent.conf配置信息,启动flume agent。 [root@h2sliver112 bin]# flume-ng agent --conf ../conf/ -f ../conf/agent1.conf -n agent1 -Dflume.root.logger=DEBUG,console --conf ../conf/ 指定flume配置文件目录位置 -f指定flume服务端启动监听的agent配置文件 -n指定启动的flume agent别名 -Dflume.root.logger=DEBUG,console 表示数据在控制台打印出 启动flume agent后,可以看到如下信息: Avro source source1 started,此时会不停扫描agent1.conf 上面配置sink为flume logflume logger,配置内容为: more conf/log4j.properties: #for test can use -Dflume.root.logger=DEBUG,console when launching flume 注释表示如果是测试那么可以用这种方式 flume.root.logger=INFO,LOGFILE 默认是将数据写入到当前目录下logs文件夹内的文件名为flume.log的文件中 flume.log.dir=./logs flume.log.file=flume.log 3 本地文件如下: [root@h2sliver112 local]# cat testflume 1 2 3 4 5 3 启动avro client,读取本地文件,在看上面启动的服务端是否有数据输出: 启动avro client命令如下: [root@h2sliver112 bin]# flume-ng avro-client --conf ../conf/ -H localhost -p 41414 -F /usr/local/testflume -H localhost 指定运行机器 -p 41414 指定端口 -F /usr/local/testflume 指定外部数据源文件 上述命令启动后,flume avro client将文件testflume数据输送到上面定义的 flume agent source中, avro-client run an avro Flume client avro-client options: --host,-H <host> hostname to which events will be sent --filename, -F <file> text file to stream to avro source 可以看到监听到的服务端打印结果如下:见截图,可见将文件数据监听的每个Event都打印出来了。
上述案例中,如果启动 flume agent为:
flume-ng agent --conf ../conf/ -f ../conf/agent1.conf -n agent1 ,那么最后产生数据会在:
[root@hadoop3 logs]# pwd
/opt/flume1.5.2/bin/logs
[root@hadoop3 logs]# ll
total 12
-rw-r--r-- 1 root root 11215 Feb 21 00:56 flume.log
如果启动flume agent想启动后在后台执行,那么写成:
flume-ng agent --conf ../conf/ -f ../conf/agent1.conf -n agent1& 加上&即可。
5 flume使用案例2,使用 avro将log4j不断产生的日志数据写到hdfs中:
需要准备的:
a) hadoop2环境,并事先创建好hdfs目录: /flume/events
b) 包flume-ng-log4jappender-1.5.0-cdh5.1.3-jar-with-dependencies.jar 用于将log4j的信息和flume关联,附件可下载此工程和对应包
开工:
0 如果使用maven 需要下载 flume slf4j的依赖:
<!-- flume --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.5.2</version> </dependency>
1 Java端:
1 java
不断产生日志,模拟web系统不停运行产生日志效果
public class GenerateLog4j {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
while(true){
Logger logger = org.apache.log4j.Logger.getLogger(GenerateLog4j.class);
logger.error("日期时间" + System.currentTimeMillis());
Thread.sleep(1000);
}
}
2 Java端的log4j.properties
log4j.rootLogger=INFO,flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.112 // 对应flume 安装的位置机器地址
log4j.appender.flume.Port = 44444 // 对应flume启动的监听端口 这个端口在flume 定义的conf中会写好
log4j.appender.flume.UnsafeMode = true
3 flume:
flume conf/创建 agent2.conf 内容如下
agent2.sources=source1
agent2.channels=channel1
agent2.sinks=sink1
agent2.sources.source1.type=avro
agent2.sources.source1.bind=0.0.0.0
agent2.sources.source1.port=44444 // 指定监听端口
agent2.sources.source1.channels=channel1
agent2.sources.source1.interceptors = i1 i2 指定使用flume的两个拦截器,一个是时间的 一个是IP的
agent2.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent2.sources.source1.interceptors.i1.preserveExisting = true
agent2.sources.source1.interceptors.i1.useIP = true
agent2.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
agent2.channels.channel1.type=memory
agent2.channels.channel1.capacity=10000
agent2.channels.channel1.transactionCapacity=1000
agent2.channels.channel1.keep-alive=30
agent2.sinks.sink1.type=hdfs
agent2.sinks.sink1.channel=channel1
agent2.sinks.sink1.hdfs.path=hdfs://h2single:9000/flume/events/%{host}/%y-%m-%d 指定写到目标hadoop2集群的hdfs某个目录下 可以用hive创建分区表加载并继续做MR操作
agent2.sinks.sink1.hdfs.fileType=DataStream
agent2.sinks.sink1.hdfs.writeFormat=Text
agent2.sinks.sink1.hdfs.rollInterval=0
agent2.sinks.sink1.hdfs.rollSize=10000
agent2.sinks.sink1.hdfs.rollCount=0
agent2.sinks.sink1.hdfs.idleTimeout=5
4 flume目录下启动如下命令:
bin/flume-ng agent --conf ./conf/ -Dflume.monitoring.type=http -Dflume.monitoring.port=34343 -n agent2 -f conf/agent2.conf
// flume应用参数监控 -Dflume.monitoring.port=34343 可以通过http://ip:34343/metrics访问,从而看到监控信息
5 运行Java端GenerateLog4j.java
6 查看h2single hdfs被写入的数据 数据截图如下:
6 flume和其他组件的区别:
6.1) flume和sqoop的区别:
sqoop 定向从关系库导入数据到hadoop生态(hdfs/hive/hbase)
flume 从不同日志类型系统中实时采集数据到hadoop生态 两者使用定位不一样
6.2) flume和kafka的区别:
kafka强调的是吞吐量。数据来源单一,
flume强调的是多种适配器。source sink有很多种。
相关推荐
在本文中,我们将深入探讨 Flume 的安装、配置、测试以及如何将其应用于实际案例,即从不同节点采集日志并存储到 HDFS。 首先,我们从安装 Flume 开始。Flume 的安装包括解压安装包,重命名 Flume 目录,配置 `...
flume agent收集数据 一个源对于两个sink 同时输出到hdfs和kafka 的配置文件,注意其中的 source绑定channel时候 channel1 channel2 不能分开写,该配置文件已经过集群实验成功收集到数据的
最后,我们将数据写入HDFS中。这里需要注意HDFS路径的正确设置。 ```properties a1.sinks.s1.channel=c1 a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://master:9000/weakanalysis a1.sinks.s1.hdfs....
本案例主要介绍如何利用Flume将数据采集并传输到Kafka服务中,然后再由Kafka进行数据的分发与消费。 **2、创建Kafka配置** 为了将数据传入Kafka,需要配置Flume的Sink指向Kafka的某个Topic。 **3、修改Sink配置**...
Spark-Streaming简单小项目 Spark Streaming实时解析flume和kafka传来的josn数据写入mysql ...另,还需将您的spark和hadoop安装文件下的core-site.xml、hdfs-site.xml和hive-site.xml拷贝到src\main\resources目录下
人人网的数据服务平台采用了基于日志分析的设计理念,通过将数据生命周期划分为三个阶段——原始数据、主题数据和可读数据,实现了数据从采集、清洗、加工到最终消费的全流程管理。平台的核心在于其强大的数据处理...
- **Sqoop导入与导出**:指导如何使用Sqoop将数据从关系型数据库导入到Hadoop,或将数据从Hadoop导出到关系型数据库。 #### 七、Flume分布式日志框架 - **Flume简介-基础知识**:介绍Flume的基本概念及其在日志收集...
- **数据写入机制**:所有写操作首先记录在一个内存中的日志结构中,随后周期性地刷新到磁盘上。 - **随机写变顺序写**:这种方式避免了传统B+树在随机写入时遇到的性能瓶颈。 - **写吞吐量高**:相比B+树,在写操作...
### Zookeeper 安装教程详解 #### 一、Zookeeper简介 Zookeeper 是一款针对大型分布式系统的可靠协调系统。...这不仅有助于理解 Zookeeper 的基本概念和工作原理,而且还可以为实际项目中使用 Zookeeper 提供参考。