`
qianshangding
  • 浏览: 128261 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Flume之ChannelProcessor源码分析

 
阅读更多
接着上篇:Flume之ChannelSelector源码分析
ChannelSelector主要是根据Event选择将其发送到哪些Channel。ChannelProcessor是通过ChannelSelector获取到Channels后,如何发送Event到Channel。

ChannelProcessor提供了将Source接收到的Events放入到Channels的一些方法,如果写入Channels发生错误,统一抛出ChannelException异常。每个ChannelProcessor实例都有一个ChannelSelector实例(ChannelSelector实例分别维护了可选的Channels列表和必选的Channels列表)和一个拦截链。


一,configure(Context context)加载配置文件,该方法主要是用来加载拦截器:

  private void configureInterceptors(Context context) {

    List<Interceptor> interceptors = Lists.newLinkedList();
    //获取拦截器
    String interceptorListStr = context.getString("interceptors", "");
    if (interceptorListStr.isEmpty()) {
      return;
    }
    //解析成拦截器名的数组
    String[] interceptorNames = interceptorListStr.split("\\s+");

    //获取interceptors的Context
    Context interceptorContexts =
        new Context(context.getSubProperties("interceptors."));

    // run through and instantiate all the interceptors specified in the Context
    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
    for (String interceptorName : interceptorNames) {
      Context interceptorContext = new Context(
          interceptorContexts.getSubProperties(interceptorName + "."));
      //得到拦截器的类型,Flume支持TIMESTAMP, HOST, STATIC, REGEX_FILTER, REGEX_EXTRACTOR, SEARCH_REPLACE
      //定义在org.apache.flume.interceptor.InterceptorType类中。
      String type = interceptorContext.getString("type");
      if (type == null) {
        LOG.error("Type not specified for interceptor " + interceptorName);
        throw new FlumeException("Interceptor.Type not specified for " +
          interceptorName);
      }
      try {
        //实例化拦截器,并存放到List中
        Interceptor.Builder builder = factory.newInstance(type);
        builder.configure(interceptorContext);
        interceptors.add(builder.build());
      } catch (ClassNotFoundException e) {
        LOG.error("Builder class not found. Exception follows.", e);
        throw new FlumeException("Interceptor.Builder not found.", e);
      } catch (InstantiationException e) {
        LOG.error("Could not instantiate Builder. Exception follows.", e);
        throw new FlumeException("Interceptor.Builder not constructable.", e);
      } catch (IllegalAccessException e) {
        LOG.error("Unable to access Builder. Exception follows.", e);
        throw new FlumeException("Unable to access Interceptor.Builder.", e);
      }
    }
    //将拦截器List设置到拦截链中。
    interceptorChain.setInterceptors(interceptors);
  }
二,configure方法已经初始化拦截链,接下来org.apache.flume.source.EventDrivenSourceRunner的start方法调用initialize()初始化拦截链并启动Source:

  @Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    //初始化ChannelProcessor,实际上就是初始化拦截链的所有的拦截器
    cp.initialize();
    //启动Source
    source.start();
    lifecycleState = LifecycleState.START;
  }

三,提交Event

ChannelProcessor提交Event有两个方法processEvent(提交单个Event)和processEventBatch(批量提交Event),下面以processEventBatch为例:

  public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
    //根据拦截链挨个处理Event
    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    //将Event分类,放入reqChannelQueue 和 optChannelQueue中
    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }
    //提交Event到必选的Channel
    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    //提交Event到可选的Channel
    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch ) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }
以上批量批量提交Event的流程:

1,根据配置的拦截器组成的拦截链挨个处理Event

2,根据Event获取必选和可选的Channel,然后分别根据可选和必选的Channel,将Event和Channel存放在以Channel为Key,Event列表为Value的Map中。

3,遍历可选和必选的Map,通过Channel的Transaction将Event commit到Channel中。

看下可选和必选map提交Channel异常处理有什么区别:

(1)必选Map的异常处理

        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          //抛出ChannelException异常,Source可以通过该异常保证数据不丢失的
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
(2)可选Map的异常处理
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          //只有在异常是Error才会抛出异常
          throw (Error) t;
        }
也就是说,只有异常为java.lang.Error类实例,可选和必选Channel才会回滚,不然可选提交到Channel后失败是不会回滚的。
分享到:
评论

相关推荐

    flume1.7.0源码

    通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...

    大数据Ambari之flume集成编译好的源码包

    **大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...

    尚硅谷大数据技术之Flume

    尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...

    flume log4f示例源码

    Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。

    Apache flume1.6_src

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 1.6 版本是其一个重要里程碑,...如果你想要在大数据日志处理领域有所建树,对 Flume 源码的深入学习是必不可少的。

    (源码)基于Spark2.x和Flume的实时新闻分析系统.zip

    # 基于Spark2.x和Flume的实时新闻分析系统 ## 项目简介 本项目是一个基于Spark2.x和Flume的实时新闻分析系统,旨在捕获用户浏览日志信息,实时分析新闻话题的流量,统计新闻话题的曝光量,并找出用户浏览量最高的...

    基于Flume的分布式日志采集分析系统设计与实现.pdf

    基于Flume的分布式日志采集分析系统设计与实现 Flume是一种分布式日志采集系统,可以实时地采集和处理大量日志数据。该系统基于Flume、Elasticsearch和Kibana等技术手段,能够对海量日志数据进行实时采集、处理和...

    flume修改源码读日志到hbase

    flume修改源码读日志到hbase,①日志文件为json数据②修改文件编译打包并替换flumejar中的对应文件

    Flume的UDP源 com.whitepages.flume.plugins.source.udp.UDPSource

    由于Flume的netcatudp为sources,avro为sink时,udp数据发送会报null of map in field headers of org.apache.flume.source.avro.AvroFlumeEvent of array。但是此类解决了这个问题。只需要将此类放在Flume安装的lib...

    电商数仓项目(八) Flume 系列源码

    在电商数仓项目中,Flume 的应用尤其常见,因为电商平台会产生大量的用户行为数据、交易数据、点击流数据等,这些都需要实时或近实时地进行处理和分析。 在"电商数仓项目(八) Flume 系列源码"中,我们可以深入理解 ...

    flume 1.8所有源代码 编译通过版 附 maven3.5.2 安装包

    为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...

    flume-ng-sql-source-1.5.2

    总之,Flume-ng-sql-source-1.5.2为Flume增加了强大的SQL数据源能力,使得数据采集范围扩大到了结构化数据领域,这对于构建实时数据处理和分析系统至关重要。通过对源码的深入理解和使用,可以更高效地利用这一工具...

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    apache-flume-1.8.0

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...

    Flume集群环境搭建,flume监控

    Flume是一个由Cloudera公司开发的分布式、可靠且可用的系统,用于有效地收集、...通过以上内容的深入学习和实践,可以掌握Flume集群环境搭建和监控的全方位技能,为处理大规模日志数据的实时传输和分析打下坚实的基础。

    flumeng-plugins-udp:使用java nio消费udp消息的flume-ng源码插件

    使用java nio消费udp消息的flume-ng源码插件Flume-ng ( )。 该插件基于 Apache Flume 1.5.0.1 和 Redis 2.8.17。特征源消费udp消息Netty ( ) 使用用法构建或下载 jar。 使用mvn clean package检出和构建将flumeng-...

    flume中的FileChannel的优化

    经过对Flume FileChannel相关源码的分析,导致FileChannel吞吐率下降的主要原因集中于事务的提交过程——commit

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    Flume+kafka+Storm整合

    - Storm将处理Flume收集的数据,并执行相应的分析任务。 通过以上步骤,我们可以构建出一个完整的Flume+kafka+Storm数据流处理系统。这套系统能够高效地处理实时数据流,为业务决策提供强有力的支持。

Global site tag (gtag.js) - Google Analytics