flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
一、什么是Flume?
flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。
flume的特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
flume的可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。
flume的可恢复性:
还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
flume的一些核心概念:
Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
Client生产数据,运行在一个独立的线程。
Source从Client收集数据,传递给Channel。
Sink从Channel收集数据,运行在一个独立线程。
Channel连接 sources 和 sinks ,这个有点像一个队列。
Events可以是日志记录、 avro 对象等。
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示:
二、flume的官方网站在哪里?
http://flume.apache.org/
三、在哪里下载?
http://www.apache.org/dyn/closer.cgi/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
四、如何安装?
1)将下载的flume包,解压到/home/hadoop目录中,你就已经完成了50%:)简单吧
2)修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置
root@m1:/home/hadoop/flume-1.5.0-bin# cp conf/flume-env.sh.template conf/flume-env.sh
root@m1:/home/hadoop/flume-1.5.0-bin# vi conf/flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
# Enviroment variables can be set here.
JAVA_HOME=/usr/lib/jvm/java-7-oracle
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""
3)验证是否安装成功
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng version
Flume 1.5.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97
Compiled by hshreedharan on Wed May 7 14:49:18 PDT 2014
From source with checksum a01fe726e4380ba0c9f7a7d222db961f
root@m1:/home/hadoop#
出现上面的信息,表示安装成功了
五、flume的案例
案例1:Syslogtcp
Syslogtcp监听TCP的端口做为数据源
a)创建agent配置文件
root@flume1 flume# vi ./conf/syslog_tcp.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
b)启动flume agent a1
root@flume1 flume# ./bin/flume-ng agent --conf conf --conf-file ./conf/syslog_tcp.conf --name a1 -Dflume.root.logger=INFO,console
c)新连接shh,测试产生syslog
root@flume1 flume# echo "hello idoall.org syslog" | nc localhost 5140
d)在1的控制台,可以看到以下信息:
/08/10 11:41:45 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/flume-1.5.0-bin/conf/syslog_tcp.conf
/08/10 11:41:45 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
/08/10 11:41:45 INFO conf.FlumeConfiguration: Processing:k1
/08/10 11:41:45 INFO conf.FlumeConfiguration: Processing:k1
/08/10 11:41:45 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Creating channels
/08/10 11:41:45 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Created channel c1
/08/10 11:41:45 INFO source.DefaultSourceFactory: Creating instance of source r1, type syslogtcp
/08/10 11:41:45 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
/08/10 11:41:45 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
/08/10 11:41:45 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6538b14 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
/08/10 11:41:45 INFO node.Application: Starting Channel c1
/08/10 11:41:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
/08/10 11:41:45 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
/08/10 11:41:45 INFO node.Application: Starting Sink k1
/08/10 11:41:45 INFO node.Application: Starting Source r1
/08/10 11:41:45 INFO source.SyslogTcpSource: Syslog TCP Source starting...
/08/10 11:42:15 WARN source.SyslogUtils: Event created from Invalid Syslog data.
/08/10 11:42:15 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 20 69 64 6F 61 6C 6C 2E 6F 72 67 hello idoall.org }
案例2:netcat
a)创建agent配置文件
root@flume1 flume# vi ./conf/netcat_example.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
b)启动flume agent a1
root@flume1 flume# ./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=INFO,console
c)新连接shh,测试产生syslog
[root@flume1 conf]# curl -X GET http://localhost:44444?w=DDDDDDDDDDDDDDDDDDDD
OK
OK
OK
OK
d)在1的控制台,可以看到以下信息:
2016-02-02 14:59:11,849 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 47 45 54 20 2F 3F 77 3D 44 44 44 44 44 44 44 44 GET /?w=DDDDDDDD }
2016-02-02 14:59:11,849 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 55 73 65 72 2D 41 67 65 6E 74 3A 20 63 75 72 6C User-Agent: curl }
2016-02-02 14:59:11,849 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 48 6F 73 74 3A 20 6C 6F 63 61 6C 68 6F 73 74 3A Host: localhost: }
2016-02-02 14:59:11,849 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 41 63 63 65 70 74 3A 20 2A 2F 2A 0D Accept: */*. }
2016-02-02 14:59:11,850 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 0D . }
分享到:
相关推荐
在实际部署和配置 Flume 时,你需要创建一个配置文件,明确指定 Agent 的 Source、Channel 和 Sink 类型以及相关参数。例如,你可以配置一个 Source 来监听 syslog 数据,使用 MemoryChannel 作为中间缓冲,然后设置...
### Flume 1.6.0 入门详解:安装、部署及案例分析 #### 一、Flume 概述 Flume 是 Cloudera 开发的一款高效、可靠且易于扩展的日志收集系统,适用于大数据环境下的日志采集任务。Flume 的初始版本被称为 FlumeOG...
4. 测试与部署:编译并打包自定义Sink,将其添加到Flume环境的类路径中,最后更新Flume配置,启动Flume服务进行测试和部署。 三、案例分析 参考博客《Flume自定义Sink接入Elasticsearch实践》(链接:...
- **安装部署步骤**:包括将下载的tar.gz文件上传到Linux指定目录,解压并重命名,以及配置环境变量(编辑flume-env.sh文件)。 在实际应用中,Flume常用于日志收集场景,比如Python爬虫产生的数据、Java后台的...
总结来说,Flume作为大数据处理的重要工具,其强大的数据采集和传输能力,以及灵活的架构设计,使其在大数据环境中扮演着不可或缺的角色。通过深入理解和熟练运用Flume,我们可以更有效地管理和利用大规模的日志数据...
- **Kafka配置详解**:详细解释Kafka的各项配置参数及其影响。 - **Kafka的安装**:指导如何安装配置Kafka集群。 - **Kafka的存储策略**:解释Kafka如何存储消息,包括日志段(Log Segments)的管理机制。 - **Kafka...
- **kafka配置详解**:详细解释Kafka的各种配置选项。 - **kafka的安装**:指导Kafka的安装过程。 - **kafka的存储策略**:讨论Kafka的数据存储策略。 - **kafka分区特点**:分析Kafka中分区(partition)的作用。 - *...
2. **Hadoop安装与配置**:如何在本地和集群环境下安装Hadoop,包括环境配置、依赖管理以及优化技巧。 3. **MapReduce编程**:详细讲解MapReduce的编程模型,包括Mapper和Reducer的编写、数据分区、排序机制、...
- **云原生支持**:更好地支持云环境下的部署和管理。 - **边缘计算**:在物联网等场景下发挥重要作用。 通过以上内容的详细介绍,我们可以了解到Hadoop不仅是一个简单的数据处理框架,更是一个包含多种技术和工具...
根据提供的文件信息,“Hadoop实战中文版pdf”这一标题与描述明确指出了文档的主要内容是关于Hadoop的实际应用和技术详解。下面将从标题、描述、标签以及部分内容中提炼出相关的关键知识点。 ### Hadoop概述 1. **...
- **Flume**:是一个高可靠、高性能的服务,用于收集、聚合和移动大量日志数据。 #### 三、Hadoop架构详解 Hadoop的架构设计旨在解决大规模数据处理的需求,其核心组件包括: - **NameNode**:管理HDFS的文件系统...
- Flume是一个高可靠、高性能的日志聚合系统,课程介绍其架构原理、配置管理和应用场景等。 - **知识点9:Kafka** - Kafka是一个分布式流处理平台,课程讲解消息队列的基本概念、Kafka集群部署、生产者与消费者API...
- 分布式环境下,配置文件的管理和同步是一个常见问题。在一个集群中,所有节点的配置信息应保持一致。 - 修改配置文件后,希望这些更改能够迅速同步到各个节点上。 - 这一功能可以由 Zookeeper 实现。可以将配置...
- **云原生化**:越来越多的企业选择将Hadoop部署在云端,这对Hadoop的可扩展性和成本效益提出了更高要求。 总之,《Hadoop MapReduce v2 Cookbook》第二版深入介绍了Hadoop MapReduce V2的相关技术和实践方法,...
### Hadoop权威指南第二版中文版...通过以上内容可以看出,《Hadoop权威指南》第二版中文版全面而深入地介绍了Hadoop的技术细节和实际应用案例,对于希望深入了解和掌握Hadoop技术的读者来说是一本不可或缺的参考书。