`
liyonghui160com
  • 浏览: 778616 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Flume监听文件目录sink至hdfs按照每天切割

阅读更多

 

 

采用的channels为file,sink为hdfs,此处往hdfs写的策略是当时间达到3600s或者文件大小达到128M。可以自己调整

 

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure spooldir source1
#agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1
#agent1.sources.source1.fileHeader = true

# Describe/configure tail -F source1
agent1.sources.source1.type = exec 
agent1.sources.source1.command = tail -n 0 -F /tmp/log.log
agent1.sources.source1.channels = channel1

# Describe/configure nc source1
#agent1.sources.source1.type = netcat
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444

#configure host for source
agent1.sources.source1.interceptors = i1 i2
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
agent1.sources.source1.interceptors.i2.type = timestamp
# Describe sink1
#agent1.sinks.sink1.type = logger

agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%Y-%m-%d/%H%M%S
agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%Y-%m-%d/%H
agent1.sinks.sink1.hdfs.filePrefix = %{hostname}/events-
agent1.sinks.sink1.hdfs.inUsePrefix = .
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
agent1.sinks.sink1.hdfs.batchSize= 1000
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 128000000
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.rollInterval = 3600
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

 

生产实例:

 

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'a
#agent section  
producer.sources = s 
producer.channels = c
producer.sinks = r

#producer.sources.s.type = seq
producer.sources.s.channels = c
producer.sources.s.type = exec
producer.sources.s.command=tail -n 0 -F /usr/local/nginx/nginxlog/access.log
producer.sources.s.deletePolicy=never
#producer.sources.s.type = avro
#producer.sources.s.bind = localhost
#producer.sources.s.port = 10000
# Each sink's type must be defined(给谁了)
#producer.sinks.r.type = avro
#producer.sinks.r.hostname = 10.1.1.100
#producer.sinks.r.port = 20000
producer.sources.source1.interceptors = i1
producer.sources.source1.interceptors.i1.type = timestamp

producer.sinks.r.type = hdfs
producer.sinks.r.hdfs.path = hdfs://localhost:8010/user/hive/warehouse/tail/%Y-%m-%d
producer.sinks.r.hdfs.inUsePrefix = .
producer.sinks.r.hdfs.maxOpenFiles = 5000
producer.sinks.r.hdfs.batchSize= 1000
producer.sinks.r.hdfs.fileType = DataStream
producer.sinks.r.hdfs.writeFormat =Text
producer.sinks.r.hdfs.rollSize = 128000000
producer.sinks.r.hdfs.rollCount = 0
producer.sinks.r.hdfs.rollInterval = 3600
producer.sinks.r.hdfs.useLocalTimeStamp = true
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000000
producer.channels.c.transactionCapacity = 1000
#producer.channels.c.type=file
#producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent
#producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen

 

 

实例启动脚本:

 

#发送到hdfs
#./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-hdfs.properties -n producer -Dflume.root.logger=INFO,console

 

 

 

 

分享到:
评论

相关推荐

    Flume采集Rabbitmq数据同步到HDFS

    agent.sinks.hdfsSink.hdfs.path = hdfs://hdfs_host:9000/user/flume/data/%y-%m-%d/%H%M%S agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink...

    flime安装+配置+测试+案例(采集日志至HDFS)

    Flume 的安装包括解压安装包,重命名 Flume 目录,配置 `flume-env.sh` 文件,并将环境变量分发到集群中的其他节点。确保所有节点的环境变量设置正确,可以通过执行 `flume-ng version` 命令来验证。如果遇到报错,...

    flume配置文件demo

    在大数据处理中,Flume 起到了关键的作用,它能够帮助用户从各种数据源(如网络套接字、日志文件、Avro 数据等)收集数据,然后将这些数据传输到目的地,如 HDFS(Hadoop 分布式文件系统)、HBase 或其他存储系统。...

    星环大数据平台_Flume使用方法.pdf

    此外,文档中还指导实验者在本地创建Flume监听目录,复制日志文件至该目录,并设置Hadoop当前用户为hdfs以进行访问授权。 ### 实验内容 实验内容包括四个部分:安装Flume、配置Flume、启动TDHClient、创建Flume监听...

    Flume学习文档(2){Flume安装部署、Flume配置文件}.docx

    1. **Source**:配置文件中的`a1.sources.r1.type = netcat`定义了一个名为r1的Source,类型为netcat,这意味着Flume将监听`localhost`的44444端口来接收数据。 2. **Sink**:`a1.sinks.k1.type = logger`指定了一...

    大数据采集技术-Flume监控端口实验手册.pdf

    agent.sinks.hdfsSink.hdfs.path = hdfs://localhost:9000/user/flume/data agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.batch...

    Flume笔记.zip

    Flume 的核心概念包括源(Source)、通道(Channel)和接收器(Sink),它们共同构建了一个高效的数据传输管道。 1. **Flume 概述** - Flume 的主要目标是简化大数据的收集过程,尤其适用于日志数据的管理。 - 它...

    log4j输出日志到flume

    这个配置中,Flume Agent监听本地41414端口接收来自Log4j的日志,使用一个内存Channel暂时存储这些日志,然后将它们写入HDFS。 在`FlumeTest`中,可能会有一个测试类,模拟生成日志并验证Flume是否正确接收和处理。...

    用Flume采集多台机器上的多种日志并存储于HDFS

    本文将详细介绍如何使用Flume从多台机器上采集不同类型的日志,并将其存储至Hadoop的分布式文件系统(HDFS)中,以供后续的数据分析。 #### 一、需求概述 假设我们有三台服务器:A、B 和 C,其中 A 和 B 两台机器...

    Flume安装包、安装文档

    3. **创建配置文件**:Flume的配置文件通常命名为`flume.conf`,在`conf`目录下创建此文件,并根据需求定义Agent的Source、Channel和Sink。 4. **启动Flume**:使用`flume-ng agent`命令启动Flume Agent,命令格式...

    大数据采集技术-flume监控httpsources.pdf

    agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/user/flume/events agent.sinks.hdfs-sink.hdfs.filePrefix = flume- agent.sinks.hdfs-sink.hdfs.fileType = DataStream agent.sinks.hdfs-sink....

    Apache flume1.6_src

    Flume 支持多种类型的 Source,如 TailSource(监听日志文件尾部)、AvroSource(接收 Avro 格式的数据)和 JMSSource(从 JMS 提供者接收消息)。Source 负责读取数据并将其放入 Channel。 2. **Channel**:通道,...

    Flume集群搭建1

    为了验证 Flume 集群的工作,可以在 hadoop12 的 `logs` 目录下创建一个测试文件,然后观察 hadoop13 上 `flume-use-case-test.log` 文件是否接收到并记录了这些数据。这可以确认 Flume 集群的正确运行。 参考文档...

    jar.tar.gz

    配置文件(通常是flume.conf)需要正确指定HDFS Sink的参数,包括HDFS的地址、文件名、写入模式等。 总的来说,"jar.tar.gz"文件对于那些使用Flume 1.8版本并希望将日志数据同步到HDFS的用户来说是必需的。这个...

    windows下flume1.7使用文档

    - 将下载的 `apache-flume-1.7.0-bin.tar.gz` 文件解压至指定位置,例如 `D:\apache-flume-1.7.0-bin`。 - 修改 `D:\apache-flume-1.7.0-bin\conf\log4j.properties` 文件,指定日志目录,例如 `flume.log.dir=D:/...

    flume自学手册

    - **配置管理:** Flume支持通过配置文件管理source、channel和sink。 - **扩展性:** Flume的设计具有很好的扩展性,支持多种插件和自定义组件。 #### 五、参考文档 - **官方网站:** [http://flume.apache.org/]...

    apache-flume-1.7.0-bin.tar.gz

    Sinks是Flume的数据输出端,它们负责将数据从Channels传输到目标位置,如HDFS(Hadoop分布式文件系统)、Elasticsearch、Kafka或其他日志存储或分析系统。通过灵活的配置,Sinks可以实现数据的多样化处理和分发。 ...

    Flume1.8安装部署

    1. 解压并安装:使用tar命令解压安装包,命令为 `tar -zxvf apache-flume-1.8.0-bin.tar.gz`,然后将安装目录重命名为 `/usr/local/apache/flume1.8`。 2. 配置环境变量:export `FLUME_HOME=/usr/local/apache/...

    flume安装程序

    在这个例子中,`source1`是一个Avro源,监听本地主机的44444端口,`sink1`是一个File Roll接收器,将数据写入指定的输出目录,`channel1`是内存通道用于在源和接收器之间临时存储数据。 5. **监控和调试**: 你...

Global site tag (gtag.js) - Google Analytics