`
liyonghui160com
  • 浏览: 775629 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

flume-ng avro方式传输数据配置 flume-ng多节点实例

阅读更多

 


tail-to-avro

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure spooldir source1
#agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1
#agent1.sources.source1.fileHeader = true

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -n +0 -F /tmp/log.log
agent1.sources.source1.channels = channel1

# Describe/configure nc source1
#agent1.sources.source1.type = netcat
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe logger  sink1
#agent1.sinks.sink1.type = logger

# Describe avro  sink1
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.hostname = 172.16.10.175
agent1.sinks.sink1.port = 4545

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

 


avro-to-rollfile

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure avro source

agent1.sources.source1.type = avro
agent1.sources.source1.bind =172.16.10.175
agent1.sources.source1.port = 4545


# Describe logger sink1
#agent1.sinks.sink1.type = logger

# Describe file sink1
agent1.sinks.sink1.type = file_roll
agent1.sinks.sink1.sink.directory = /var/log/flume

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

 

启动:

 

./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.0-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.0-bin/conf/flume-single.properties -n agent1 -Dflume.root.logger=INFO,console

 

 

 

 

亲自操作如下:

 

 

 source配置(接收):

 

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'a
#agent section  
producer.sources = s 
producer.channels = c
producer.sinks = r

#producer.sources.s.type = seq
producer.sources.s.channels = c
#producer.sources.s.type = exec
#producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log
producer.sources.s.deletePolicy=never
producer.sources.s.type = avro
producer.sources.s.bind = localhost
producer.sources.s.port = 4545
# Each sink's type must be defined(给谁了)
#producer.sinks.r.type = avro
#producer.sinks.r.hostname = 10.1.1.100
#producer.sinks.r.port = 20000
producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink
producer.sinks.r.zk.connect = 127.0.0.1:2181
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000000
producer.channels.c.transactionCapacity = 1000000

#producer.channels.c.type=file
#producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent
#producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen

 

sink配置(发送):

 

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'a
#agent section  
producer.sources = s 
producer.channels = c
producer.sinks = r

#producer.sources.s.type = seq
producer.sources.s.channels = c
producer.sources.s.type = exec
producer.sources.s.command=tail -n +0 -F /usr/local/nginx/nginxlog/access.log
producer.sources.s.deletePolicy=never
#producer.sources.s.type = avro
#producer.sources.s.bind = localhost
#producer.sources.s.port = 10000
# Each sink's type must be defined(给谁了)
producer.sinks.r.type = avro
producer.sinks.r.hostname = localhost
producer.sinks.r.port = 4545
#producer.sinks.r.type = org.xx.clickstream.sink.kafka.KafkaSink
#producer.sinks.r.zk.connect = 127.0.0.1:2181
#producer.sinks.r.metadata.broker.list=127.0.0.1:9092
#producer.sinks.r.partitioner.class=org.xx.clickstream.partition.TypePartitioner
#producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
#producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000000
producer.channels.c.transactionCapacity = 1000000

#producer.channels.c.type=file
#producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent
#producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen

 

启动顺序,先启动source接收,再启动sink

 

#先启动接收source,准备好接收
#./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosource.properties -n producer -Dflume.root.logger=INFO,console

#再启动发送sink,发送
#./flume-ng agent -c /usr/local/flumeng/apache-flume-1.5.2-bin/conf/ -f /usr/local/flumeng/apache-flume-1.5.2-bin/conf/flume-avrosink.properties -n producer -Dflume.root.logger=INFO,console

 

 

分享到:
评论

相关推荐

    flume-ng安装

    Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:Java JDK 安装 在安装 Flume-...

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

    flume-ng-sql-source-release-1.5.2.zip

    Flume-ng-sql-source是Apache Flume的一个扩展插件,主要功能是允许用户从各种数据库中抽取数据并将其传输到其他目的地,如Apache Kafka。在本案例中,我们讨论的是版本1.5.2的发布包,即"flume-ng-sql-source-...

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    Apache Flume 是一个分布式...总的来说,Apache Flume-ng-1.6.0-cdh5.5.0 是一个强大且灵活的数据收集工具,特别适合在 CDH 环境中处理大规模的日志数据,它的易用性和可扩展性使其成为大数据基础设施的重要组成部分。

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其传输到目标系统中,如 HDFS、HBase 等。在本文中,我们将介绍如何在 Windows ...

    flume-ng-1.6.0 cdh5.7.0安装包

    总结来说,Flume-ng-1.6.0-cdh5.7.0是专为CDH 5.7.0设计的数据流管理工具,具备强大的数据采集、传输和处理能力,是构建高效大数据管道的重要组成部分。对于需要管理和分析大量日志或其他类型数据的企业,部署和熟练...

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    flume-ng-sql-source-1.4.3.jar

    Flume-ng-sql-source支持多种数据库类型,包括MySQL、PostgreSQL、Oracle等常见的关系型数据库,允许用户通过配置文件定义SQL查询语句来获取所需的数据。这样,不仅能够实现定期或者实时的数据库数据抽取,还能根据...

    flume-ng-sql-source.jar

    flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    总的来说,`flume-ng-1.6.0-cdh5.10.1.tar.gz`是一个强大的工具,对于需要实时处理和传输大规模日志数据的环境而言,它是不可或缺的一部分。通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加...

    flume-ng-1.6.0-cdh5.14.2.tar.gz

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

    flume-ng-1.5.0-cdh5.3.6.tar.gz

    本文将深入探讨`flume-ng-1.5.0-cdh5.3.6.tar.gz`压缩包中的核心概念、配置方法及实际应用。 **Flume NG简介** Flume NG是Flume的升级版,它引入了新的架构,增强了可扩展性和灵活性。Flume NG的主要任务是收集、...

    flume-ng-1.6.0-cdh5.14.0源码

    在 CDH(Cloudera Distribution Including Hadoop)5.14.0 版本中,Flume-ng(下一代 Flume)1.6.0 版本是官方提供的组件,用于日志管理和大数据分析。这个源码包,`flume-ng-1.6.0-cdh5.14.0-src.tar.gz`,提供了...

    Kafka+Flume-ng搭建

    - 验证数据是否能够正确地从客户端Flume-ng传输到服务器端Flume-ng,并最终保存至HDFS中。 #### 五、总结 通过上述步骤,我们可以成功搭建起一套Kafka+Flume-ng的数据传输系统。这套系统不仅能够高效地收集、聚合...

Global site tag (gtag.js) - Google Analytics