`

分布式日志收集系统Apache Flume的设计介绍

 
阅读更多

概述

Flume是Cloudera公司的一款高性能、高可能的分布式日志收集系统。现在已经是Apache Top项目。Github地址。同Flume相似的日志收集系统还有Facebook ScribeApache ChuwkaApache Kafka(也是LinkedIn的)。Flume是后起之秀,本文尝试简要分析Flume数据流通过程中提供的组件、可靠性保证来介绍Flume的主要设计,不涉及Flume具体的安装使用,也不涉及代码层面的剖析。写博文来记录这个工具主要是觉得与最近开发的一个流式的数据搬运的工具在设计上有相似之处,想看看有没有可以参考的地方。在博文的基础上,还需要浏览一下源码。
 

数据流通

Flume传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。flume运行的核心是agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带headers信息。Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。通过这些组件,event可以从一个地方流向另一个地方,如下图所示。
Source消费从外部流进的Events,如AvroSource接收外部客户端传来的或是从别的agent流出来的Avro Event。Source可以把event送往一个或多个channel。channel是一个队列,持有event等待sink来消费,一种Channel的实现:FileChannel使用本地文件系统来作为它的存储。Sink的作用是把Event从channel里移除,送往外部数据仓库或给下一站agent的Source,如HDFSEventSink送往HDFS。同个agent下的source和sink是异步的。下面再举几个数据流通的例子,说明不同的使用方式。
 
 
 
 

Source接入

Client端操作消费数据的来源,Flume支持Avro,log4j,syslog和http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以写一个Source,以IPC或RPC的方式接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考官方手册
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 
对于直接读取文件Source,有两种方式: 
  1. ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
  2. SpoolSource:监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。 在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. public class MySource extends AbstractSource implements Configurable, PollableSource {  
  2.   private String myProp;  
  3.   
  4.   @Override  
  5.   public void configure(Context context) {  
  6.     String myProp = context.getString("myProp""defaultValue");  
  7.   
  8.     // Process the myProp value (e.g. validation, convert to another type, ...)  
  9.   
  10.     // Store myProp for later retrieval by process() method  
  11.     this.myProp = myProp;  
  12.   }  
  13.   
  14.   @Override  
  15.   public void start() {  
  16.     // Initialize the connection to the external client  
  17.   }  
  18.   
  19.   @Override  
  20.   public void stop () {  
  21.     // Disconnect from external client and do any additional cleanup  
  22.     // (e.g. releasing resources or nulling-out field values) ..  
  23.   }  
  24.   
  25.   @Override  
  26.   public Status process() throws EventDeliveryException {  
  27.     Status status = null;  
  28.   
  29.     // Start transaction  
  30.     Channel ch = getChannel();  
  31.     Transaction txn = ch.getTransaction();  
  32.     txn.begin();  
  33.     try {  
  34.       // This try clause includes whatever Channel operations you want to do  
  35.   
  36.       // Receive new data  
  37.       Event e = getSomeData();  
  38.   
  39.       // Store the Event into this Source's associated Channel(s)  
  40.       getChannelProcessor().processEvent(e)  
  41.   
  42.       txn.commit();  
  43.       status = Status.READY;  
  44.     } catch (Throwable t) {  
  45.       txn.rollback();  
  46.   
  47.       // Log exception, handle individual exceptions as needed  
  48.   
  49.       status = Status.BACKOFF;  
  50.   
  51.       // re-throw all Errors  
  52.       if (t instanceof Error) {  
  53.         throw (Error)t;  
  54.       }  
  55.     } finally {  
  56.       txn.close();  
  57.     }  
  58.     return status;  
  59.   }}  

Channel

Channel有多种方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。 
 

Sink

Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
更多sink的内容可以参考官方手册
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. public class MySink extends AbstractSink implements Configurable {  
  2.   private String myProp;  
  3.   
  4.   @Override  
  5.   public void configure(Context context) {  
  6.     String myProp = context.getString("myProp""defaultValue");  
  7.   
  8.     // Process the myProp value (e.g. validation)  
  9.   
  10.     // Store myProp for later retrieval by process() method  
  11.     this.myProp = myProp;  
  12.   }  
  13.   
  14.   @Override  
  15.   public void start() {  
  16.     // Initialize the connection to the external repository (e.g. HDFS) that  
  17.     // this Sink will forward Events to ..  
  18.   }  
  19.   
  20.   @Override  
  21.   public void stop () {  
  22.     // Disconnect from the external respository and do any  
  23.     // additional cleanup (e.g. releasing resources or nulling-out  
  24.     // field values) ..  
  25.   }  
  26.   
  27.   @Override  
  28.   public Status process() throws EventDeliveryException {  
  29.     Status status = null;  
  30.   
  31.     // Start transaction  
  32.     Channel ch = getChannel();  
  33.     Transaction txn = ch.getTransaction();  
  34.     txn.begin();  
  35.     try {  
  36.       // This try clause includes whatever Channel operations you want to do  
  37.   
  38.       Event event = ch.take();  
  39.   
  40.       // Send the Event to the external repository.  
  41.       // storeSomeData(e);  
  42.   
  43.       txn.commit();  
  44.       status = Status.READY;  
  45.     } catch (Throwable t) {  
  46.       txn.rollback();  
  47.   
  48.       // Log exception, handle individual exceptions as needed  
  49.   
  50.       status = Status.BACKOFF;  
  51.   
  52.       // re-throw all Errors  
  53.       if (t instanceof Error) {  
  54.         throw (Error)t;  
  55.       }  
  56.     } finally {  
  57.       txn.close();  
  58.     }  
  59.     return status;  
  60.   }}  

可靠性

Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event会被成功存储起来。而Channel的多种实现在可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份文件channel作为备份,而memory channel将event存在内存queue里,速度快,但丢失的话无法恢复。
具体看一下Transaction。Source和Sink封装了Channel提供的对Event的事务存、取接口,下图为一个transaction过程:
一个Channel的实现里会包括一个transaction的实现,每个与channel打交道的source和sink都得带有一个transaction对象。下面的例子中可以看到一个Event的状态和变化会在一次transation中完成。transaction的状态也对应了时序图中的各个状态。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. Channel ch = new MemoryChannel();  
  2. Transaction txn = ch.getTransaction();  
  3. txn.begin();  
  4. try {  
  5.   // This try clause includes whatever Channel operations you want to do  
  6.   
  7.   Event eventToStage = EventBuilder.withBody("Hello Flume!",  
  8.                        Charset.forName("UTF-8"));  
  9.   ch.put(eventToStage);  
  10.   // Event takenEvent = ch.take();  
  11.   // ...  
  12.   txn.commit();  
  13. catch (Throwable t) {  
  14.   txn.rollback();  
  15.   
  16.   // Log exception, handle individual exceptions as needed  
  17.   
  18.   // re-throw all Errors  
  19.   if (t instanceof Error) {  
  20.     throw (Error)t;  
  21.   }  
  22. finally {  
  23.   txn.close();  
  24. }  

(全文完)
分享到:
评论

相关推荐

    基于Flume的分布式日志采集分析系统设计与实现.pdf

    基于Flume的分布式日志采集分析系统设计与实现 Flume是一种分布式日志采集系统,可以实时地采集和处理大量日志数据。该系统基于Flume、Elasticsearch和Kibana等技术手段,能够对海量日志数据进行实时采集、处理和...

    基于Apache Flume的分布式日志收集系统设计与实现.pdf

    #资源达人分享计划#

    [Apache Flume] Apache Flume 分布式日志采集应用 (Hadoop 实现) (英文版)

    [Packt Publishing] Apache Flume 分布式日志采集应用 (Hadoop 实现) (英文版) [Packt Publishing] Apache Flume Distributed Log Collection for Hadoop (E-Book) ☆ 图书概要:☆ Stream data to Hadoop using ...

    java分布式日志系统.zip

    Java分布式日志系统是现代大型网络应用不可或缺的组成部分,它为开发者提供了收集、存储、查询和分析应用程序日志的能力。在分布式环境中,由于系统由多个节点组成,日志的管理和处理变得复杂,因此,分布式日志系统...

    apache-flume-1.8.0

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...

    日志服务器 Apache Flume.7z

    综上所述,Apache Flume 是一种强大的日志处理工具,它简化了大规模分布式系统中的日志收集和分析任务。通过理解并熟练运用Flume,你可以构建高效、可靠且易于维护的日志服务器,为企业的日志管理和业务洞察提供有力...

    apache-flume1.8

    Apache Flume 是一个高度可靠且灵活的数据收集系统,主要用于聚合、传输和存储大规模日志数据。在大数据领域,Flume 是一个不可或缺的组件,尤其在处理实时流数据时。其设计目标是支持高可用性和容错性,确保数据在...

    日志服务器 Apache Flume.tar

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。在大数据领域,日志管理是至关重要的,因为它们提供了关于系统行为、性能和潜在问题的宝贵信息。Flume 提供了一个高效且灵活的...

    apache-flume-1.6.0-bin

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。在标题"apache-flume-1.6.0-bin"中,我们了解到这是Apache Flume的1.6.0版本的二进制发行版,通常包含可执行文件和其他运行时所需...

    基于 Apache Flume 定制的数据采集工具.zip

    Apache Flume 是一个高度可配置、可靠且分布式的数据采集系统,常用于收集、聚合和移动大量日志数据。它设计的目标是将数据流从多个源有效地传输到一个或多个目标,例如 HDFS(Hadoop 分布式文件系统)或任何其他...

    java源码:日志服务器 Apache Flume.tar.gz

    Apache Flume 是一个高度可靠的、分布式的、可用的日志聚合系统,主要被设计用于收集、聚合和移动大量日志数据。在Java编程语言中开发,它是一个关键组件,尤其在大数据处理领域,用于构建高效、容错的日志流水线。 ...

    基于java的日志服务器 Apache Flume.zip

    Apache Flume 是一个高度可定制的数据收集、聚合和传输系统,主要用于处理大量日志数据。它是由Apache软件基金会开发的,被广泛应用于大数据生态系统中,尤其是与Hadoop结合使用时,能够有效地将分布式系统的日志...

    apache-flume-1.9.0-bin.tar.zip

    Apache Flume 是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。它设计用于处理和传输大规模的日志数据,是大数据生态系统中的重要组件,常用于实时流数据处理。Apache Flume 1.9.0 版本是该软件...

    基于Java的日志服务器 Apache Flume.zip

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。它是Apache软件基金会的一个项目,特别设计用于处理和传输大规模流式数据。Flume 使用简单灵活的架构,允许用户自定义数据...

    基于Apache,Flume,的大数据日志收集系统.docx

    本文在Apache Flume和Hadoop等开源框架的基础上,设计并实现了一套大数据日志收集系统。该系统能够针对分布式系统中的日志进行高效采集,并支持在线和离线分析。 #### 关键词 Apache Flume、日志收集、Hadoop、...

    apache-flume-1.9.0-bin.tar.gz

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动大量日志数据。Flume 提供了简单灵活的架构,允许数据在多个数据源和数据接收器之间流动。它被广泛应用于大数据处理环境,尤其适合监控...

    基于Java的实例源码-日志服务器 Apache Flume.zip

    Apache Flume 是一个高度可靠且可扩展的数据收集、聚合和移动系统,主要用于处理大量日志数据。它在Java平台上构建,被广泛应用于大数据生态系统中,尤其是与Hadoop结合使用时。这个压缩包“基于Java的实例源码-日志...

Global site tag (gtag.js) - Google Analytics