采用的channels为file,sink为hdfs,此处往hdfs写的策略是当时间达到3600s或者文件大小达到128M。可以自己调整
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure spooldir source1 #agent1.sources.source1.type = spooldir #agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1 #agent1.sources.source1.fileHeader = true # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -n 0 -F /tmp/log.log agent1.sources.source1.channels = channel1 # Describe/configure nc source1 #agent1.sources.source1.type = netcat #agent1.sources.source1.bind = localhost #agent1.sources.source1.port = 44444 #configure host for source agent1.sources.source1.interceptors = i1 i2 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname agent1.sources.source1.interceptors.i2.type = timestamp # Describe sink1 #agent1.sinks.sink1.type = logger agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 #agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%Y-%m-%d/%H%M%S agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%Y-%m-%d/%H agent1.sinks.sink1.hdfs.filePrefix = %{hostname}/events- agent1.sinks.sink1.hdfs.inUsePrefix = . agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 1000 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 128000000 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.rollInterval = 3600 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
生产实例:
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'a #agent section producer.sources = s producer.channels = c producer.sinks = r #producer.sources.s.type = seq producer.sources.s.channels = c producer.sources.s.type = exec producer.sources.s.command=tail -n 0 -F /usr/local/nginx/nginxlog/access.log producer.sources.s.deletePolicy=never #producer.sources.s.type = avro #producer.sources.s.bind = localhost #producer.sources.s.port = 10000 # Each sink's type must be defined(给谁了) #producer.sinks.r.type = avro #producer.sinks.r.hostname = 10.1.1.100 #producer.sinks.r.port = 20000 producer.sources.source1.interceptors = i1 producer.sources.source1.interceptors.i1.type = timestamp producer.sinks.r.type = hdfs producer.sinks.r.hdfs.path = hdfs://localhost:8010/user/hive/warehouse/tail/%Y-%m-%d producer.sinks.r.hdfs.inUsePrefix = . producer.sinks.r.hdfs.maxOpenFiles = 5000 producer.sinks.r.hdfs.batchSize= 1000 producer.sinks.r.hdfs.fileType = DataStream producer.sinks.r.hdfs.writeFormat =Text producer.sinks.r.hdfs.rollSize = 128000000 producer.sinks.r.hdfs.rollCount = 0 producer.sinks.r.hdfs.rollInterval = 3600 producer.sinks.r.hdfs.useLocalTimeStamp = true producer.sinks.r.request.required.acks=1 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000000 producer.channels.c.transactionCapacity = 1000 #producer.channels.c.type=file #producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent #producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen
实例启动脚本:
#发送到hdfs #./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-hdfs.properties -n producer -Dflume.root.logger=INFO,console
相关推荐
agent.sinks.hdfsSink.hdfs.path = hdfs://hdfs_host:9000/user/flume/data/%y-%m-%d/%H%M%S agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink...
Flume 的安装包括解压安装包,重命名 Flume 目录,配置 `flume-env.sh` 文件,并将环境变量分发到集群中的其他节点。确保所有节点的环境变量设置正确,可以通过执行 `flume-ng version` 命令来验证。如果遇到报错,...
在大数据处理中,Flume 起到了关键的作用,它能够帮助用户从各种数据源(如网络套接字、日志文件、Avro 数据等)收集数据,然后将这些数据传输到目的地,如 HDFS(Hadoop 分布式文件系统)、HBase 或其他存储系统。...
此外,文档中还指导实验者在本地创建Flume监听目录,复制日志文件至该目录,并设置Hadoop当前用户为hdfs以进行访问授权。 ### 实验内容 实验内容包括四个部分:安装Flume、配置Flume、启动TDHClient、创建Flume监听...
1. **Source**:配置文件中的`a1.sources.r1.type = netcat`定义了一个名为r1的Source,类型为netcat,这意味着Flume将监听`localhost`的44444端口来接收数据。 2. **Sink**:`a1.sinks.k1.type = logger`指定了一...
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost:9000/user/flume/data agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.batch...
Flume 的核心概念包括源(Source)、通道(Channel)和接收器(Sink),它们共同构建了一个高效的数据传输管道。 1. **Flume 概述** - Flume 的主要目标是简化大数据的收集过程,尤其适用于日志数据的管理。 - 它...
这个配置中,Flume Agent监听本地41414端口接收来自Log4j的日志,使用一个内存Channel暂时存储这些日志,然后将它们写入HDFS。 在`FlumeTest`中,可能会有一个测试类,模拟生成日志并验证Flume是否正确接收和处理。...
本文将详细介绍如何使用Flume从多台机器上采集不同类型的日志,并将其存储至Hadoop的分布式文件系统(HDFS)中,以供后续的数据分析。 #### 一、需求概述 假设我们有三台服务器:A、B 和 C,其中 A 和 B 两台机器...
3. **创建配置文件**:Flume的配置文件通常命名为`flume.conf`,在`conf`目录下创建此文件,并根据需求定义Agent的Source、Channel和Sink。 4. **启动Flume**:使用`flume-ng agent`命令启动Flume Agent,命令格式...
agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/flume/events agent.sinks.hdfs-sink.hdfs.filePrefix = flume- agent.sinks.hdfs-sink.hdfs.fileType = DataStream agent.sinks.hdfs-sink....
Flume 支持多种类型的 Source,如 TailSource(监听日志文件尾部)、AvroSource(接收 Avro 格式的数据)和 JMSSource(从 JMS 提供者接收消息)。Source 负责读取数据并将其放入 Channel。 2. **Channel**:通道,...
为了验证 Flume 集群的工作,可以在 hadoop12 的 `logs` 目录下创建一个测试文件,然后观察 hadoop13 上 `flume-use-case-test.log` 文件是否接收到并记录了这些数据。这可以确认 Flume 集群的正确运行。 参考文档...
配置文件(通常是flume.conf)需要正确指定HDFS Sink的参数,包括HDFS的地址、文件名、写入模式等。 总的来说,"jar.tar.gz"文件对于那些使用Flume 1.8版本并希望将日志数据同步到HDFS的用户来说是必需的。这个...
- **Sink**:Sink 负责将 Flume 事件存储到目标位置,如 HDFS、HBase 或其他存储系统。 - **Channel**:Channel 作为 Source 和 Sink 之间的桥梁,用于临时存储事件,保证数据传输的可靠性。 - **Event**:Event 是 ...
- 将下载的 `apache-flume-1.7.0-bin.tar.gz` 文件解压至指定位置,例如 `D:\apache-flume-1.7.0-bin`。 - 修改 `D:\apache-flume-1.7.0-bin\conf\log4j.properties` 文件,指定日志目录,例如 `flume.log.dir=D:/...
- **配置管理:** Flume支持通过配置文件管理source、channel和sink。 - **扩展性:** Flume的设计具有很好的扩展性,支持多种插件和自定义组件。 #### 五、参考文档 - **官方网站:** [http://flume.apache.org/]...
Sinks是Flume的数据输出端,它们负责将数据从Channels传输到目标位置,如HDFS(Hadoop分布式文件系统)、Elasticsearch、Kafka或其他日志存储或分析系统。通过灵活的配置,Sinks可以实现数据的多样化处理和分发。 ...
1. 解压并安装:使用tar命令解压安装包,命令为 `tar -zxvf apache-flume-1.8.0-bin.tar.gz`,然后将安装目录重命名为 `/usr/local/apache/flume1.8`。 2. 配置环境变量:export `FLUME_HOME=/usr/local/apache/...