- 浏览: 127874 次
- 性别:
- 来自: 深圳
文章分类
最新评论
Flume之Failover和Load balancing原理及实例
Failover Sink Processor
Failover Sink Processor维护了一个sink的优先级列表,具有故障转移的功能,具体的配置如下(加粗的必须配置):
sinks | – | 多个sink用空格分开。 |
processor.type |
default
|
组件的名称,必须是:failover
|
processor.priority.<sinkName> | – |
优先级值。<sinkName> 必须是sinks中有定义的。优先级值高Sink会更早被激活。值越大,优先级越高。 注:多个sinks的话,优先级的值不要相同,如果优先级相同的话,只会有一个生效。且failover时,同优先级的不会Failover,就算是同优先级的还存在也会报All sinks failed to process。 |
processor.maxpenalty | 30000 | 失败的Sink最大的退避时间(单位:毫秒)(退避算法(退避算法为我们在解决重试某项任务的时候,提供了一个比较好的等待思想。),参考:http://qiuqiang1985.iteye.com/blog/1513049) |
示例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink Processor
Load balancing sink processor 提供了多个sinks负载均衡的能力。它维护了一个active sinks列表,该列表中的负载必须是分布式的。实现了round_robin(轮询调度) 或者 random(随机) 的选择机制,默认是:round_robin(轮询调度)。也可以通过继承AbstractSinkSelector类来实现自定义的选择机制。
当被调用时,选择器根据配置文件的选择机制挑选下一个sink,并且调用该sink。如果所选的Sink传递Event失败,则通过选择机制挑选下一个可用的Sink,以此类推。
processor.sinks | – | 多个sink用空格分开。 |
processor.type |
default
|
组件的名称,必须是:load_balance
|
processor.backoff | false | 是否以指数的形式退避失败的Sinks。 |
processor.selector |
round_robin
|
选择机制。必须是round_robin ,random 或者自定义的类,该类继承了AbstractSinkSelector
|
processor.selector.maxTimeOut | 30000 | 默认是30000毫秒,屏蔽故障sink的时间 |
示例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Failover和Load balancing实例
测试环境:
10.0.1.76(Client)
10.0.1.68 (Failover和Load balancing)
10.0.1.70
10.0.1.77
10.0.1.85
10.0.1.86
10.0.1.87
以10.0.1.76作为客户端,通过exec获取nginx的日志信息,然后将数据传到10.0.1.68(配置了Failover和Load balancing)的节点,最后10.0.1.68将数据发送的10.0.1.70,77,85,86,87节点,这些节点最终将数据写到本地硬盘。
10.0.1.76的配置:
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -n 0 -F /home/nginx/logs/access.log a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.0.1.68 a1.sinks.k1.port = 41415 a1.channels = c1 a1.sources = r1 a1.sinks = k1获取nginx产生的日志,然后通过avro发送的10.0.1.68
10.0.1.68配置(配置A):
a1.channels = c1 a1.sources = r1 a1.sinks = k70 k77 k85 k86 k87 a1.sinkgroups = g1 g2 g3 a1.sinkgroups.g1.sinks = k70 k85 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g2.sinks = k70 k86 a1.sinkgroups.g2.processor.type = failover a1.sinkgroups.g2.processor.priority.k70 = 20 a1.sinkgroups.g2.processor.priority.k86 = 10 a1.sinkgroups.g2.processor.maxpenalty = 10000 a1.sinkgroups.g3.sinks = k85 k87 k77 a1.sinkgroups.g3.processor.type = failover a1.sinkgroups.g3.processor.priority.k85 = 20 a1.sinkgroups.g3.processor.priority.k87 = 10 a1.sinkgroups.g3.processor.priority.k77 = 5 a1.sinkgroups.g3.processor.maxpenalty = 10000 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41415 a1.sinks.k87.channel = c1 a1.sinks.k87.type = avro a1.sinks.k87.hostname = 10.0.1.87 a1.sinks.k87.port = 41414 a1.sinks.k86.channel = c1 a1.sinks.k86.type = AVRO a1.sinks.k86.hostname = 10.0.1.86 a1.sinks.k86.port = 41414 a1.sinks.k85.channel = c1 a1.sinks.k85.type = AVRO a1.sinks.k85.hostname = 10.0.1.85 a1.sinks.k85.port = 41414 a1.sinks.k77.channel = c1 a1.sinks.k77.type = AVRO a1.sinks.k77.hostname = 10.0.1.77 a1.sinks.k77.port = 41414 a1.sinks.k70.channel = c1 a1.sinks.k70.type = AVRO a1.sinks.k70.hostname = 10.0.1.70 a1.sinks.k70.port = 4141410.0.1.70和10.0.1.85Load balancing,均衡的方式为轮询调用。10.0.1.70和10.0.1.86为Failover,10.0.1.70和10.0.1.87为Failover
10.0.1.70,77,85,86,87配置:
a1.channels = c1 a1.sources = r1 a1.sinks = k1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sources.r1.type = AVRO a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /data/load/ a1.sinks.k1.sink.rollInterval = 0通过Avro获取到Event,存放到文件中。
每次往nginx发2w个请求,然后查看10.0.1.70,77,85,86,87四台服务器接受数据的情况。我们做几组测试:
注:表格中的 * 表示关闭关闭该服务器Flume进程。
测试一:
发送2w个请求到Nginx中,查看各个节点接受数据的行数:
服务器
|
10.0.1.70
|
10.0.1.77
|
10.0.1.85
|
10.0.1.86
|
10.0.1.87
|
总计
|
数据行数
|
3400
|
0
|
3459
|
6778
|
6363
|
20000
|
其实无论测试2w次请求,还是测试100w次请求,10.0.1.77都无法接受到数据。
测试二:
服务器
|
10.0.1.70
|
10.0.1.77
|
10.0.1.85
|
10.0.1.86
|
10.0.1.87(*)
|
总计
|
数据行数
|
6619
|
6300
|
6840
|
13878
|
6363
|
40000
|
问题1: 作为Failover的节点86,87为何可以接受数据,而77没有将接收数据呢?
作为failover,我们会认为只有一个节点生效,其他节点只有在优先级节点down掉才能替补上去,在Flume中关于failover的实现,首先我们要了解Flume加载配置文件是有顺序的。如果配置文件的顺序不同,会导致failover出乎我们的意料,现在我们把上面的(配置A)关于failover和load_balance修改成如下(部分代码):
...... a1.sinkgroups = g2 g1 g3 a1.sinkgroups.g3.sinks = k70 k85 a1.sinkgroups.g3.processor.type = load_balance a1.sinkgroups.g3.processor.selector = round_robin a1.sinkgroups.g3.processor.backoff = true a1.sinkgroups.g2.sinks = k70 k86 a1.sinkgroups.g2.processor.type = failover a1.sinkgroups.g2.processor.priority.k70 = 20 a1.sinkgroups.g2.processor.priority.k86 = 10 a1.sinkgroups.g2.processor.maxpenalty = 10000 a1.sinkgroups.g1.sinks = k85 k87 k77 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k85 = 20 a1.sinkgroups.g1.processor.priority.k87 = 10 a1.sinkgroups.g1.processor.priority.k77 = 5 a1.sinkgroups.g1.processor.maxpenalty = 10000 ......如果修改成如下的配置,启动时报如下错误:
WARN 2015-10-22 14:22:01 [org.apache.flume.conf.FlumeConfiguration] - Could not configure sink group g3 due to: No available sinks for sinkgroup: g3. Sinkgroup will be removed org.apache.flume.conf.ConfigurationException: No available sinks for sinkgroup: g3. Sinkgroup will be removed at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:754) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:348) at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$0(FlumeConfiguration.java:313) at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127) at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109) 。。。。。。报异常的原因,我们可以查看源码,找到答案,FlumeConfiguration类的isValid()方法:
sourceSet = validateSources(channelSet); sinkSet = validateSinks(channelSet); sinkgroupSet = validateGroups(sinkSet);上述是主要的源码片段,可以Debug进去看看,大致的流程:以validateGroups为例,Flume根据sinkgroups顺序的解析配置文件,然后把sink放到变量名为usedSinks的Map当中,每个sink只能使用一次,如果sink在前面某个sinkgroups已经使用,那么就会在该sinkgroups中删除这个sink。按上面的配置,Flume开始解析sinkgroups的g1,则g1包含k85,k87和k77三个有效sink;然后解析sinkgroups的g2,则g2包含k70和k86;解析sinkgroups的g3时,因为k70和k85已经在g1和g2存在了,所以g3包含的sink为空,才导致报如上的错误。也就是说Flume是根据usedSinks来实现failover和load_balance的,因为配置的原因,可能会跟你想象的效果相差甚远。
在AbstractConfigurationProvider类的getConfiguration方法,代码片段:
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration();//加载和验证配置文件的入口 AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap);//初始化channel loadSources(agentConf, channelComponentMap, sourceRunnerMap);//初始化source loadSinks(agentConf, channelComponentMap, sinkRunnerMap);//初始化sink ......
验证完之后,加载Channels,Sources,Sinks,根据验证的结果g1,g2,g3的usedSinks分配如下(配置A):
g1 的usedSinks是:k70和k85
g2 的usedSinks是:k86
g3 的usedSinks是:k87,k77
以loadSinks为例,加载Sink,先调用AbstractConfigurationProvider类的loadSinks方法,然后调用loadSinkGroups方法来初始化Sink,g1的usedSinks有k70和k85,所以k70和k85这两个节点通过round_robin方式balance来接收数据;g2的usedSinks只有k86(由于k70已经在g1中被占用了),所以只有k86接收数据,自然也不会有failover的功能;g3的usedSinks有k87和k77,由于Failover会选取优先级最高的接收数据,所以k87接收数据,当k87挂掉的时候,k77替补上去接收数据。这也就是为何其他节点都可以接收数据,唯独只有k77没有数据的原因。
再者每个sinkgroups都会启动一个SinkRunner线程去调用FailoverSinkProcessor和LoadBalancingSinkProcessor的process()方法去获取数据,这也就是为啥Failover和balance都能接收数据的原因,具体的实现细节,可以自行阅读源码。
2,Failover的情况下,是否优先级越高的就先生效?
是的,同一个Failover下的sink都存放在TreeMap下,然后取最大优先级的Sink作为activeSink。
3,Failover的情况下,如果优先级相同是怎么做失败转移的?
优先级相同的sink节点在failover中只会有一个生效,看源码可以很容易的发现,因为Failover中live的Sink存放在TreeMap中,用优先级作为key,同等优先级的Sink只能保存一个。
@Override public void configure(Context context) { liveSinks = new TreeMap<Integer, Sink>(); //存活的Sink放在TreeMap中,且用priority作为Key failedSinks = new PriorityQueue<FailedSink>(); Integer nextPrio = 0; String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX); if(maxPenaltyStr == null) { maxPenalty = DEFAULT_MAX_PENALTY; } else { try { maxPenalty = Integer.parseInt(maxPenaltyStr); } catch (NumberFormatException e) { logger.warn("{} is not a valid value for {}", new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); maxPenalty = DEFAULT_MAX_PENALTY; } } for (Entry<String, Sink> entry : sinks.entrySet()) { String priStr = PRIORITY_PREFIX + entry.getKey(); Integer priority; try { priority = Integer.parseInt(context.getString(priStr)); } catch (Exception e) { priority = --nextPrio; } if(!liveSinks.containsKey(priority)) { liveSinks.put(priority, sinks.get(entry.getKey()));//priority作为Key } else { logger.warn("Sink {} not added to FailverSinkProcessor as priority" + "duplicates that of sink {}", entry.getKey(), liveSinks.get(priority)); } } activeSink = liveSinks.get(liveSinks.lastKey());//获取优先级最高的节点作为active节点
总结
1,load_balance配置中的Sink都可以接收数据。
2,load_balance根据均衡策略接收数据。
3,没有Sink既能failover又能load_balance。
4,failover中的Sink优先级不要设置为相同的值。
5,failover配置中的Sink只有优先级最高及没有被之前加载的sinkgroups占用的Sink接收数据,如果优先级高的Sink挂掉,则转到优先级次之的Sink。
6,failover可以做失败转移,如果因为加载顺序的问题,导致failover的Sink已经被占用,failover会造成配置在failover中的sink都能接收数据的假象,其实只是在剩余的sink中实施failover策略。
相关推荐
在这个“Flume+Kafka+HBase实例”中,我们将深入探讨如何在电信客服项目中整合这三个工具,以实现高效的数据处理和存储。 Flume是Apache的一款开源工具,专门用于高效、可靠地收集、聚合和移动大量日志数据。在电信...
尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...
根据提供的文档信息,本文将详细解析Flume的基本概念、Flume事件的概念与原理,并进一步探讨其在大数据领域的应用价值。...通过对Flume的基本概念和工作原理的深入理解,可以帮助开发者更好地利用Flume解决实际问题。
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
### Flume 1.6.0 入门详解:安装、部署及案例分析 #### 一、Flume 概述 Flume 是 Cloudera 开发的一款高效、可靠且易于扩展的日志收集系统,适用于大数据环境下的日志采集任务。Flume 的初始版本被称为 FlumeOG...
Logstash的工作原理基于pipeline,数据在pipeline中经历多个阶段,包括输入(input)、过滤(filter)和输出(output)。过滤器部分特别强大,能解析和结构化事件数据,使其易于分析和利用。Logstash广泛用于日志管理和...
NettyAvroRpcClient 和 ThriftRpcClient 都实现了 RpcClient 接口,用户需要知道目标 Flume Agent 的主机名和端口号来创建客户端实例。 为了提高容错性和可用性,Flume 还提供了 Failover Client。此客户端能够在与...
2. **核心组件功能:** 深入理解Flume中的Source、Channel和Sink三个核心组件的工作原理和配置方式。 3. **集群环境搭建:** 根据官方文档的推荐架构进行Flume集群的搭建,注意使用合适的版本,并正确配置各个组件。...
Configure failover paths and load-balancing to remove single points of failure Utilize Gzip Compression for files written to HDFS ☆ 出版信息:☆ [作者信息] Steve Hoffman [出版机构] Packt ...
### Flume解析和应用 #### 一、Flume概述 Flume是由Cloudera开发的一款分布式、可靠且可用的日志采集系统。它被设计用来高效地处理大量数据流,能够从多个源头收集数据并将其传输至不同的存储系统中。Flume支持...
参照下图可以看得出Agent就是Flume的一个部署实例, 一个完整的Agent中包含了必须的三个组件Source、Channel和Sink,Source是指数据的来源和方式,Channel是一个数据的缓冲池,Sink定义了数据输出的方式和目的地(这...
1. **Load Balancing**: Flume 支持负载均衡,可以通过配置多个Sinks并使用Sink Group实现。 2. **Error Handling**: 当数据传输出现问题时,Flume 提供了重试和备份策略,确保数据完整性。 3. **Interceptors**: ...
标题中的"flume和logstash.zip"是一个包含两个著名数据采集工具——Apache Flume和Logstash的压缩包。这两个工具都是大数据生态系统中的重要组件,主要用于日志管理和数据收集。 Apache Flume是一个分布式、可靠且...
Flume 是 Apache 开源项目中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。它设计的主要目标是简化大数据收集流程,确保数据的高可用性和高可靠性。在日志管理系统中,Flume 提供了多种数据源...
**大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...
大数据技术之Flume Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。它基于流式架构,灵活简单。 Flume定义 Flume是一个高可用的,高可靠的,分布式的海量日志采集、...
Flume还支持通过SinkProcessor将多个Sink编组,实现负载均衡(Loadbalance)和故障转移(Failover)。默认的SinkProcessor实现包括Default、Failover(基于优先级)和Loadbalance(轮循或随机,带有退避算法)。 ...
Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...
Flume 是 Apache 开源项目提供的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。它设计用于高可用性和高可靠性,使得在大数据环境中处理流式数据变得简单。在描述中提到的问题是关于 Flume 不...