flume包括三种sink processor,DefaultSinkProcessor,FailoverSinkProcessor,LoadBalancingSinkProcessor
Default sink processor that only accepts a single sink, passing on process results without any additional handling. Suitable for all sinks that aren't assigned to a group.
public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent { private Sink sink; private LifecycleState lifecycleState; @Override public void start() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.start(); lifecycleState = LifecycleState.START; } @Override public void stop() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.stop(); lifecycleState = LifecycleState.STOP; } @Override public Status process() throws EventDeliveryException { return sink.process(); } @Override public void setSinks(List<Sink> sinks) { Preconditions.checkNotNull(sinks); Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can " + "only handle one sink, " + "try using a policy that supports multiple sinks"); sink = sinks.get(0); } @Override public void configure(ComponentConfiguration conf) { } }
FailoverSinkProcessor用来处理一个sink的group组,当高优先级的sink处理失败后,FailoverSinkProcessor会选择另一个sink来处理.
/** * FailoverSinkProcessor maintains a prioritized list of sinks, * guarranteeing that so long as one is available events will be processed. * * The failover mechanism works by relegating failed sinks to a pool * where they are assigned a cooldown period, increasing with sequential * failures before they are retried. Once a sink succesfully sends an * event it is restored to the live pool. * * FailoverSinkProcessor is in no way thread safe and expects to be run via * SinkRunner Additionally, setSinks must be called before configure, and * additional sinks cannot be added while running * * To configure, set a sink groups processor to "failover" and set priorities * for individual sinks, all priorities must be unique. Furthermore, an * upper limit to failover time can be set(in miliseconds) using maxpenalty * * Ex) * * host1.sinkgroups = group1 * * host1.sinkgroups.group1.sinks = sink1 sink2 * host1.sinkgroups.group1.processor.type = failover * host1.sinkgroups.group1.processor.priority.sink1 = 5 * host1.sinkgroups.group1.processor.priority.sink2 = 10 * host1.sinkgroups.group1.processor.maxpenalty = 10000 * */ public class FailoverSinkProcessor extends AbstractSinkProcessor { private static final int FAILURE_PENALTY = 1000; private static final int DEFAULT_MAX_PENALTY = 30000; private class FailedSink implements Comparable<FailedSink> { private Long refresh; private Integer priority; private Sink sink; private Integer sequentialFailures; public FailedSink(Integer priority, Sink sink, int seqFailures) { this.sink = sink; this.priority = priority; this.sequentialFailures = seqFailures; adjustRefresh(); } @Override public int compareTo(FailedSink arg0) { return refresh.compareTo(arg0.refresh); } public Long getRefresh() { return refresh; } public Sink getSink() { return sink; } public Integer getPriority() { return priority; } public void incFails() { sequentialFailures++; adjustRefresh(); logger.debug("Sink {} failed again, new refresh is at {}, " + "current time {}", new Object[] { sink.getName(), refresh, System.currentTimeMillis()}); } private void adjustRefresh() { refresh = System.currentTimeMillis() + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); } } private static final Logger logger = LoggerFactory .getLogger(FailoverSinkProcessor.class); private static final String PRIORITY_PREFIX = "priority."; private static final String MAX_PENALTY_PREFIX = "maxpenalty"; private Map<String, Sink> sinks; private Sink activeSink; private SortedMap<Integer, Sink> liveSinks; private Queue<FailedSink> failedSinks; private int maxPenalty; @Override public void configure(Context context) { liveSinks = new TreeMap<Integer, Sink>(); failedSinks = new PriorityQueue<FailedSink>(); Integer nextPrio = 0; String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX); if(maxPenaltyStr == null) { maxPenalty = DEFAULT_MAX_PENALTY; } else { try { maxPenalty = Integer.parseInt(maxPenaltyStr); } catch (NumberFormatException e) { logger.warn("{} is not a valid value for {}", new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); maxPenalty = DEFAULT_MAX_PENALTY; } } for (Entry<String, Sink> entry : sinks.entrySet()) { String priStr = PRIORITY_PREFIX + entry.getKey(); Integer priority; try { priority = Integer.parseInt(context.getString(priStr)); } catch (Exception e) { priority = --nextPrio; } if(!liveSinks.containsKey(priority)) { liveSinks.put(priority, sinks.get(entry.getKey())); } else { logger.warn("Sink {} not added to FailverSinkProcessor as priority" + "duplicates that of sink {}", entry.getKey(), liveSinks.get(priority)); } } activeSink = liveSinks.get(liveSinks.lastKey()); } @Override public Status process() throws EventDeliveryException { // Retry any failed sinks that have gone through their "cooldown" period Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { FailedSink cur = failedSinks.poll(); Status s; try { s = cur.getSink().process(); if (s == Status.READY) { liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } } throw new EventDeliveryException("All sinks failed to process, " + "nothing left to failover to"); } }
public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { public static final String CONFIG_SELECTOR = "selector"; public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + "."; public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN"; public static final String SELECTOR_NAME_RANDOM = "RANDOM"; private static final Logger LOGGER = LoggerFactory .getLogger(LoadBalancingSinkProcessor.class); private SinkSelector selector; @Override public void configure(Context context) { Preconditions.checkState(getSinks().size() > 1, "The LoadBalancingSinkProcessor cannot be used for a single sink. " + "Please configure more than one sinks and try again."); String selectorTypeName = context.getString(CONFIG_SELECTOR, SELECTOR_NAME_ROUND_ROBIN); selector = null; if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) { selector = new RoundRobinSinkSelector(); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { selector = new RandomOrderSinkSelector(); } else { try { @SuppressWarnings("unchecked") Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>) Class.forName(selectorTypeName); selector = klass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to instantiate sink selector: " + selectorTypeName, ex); } } selector.setSinks(getSinks()); selector.configure( new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX))); LOGGER.debug("Sink selector: " + selector + " initialized"); } @Override public Status process() throws EventDeliveryException { Status status = null; Iterator<Sink> sinkIterator = selector.createSinkIterator(); while (sinkIterator.hasNext()) { Sink sink = sinkIterator.next(); try { status = sink.process(); break; } catch (Exception ex) { LOGGER.warn("Sink failed to consume event. " + "Attempting next sink if available.", ex); } } if (status == null) { throw new EventDeliveryException("All configured sinks have failed"); } return status; } /** * <p> * An interface that allows the LoadBalancingSinkProcessor to use * a load-balancing strategy such as round-robin, random distribution etc. * Implementations of this class can be plugged into the system via * processor configuration and are used to select a sink on every invocation. * </p> * <p> * An instance of the configured sink selector is create during the processor * configuration, its {@linkplain #setSinks(List)} method is invoked following * which it is configured via a subcontext. Once configured, the lifecycle of * this selector is tied to the lifecycle of the sink processor. * </p> * <p> * At runtime, the processor invokes the {@link #createSinkIterator()} * method for every <tt>process</tt> call to create an iteration order over * the available sinks. The processor then loops through this iteration order * until one of the sinks succeeds in processing the event. If the iterator * is exhausted and none of the sinks succeed, the processor will raise * an <tt>EventDeliveryException</tt>. * </p> */ public interface SinkSelector extends Configurable, LifecycleAware { void setSinks(List<Sink> sinks); Iterator<Sink> createSinkIterator(); } /** * A sink selector that implements the round-robin sink selection policy. * This implementation is not MT safe. */ private static class RoundRobinSinkSelector extends AbstractSinkSelector { private int nextHead = 0; @Override public Iterator<Sink> createSinkIterator() { int size = getSinks().size(); int[] indexOrder = new int[size]; int begin = nextHead++; if (nextHead == size) { nextHead = 0; } for (int i=0; i < size; i++) { indexOrder[i] = (begin + i)%size; } return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); } } /** * A sink selector that implements a random sink selection policy. This * implementation is not thread safe. */ private static class RandomOrderSinkSelector extends AbstractSinkSelector { private Random random = new Random(System.currentTimeMillis()); @Override public Iterator<Sink> createSinkIterator() { int size = getSinks().size(); int[] indexOrder = new int[size]; List<Integer> indexList = new ArrayList<Integer>(); for (int i=0; i<size; i++) { indexList.add(i); } while (indexList.size() != 1) { int pick = random.nextInt(indexList.size()); indexOrder[indexList.size() - 1] = indexList.remove(pick); } indexOrder[0] = indexList.get(0); return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); } } }
LoadBalancingSinkProcessor是用来做load balance的,分为两种selector,RandomOrderSinkSelector和RoundRobinSinkSelector。RoundRobinSinkSelector循环选取一个sink作为最先处理。RandomOrderSinkSelector随机选取一个作为最先处理。
public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { public static final String CONFIG_SELECTOR = "selector"; public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + "."; public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN"; public static final String SELECTOR_NAME_RANDOM = "RANDOM"; private static final Logger LOGGER = LoggerFactory .getLogger(LoadBalancingSinkProcessor.class); private SinkSelector selector; @Override public void configure(Context context) { Preconditions.checkState(getSinks().size() > 1, "The LoadBalancingSinkProcessor cannot be used for a single sink. " + "Please configure more than one sinks and try again."); String selectorTypeName = context.getString(CONFIG_SELECTOR, SELECTOR_NAME_ROUND_ROBIN); selector = null; if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) { selector = new RoundRobinSinkSelector(); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { selector = new RandomOrderSinkSelector(); } else { try { @SuppressWarnings("unchecked") Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>) Class.forName(selectorTypeName); selector = klass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to instantiate sink selector: " + selectorTypeName, ex); } } selector.setSinks(getSinks()); selector.configure( new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX))); LOGGER.debug("Sink selector: " + selector + " initialized"); } @Override public void start() { super.start(); selector.start(); } @Override public void stop() { super.stop(); selector.stop(); } @Override public Status process() throws EventDeliveryException { Status status = null; Iterator<Sink> sinkIterator = selector.createSinkIterator(); while (sinkIterator.hasNext()) { Sink sink = sinkIterator.next(); try { status = sink.process(); break; } catch (Exception ex) { LOGGER.warn("Sink failed to consume event. " + "Attempting next sink if available.", ex); } } if (status == null) { throw new EventDeliveryException("All configured sinks have failed"); } return status; } /** * <p> * An interface that allows the LoadBalancingSinkProcessor to use * a load-balancing strategy such as round-robin, random distribution etc. * Implementations of this class can be plugged into the system via * processor configuration and are used to select a sink on every invocation. * </p> * <p> * An instance of the configured sink selector is create during the processor * configuration, its {@linkplain #setSinks(List)} method is invoked following * which it is configured via a subcontext. Once configured, the lifecycle of * this selector is tied to the lifecycle of the sink processor. * </p> * <p> * At runtime, the processor invokes the {@link #createSinkIterator()} * method for every <tt>process</tt> call to create an iteration order over * the available sinks. The processor then loops through this iteration order * until one of the sinks succeeds in processing the event. If the iterator * is exhausted and none of the sinks succeed, the processor will raise * an <tt>EventDeliveryException</tt>. * </p> */ public interface SinkSelector extends Configurable, LifecycleAware { void setSinks(List<Sink> sinks); Iterator<Sink> createSinkIterator(); } /** * A sink selector that implements the round-robin sink selection policy. * This implementation is not MT safe. */ private static class RoundRobinSinkSelector extends AbstractSinkSelector { private int nextHead = 0; @Override public Iterator<Sink> createSinkIterator() { int size = getSinks().size(); int[] indexOrder = new int[size]; int begin = nextHead++; if (nextHead == size) { nextHead = 0; } for (int i=0; i < size; i++) { indexOrder[i] = (begin + i)%size; } return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); } } /** * A sink selector that implements a random sink selection policy. This * implementation is not thread safe. */ private static class RandomOrderSinkSelector extends AbstractSinkSelector { private Random random = new Random(System.currentTimeMillis()); @Override public Iterator<Sink> createSinkIterator() { int size = getSinks().size(); int[] indexOrder = new int[size]; List<Integer> indexList = new ArrayList<Integer>(); for (int i=0; i<size; i++) { indexList.add(i); } while (indexList.size() != 1) { int pick = random.nextInt(indexList.size()); indexOrder[indexList.size() - 1] = indexList.remove(pick); } indexOrder[0] = indexList.get(0); return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); } } }
相关推荐
总之,Flume-ng-sql-source-1.5.2为Flume增加了强大的SQL数据源能力,使得数据采集范围扩大到了结构化数据领域,这对于构建实时数据处理和分析系统至关重要。通过对源码的深入理解和使用,可以更高效地利用这一工具...
flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...
总结来说,"flume-ng-sql-source-release-1.5.2.zip"提供了一种高效的方式,通过SQL查询从数据库中抽取数据,并将其无缝地集成到Apache Flume的流处理系统中,最终将数据推送到Kafka,以支持实时的大数据分析流程。...
注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
"flume-ng-1.6.0-cdh5.5.0.tar.gz" 是 Apache Flume 的一个特定版本,具体来说是 "Next Generation" (ng) 版本的 1.6.0,与 Cloudera Data Hub (CDH) 5.5.0 发行版兼容。CDH 是一个包含多个开源大数据组件的商业发行...
1. **bin** 目录:包含可执行脚本,如 `flume-ng` 和 `flume-agent`,用于启动、停止和管理 Flume 代理。 2. **conf** 目录:存放配置文件,例如 `flume.conf`,这是默认的配置文件,用户可以在这里定义数据流的结构...
spark-streaming-flume_2.11-2.1.0.jar
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu
总的来说,Flume-ng-sql-source-1.4.3.jar是数据工程师和分析师的得力助手,它将数据库数据的采集与Flume的强大功能相结合,为企业的大数据战略提供了一条有效的数据输入途径。在当前大数据时代,掌握如何使用这一...
该压缩包“flume-ng-1.6.0-cdh5.7.0”是针对Cloudera Data Hub (CDH) 5.7.0 平台的Flume的特定版本,"ng"代表"Next Generation",意味着它是Flume的更新版本,提供了更先进的特性和性能优化。CDH是一个完整的、经过...