`
gujialiangsz
  • 浏览: 2199 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Flume配置

阅读更多

# Flume配置说明

 

## 1.部署结构图

 

![](source/Flume数据流.png)

 

 

 

## 2.Agent配置

 

```properties

##########################################

# agent config

###########################################

#*****************agent section**********************

agent.sources = source_pad source_adx source_pad_clean source_db_data

agent.channels = channel_pad channel_adx channel_pad_clean channel_adx_clean channel_db_data

agent.sinks = sink_pad sink_adx sink_pad_clean sink_pad_clean1 sink_adx_clean sink_adx_clean1 sink_db_data sink_db_data1

 

#*****************source section**********************

#pad source section

agent.sources.source_pad.deletePolicy=immediate

agent.sources.source_pad.type = spooldir

agent.sources.source_pad.channels = channel_pad

agent.sources.source_pad.spoolDir=/wls/data/pad

agent.sources.source_pad.batchSize=500

agent.sources.source_pad.bufferMaxLineLength=20000

#adx source section

agent.sources.source_adx.deletePolicy=immediate

agent.sources.source_adx.type = spooldir

agent.sources.source_adx.channels = channel_adx channel_adx_clean

agent.sources.source_adx.spoolDir=/wls/data/adx

agent.sources.source_adx.batchSize=500

agent.sources.source_adx.bufferMaxLineLength=10000

agent.sources.source_adx.inputCharset=UTF-8

agent.sources.source_adx.decodeErrorPolicy=IGNORE

agent.sources.source_adx.selector.type=replicating

 

#pad clean source section

agent.sources.source_pad_clean.type=org.apache.flume.source.kafka.KafkaSource

agent.sources.source_pad_clean.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092

agent.sources.source_pad_clean.kafka.topics=pad_report_data_clean

agent.sources.source_pad_clean.channels=channel_pad_clean

 

#db data source section

agent.sources.source_db_data.type=org.apache.flume.source.kafka.KafkaSource

agent.sources.source_db_data.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092

agent.sources.source_db_data.kafka.topics=db_data

agent.sources.source_db_data.channels=channel_db_data

 

#*****************sink section**********************

#pad sink section

agent.sinks.sink_pad.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.sink_pad.kafka.bootstrap.servers=collector3:9092,collector2:9092,collector1:9092

agent.sinks.sink_pad.kafka.flumeBatchSize=500

agent.sinks.sink_pad.kafka.producer.acks=1

agent.sinks.sink_pad.kafka.producer.type=async

agent.sinks.sink_pad.kafka.topic=pad_report_data

agent.sinks.sink_pad.kafka.producer.compression.type = snappy

agent.sinks.sink_pad.kafka.producer.linger.ms=50

agent.sinks.sink_pad.channel = channel_pad

 

#adx sink section

agent.sinks.sink_adx.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.sink_adx.kafka.bootstrap.servers=collector1:9092,collector2:9092,collector3:9092

agent.sinks.sink_adx.kafka.flumeBatchSize=500

agent.sinks.sink_adx.kafka.producer.acks=1

agent.sinks.sink_adx.kafka.producer.type=async

agent.sinks.sink_adx.kafka.topic=adx_report_data

agent.sinks.sink_adx.kafka.producer.compression.type = snappy

agent.sinks.sink_adx.kafka.producer.linger.ms=50

agent.sinks.sink_adx.channel = channel_adx

 

#pad clean sink

agent.sinks.sink_pad_clean.type=avro

agent.sinks.sink_pad_clean.hostname=30.16.94.72

agent.sinks.sink_pad_clean.port=44444

agent.sinks.sink_pad_clean.threads=10

agent.sinks.sink_pad_clean.channel=channel_pad_clean

 

agent.sinks.sink_pad_clean1.type=avro

agent.sinks.sink_pad_clean1.hostname=30.16.94.75

agent.sinks.sink_pad_clean1.port=44444

agent.sinks.sink_pad_clean1.threads=10

agent.sinks.sink_pad_clean1.channel=channel_pad_clean

 

#adx clean sink

agent.sinks.sink_adx_clean.type=avro

agent.sinks.sink_adx_clean.hostname=30.16.94.72

agent.sinks.sink_adx_clean.port=44445

agent.sinks.sink_adx_clean.threads=10

agent.sinks.sink_adx_clean.channel=channel_adx_clean

 

agent.sinks.sink_adx_clean1.type=avro

agent.sinks.sink_adx_clean1.hostname=30.16.94.75

agent.sinks.sink_adx_clean1.port=44445

agent.sinks.sink_adx_clean1.threads=10

agent.sinks.sink_adx_clean1.channel=channel_adx_clean

 

#db data sink

