flume自带两种channelSelector。一种是MultiplexingChannelSelector,另一种是ReplicatingChannelSelector。
ReplicatingChannelSelector是将event发送到每个channel
public class ReplicatingChannelSelector extends AbstractChannelSelector { private final List<Channel> emptyList = Collections.emptyList(); @Override public List<Channel> getRequiredChannels(Event event) { return getAllChannels(); } @Override public List<Channel> getOptionalChannels(Event event) { return emptyList; } @Override public void configure(Context context) { // No configuration necessary } }
MultiplexingChannelSelector按照header的配置将event发送到相应的channel
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.channel; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MultiplexingChannelSelector extends AbstractChannelSelector { public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header"; public static final String DEFAULT_MULTIPLEX_HEADER = "flume.selector.header"; public static final String CONFIG_PREFIX_MAPPING = "mapping."; public static final String CONFIG_DEFAULT_CHANNEL = "default"; @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory .getLogger(MultiplexingChannelSelector.class); private static final List<Channel> EMPTY_LIST = Collections.emptyList(); private String headerName; private Map<String, List<Channel>> channelMapping; private List<Channel> defaultChannels; @Override public List<Channel> getRequiredChannels(Event event) { String headerValue = event.getHeaders().get(headerName); if (headerValue == null || headerValue.trim().length() == 0) { return defaultChannels; } List<Channel> channels = channelMapping.get(headerValue); //This header value does not point to anything //Return default channel(s) here. if (channels == null) { channels = defaultChannels; } return channels; } @Override public List<Channel> getOptionalChannels(Event event) { return EMPTY_LIST; } @Override public void configure(Context context) { this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER); Map<String, Channel> channelNameMap = new HashMap<String, Channel>(); for (Channel ch : getAllChannels()) { channelNameMap.put(ch.getName(), ch); } defaultChannels = getChannelListFromNames( context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); if(defaultChannels.isEmpty()){ throw new FlumeException("Default channel list empty"); } Map<String, String> mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING); channelMapping = new HashMap<String, List<Channel>>(); for (String headerValue : mapConfig.keySet()) { List<Channel> configuredChannels = getChannelListFromNames( mapConfig.get(headerValue), channelNameMap); //This should not go to default channel(s) //because this seems to be a bad way to configure. if (configuredChannels.size() == 0) { throw new FlumeException("No channel configured for when " + "header value is: " + headerValue); } if (channelMapping.put(headerValue, configuredChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } //If no mapping is configured, it is ok. //All events will go to the default channel(s). } //Given a list of channel names as space delimited string, //returns list of channels. private List<Channel> getChannelListFromNames(String channels, Map<String, Channel> channelNameMap){ List<Channel> configuredChannels = new ArrayList<Channel>(); String[] chNames = channels.split(" "); for (String name : chNames) { Channel ch = channelNameMap.get(name); if (ch != null) { configuredChannels.add(ch); } else { throw new FlumeException("Selector channel not found: " + name); } } return configuredChannels; } }
相关推荐
总之,Flume-ng-sql-source-1.5.2为Flume增加了强大的SQL数据源能力,使得数据采集范围扩大到了结构化数据领域,这对于构建实时数据处理和分析系统至关重要。通过对源码的深入理解和使用,可以更高效地利用这一工具...
flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...
总结来说,"flume-ng-sql-source-release-1.5.2.zip"提供了一种高效的方式,通过SQL查询从数据库中抽取数据,并将其无缝地集成到Apache Flume的流处理系统中,最终将数据推送到Kafka,以支持实时的大数据分析流程。...
注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/ flume-ng-sql-source实现oracle增量数据读取 有了这个jar文件即可从关系型数据库拉去数据到flume
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
1. **bin** 目录:包含可执行脚本,如 `flume-ng` 和 `flume-agent`,用于启动、停止和管理 Flume 代理。 2. **conf** 目录:存放配置文件,例如 `flume.conf`,这是默认的配置文件,用户可以在这里定义数据流的结构...
"flume-ng-1.6.0-cdh5.5.0.tar.gz" 是 Apache Flume 的一个特定版本,具体来说是 "Next Generation" (ng) 版本的 1.6.0,与 Cloudera Data Hub (CDH) 5.5.0 发行版兼容。CDH 是一个包含多个开源大数据组件的商业发行...
spark-streaming-flume_2.11-2.1.0.jar
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.4.1 flume采集mysql数据到kudu
总的来说,Flume-ng-sql-source-1.4.3.jar是数据工程师和分析师的得力助手,它将数据库数据的采集与Flume的强大功能相结合,为企业的大数据战略提供了一条有效的数据输入途径。在当前大数据时代,掌握如何使用这一...
在标题中的"apache-flume-1.7.0-bin.tar.gz"是一个压缩包,包含了Flume的1.7.0版本的二进制发行版。这个版本提供了执行Flume服务所需的全部组件和依赖,使得开发者和系统管理员可以方便地在他们的环境中安装和运行...