# Flume配置说明
## 1.部署结构图
![](source/Flume数据流.png)
## 2.Agent配置
```properties
##########################################
# agent config
###########################################
#*****************agent section**********************
agent.sources = source_pad source_adx source_pad_clean source_db_data
agent.channels = channel_pad channel_adx channel_pad_clean channel_adx_clean channel_db_data
agent.sinks = sink_pad sink_adx sink_pad_clean sink_pad_clean1 sink_adx_clean sink_adx_clean1 sink_db_data sink_db_data1
#*****************source section**********************
#pad source section
agent.sources.source_pad.deletePolicy=immediate
agent.sources.source_pad.type = spooldir
agent.sources.source_pad.channels = channel_pad
agent.sources.source_pad.spoolDir=/wls/data/pad
agent.sources.source_pad.batchSize=500
agent.sources.source_pad.bufferMaxLineLength=20000
#adx source section
agent.sources.source_adx.deletePolicy=immediate
agent.sources.source_adx.type = spooldir
agent.sources.source_adx.channels = channel_adx channel_adx_clean
agent.sources.source_adx.spoolDir=/wls/data/adx
agent.sources.source_adx.batchSize=500
agent.sources.source_adx.bufferMaxLineLength=10000
agent.sources.source_adx.inputCharset=UTF-8
agent.sources.source_adx.decodeErrorPolicy=IGNORE
agent.sources.source_adx.selector.type=replicating
#pad clean source section
agent.sources.source_pad_clean.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.source_pad_clean.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092
agent.sources.source_pad_clean.kafka.topics=pad_report_data_clean
agent.sources.source_pad_clean.channels=channel_pad_clean
#db data source section
agent.sources.source_db_data.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.source_db_data.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092
agent.sources.source_db_data.kafka.topics=db_data
agent.sources.source_db_data.channels=channel_db_data
#*****************sink section**********************
#pad sink section
agent.sinks.sink_pad.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink_pad.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092
agent.sinks.sink_pad.kafka.flumeBatchSize=500
agent.sinks.sink_pad.kafka.producer.acks=1
agent.sinks.sink_pad.kafka.producer.type=async
agent.sinks.sink_pad.kafka.topic=pad_report_data
agent.sinks.sink_pad.kafka.producer.compression.type = snappy
agent.sinks.sink_pad.kafka.producer.linger.ms=50
agent.sinks.sink_pad.channel = channel_pad
#adx sink section
agent.sinks.sink_adx.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink_adx.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092
agent.sinks.sink_adx.kafka.flumeBatchSize=500
agent.sinks.sink_adx.kafka.producer.acks=1
agent.sinks.sink_adx.kafka.producer.type=async
agent.sinks.sink_adx.kafka.topic=adx_report_data
agent.sinks.sink_adx.kafka.producer.compression.type = snappy
agent.sinks.sink_adx.kafka.producer.linger.ms=50
agent.sinks.sink_adx.channel = channel_adx
#pad clean sink
agent.sinks.sink_pad_clean.type=avro
agent.sinks.sink_pad_clean.hostname=30.16.94.72
agent.sinks.sink_pad_clean.port=44444
agent.sinks.sink_pad_clean.threads=10
agent.sinks.sink_pad_clean.channel=channel_pad_clean
agent.sinks.sink_pad_clean1.type=avro
agent.sinks.sink_pad_clean1.hostname=30.16.94.75
agent.sinks.sink_pad_clean1.port=44444
agent.sinks.sink_pad_clean1.threads=10
agent.sinks.sink_pad_clean1.channel=channel_pad_clean
#adx clean sink
agent.sinks.sink_adx_clean.type=avro
agent.sinks.sink_adx_clean.hostname=30.16.94.72
agent.sinks.sink_adx_clean.port=44445
agent.sinks.sink_adx_clean.threads=10
agent.sinks.sink_adx_clean.channel=channel_adx_clean
agent.sinks.sink_adx_clean1.type=avro
agent.sinks.sink_adx_clean1.hostname=30.16.94.75
agent.sinks.sink_adx_clean1.port=44445
agent.sinks.sink_adx_clean1.threads=10
agent.sinks.sink_adx_clean1.channel=channel_adx_clean
#db data sink
agent.sinks.sink_db_data.type=avro
agent.sinks.sink_db_data.hostname=30.16.94.72
agent.sinks.sink_db_data.port=44446
agent.sinks.sink_db_data.threads=10
agent.sinks.sink_db_data.channel=channel_db_data
agent.sinks.sink_db_data1.type=avro
agent.sinks.sink_db_data1.hostname=30.16.94.75
agent.sinks.sink_db_data1.port=44446
agent.sinks.sink_db_data1.threads=10
agent.sinks.sink_db_data1.channel=channel_db_data
#*****************sink group**************************
#pad clean sinkgroup
agent.sinkgroups=gpad
agent.sinkgroups.gpad.sinks=sink_pad_clean sink_pad_clean1
agent.sinkgroups.gpad.processor.type = load_balance
agent.sinkgroups.gpad.processor.selector = round_robin
agent.sinkgroups.gpad.processor.backoff = true
#pad clean sinkgroup
agent.sinkgroups=gadx
agent.sinkgroups.gadx.sinks=sink_adx_clean sink_adx_clean1
agent.sinkgroups.gadx.processor.type = load_balance
agent.sinkgroups.gadx.processor.selector = round_robin
agent.sinkgroups.gadx.processor.backoff = true
#db data sinkgroup
agent.sinkgroups=gdb
agent.sinkgroups.gdb.sinks=sink_db_data sink_db_data1
agent.sinkgroups.gdb.processor.type = load_balance
agent.sinkgroups.gdb.processor.selector = round_robin
agent.sinkgroups.gdb.processor.backoff = true
#*****************channel section**********************
#pad channel section
agent.channels.channel_pad.type = memory
agent.channels.channel_pad.capacity = 20000
agent.channels.channel_pad.keep-alive=60
agent.channels.channel_pad.transactionCapacity=2000
#adx channel section
agent.channels.channel_adx.type = memory
agent.channels.channel_adx.capacity = 20000
agent.channels.channel_adx.keep-alive=60
agent.channels.channel_adx.transactionCapacity=2000
#pad clean channel section
agent.channels.channel_pad_clean.type = memory
agent.channels.channel_pad_clean.capacity = 20000
agent.channels.channel_pad_clean.keep-alive=60
agent.channels.channel_pad_clean.transactionCapacity=2000
#adx clean channel section
agent.channels.channel_adx_clean.type = memory
agent.channels.channel_adx_clean.capacity = 20000
agent.channels.channel_adx_clean.keep-alive=60
agent.channels.channel_adx_clean.transactionCapacity=2000
#db data channel section
agent.channels.channel_db_data.type = memory
agent.channels.channel_db_data.capacity = 10000
agent.channels.channel_db_data.keep-alive=60
agent.channels.channel_db_data.transactionCapacity=2000
############interceptor########
agent.sources.source_pad.interceptors = i1
agent.sources.source_pad.interceptors.i1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.source_pad.interceptors.i1.preserveExisting = false
agent.sources.source_pad.interceptors.i1.headerName =key
agent.sources.source_adx.interceptors=i2
agent.sources.source_adx.interceptors.i2.type=org.apache.flume.interceptor.Md5ConvertInterceptor$Builder
agent.sources.source_pad_clean.interceptors=i3
agent.sources.source_pad_clean.interceptors.i3.type=org.apache.flume.interceptor.DmpTimestampInterceptor$Builder
agent.sources.source_db_data.interceptors=i4
agent.sources.source_db_data.interceptors.i4.type=org.apache.flume.interceptor.DbDataInterceptor$Builder
```
数据流图:
![](source/agent数据流图.png)
## 3.Collector配置
```properties
##########################################
# collector config
###########################################
#*****************agent section**********************
collector.sources=source_pad source_adx source_db
collector.channels=channel_pad channel_adx channel_db
collector.sinks=sink_pad sink_adx sink_db
#*****************source section**********************
#pad source section
collector.sources.source_pad.type = avro
collector.sources.source_pad.channels = channel_pad
collector.sources.source_pad.bind=0.0.0.0
collector.sources.source_pad.port=44444
#adx source section
collector.sources.source_adx.type = avro
collector.sources.source_adx.channels = channel_adx
collector.sources.source_adx.bind=0.0.0.0
collector.sources.source_adx.port=44445
#db source section
collector.sources.source_db.type = avro
collector.sources.source_db.channels = channel_db
collector.sources.source_db.bind=0.0.0.0
collector.sources.source_db.port=44446
#pad clean sink
collector.sinks.sink_pad.type=hdfs
collector.sinks.sink_pad.hdfs.path=hdfs://dmp/data/logs/pad/%{day}
collector.sinks.sink_pad.hdfs.rollInterval=86400
collector.sinks.sink_pad.hdfs.rollSize=0
collector.sinks.sink_pad.hdfs.idleTimeout=172800
collector.sinks.sink_pad.hdfs.callTimeout=60000
collector.sinks.sink_pad.hdfs.writeFormat=Text
collector.sinks.sink_pad.hdfs.filePrefix=pad.master1
collector.sinks.sink_pad.hdfs.rollCount=9900000000
collector.sinks.sink_pad.hdfs.batchSize=3000
collector.sinks.sink_pad.hdfs.fileType=DataStream
collector.sinks.sink_pad.channel=channel_pad
#adx clean sink
collector.sinks.sink_adx.type=hdfs
collector.sinks.sink_adx.hdfs.path=hdfs://dmp/data/logs/%{datatype}/%{day}
collector.sinks.sink_adx.hdfs.rollInterval=0
collector.sinks.sink_adx.hdfs.rollSize=0
collector.sinks.sink_adx.hdfs.idleTimeout=172800
collector.sinks.sink_adx.hdfs.writeFormat=Text
collector.sinks.sink_adx.hdfs.callTimeout=60000
collector.sinks.sink_adx.hdfs.filePrefix=%{datatype}.master1
collector.sinks.sink_adx.hdfs.rollCount=9900000000
collector.sinks.sink_adx.hdfs.batchSize=3000
collector.sinks.sink_adx.hdfs.fileType=DataStream
collector.sinks.sink_adx.channel=channel_adx
#db clean sink
collector.sinks.sink_db.type=hdfs
collector.sinks.sink_db.hdfs.path=hdfs://dmp/data/logs/%{datatype}
collector.sinks.sink_db.hdfs.rollInterval=0
collector.sinks.sink_db.hdfs.rollSize=0
collector.sinks.sink_db.hdfs.idleTimeout=172800
collector.sinks.sink_db.hdfs.writeFormat=Text
collector.sinks.sink_db.hdfs.callTimeout=60000
collector.sinks.sink_db.hdfs.filePrefix=%{datatype}.master1
collector.sinks.sink_db.hdfs.rollCount=9900000000
collector.sinks.sink_db.hdfs.batchSize=1000
collector.sinks.sink_db.hdfs.fileType=DataStream
collector.sinks.sink_db.channel=channel_db
#*****************channel section**********************
#pad channel section
collector.channels.channel_pad.type = memory
collector.channels.channel_pad.capacity = 60000
collector.channels.channel_pad.keep-alive=60
collector.channels.channel_pad.transactionCapacity=10000
#adx channel section
collector.channels.channel_adx.type = memory
collector.channels.channel_adx.capacity = 60000
collector.channels.channel_adx.keep-alive=60
collector.channels.channel_adx.transactionCapacity=6000
#db channel section
collector.channels.channel_db.type = memory
collector.channels.channel_db.capacity = 30000
collector.channels.channel_db.keep-alive=60
collector.channels.channel_db.transactionCapacity=3000
```
数据流图:
![](source/collector数据流图.png)
## 4.Interceptor
在event前置拦截器,可以根据数据添加一些header,例如type、partition、date,用于数据归类和分区,还可以做一些简单过滤工作(flume自带时间戳解析过慢,不建议使用timestamp及相关表达式)
继承Flume中的Interceptor,新增子类Builder继承Interceptor.Builder,然后实现相关方法
```java
public class DmpTimestampInterceptor implements Interceptor{
@Override
public void close() {
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event e) {
return e;
}
@Override
public List<Event> intercept(List<Event> events) {
return events;
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new DmpTimestampInterceptor();
}
@Override
public void configure(Context arg0) {
}
}
}
```
配置interceptor的type为该子类Builder即可
## 5.其他配置
1.no-reload-conf设置是否自动加载配置文件。
2.sinkgroups.g1.processor.type = failover(load_balance)设置失效切换和负载均衡机制。
3.flume组件启动顺序:channels——>sinks——>sources,关闭顺序:sources——>sinks——>channels.
4.sink 可以配置压缩选项。
5.hdfs.idleTimeout用于设置文件长期未操作则释放。
6.spool-source:只能放静态文件,否则读取到正在编辑的文件则异常终止。
相关推荐
本示例中的 "flume配置文件demo" 提供了一些基础的 Flume 配置文件,用于展示其基本工作原理和配置方法。 Flume 的配置文件通常以 `.conf` 结尾,如 `flume-conf.conf`。这些配置文件使用基于 Java 属性的格式,...
配置Flume涉及到编辑Flume配置文件,这是一个基于Java的Properties格式的文本文件。在案例一中,我们看到一个简单的配置示例,用于监听特定端口(如44444)并把接收到的数据打印到控制台。以下是对配置文件中各个...
flume配置文件,文件配了说明,可以拿下来改一改就用。 可以获取端口数据监听或者文件、文件夹内容监听,实时写入hdfs、mysql或者你需要的路径。
### Flume配置双HA HDFS Sink详解 #### 一、背景与需求分析 Apache Flume 是一款高可靠、高性能的服务,用于收集、聚合和移动大量日志数据。它具有简单的可扩展架构,易于定制和部署。然而,在某些情况下,用户...
在这个文档中,我们将深入探讨两个 Flume 配置案例,以理解其工作原理。 案例 1:单一节点配置 这个案例展示了最基础的 Flume 设置,只有一个节点(agent),该节点包含一个源(source)、一个接收器(sink)和一个...
**大数据采集技术与Flume配置详解** 在大数据领域,数据采集是整个数据分析流程的第一步,它涉及从各种来源收集大量数据并将其传输到处理或存储系统。Apache Flume是Apache Hadoop项目的一个子项目,专门设计用于...
Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。
在这个主题中,我们将深入探讨 Flume 的配置,包括部署种类、流配置、组件详解以及多代理流程配置。 1. **Flume 部署种类**: - **多代理流程**:在这种部署中,多个 Flume 代理协同工作,形成一个日志收集网络,...
Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息
版本:0.1.0 Flume 配置完全用 Javascript 编写并且是自包含的。 它允许您直观地布置 Flume 拓扑,输入源、通道和接收器的属性,并为您创建水槽配置文件。 它可以处理多个代理。 目前并非所有的源、接收器、通道都...
kafka对接flume,flume对接elasticSearch,flume配置样例
这里的`agentName`是你自定义的Agent名称,`config/file.conf`是你的Flume配置文件的路径。 4. **配置Flume**: 配置文件是Flume的核心,它定义了数据流的结构。一个基本的配置示例可能如下所示: ``` ...
配置 Flume 数据源主要涉及 Source、Sink 和 Channel 的配置。这些配置通常通过编写 Flume 配置文件来完成。 **示例配置**: ```properties # 定义 Agent 名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #...
5. **Flume配置文件编辑**: 使用`vim`编辑配置文件,如`tail-avro-avro-logger.conf`和`avro-hdfs.conf`,根据实际需求设置Source、Channel和Sink的属性。 6. **开发数据生成脚本**: 在Node02上运行一个Shell...
在描述中提到,这个Flume配置可能用于将数据发送给Kafka,Kafka作为一个分布式消息队列,可以进一步处理和分发这些数据。 4. **Flume配置**:Flume的配置基于简单的文本文件,其中定义了源、通道和接收器的配置细节...
这个插件扩展了 Flume 的源功能,使得我们可以配置一个源来定期查询 MySQL 数据库,并将查询结果作为事件传输到 Flume 配置中的下一个通道。1.5.2 版本可能不是最新的,但它是稳定且兼容 Flume 1.x 系列的版本,适合...
- Flume 的配置基于 Java 属性格式,易于理解和维护。 - 配置文件定义了源、通道和接收器的类型以及它们之间的关联。 - 可以创建多个配置文件,通过 Flume Agent 运行不同的数据流。 5. **Flume 高级特性** - *...