agent.sinks.sink_db_data.type=avro

agent.sinks.sink_db_data.hostname=30.16.94.72

agent.sinks.sink_db_data.port=44446

agent.sinks.sink_db_data.threads=10

agent.sinks.sink_db_data.channel=channel_db_data

 

agent.sinks.sink_db_data1.type=avro

agent.sinks.sink_db_data1.hostname=30.16.94.75

agent.sinks.sink_db_data1.port=44446

agent.sinks.sink_db_data1.threads=10

agent.sinks.sink_db_data1.channel=channel_db_data

 

#*****************sink group**************************

#pad clean sinkgroup

agent.sinkgroups=gpad

agent.sinkgroups.gpad.sinks=sink_pad_clean sink_pad_clean1

agent.sinkgroups.gpad.processor.type = load_balance

agent.sinkgroups.gpad.processor.selector = round_robin

agent.sinkgroups.gpad.processor.backoff = true

 

 

#pad clean sinkgroup

agent.sinkgroups=gadx

agent.sinkgroups.gadx.sinks=sink_adx_clean sink_adx_clean1

agent.sinkgroups.gadx.processor.type = load_balance

agent.sinkgroups.gadx.processor.selector = round_robin

agent.sinkgroups.gadx.processor.backoff = true

 

 

#db data sinkgroup

agent.sinkgroups=gdb

agent.sinkgroups.gdb.sinks=sink_db_data sink_db_data1

agent.sinkgroups.gdb.processor.type = load_balance

agent.sinkgroups.gdb.processor.selector = round_robin

agent.sinkgroups.gdb.processor.backoff = true

 

#*****************channel section**********************

#pad channel section

agent.channels.channel_pad.type = memory

agent.channels.channel_pad.capacity = 20000

agent.channels.channel_pad.keep-alive=60

agent.channels.channel_pad.transactionCapacity=2000

#adx channel section

agent.channels.channel_adx.type = memory

agent.channels.channel_adx.capacity = 20000

agent.channels.channel_adx.keep-alive=60

agent.channels.channel_adx.transactionCapacity=2000

#pad clean channel section

agent.channels.channel_pad_clean.type = memory

agent.channels.channel_pad_clean.capacity = 20000

agent.channels.channel_pad_clean.keep-alive=60

agent.channels.channel_pad_clean.transactionCapacity=2000

#adx clean channel section

agent.channels.channel_adx_clean.type = memory

agent.channels.channel_adx_clean.capacity = 20000

agent.channels.channel_adx_clean.keep-alive=60

agent.channels.channel_adx_clean.transactionCapacity=2000

#db data channel section

agent.channels.channel_db_data.type = memory

agent.channels.channel_db_data.capacity = 10000

agent.channels.channel_db_data.keep-alive=60

agent.channels.channel_db_data.transactionCapacity=2000

 

############interceptor########

agent.sources.source_pad.interceptors = i1

agent.sources.source_pad.interceptors.i1.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

agent.sources.source_pad.interceptors.i1.preserveExisting = false

agent.sources.source_pad.interceptors.i1.headerName =key

 

agent.sources.source_adx.interceptors=i2

agent.sources.source_adx.interceptors.i2.type=org.apache.flume.interceptor.Md5ConvertInterceptor$Builder

 

agent.sources.source_pad_clean.interceptors=i3

agent.sources.source_pad_clean.interceptors.i3.type=org.apache.flume.interceptor.DmpTimestampInterceptor$Builder

 

agent.sources.source_db_data.interceptors=i4

agent.sources.source_db_data.interceptors.i4.type=org.apache.flume.interceptor.DbDataInterceptor$Builder

```

 

数据流图:

 

![](source/agent数据流图.png)

 

 

 

## 3.Collector配置

 

```properties

##########################################

# collector config

###########################################

#*****************agent section**********************

collector.sources=source_pad source_adx source_db

collector.channels=channel_pad channel_adx channel_db

collector.sinks=sink_pad sink_adx sink_db

#*****************source section**********************

#pad source section

collector.sources.source_pad.type = avro

collector.sources.source_pad.channels = channel_pad

collector.sources.source_pad.bind=0.0.0.0

collector.sources.source_pad.port=44444

#adx source section

collector.sources.source_adx.type = avro

collector.sources.source_adx.channels = channel_adx

collector.sources.source_adx.bind=0.0.0.0

collector.sources.source_adx.port=44445

#db source section

collector.sources.source_db.type = avro

collector.sources.source_db.channels = channel_db

collector.sources.source_db.bind=0.0.0.0

collector.sources.source_db.port=44446

 

#pad clean sink

collector.sinks.sink_pad.type=hdfs

collector.sinks.sink_pad.hdfs.path=hdfs://dmp/data/logs/pad/%{day}

