`
usenrong
  • 浏览: 517318 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Flume+Kafka收集Docker容器内分布式日志应用实践

 
阅读更多

 

1 背景和问题

随着云计算、PaaS平台的普及,虚拟化、容器化等技术的应用,例如Docker等技术,越来越多的服务会部署在云端。通常,我们需要需要获取日志,来进行监控、分析、预测、统计等工作,但是云端的服务不是物理的固定资源,日志获取的难度增加了,以往可以SSH登陆的或者FTP获取的,现在可不那么容易获得,但这又是工程师迫切需要的,最典型的场景便是:上线过程中,一切都在GUI化的PaaS平台点点鼠标完成,但是我们需要结合tail -F、grep等命令来观察日志,判断是否上线成功。当然这是一种情况,完善的PaaS平台会为我们完成这个工作,但是还有非常多的ad-hoc的需求,PaaS平台无法满足我们,我们需要日志。本文就给出了在分布式环境下,容器化的服务中的分散日志,如何集中收集的一种方法。
 

2 设计约束和需求描述

做任何设计之前,都需要明确应用场景、功能需求和非功能需求。

2.1 应用场景

分布式环境下可承载百台服务器产生的日志,单条数据日志小于1k,最大不超过50k,日志总大小每天小于500G。

2.2 功能需求

1)集中收集所有服务日志。
2)可区分来源,按服务、模块和天粒度切分。

2.3 非功能需求

1)不侵入服务进程,收集日志功能需独立部署,占用系统资源可控。
2)实时性,低延迟,从产生日志到集中存储延迟小于4s。
3)持久化,保留最近N天。
4)尽量递送日志即可,不要求不丢不重,但比例应该不超过一个阈值(例如万分之一)。
4)可以容忍不严格有序。
5)收集服务属于线下离线功能,可用性要求不高,全年满足3个9即可。
 

3 实现架构

一种方案实现的架构如下图所示:

3.1 Producer层分析

PaaS平台内的服务假设部署在Docker容器内,那么为了满足非功能需求#1,独立另外一个进程负责收集日志,因此不侵入服务框架和进程。采用Flume NG来进行日志的收集,这个开源的组件非常强大,可以看做一种监控、生产增量,并且可以发布、消费的模型,Source就是源,是增量源,Channel是缓冲通道,这里使用内存队列缓冲区,Sink就是槽,是个消费的地方。容器内的Source就是执行tail -F这个命令的去利用linux的标准输出读取增量日志,Sink是一个Kafka的实现,用于推送消息到分布式消息中间件。
 

3.2 Broker层分析

PaaS平台内的多个容器,会存在多个Flume NG的客户端去推送消息到Kafka消息中间件。Kafka是一个吞吐量、性能非常高的消息中间件,采用单个分区按照顺序的写入的方式工作,并且支持按照offset偏移量随机读取的特性,因此非常适合做topic发布订阅模型的实现。这里图中有多个Kafka,是因为支持集群特性,容器内的Flume NG客户端可以连接若干个Kafka的broker发布日志,也可以理解为连接若干个topic下的分区,这样可以实现高吞吐,一来可以在Flume NG内部做打包批量发送来减轻QPS压力,二来可以分散到多个分区写入,同时Kafka还会指定replica备份个数,保证写入某个master后还需要写入N个备份,这里设置为2,没有采用常用的分布式系统的3,是因为尽量保证高并发特性,满足非功能需求中的#4。
 

3.3 Consumer层分析

消费Kafka增量的也是一个Flume NG,可以看出它的强大之处,在于可以接入任意的数据源,都是可插拔的实现,通过少量配置即可。这里使用Kafka Source订阅topic,收集过来的日志同样先入内存缓冲区,之后使用一个File Sink写入文件,为了满足功能需求#2,可区分来源,按服务、模块和天粒度切分,我自己实现了一个Sink,叫做RollingByTypeAndDayFileSink,源代码放到了github上,可以从这个页面下载jar,直接放到flume的lib目录即可。
 

4 实践方法

4.1 容器内配置

Dockerfile

Dockerfile是容器内程序的运行脚本,里面会含有不少docker自带的命令,下面是要典型的Dockerfile,BASE_IMAGE是一个包含了运行程序以及flume bin的镜像,比较重要的就是ENTRYPOINT,主要利用supervisord来保证容器内进程的高可用。
FROM ${BASE_IMAGE}
MAINTAINER ${MAINTAINER}
ENV REFRESH_AT ${REFRESH_AT}
RUN mkdir -p /opt/${MODULE_NAME}
ADD ${PACKAGE_NAME} /opt/${MODULE_NAME}/
COPY service.supervisord.conf /etc/supervisord.conf.d/service.supervisord.conf
COPY supervisor-msoa-wrapper.sh /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/*.sh
EXPOSE
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/supervisord.conf"]
下面是supervisord的配置文件,执行supervisor-msoa-wrapper.sh脚本。
[program:${MODULE_NAME}]
command=/opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
下面是supervisor-msoa-wrapper.sh,这个脚本内的start.sh或者stop.sh就是应用程序的启动和停止脚本,这里的背景是我们的启停的脚本都是在后台运行的,因此不会阻塞当前进程,因此直接退出了,Docker就会认为程序结束,因此应用生命周期也结束,这里使用wait命令来进行一个阻塞,这样就可以保证即使后台运行的进程,我们可以看似是前台跑的。
 
这里加入了flume的运行命令,–conf后面的参数标示会去这个文件夹下面寻找flume-env.sh,里面可以定义JAVA_HOME和JAVA_OPTS。–conf-file指定flume实际的source、channel、sink等的配置。
#! /bin/bash
function shutdown()
{
    date
    echo "Shutting down Service"
    unset SERVICE_PID # Necessary in some cases
    cd /opt/${MODULE_NAME}
    source stop.sh
}
 
## 停止进程
cd /opt/${MODULE_NAME}
echo "Stopping Service"
source stop.sh
 
## 启动进程
echo "Starting Service"
source start.sh
export SERVICE_PID=$!
 
## 启动Flume NG agent,等待4s日志由start.sh生成
sleep 4 
nohup /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf --conf-file /opt/apache-flume-1.6.0-bin/conf/logback-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console &
 
# Allow any signal which would kill a process to stop Service
trap shutdown HUP INT QUIT ABRT KILL ALRM TERM TSTP
 
echo "Waiting for $SERVICE_PID"
wait $SERVICE_PID

Flume配置

source本应该采用exec source,执行tailf -F日志文件即可。但是这里使用了一个自行开发的StaticLinePrefixExecSource,源代码可以在github上找到。之所以采用自定义的,是因为需要将一些固定的信息传递下去,例如服务/模块的名称以及分布式服务所在容器的hostname,便于收集方根据这个标记来区分日志。如果这里你发现为什么不用flume的拦截器interceptor来做这个工作,加入header中一些KV不就OK了吗?这是个小坑,我后续会解释一下。
 
例如原来日志的一行为:
[INFO]  2016-03-18 12:59:31,080 [main]  fountain.runner.CustomConsumerFactoryPostProcessor      (CustomConsumerFactoryPostProcessor.java:91)    -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml
按照如下配置,那么实际传递给Channel的日志为:
service1##$$##m1-ocean-1004.cp  [INFO]  2016-03-18 12:59:31,080 [main]  fountain.runner.CustomConsumerFactoryPostProcessor      (CustomConsumerFactoryPostProcessor.java:91)    -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml
channel使用内存缓冲队列,大小标识可容乃的日志条数(event size),事务可以控制一次性从source以及一次性给sink的批量日志条数,实际内部有个timeout超时,可通过keepAlive参数设置,超时后仍然会推送过去,默认为3s。
 
sink采用Kafka sink,配置broker的list列表以及topic的名称,需要ACK与否,以及一次性批量发送的日志大小,默认5条一个包,如果并发很大可以把这个值扩大,加大吞吐。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = com.baidu.unbiz.flume.sink.StaticLinePrefixExecSource
a1.sources.r1.command = tail -F /opt/MODULE_NAME/log/logback.log
a1.sources.r1.channels = c1
a1.sources.r1.prefix=service1
a1.sources.r1.separator=##$$##
a1.sources.r1.suffix=m1-ocean-1004.cp
 
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = keplerlog
a1.sinks.k1.brokerList = gzns-cm-201508c02n01.gzns:9092,gzns-cm-201508c02n02.gzn
s:9092
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 5
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.2 Broker配置

参考Kafka官方的教程,这里新建一个名称叫做keplerlog的topic,备份数量为2,分区为4。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic keplerlog

制造一些增量信息,例如如下脚本,在终端内可以随便输入一些字符串:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic keplerlog

打开另外一个终端,订阅topic,确认可以看到producer的输入的字符串即可,即表示联通了。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic keplerlog --from-beginning

4.3 集中接收日志配置

Flume配置

首先source采用flume官方提供的KafkaSource,配置好zookeeper的地址,会去找可用的broker list进行日志的订阅接收。channel采用内存缓存队列。sink由于我们的需求是按照服务名称和日期切分日志,而官方提供的默认file roll sink,只能按照时间戳,和时间interval来切分。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = localhost:2181
a1.sources.r1.topic = keplerlog
a1.sources.r1.batchSize = 5
a1.sources.r1.groupId = flume-collector
a1.sources.r1.kafka.consumer.timeout.ms = 800
 
# Describe the sink
a1.sinks.k1.type = com.baidu.unbiz.flume.sink.RollingByTypeAndDayFileSink
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/work/data/kepler-log
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

定制版RollingByTypeAndDayFileSink

源代码见github。RollingByTypeAndDayFileSink使用有两个条件:
1)Event header中必须有timestamp,否则会忽略事件,并且会抛出{@link InputNotSpecifiedException} 
2)Event body如果是按照##$$##分隔的,那么把分隔之前的字符串当做模块名称(module name)来处理;如果没有则默认为default文件名。
 
输出到本地文件,首先要设置一个跟目录,通过sink.directory设置。其次根据条件#2中提取出来的module name作为文件名称前缀,timestamp日志作为文件名称后缀,例如文件名为portal.20150606或者default.20150703。
 
规整完的一个文件目录形式如下,可以看出汇集了众多服务的日志,并且按照服务名称、时间进行了区分:
~/data/kepler-log$ ls
authorization.20160512  
default.20160513  
default.20160505 
portal.20160512       
portal.20160505   
portal.20160514

不得不提的两个坑

坑1

回到前两节提到的自定义了一个StaticLinePrefixExecSource来进行添加一些前缀的工作。由于要区分来源的服务/模块名称,并且按照时间来切分,根据官方flume文档,完全可以采用如下的Source拦截器配置。例如i1表示时间戳,i2表示默认的静态变量KV,key=module,value=portal。
a1.sources.r1.interceptors = i2 i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = module
a1.sources.r1.interceptors.i2.value = portal
但是flume官方默认的KafkaSource(v1.6.0)的实现:
95 while (eventList.size() < batchUpperLimit &&
96               System.currentTimeMillis() < batchEndTime) {
97         iterStatus = hasNext();
98         if (iterStatus) {
99           // get next message
100          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
101          kafkaMessage = messageAndMetadata.message();
102          kafkaKey = messageAndMetadata.key();
103
104          // Add headers to event (topic, timestamp, and key)
105          headers = new HashMap<String, String>();
106          headers.put(KafkaSourceConstants.TIMESTAMP,
107                  String.valueOf(System.currentTimeMillis()));
108          headers.put(KafkaSourceConstants.TOPIC, topic);
109          if (kafkaKey != null) {
110            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
111          }
112          if (log.isDebugEnabled()) {
113            log.debug("Message: {}", new String(kafkaMessage));
114          }
115          event = EventBuilder.withBody(kafkaMessage, headers);
116          eventList.add(event);
117        }
可以看出自己重写了Event header中的KV,丢弃了发送过来的header,因为这个坑的存在因此,tailf -F在event body中在前面指定模块/服务名称,然后RollingByTypeAndDayFileSink会按照分隔符切分。否则下游无法能达到KV。
 

坑2

exec source需要执行tail -F命令来通过标准输出和标准错误一行一行的读取,但是如果把tail -F封装在一个脚本中,脚本中再执行一些管道命令,例如tail -F logback.log | awk ‘{print "portal##$$##"$0}’,那么exec source总是会把最近的输出丢弃掉,导致追加到文件末尾的日志有一些无法总是“姗姗来迟”,除非有新的日志追加,他们才会被“挤”出来。这个问题比较诡异。暂时没有细致研究。以示后人不要采坑。
 

5 结语

从这个分布式服务分散日志的集中收集方法,可以看出利用一些开源组件,可以非常方便的解决我们日常工作中所发现的问题,而这个发现问题和解决问题的能力才是工程师的基本素质要求。对于其不满足需求的,需要具备有钻研精神,知其然还要知其所以然的去做一些ad-hoc工作,才可以更加好的leverage这些组件。
 
另外,日志的收集只是起点,利用宝贵的数据,后面的使用场景和想象空间都会非常大,例如
1)利用Spark streaming在一个时间窗口内计算日志,做流量控制和访问限制。
2)使用awk脚本、scala语言的高级函数做单机的访问统计分析,或者Hadoop、Spark做大数据的统计分析。
3)除了端口存活和语义监控,利用实时计算处理日志,做ERROR、异常等信息的过滤,实现服务真正的健康保障和预警监控。
4)收集的日志可以通过logstash导入Elastic Search,使用ELK方式做日志查询使用。
 
分享到:
评论

相关推荐

    424页16万字广西大数据应用专题开发技术方案.docx

    同时,还提供了 Docker 容器部署前台应用,利用 Jetty 部署 hbase rest server 对外提供服务。 在大数据处理方面,该方案基于 kafka 作消息队列,支持 NSSP 数据实时写入,并通过 kafka Consumer Group 支持数据...

    广西大数据应用专题开发技术方案-标包3.docx

    本文档概述了广西大数据应用专题开发技术方案的第三个标包,主要介绍了大数据设计架构、分布式模块设计、插件化程序开发、多样化数据采集系统、Docker 容器部署、ArcGIS 二次开发等技术方案。 titre:广西大数据...

    elk with docker

    Docker是一个开源的应用容器引擎,可以让开发者打包他们的应用以及应用的依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。 本...

    大型企业级云产品-亿级数据统计分析系统(真实大数据应用,代码企业可直接复用).rar

    具体包含: spring,spark,spark streaming,hive,flume,kafka,hadoop,hbase,mongodb,dubbo,docker,easyui,highcharts等等。 此课程是按照真实企业级开发项目流程进行讲解,通过学习此课程可以体会到真实的...

    统一监控平台方案(日志监控、方法监控、调用链路监控)

    该统一监控平台主要关注于**日志监控**、**方法监控**、**调用链路监控**三大核心领域,并结合先进的大数据技术如Flume、Kafka、Spark和Elasticsearch,以确保高效、准确的数据采集与分析能力。此外,还计划集成自研...

    大数据培训总结.docx

    7. **容器与虚拟化技术**:Docker和KVM在大数据环境中用于快速部署和隔离应用程序,提高资源利用率。 8. **大数据项目实战**:通过实际项目,学习如何应用所学知识解决实际问题,提升数据分析和处理能力。 9. **...

    大数据课程分类.doc

    Flume用于分布式日志收集,Zookeeper提供分布式协调服务,Kafka则是一个高吞吐量的分布式消息系统。这些工具帮助构建稳定可靠的大数据处理架构。 **大数据实时计算阶段:** Mahout是一个机器学习库,Spark提供了...

    大数据课程分类.pdf

    Flume用于分布式日志收集,Zookeeper提供分布式协调服务,Kafka则是消息队列系统,这些工具在大数据架构中扮演着重要角色,确保数据的流动和处理效率。 **大数据实时计算:** Mahout是机器学习库,Spark提供快速、...

    企业级大数据平台实践之路.docx

    8. **容器化与微服务**:Docker和Kubernetes等容器技术使得大数据组件的部署和管理更加灵活,而微服务架构则有助于构建可扩展、高可用的大数据应用。 9. **持续集成与持续交付(CI/CD)**:DevOps实践在大数据领域也...

    大数据课程体系 (2).docx

    Flume是日志收集系统,其配置包括source、sink和selector,用于数据的摄入、传输和处理。 Zookeeper是分布式协调服务,Java API允许开发者构建高可用的分布式应用。RMI(远程方法调用)和Netty框架用于实现高效的...

    大数据课程体系 (3).pdf

    此外,数据迁移工具如Sqoop和Flume,以及分布式日志框架Flume的配置和使用,使得数据的导入导出和日志收集更为高效。 进一步深入,课程涵盖了Zookeeper的开发,包括其API和在高可用性集群中的应用。Netty异步IO通信...

    大数据课程体系.docx

    在大数据课程体系中,主要涵盖了从基础环境搭建到高级应用的多个方面,涉及Linux操作系统、虚拟化技术、分布式计算框架、数据库系统、流处理技术、编程语言和云平台等多个知识点。 首先,课程从Linux环境开始,包括...

    大数据电商项目-BigData.zip

    这些数据通常通过日志收集系统(如Flume或Kafka)实时捕获,并存储在分布式文件系统(如HDFS)中。 2. 数据处理:数据预处理是大数据项目的关键步骤,涉及到数据清洗、转换和集成。Apache Spark 或 MapReduce 是...

    JVM基础及微服务基础知识大数据技术全栈及面试指南.pdf

    - **Docker**:容器化平台,用于自动化部署、扩展和运行应用。 - **Npm**:Node.js的包管理器。 - **Unity3D**:游戏开发引擎。 ### 面试指南 #### 1. 技术面试准备 - **基础知识**:确保对基本概念有深刻的理解,...

    基于大数据时代计算机软件技术的开发与应用分析.zip

    1. 数据采集:通过各种传感器、日志记录、社交媒体等途径获取数据,如Hadoop的Flume和Apache Kafka用于实时数据流处理。 2. 数据存储:分布式存储系统如Hadoop的HDFS和NoSQL数据库(如MongoDB、Cassandra)用于存储...

    案例正文_基于招聘网站的离线统计及实时分析系统1

    - 配合Zookeeper进行集群管理,Kafka用于数据流处理,Flume收集日志,MongoDB和Redis存储非结构化和半结构化数据。 3. **数据处理与分析**: - 数据预处理阶段,清洗、整合爬取的原始数据,去除无效或重复信息,...

    基于Spark的电商用户分析系统-开题报告.pdf

    此外,还需要研究如何利用Flume收集日志信息,通过Kafka将数据实时传输至HDFS,确保数据处理的时效性。在数据存储方面,结合Mysql、Hive、HBase等多类型数据库,满足不同场景下的数据存储需求。 系统架构设计上,...

    大数据课程分类.docx

    - **Flume分布式**:了解数据收集工具Flume,如何处理日志和流数据。 - **Zookeeper**:学习分布式协调服务,用于集群管理和配置同步。 - **Kafka**:理解消息队列Kafka,用于高吞吐量的实时数据流处理。 **大数据...

    千亿级实时数仓-讲义.rar

    数据采集阶段,利用各种数据采集工具(如Flume、Kafka)从各类数据源(如日志、API、传感器)收集数据;数据清洗则通过ETL(Extract-Transform-Load)过程去除无效或不一致的数据;数据存储往往采用分布式存储系统,...

Global site tag (gtag.js) - Google Analytics