接着上篇:
Flume之ChannelSelector源码分析
ChannelSelector主要是根据Event选择将其发送到哪些Channel。ChannelProcessor是通过ChannelSelector获取到Channels后,如何发送Event到Channel。
ChannelProcessor提供了将Source接收到的Events放入到Channels的一些方法,如果写入Channels发生错误,统一抛出ChannelException异常。每个ChannelProcessor实例都有一个ChannelSelector实例(ChannelSelector实例分别维护了可选的Channels列表和必选的Channels列表)和一个拦截链。
一,configure(Context context)加载配置文件,该方法主要是用来加载拦截器:
private void configureInterceptors(Context context) {
List<Interceptor> interceptors = Lists.newLinkedList();
//获取拦截器
String interceptorListStr = context.getString("interceptors", "");
if (interceptorListStr.isEmpty()) {
return;
}
//解析成拦截器名的数组
String[] interceptorNames = interceptorListStr.split("\\s+");
//获取interceptors的Context
Context interceptorContexts =
new Context(context.getSubProperties("interceptors."));
// run through and instantiate all the interceptors specified in the Context
InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
for (String interceptorName : interceptorNames) {
Context interceptorContext = new Context(
interceptorContexts.getSubProperties(interceptorName + "."));
//得到拦截器的类型,Flume支持TIMESTAMP, HOST, STATIC, REGEX_FILTER, REGEX_EXTRACTOR, SEARCH_REPLACE
//定义在org.apache.flume.interceptor.InterceptorType类中。
String type = interceptorContext.getString("type");
if (type == null) {
LOG.error("Type not specified for interceptor " + interceptorName);
throw new FlumeException("Interceptor.Type not specified for " +
interceptorName);
}
try {
//实例化拦截器,并存放到List中
Interceptor.Builder builder = factory.newInstance(type);
builder.configure(interceptorContext);
interceptors.add(builder.build());
} catch (ClassNotFoundException e) {
LOG.error("Builder class not found. Exception follows.", e);
throw new FlumeException("Interceptor.Builder not found.", e);
} catch (InstantiationException e) {
LOG.error("Could not instantiate Builder. Exception follows.", e);
throw new FlumeException("Interceptor.Builder not constructable.", e);
} catch (IllegalAccessException e) {
LOG.error("Unable to access Builder. Exception follows.", e);
throw new FlumeException("Unable to access Interceptor.Builder.", e);
}
}
//将拦截器List设置到拦截链中。
interceptorChain.setInterceptors(interceptors);
}
二,configure方法已经初始化拦截链,接下来org.apache.flume.source.EventDrivenSourceRunner的start方法调用initialize()初始化拦截链并启动Source:
@Override
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
//初始化ChannelProcessor,实际上就是初始化拦截链的所有的拦截器
cp.initialize();
//启动Source
source.start();
lifecycleState = LifecycleState.START;
}
三,提交Event
ChannelProcessor提交Event有两个方法processEvent(提交单个Event)和processEventBatch(批量提交Event),下面以processEventBatch为例:
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
//根据拦截链挨个处理Event
events = interceptorChain.intercept(events);
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
//将Event分类,放入reqChannelQueue 和 optChannelQueue中
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
List<Channel> optChannels = selector.getOptionalChannels(event);
for (Channel ch: optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
}
//提交Event到必选的Channel
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
reqChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
//提交Event到可选的Channel
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch ) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
以上批量批量提交Event的流程:
1,根据配置的拦截器组成的拦截链挨个处理Event
2,根据Event获取必选和可选的Channel,然后分别根据可选和必选的Channel,将Event和Channel存放在以Channel为Key,Event列表为Value的Map中。
3,遍历可选和必选的Map,通过Channel的Transaction将Event commit到Channel中。
看下可选和必选map提交Channel异常处理有什么区别:
(1)必选Map的异常处理
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
//抛出ChannelException异常,Source可以通过该异常保证数据不丢失的
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
(2)可选Map的异常处理
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
//只有在异常是Error才会抛出异常
throw (Error) t;
}
也就是说,只有异常为java.lang.Error类实例,可选和必选Channel才会回滚,不然可选提交到Channel后失败是不会回滚的。
分享到:
相关推荐
通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...
**大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...
尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。
Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 1.6 版本是其一个重要里程碑,...如果你想要在大数据日志处理领域有所建树,对 Flume 源码的深入学习是必不可少的。
# 基于Spark2.x和Flume的实时新闻分析系统 ## 项目简介 本项目是一个基于Spark2.x和Flume的实时新闻分析系统,旨在捕获用户浏览日志信息,实时分析新闻话题的流量,统计新闻话题的曝光量,并找出用户浏览量最高的...
基于Flume的分布式日志采集分析系统设计与实现 Flume是一种分布式日志采集系统,可以实时地采集和处理大量日志数据。该系统基于Flume、Elasticsearch和Kibana等技术手段,能够对海量日志数据进行实时采集、处理和...
flume修改源码读日志到hbase,①日志文件为json数据②修改文件编译打包并替换flumejar中的对应文件
由于Flume的netcatudp为sources,avro为sink时,udp数据发送会报null of map in field headers of org.apache.flume.source.avro.AvroFlumeEvent of array。但是此类解决了这个问题。只需要将此类放在Flume安装的lib...
在电商数仓项目中,Flume 的应用尤其常见,因为电商平台会产生大量的用户行为数据、交易数据、点击流数据等,这些都需要实时或近实时地进行处理和分析。 在"电商数仓项目(八) Flume 系列源码"中,我们可以深入理解 ...
为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...
总之,Flume-ng-sql-source-1.5.2为Flume增加了强大的SQL数据源能力,使得数据采集范围扩大到了结构化数据领域,这对于构建实时数据处理和分析系统至关重要。通过对源码的深入理解和使用,可以更高效地利用这一工具...
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...
Flume是一个由Cloudera公司开发的分布式、可靠且可用的系统,用于有效地收集、...通过以上内容的深入学习和实践,可以掌握Flume集群环境搭建和监控的全方位技能,为处理大规模日志数据的实时传输和分析打下坚实的基础。
使用java nio消费udp消息的flume-ng源码插件Flume-ng ( )。 该插件基于 Apache Flume 1.5.0.1 和 Redis 2.8.17。特征源消费udp消息Netty ( ) 使用用法构建或下载 jar。 使用mvn clean package检出和构建将flumeng-...
经过对Flume FileChannel相关源码的分析,导致FileChannel吞吐率下降的主要原因集中于事务的提交过程——commit
Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...
- Storm将处理Flume收集的数据,并执行相应的分析任务。 通过以上步骤,我们可以构建出一个完整的Flume+kafka+Storm数据流处理系统。这套系统能够高效地处理实时数据流,为业务决策提供强有力的支持。