collector.sinks.sink_pad.hdfs.rollInterval=86400

collector.sinks.sink_pad.hdfs.rollSize=0

collector.sinks.sink_pad.hdfs.idleTimeout=172800

collector.sinks.sink_pad.hdfs.callTimeout=60000

collector.sinks.sink_pad.hdfs.writeFormat=Text

collector.sinks.sink_pad.hdfs.filePrefix=pad.master1

collector.sinks.sink_pad.hdfs.rollCount=9900000000

collector.sinks.sink_pad.hdfs.batchSize=3000

collector.sinks.sink_pad.hdfs.fileType=DataStream

collector.sinks.sink_pad.channel=channel_pad

 

#adx clean sink

collector.sinks.sink_adx.type=hdfs

collector.sinks.sink_adx.hdfs.path=hdfs://dmp/data/logs/%{datatype}/%{day}

collector.sinks.sink_adx.hdfs.rollInterval=0

collector.sinks.sink_adx.hdfs.rollSize=0

collector.sinks.sink_adx.hdfs.idleTimeout=172800

collector.sinks.sink_adx.hdfs.writeFormat=Text

collector.sinks.sink_adx.hdfs.callTimeout=60000

collector.sinks.sink_adx.hdfs.filePrefix=%{datatype}.master1

collector.sinks.sink_adx.hdfs.rollCount=9900000000

collector.sinks.sink_adx.hdfs.batchSize=3000

collector.sinks.sink_adx.hdfs.fileType=DataStream

collector.sinks.sink_adx.channel=channel_adx

 

#db clean sink

collector.sinks.sink_db.type=hdfs

collector.sinks.sink_db.hdfs.path=hdfs://dmp/data/logs/%{datatype}

collector.sinks.sink_db.hdfs.rollInterval=0

collector.sinks.sink_db.hdfs.rollSize=0

collector.sinks.sink_db.hdfs.idleTimeout=172800

collector.sinks.sink_db.hdfs.writeFormat=Text

collector.sinks.sink_db.hdfs.callTimeout=60000

collector.sinks.sink_db.hdfs.filePrefix=%{datatype}.master1

collector.sinks.sink_db.hdfs.rollCount=9900000000

collector.sinks.sink_db.hdfs.batchSize=1000

collector.sinks.sink_db.hdfs.fileType=DataStream

collector.sinks.sink_db.channel=channel_db

 

#*****************channel section**********************

#pad channel section

collector.channels.channel_pad.type = memory

collector.channels.channel_pad.capacity = 60000

collector.channels.channel_pad.keep-alive=60

collector.channels.channel_pad.transactionCapacity=10000

#adx channel section

collector.channels.channel_adx.type = memory

collector.channels.channel_adx.capacity = 60000

collector.channels.channel_adx.keep-alive=60

collector.channels.channel_adx.transactionCapacity=6000

#db channel section

collector.channels.channel_db.type = memory

collector.channels.channel_db.capacity = 30000

collector.channels.channel_db.keep-alive=60

collector.channels.channel_db.transactionCapacity=3000

```

 

数据流图:

 

![](source/collector数据流图.png)

 

 

 

## 4.Interceptor

 

​在event前置拦截器,可以根据数据添加一些header,例如type、partition、date,用于数据归类和分区,还可以做一些简单过滤工作(flume自带时间戳解析过慢,不建议使用timestamp及相关表达式)

 

​继承Flume中的Interceptor,新增子类Builder继承Interceptor.Builder,然后实现相关方法

 

```java

public class DmpTimestampInterceptor implements Interceptor{

  @Override

public void close() {

}

@Override

public void initialize() {

}

@Override

public Event intercept(Event e) {

      return e;

}

@Override

public List<Event> intercept(List<Event> events) {

       return events;  

}

public static class Builder implements Interceptor.Builder {  

        @Override  

        public Interceptor build() {  

            return new DmpTimestampInterceptor();  

        }  

@Override

public void configure(Context arg0) {

}  

    }  

}

