Flume基本架构图:
一个Agent中有Source、Channel和Sink。Sink可以连接HDFS,JMS或者其他的Agent Source
Flume术语解释
-
FlumeEvent
A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes。
问题,既然Flume是以Event为单位进行数据处理,那么,对Event解析是怎么做的?通过Spark集成Flume这个例子研究下这个问题,然后再来更新
-
Agent
Flume agent is a (JVM) process that hosts the components(Source、Channel、Sink) through which events flow from an external source to the next destination (hop).
-
Source
Flume Agent的Source用于接收外部数据源发送过来的数据(例如上例中的Web Server),注意的是,外部数据源发送的数据必须满足Agent Source定义的数据源的格式。比如对于Avro Source,那么这个Source仅接收Avro格式的数据,外部数据源可以通过Avro Client的方式给Agent的Source发送数据;Avro Source也可以接收其它Agent的Avro Sink发送的Avro Event数据。(A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink)
-
Channel
Agent Channel是Agent Source接收到数据的一个缓冲,数据在被消费前(写入到Sink)对读取到的数据进行缓冲。When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem
-
Sink
The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow.
-
Source和Sink的异步特性
The source and sink within the given agent run asynchronously with the events staged in the channel.
Agent可以定义多个Channel以及Sink
如下图所见,一个Source接收到的数据,可以写到不同的Channel中,然后不同的Sink连接到这个Channel,从而达到同一个日志写到不同存储位置(比如HDFS、JMS、MySQL)或者作为其它Agent的外部数据源继续流向下一个Agent,这就形成了Pipeline的数据处理流
Agent可以组合,构成流式处理系统,Agent foo的输出作为Agent bar的输入,Pipe处理。同时一个Agent可以有多个Channel和Sink,因而同一条日志,可以同时流向不同的地方,比如HDFS,JMS或者另外的Agent。
如何配置一个Agent,多个source和sink?如下所示,使用空格隔开而不是逗号
#2个channel和2个sink的配置文件 这里我们可以设置两个sink,一个是kafka的,一个是hdfs的; a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
多个Agent可以输出到一个Agent,这样就构成了聚合Consolidateion的效果,
两个Agent组合产生多级处理
Flume安装
1.下载Flume
wget http://mirror.bit.edu.cn/apache/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
2.Flume解压并配置
tar xzvf apache-flume-1.5.2-bin.tar.gz ///在conf目录下 cp flume-env.sh.template flume-env.sh ///在flume-env.sh文件中,添加JAVA_HOME环境变量: JAVA_HOME=/home/hadoop/software/jdk1.7.0_67
Flume可用命令
[hadoop@hadoop bin]$ ./flume-ng help Error: Unknown or unspecified command '--help' Usage: ./flume-ng <command> [options]... commands: help display this help text agent run a Flume agent avro-client run an avro Flume client version show Flume version info global options: --conf,-c <conf> use configs in <conf> directory --classpath,-C <cp> append to the classpath --dryrun,-d do not actually start Flume, just print the command --plugins-path <dirs> colon-separated list of plugins.d directories. See the plugins.d section in the user guide for more details. Default: $FLUME_HOME/plugins.d -Dproperty=value sets a Java system property value -Xproperty=value sets a Java -X option agent options: --conf-file,-f <file> specify a config file (required) --name,-n <name> the name of this agent (required) --help,-h display help text avro-client options: --rpcProps,-P <file> RPC client properties file with server connection params --host,-H <host> hostname to which events will be sent --port,-p <port> port of the avro source --dirname <dir> directory to stream to avro source --filename,-F <file> text file to stream to avro source (default: std input) --headerFile,-R <file> File containing event headers as key/value pairs on each new line --help,-h display help text
Flume实例
1. Avro实例
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制
1.1 配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.2 启动agent
1.2.1 启动命令
./flume-ng agent -c ../conf -f ../conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
启动后,4141端口被启动用于监听avro client发送来的event
-n表示agent的名字,此处是a1
-c表示配置文件的路径,flume-env.sh和log4j.properties都在这个目录下
-f表示配置文件的路径,这个配置文件定义了agent,source,channel和sink
1.2.2 启动日志
15/02/13 21:05:16 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 15/02/13 21:05:16 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/avro.conf 15/02/13 21:05:16 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 15/02/13 21:05:16 INFO conf.FlumeConfiguration: Processing:k1 15/02/13 21:05:16 INFO conf.FlumeConfiguration: Processing:k1 15/02/13 21:05:16 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 15/02/13 21:05:16 INFO node.AbstractConfigurationProvider: Creating channels 15/02/13 21:05:16 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 15/02/13 21:05:16 INFO node.AbstractConfigurationProvider: Created channel c1 15/02/13 21:05:16 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro 15/02/13 21:05:16 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 15/02/13 21:05:16 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 15/02/13 21:05:16 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@64cefc3a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 15/02/13 21:05:16 INFO node.Application: Starting Channel c1 15/02/13 21:05:16 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms 15/02/13 21:05:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 15/02/13 21:05:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 15/02/13 21:05:17 INFO node.Application: Starting Sink k1 15/02/13 21:05:17 INFO node.Application: Starting Source r1 15/02/13 21:05:17 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }... 15/02/13 21:05:17 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 15/02/13 21:05:17 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 15/02/13 21:05:17 INFO source.AvroSource: Avro source r1 started.
1.3 测试:
1.3.1建立测试文件(在bin目录下建立log.00文件),内容如下:
1233456789 abcdefghij
1.3.2 启动avro-client用于发送Event到指定的Agent Source
./flume-ng avro-client -H localhost -p 4141 -F log.00
-c .表示什么意思?----avro client是没有-c这个参数的
-F表示什么意思?--filename,-F <file> text file to stream to avro source (default: std input),为avro client提供数据输入,如果不指定则从键盘输入
-H表示什么意思? --host,-H <host> hostname to which events will be sent -H和-p指定了avro source的IP和端口,这个在source中指定
-p表示什么意思?--port,-p <port> port of the avro source
1.3.3 控制台上输出内容:
15/02/13 21:12:10 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 => /127.0.0.1:4141] OPEN 15/02/13 21:12:10 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 => /127.0.0.1:4141] BOUND: /127.0.0.1:4141 15/02/13 21:12:10 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 => /127.0.0.1:4141] CONNECTED: /127.0.0.1:38319 15/02/13 21:12:11 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33 33 34 35 36 37 38 39 1233456789 } 15/02/13 21:12:11 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 64 65 66 67 68 69 6A abcdefghij } 15/02/13 21:12:11 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 :> /127.0.0.1:4141] DISCONNECTED 15/02/13 21:12:11 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 :> /127.0.0.1:4141] UNBOUND 15/02/13 21:12:11 INFO ipc.NettyServer: [id: 0xd1864529, /127.0.0.1:38319 :> /127.0.0.1:4141] CLOSED 15/02/13 21:12:11 INFO ipc.NettyServer: Connection to /127.0.0.1:38319 disconnected.
可见log.00中的内容已经被agen收到,Event包括两部分内容,headers和body(body是内容的字节数组)
1.4 追加内容到log.00
为log.00追加内容,并没有看到Flume继续消费
2. exec命令
说明:exec命令用于通过执行一条命令获得输出,然后发送给Flume。对于tail,必须产生足够多的数据,才会发送给Flume
2.1 配置文件:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -f /home/hadoop/software/apache-flume-1.5.2-bin/bin/log_exec_tail # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2.2 启动agent
2.2.1 启动命令
./flume-ng agent -c . -f ../conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
2.2.2 启动日志
15/02/13 21:39:06 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 15/02/13 21:39:06 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:../conf/exec_tail.conf 15/02/13 21:39:06 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 15/02/13 21:39:06 INFO conf.FlumeConfiguration: Processing:k1 15/02/13 21:39:06 INFO conf.FlumeConfiguration: Processing:k1 15/02/13 21:39:06 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 15/02/13 21:39:06 INFO node.AbstractConfigurationProvider: Creating channels 15/02/13 21:39:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 15/02/13 21:39:06 INFO node.AbstractConfigurationProvider: Created channel c1 15/02/13 21:39:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec 15/02/13 21:39:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger 15/02/13 21:39:06 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 15/02/13 21:39:06 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@471719b6 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 15/02/13 21:39:06 INFO node.Application: Starting Channel c1 15/02/13 21:39:06 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms 15/02/13 21:39:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 15/02/13 21:39:06 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 15/02/13 21:39:06 INFO node.Application: Starting Sink k1 15/02/13 21:39:06 INFO node.Application: Starting Source r1 15/02/13 21:39:06 INFO source.ExecSource: Exec source starting with command:tail -f /home/hadoop/software/flume-1.5.2-bin/bin/log_exec_tail 15/02/13 21:39:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 15/02/13 21:39:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 15/02/13 21:39:07 INFO source.ExecSource: Command [tail -f /home/hadoop/software/flume-1.5.2-bin/bin/log_exec_tail] exited with 1
最后一行显示exited with 1表示啥意思?
2.3修改log_exec_tail文件
[hadoop@hadoop bin]$ for i in {1..100};do echo "exec tail$i" >> log_exec_tail;echo $i;sleep 0.1;done
2.4 查看Flume的变化
15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 38 37 exec tail1} 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 38 38 exec tail2 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 38 39 exec tail3 } ..... 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 30 exec tail90 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 31 exec tail91 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 32 exec tail92 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 33 exec tail93 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 34 exec tail94 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 35 exec tail95 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 36 exec tail96 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 37 exec tail97 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 38 exec tail98 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 39 39 exec tail99 } 15/02/13 21:48:53 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 61 69 6C 31 30 30 exec tail100 }
2.5将log_exec_tail改名后新建一个同名的文件,Flume能否监听到内容变化?监听不到!!!
问题:Flume收集系统产生的日志是否就是通过tail -f命令获取文件的内容?不是,因为文件重建后再写入内容,Flume监听不到
3.HDFS Sink
3.1 配置
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 ###HDFS的数目路径 a1.sinks.k1.hdfs.path = hdfs://hadoop.master:9000/user/hadoop/flume a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = minute # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3.2 启动Agent
./flume-ng agent -c . -f ../conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
启动发现,Flume已经占了5140端口?只能通过telnet发送消息
3.3 发送消息
telnet localhost 5140
3.4 Flume端的日志
3.5 查看HDFS上的内容
[hadoop@hadoop bin]$ ./hdfs dfs -ls /user/hadoop/flume Found 2 items -rw-r--r-- 2 hadoop supergroup 318 2015-02-13 22:32 /user/hadoop/flume/Syslog.1423884708095 -rw-r--r-- 2 hadoop supergroup 167 2015-02-13 22:32 /user/hadoop/flume/Syslog.1423884746573 [hadoop@hadoop bin]$ ./hdfs dfs -cat /user/hadoop/flume/Syslog.1423884746573 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableĶfLۀ¹؍+dþK$ÿ
3.6 这个如何用?因为HDFS中存放的是BytesWritable对象?
相关推荐
### Flume 1.6.0 入门详解:安装、部署及案例分析 #### 一、Flume 概述 Flume 是 Cloudera 开发的一款高效、可靠且易于扩展的日志收集系统,适用于大数据环境下的日志采集任务。Flume 的初始版本被称为 FlumeOG...
Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、分布式的海量日志采集、聚合和...
Apache Flume 是一个强大的、分布式的日志聚合工具,特别适用于大数据环境中的日志收集。Flume 1.5.0 版本是其发展历程中的一个重要里程碑,提供了更稳定、高效和可扩展的功能。以下是对 Flume 的详细介绍: **...
Flume是的一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数据进行简单处理并写到各种数据接收方的能力。 2、适应人群 有一定的...
以下是对Flume入门使用的详细说明: 1. **Flume 组件配置**: 在`netcat-logger.conf`配置文件中,我们看到Flume的配置主要由三部分组成:Sources、Sinks 和 Channels。 - **Sources**:在这里是`r1`,类型设置为...
flume入门介绍,简单介绍flume的背景和应用场景,flume的实现原理以及案例分享
#### 一、Flume简介与应用场景 Flume是一种高可靠、高性能的服务,用于收集、聚合和移动大量日志数据。它具有灵活的架构,可以根据不同的需求进行配置,并支持多种数据源类型。Flume广泛应用于大数据处理领域,特别...
#### 一、Flume 入门 ##### 1.1 Flume 概述 Flume 是一个分布式的、可靠的、高可用的日志采集系统,主要用于收集、汇总和移动大量的日志数据。它由 Cloudera 公司开发并开源,现已成为了 Hadoop 生态系统中的一个...
3. Flume快速入门: - **安装地址**:Flume的官网、文档查看和下载链接分别在http://flume.apache.org/、http://flume.apache.org/FlumeUserGuide.html和http://archive.apache.org/dist/flume/。 - **安装部署...
Apache Flume是一个分布式、可靠且可用的系统,...以上内容详细介绍了Flume的核心概念、架构组件、系统要求、安装步骤和一个简单的入门案例。理解这些知识点对于成功配置和使用Flume来收集、聚合和传输数据至关重要。
#### 二、Flume入门 **2.1 Flume安装部署** 安装Flume前需确保已具备Java运行环境。Flume的下载与安装步骤如下: 1. **2.1.1 安装地址** - **Flume官网地址**: [http://flume.apache.org/]...
Flume 是 Apache Hadoop 生态系统中的一个关键组件,它设计用于高效地收集、聚合和传输大规模日志数据。在本文中,我们将深入探讨 Flume 的安装、配置、测试以及如何将其应用于实际案例,即从不同节点采集日志并存储...
Flume的入门案例包括监控端口数据官方案例,使用Flume监听一个端口,收集该端口数据,并打印到控制台。 Flume的应用场景非常广泛,包括大数据采集、日志采集、数据传输等。Flume的优点包括高可用性、灵活性、可靠...
#### 四、Flume快速入门 **2.1 Flume安装地址** - **官方地址**:[http://flume.apache.org/](http://flume.apache.org/) - **文档地址**:[http://flume.apache.org/FlumeUserGuide.html]...
Flume 快速入门教程,文本数据采集
Flume入门案例** 以监控端口数据为例,Flume可以监听特定端口,收集流入数据并输出到控制台。实现步骤包括安装必要的工具(如netcat),配置Flume Agent的Source为netcat类型,Sink为logger,然后启动Flume并验证...
### Flume NG:新一代数据收集系统入门指南 #### 什么是Flume NG? Flume NG(Next Generation)旨在比Flume OG(Original Generation)更简单、更小巧且更容易部署。为了实现这一目标,Flume NG 不承诺与 Flume ...
"大数据技术之Flume.pdf"可能包含Flume的基本概念、架构、配置、操作和最佳实践等内容,适合初学者入门。而"视频链接.txt"则可能提供了尚硅谷的Flume视频教程链接,这种结合理论与实践的学习方式有助于深入理解和...