流配置中介绍多路复用流的时候,有说到Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用。这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向。
而关于selector配置前面也讲过:
<Agent>.sources.<Source1>.selector.type= replicating
这个源的选择类型为复制。这个参数不指定一个选择的时候,默认情况下它复制
复用则是麻烦一下,流的事情是被筛选的发生到不同的渠道,需要指定源和扇出通道的规则,感觉与case when 类似。
复用的参数为:
<Agent>.sources.<Source1>.selector.type= multiplexing
一、下面给出复制的测试例子:
这里需要配置1个代理作为源发送与2个代理作为接受复制事件,共3个flume配置
首先是作为源发送的代理配置
- #配置文件:replicate_source_case11.conf
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
- # Describe/configure the source
- a1.sources.r1.type = syslogtcp
- a1.sources.r1.port = 50000
- a1.sources.r1.host = 192.168.233.128
- a1.sources.r1.selector.type = replicating
- a1.sources.r1.channels = c1 c2
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.channel = c1
- a1.sinks.k1.hostname = 192.168.233.129
- a1.sinks.k1.port = 50000
- a1.sinks.k2.type = avro
- a1.sinks.k2.channel = c2
- a1.sinks.k2.hostname = 192.168.233.130
- a1.sinks.k2.port = 50000
- # Use a channel which buffers events inmemory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
这里设置了2个channels与2个sinks,那么我们也要设置2个sinks对应的代理配置:
下面是第一个接受复制事件代理配置
- #配置文件:replicate_sink1_case11.conf
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- a2.sources.r1.type = avro
- a2.sources.r1.channels = c1
- a2.sources.r1.bind = 192.168.233.129
- a2.sources.r1.port = 50000
- # Describe the sink
- a2.sinks.k1.type = logger
- a2.sinks.k1.channel = c1
- # Use a channel which buffers events inmemory
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
下面是第二个接受复制事件代理配置:
- #配置文件:replicate_sink2_case11.conf
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c1
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.channels = c1
- a3.sources.r1.bind = 192.168.233.130
- a3.sources.r1.port = 50000
- # Describe the sink
- a3.sinks.k1.type = logger
- a3.sinks.k1.channel = c1
- # Use a channel which buffers events inmemory
- a3.channels.c1.type = memory
- a3.channels.c1.capacity = 1000
- a3.channels.c1.transactionCapacity = 100
#敲命令
首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
echo "hello looklook5"| nc 192.168.233.128 50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
可以看到他的正常启动,以及接受到源代理发送的数据
#在启动源第二个接事件的代理终端查看console输出
同样可以可以看到他的正常启动,以及接受到源代理发送的数据
Ok,成功
二、下面给出复用的测试例子:
因为复用的流的事件要声明一个头部,然后我们检查头部对应的值,因为我们这边源类用http source
下面是源代理的配置
- #配置文件:multi_source_case12.conf
- a1.sources= r1
- a1.sinks= k1 k2
- a1.channels= c1 c2
- #Describe/configure the source
- a1.sources.r1.type= org.apache.flume.source.http.HTTPSource
- a1.sources.r1.port= 50000
- a1.sources.r1.host= 192.168.233.128
- a1.sources.r1.selector.type= multiplexing
- a1.sources.r1.channels= c1 c2
- a1.sources.r1.selector.header= state
- a1.sources.r1.selector.mapping.CZ= c1
- a1.sources.r1.selector.mapping.US= c2
- a1.sources.r1.selector.default= c1
- #Describe the sink
- a1.sinks.k1.type= avro
- a1.sinks.k1.channel= c1
- a1.sinks.k1.hostname= 192.168.233.129
- a1.sinks.k1.port= 50000
- a1.sinks.k2.type= avro
- a1.sinks.k2.channel= c2
- a1.sinks.k2.hostname= 192.168.233.130
- a1.sinks.k2.port= 50000
- # Usea channel which buffers events in memory
- a1.channels.c1.type= memory
- a1.channels.c1.capacity= 1000
- a1.channels.c1.transactionCapacity= 100
- a1.channels.c2.type= memory
- a1.channels.c2.capacity= 1000
- a1.channels.c2.transactionCapacity= 100
这里设置了2个channels与2个sinks 同时判断头部属性,当CZ的时,事件发送到sinks1,US时发送到sink2,其他的都发送到sink2,因此我们还有配置2个sinks对于的代理。这里的2个接受代理我们沿用之前复制的接受代理。
#敲命令
与之前复制的情况一样,首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。
flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console
在启动源发送的代理
flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console
启动成功后
打开另一个终端输入,往侦听端口送数据
curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000
curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000
#在启动源发送的代理终端查看console输出
可以看到他的正常启动以及发送数据成功
#在启动源第一个接事件的代理终端查看console输出
这里可以清楚的看到,这个接事件代理只收到了2个事件,因为第二个事件因为我们设置复用,将头部信息对于的事件分流的关系,发送到另一个接事件代理去了。
#在启动源第二个接事件的代理终端查看console输出
Ok,第二个接事件代理因为复用分流,果然只获得了第二个事件信息。
相关推荐
Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
最后,提供的官方文档和社区资源对于深入学习 Flume NG 非常有帮助,建议深入研究这些资料以获得更多见解。 - [Apache Flume 用户指南](http://flume.apache.org/FlumeUserGuide.html) - [Apache Flume 开发者指南]...
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,...
包括Arvind Prabhakar、Prasad Mujumdar、E.Sammer(笔者)、Jon Hsieh、Patrick Hunt、Henry Robinson、Will McQueen等,他们通过设计验证、可用性和正确性测试等多种方式,为FlumeNG的开发提供了帮助。文档鼓励...
`Mvn Flume NG SDK` 是一个用于Apache Flume集成开发的重要工具,它基于Maven构建系统,使得在Java环境中开发、管理和部署Flume插件变得更加便捷。Apache Flume是一款高度可配置的数据收集系统,广泛应用于日志聚合...
Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...
《Flume NG 1.6.0 在 CDH 5.5.2 中的应用与解析》 Flume NG,全称为“Next Generation Flume”,是Apache Hadoop项目中用于高效、可靠、分布式地收集、聚合和移动大量日志数据的工具。在CDH(Cloudera Distribution...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
Flume-ng-sql-source是Apache Flume的一个扩展插件,主要功能是允许用户从各种数据库中抽取数据并将其传输到其他目的地,如Apache Kafka。在本案例中,我们讨论的是版本1.5.2的发布包,即"flume-ng-sql-source-...
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
"flume-ng-1.6.0-cdh5.5.0.tar.gz" 是 Apache Flume 的一个特定版本,具体来说是 "Next Generation" (ng) 版本的 1.6.0,与 Cloudera Data Hub (CDH) 5.5.0 发行版兼容。CDH 是一个包含多个开源大数据组件的商业发行...
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...
《Flume NG与Elasticsearch 6.5.4集成详解》 Flume NG,全称为Apache Flume,是一款由Apache软件基金会开发的数据收集系统,主要用于日志聚合、监控和数据传输。它设计的目标是高效、可靠且易于扩展,特别适合...