```

 

​配置interceptor的type为该子类Builder即可

 

 

 

## 5.其他配置

 

​1.no-reload-conf设置是否自动加载配置文件。

 

​2.sinkgroups.g1.processor.type = failover(load_balance)设置失效切换和负载均衡机制。

 

​3.flume组件启动顺序:channels——>sinks——>sources,关闭顺序:sources——>sinks——>channels.

 

​4.sink 可以配置压缩选项。

 

​5.hdfs.idleTimeout用于设置文件长期未操作则释放。

 

​6.spool-source:只能放静态文件,否则读取到正在编辑的文件则异常终止。

  • 大小: 15.4 KB
  • 大小: 48.1 KB
  • 大小: 16.3 KB
  • 大小: 6.8 KB
分享到:
评论

相关推荐

    flume配置文件demo

    本示例中的 "flume配置文件demo" 提供了一些基础的 Flume 配置文件,用于展示其基本工作原理和配置方法。 Flume 的配置文件通常以 `.conf` 结尾,如 `flume-conf.conf`。这些配置文件使用基于 Java 属性的格式,...

    Flume学习文档(2){Flume安装部署、Flume配置文件}.docx

    配置Flume涉及到编辑Flume配置文件,这是一个基于Java的Properties格式的文本文件。在案例一中,我们看到一个简单的配置示例,用于监听特定端口(如44444)并把接收到的数据打印到控制台。以下是对配置文件中各个...

    flume配置文件案例

    flume配置文件,文件配了说明,可以拿下来改一改就用。 可以获取端口数据监听或者文件、文件夹内容监听,实时写入hdfs、mysql或者你需要的路径。

    Flume配置双HA hdfsSink.docx

    ### Flume配置双HA HDFS Sink详解 #### 一、背景与需求分析 Apache Flume 是一款高可靠、高性能的服务,用于收集、聚合和移动大量日志数据。它具有简单的可扩展架构,易于定制和部署。然而,在某些情况下,用户...

    04、日志收集系统Flume-flume配置案例.docx

    在这个文档中,我们将深入探讨两个 Flume 配置案例,以理解其工作原理。 案例 1:单一节点配置 这个案例展示了最基础的 Flume 设置,只有一个节点(agent),该节点包含一个源(source)、一个接收器(sink)和一个...

    大数据采集技术-Flume配置.pptx

    **大数据采集技术与Flume配置详解** 在大数据领域,数据采集是整个数据分析流程的第一步,它涉及从各种来源收集大量数据并将其传输到处理或存储系统。Apache Flume是Apache Hadoop项目的一个子项目,专门设计用于...

    Flume配置文件kafkaSource

    Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。

    04、日志收集系统Flume-实时计算4-3:flume配置深入.pptx

    在这个主题中,我们将深入探讨 Flume 的配置,包括部署种类、流配置、组件详解以及多代理流程配置。 1. **Flume 部署种类**: - **多代理流程**:在这种部署中,多个 Flume 代理协同工作,形成一个日志收集网络,...

    Flume配置文件kafkaSource Interceptor

    Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息

    FlumeConfig:可视化 Flume 配置编辑器

    版本:0.1.0 Flume 配置完全用 Javascript 编写并且是自包含的。 它允许您直观地布置 Flume 拓扑,输入源、通道和接收器的属性,并为您创建水槽配置文件。 它可以处理多个代理。 目前并非所有的源、接收器、通道都...

    flume 1.6.0配置文件样例

    kafka对接flume,flume对接elasticSearch,flume配置样例

    flume安装程序

    这里的`agentName`是你自定义的Agent名称,`config/file.conf`是你的Flume配置文件的路径。 4. **配置Flume**: 配置文件是Flume的核心,它定义了数据流的结构。一个基本的配置示例可能如下所示: ``` ...

    Flume1.6.0入门:安装、部署、及flume的案例

    配置 Flume 数据源主要涉及 Source、Sink 和 Channel 的配置。这些配置通常通过编写 Flume 配置文件来完成。 **示例配置**: ```properties # 定义 Agent 名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #...

    大数据采集技术-Flume级联配置.pptx

    5. **Flume配置文件编辑**: 使用`vim`编辑配置文件,如`tail-avro-avro-logger.conf`和`avro-hdfs.conf`,根据实际需求设置Source、Channel和Sink的属性。 6. **开发数据生成脚本**: 在Node02上运行一个Shell...

    flume_jars.zip

    在描述中提到,这个Flume配置可能用于将数据发送给Kafka,Kafka作为一个分布式消息队列,可以进一步处理和分发这些数据。 4. **Flume配置**:Flume的配置基于简单的文本文件,其中定义了源、通道和接收器的配置细节...

    Flume采集MySQL数据所需jar包.zip

    这个插件扩展了 Flume 的源功能,使得我们可以配置一个源来定期查询 MySQL 数据库,并将查询结果作为事件传输到 Flume 配置中的下一个通道。1.5.2 版本可能不是最新的,但它是稳定且兼容 Flume 1.x 系列的版本,适合...

    Flume笔记.zip

    - Flume 的配置基于 Java 属性格式,易于理解和维护。 - 配置文件定义了源、通道和接收器的类型以及它们之间的关联。 - 可以创建多个配置文件,通过 Flume Agent 运行不同的数据流。 5. **Flume 高级特性** - *...

Global site tag (gtag.js) - Google Analytics