`
flychao88
  • 浏览: 753056 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

玩转Flume之核心架构深入解析

 
阅读更多

玩转Flume之核心架构深入解析

字数2224 阅读29 评论0 

前段时间我们分享过玩转Flume+Kafka原来也就那点事儿Flume-NG源码分析-整体结构及配置载入分析这二篇文章,主要介绍了flume的简单使用和配置文件加载的全过程,那么今天我们重点分析flume核心原理,从而了解Source、Channel和Sink的全链路过程。

一、Flume架构分析


F7C59934-2C22-4F45-BE12-FCC9BB2A1708.png


这个图中核心的组件是:
Source,ChannelProcessor,Channel,Sink。他们的关系结构如下:

Source  {
    ChannelProcessor  {
             Channel  ch1
             Channel  ch2
             …
    }
} 
Sink  {
   Channel  ch; 
} 
SinkGroup {
   Channel ch;
   Sink s1;
   Sink s2;
   …
}

二、各组件详细介绍

1、Source组件

Source是数据源的总称,我们往往设定好源后,数据将源源不断的被抓取或者被推送。
常见的数据源有:ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。
所有的数据源统一实现一个接口类如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {

  /**
   * Specifies which channel processor will handle this source's events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor that will handle this source's events.
   */
  public ChannelProcessor getChannelProcessor();

}

Source提供了两种机制: PollableSource(轮询拉取)和EventDrivenSource(事件驱动):


B0F4FCCA-7DAF-4E2B-B1DB-1AC23ACA2128.png


上图展示的Source继承关系类图。
通过类图我们可以看到NetcatSource,ExecSource和HttpSource属于事件驱动模型。KafkaSource,SequenceGeneratorSource和JmsSource属于轮询拉取模型。
Source接口继承了LifecycleAware接口,它的的所有逻辑的实现在接口的start和stop方法中进行。

下图是类关系方法图:


E8953D29-35EC-4A63-AC72-78675BE0A56E.png

Source接口定义的是最终的实现过程,比如通过日志抓取日志,这个抓取的过程和实际操作就是在对应的Source实现中,比如:ExecSource。那么这些Source实现由谁来驱动的呢?现在我们将介绍SourceRunner类。将看一下类继承结构图:


Paste_Image.png


我们看一下PollableSourceRunner和EventDrivenSourceRunner的具体实现:

//PollableSourceRunner:
public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();

    runner = new PollingRunner();

    runner.source = source; //Source实现类就在这里被赋与。
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

//EventDrivenSourceRunner:
@Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

注:其实所有的Source实现类内部都维护着线程,执行source.start()其实就是启动了相应的线程。
刚才我们看代码,代码中一直都在展示channelProcessor这个类,同时最上面架构设计图里面也提到了这个类,那它到底是干什么呢,下面我们就对其分解。

2、Channel组件

Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。
先看代码如下:

ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();

ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);

ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);

source.setChannelProcessor(channelProcessor);

ChannelSelectorFactory.create方法实现如下:

public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null){
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

其中我们看一下ChannelSelectorType这个枚举类,包括了几种类型:

public enum ChannelSelectorType {

  /**
   * Place holder for custom channel selectors not part of this enumeration.
   */
  OTHER(null),

  /**
   * 复用通道选择器
   */
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),

  /**
   *  多路通道选择器
   */
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
}

ChannelSelector的类结构图如下所示:


Paste_Image.png

注:RelicatingChannelSelector和MultiplexingChannelSelector是二个通道选择器,第一个是复用型通道选择器,也就是的默认的方式,会把接收到的消息发送给其他每个channel。第二个是多路通道选择器,这个会根据消息header中的参数进行通道选择。

说完通道选择器,正式来解释Channel是什么,先看一个接口类:

public interface Channel extends LifecycleAware, NamedComponent {  
  public void put(Event event) throws ChannelException;  
  public Event take() throws ChannelException;  
  public Transaction getTransaction();  
}

注:put方法是用来发送消息,take方法是获取消息,transaction是用于事务操作。
类结构图如下:


Paste_Image.png

Paste_Image.png

3、Sink组件

Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Sink接口类内容如下:

public interface Sink extends LifecycleAware, NamedComponent {  
  public void setChannel(Channel channel);  
  public Channel getChannel();  
  public Status process() throws EventDeliveryException;  
  public static enum Status {  
    READY, BACKOFF  
  }  
}

Sink是通过如下代码进行的创建:

Sink sink = sinkFactory.create(comp.getComponentName(),  comp.getType());

DefaultSinkFactory.create方法如下:

public Sink create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of sink: {}, type: {}", name, type);
    Class<? extends Sink> sinkClass = getClass(type);
    try {
      Sink sink = sinkClass.newInstance();
      sink.setName(name);
      return sink;
    } catch (Exception ex) {
      System.out.println(ex);
      throw new FlumeException("Unable to create sink: " + name
          + ", type: " + type + ", class: " + sinkClass.getName(), ex);
    }
  }

注:Sink是通过SinkFactory工厂来创建,提供了DefaultSinkFactory默认工厂,程序会查找org.apache.flume.conf.sink.SinkType这个枚举类找到相应的Sink处理类,比如:org.apache.flume.sink.LoggerSink,如果没找到对应的处理类,直接通过Class.forName(className)进行直接查找实例化实现类。

Sink的类结构图如下:


Paste_Image.png

与ChannelProcessor处理类对应的是SinkProcessor,由SinkProcessorFactory工厂类负责创建,SinkProcessor的类型由一个枚举类提供,看下面代码:

public enum SinkProcessorType {
  /**
   * Place holder for custom sinks not part of this enumeration.
   */
  OTHER(null),

  /**
   * 故障转移 processor
   *
   * @see org.apache.flume.sink.FailoverSinkProcessor
   */
  FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),

  /**
   * 默认processor
   *
   * @see org.apache.flume.sink.DefaultSinkProcessor
   */
  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),

  /**
   * 负载processor
   *
   * @see org.apache.flume.sink.LoadBalancingSinkProcessor
   */
  LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");

  private final String processorClassName;

  private SinkProcessorType(String processorClassName) {
    this.processorClassName = processorClassName;
  }

  public String getSinkProcessorClassName() {
    return processorClassName;
  }
}

SinkProcessor的类结构图如下:


Paste_Image.png


说明:
1、FailoverSinkProcessor是故障转移处理器,当sink从通道拿数据信息时出错进行的相关处理,代码如下:

public Status process() throws EventDeliveryException {
    // 经过了冷却时间,再次发起重试
    Long now = System.currentTimeMillis();
    while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      //从失败队列中获取sink节点
      FailedSink cur = failedSinks.poll(); 
      Status s;
      try {
        //调用相应sink进行处理,比如将channel的数据读取存放到文件中,
        //这个存放文件的动作就在process中进行。
        s = cur.getSink().process();
        if (s  == Status.READY) {
          //如果处理成功,则放到存活队列中
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          //如果处理失败,则继续放到失败队列中
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

    Status ret = null;
    while(activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }

2、LoadBalancingSinkProcessor是负载Sink处理器
首先我们和ChannelProcessor一样,我们也要重点说明一下SinkSelector这个选择器。
先看一下SinkSelector.configure方法的部分代码:

if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
      selector = new RoundRobinSinkSelector(shouldBackOff);
    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
      selector = new RandomOrderSinkSelector(shouldBackOff);
    } else {
      try {
        @SuppressWarnings("unchecked")
        Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
            Class.forName(selectorTypeName);

        selector = klass.newInstance();
      } catch (Exception ex) {
        throw new FlumeException("Unable to instantiate sink selector: "
            + selectorTypeName, ex);
      }
    }

结合上面的代码,再看类结构图如下:


Paste_Image.png


注:RoundRobinSinkSelector是轮询选择器,RandomOrderSinkSelector是随机分配选择器。

最后我们以KafkaSink为例看一下Sink里面的具体实现:

public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = null;
    Event event = null;
    String eventTopic = null;
    String eventKey = null;

    try {
      long processedEvents = 0;

      transaction = channel.getTransaction();
      transaction.begin();

      messageList.clear();
      for (; processedEvents < batchSize; processedEvents += 1) {
        event = channel.take();

        if (event == null) {
          // no events available in channel
          break;
        }

        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }

        eventKey = headers.get(KEY_HDR);

        if (logger.isDebugEnabled()) {
          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
            + new String(eventBody, "UTF-8"));
          logger.debug("event #{}", processedEvents);
        }

        // create a message and add to buffer
        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
        messageList.add(data);

      }

      // publish batch and commit.
      if (processedEvents > 0) {
        long startTime = System.nanoTime();
        producer.send(messageList);
        long endTime = System.nanoTime();
        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
      }

      transaction.commit();

    } catch (Exception ex) {
      String errorMsg = "Failed to publish events";
      logger.error("Failed to publish events", ex);
      result = Status.BACKOFF;
      if (transaction != null) {
        try {
          transaction.rollback();
          counter.incrementRollbackCount();
        } catch (Exception e) {
          logger.error("Transaction rollback failed", e);
          throw Throwables.propagate(e);
        }
      }
      throw new EventDeliveryException(errorMsg, ex);
    } finally {
      if (transaction != null) {
        transaction.close();
      }
    }

    return result;
  }

注:方法从channel中不断的获取数据,然后通过Kafka的producer生产者将消息发送到Kafka里面

分享到:
评论

相关推荐

    06.Flume的体系架构与示例.pptx

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...

    Flume解析和应用

    ### Flume解析和应用 #### 一、Flume概述 Flume是由Cloudera开发的一款分布式、可靠且可用的日志采集系统。它被设计用来高效地处理大量数据流,能够从多个源头收集数据并将其传输至不同的存储系统中。Flume支持...

    尚硅谷大数据技术之Flume

    尚硅谷大数据技术之Flume Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、...

    Flume 基础架构.pptx )

    ### Flume 基础架构知识点详解 #### 一、Flume 整体架构概述 Flume 是一款高效、可靠的服务,用于收集、聚合和移动大量日志数据。它的核心功能是将各种来源的日志数据汇聚到一个集中的存储位置(如HDFS)。Flume ...

    flume解析日志所需jar

    总结来说,"flume解析日志所需jar"涉及到的技术点包括:Flume的基本架构、Kafka Source、日志解析、Channel、HBase和Elasticsearch的Sink配置,以及Flume类路径的管理。理解并掌握这些知识点对于构建高效的数据流...

    Flume最佳实践手册

    总的来说,Flume最佳实践手册提供了关于Flume NG版本的深入介绍,涵盖了架构、核心概念、数据流以及源点组件的详细说明和实践技巧。对于使用Flume进行大数据日志收集和处理的用户而言,它是一份非常有价值的资料。

    Flume集群环境搭建,flume监控

    Flume从1.5.0版本开始,重构了其内部架构,核心组件、配置以及代码架构都进行了重大改动,这个新版本被称为Flume NG(Next Generation),即Flume的新一代版本,用以替代了原来的Flume OG(Original Generation)。...

    大数据Flume架构原理.pdf

    大数据Flume架构原理 Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。目前...

    实时大数据采集框架Flume详解(视频+课件+代码+工具)

    01_Flume的介绍及其架构组成 02_Flume的安装部署 03_Flume的测试运行 04_Flume中配置使用file channel及HDFS sink 05_Flume中配置HDFS文件生成大小及时间分区 06_Flume中配置Spooling Dir的使用 07_Flume中...

    flume自定义功能实现代码

    首先,我们需要理解 Flume 的基本架构。Flume 由三个核心组件组成:Source、Channel 和 Sink。Source 负责接收数据,Channel 作为临时存储,Sink 则负责将数据发送到最终目的地。当需要实现自定义功能时,通常是在...

    尚硅谷大数据技术之Flume笔记1

    【尚硅谷大数据技术之Flume笔记1】 Flume是Cloudera开发的一款专门用于大数据收集、聚合和传输的系统,其设计目标是提供高可用、高可靠性和分布式的服务。Flume基于流处理架构,简化了日志数据的管理和传输过程。 ...

    Flume学习文档

    在学习Flume之前,首先应该了解其核心概念和架构设计。 Flume的初始版本称为FlumeOG,它为日志收集提供了基础支持,但由于代码架构等问题,Cloudera推出了重构后的FlumeNG版本,后者解决了FlumeOG的缺陷,并提升了...

    大数据Ambari之flume集成编译好的源码包

    **大数据Ambari之flume集成编译好的源码包** Apache Ambari 是一个用于管理和监控Hadoop集群的开源工具,它提供了直观的Web界面和RESTful API,使得安装、配置、管理Hadoop生态系统变得更加简单。Flume是Apache的一...

    Flume学习文档(1){Flume基本概念、Flume事件概念与原理}.docx

    根据提供的文档信息,本文将详细解析Flume的基本概念、Flume事件的概念与原理,并进一步探讨其在大数据领域的应用价值。 ### 一、Flume基本概念 #### 1.1 Flume简介 Flume是由Cloudera公司开源的一种分布式、可靠...

    大数据技术之Flume.docx

    大数据技术之Flume Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。它基于流式架构,灵活简单。 Flume定义 Flume是一个高可用的,高可靠的,分布式的海量日志采集、...

    Flume核心思想与解密

    Apache flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,用于有效地收集、聚合和将大量日志数据从许多不同的源移动到一个集中的数据存储(如文本、HDFS、Hbase等)。  其使用不仅仅限于日志...

    Flume笔记.zip

    Flume 的核心概念包括源(Source)、通道(Channel)和接收器(Sink),它们共同构建了一个高效的数据传输管道。 1. **Flume 概述** - Flume 的主要目标是简化大数据的收集过程,尤其适用于日志数据的管理。 - 它...

    flume安装程序

    Flume通过简单而灵活的架构实现了数据的高效传输,允许数据源、数据通道和数据接收器之间的动态配置。 在本安装指南中,我们将深入探讨如何使用提供的`flume-1.6.0-bin.tar`安装包来安装和配置Apache Flume。 1. *...

    flume1.7.0源码

    Flume 的核心架构由三个主要组件构成:Sources、Channels 和 Sinks。Sources 负责从各种数据源(如网络套接字、日志文件、应用程序API等)收集数据;Channels 作为临时存储,保证数据在传输过程中的可靠性;Sinks ...

Global site tag (gtag.js) - Google Analytics