`
bit1129
  • 浏览: 1067931 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark七十五】Spark Streaming整合Flume-NG三之接入log4j

 
阅读更多

先来一段废话:

实际工作中,业务系统的日志基本上是使用Log4j写入到日志文件中的,问题的关键之处在于业务日志的格式混乱,这给对日志文件中的日志进行统计分析带来了极大的困难,或者说,基本上无法进行分析,每个人写日志的习惯不同,导致日志行的格式五花八门,最后只能通过grep来查找特定的关键词缩小范围,但是在集群环境下,每个机器去grep一遍,分析一遍,这个效率如何可想之二,大好光阴都浪费在这上面了。

更合理的做法时,对重要日志进行统计分析,写入关系型数据库或者NoSQL数据库,一方面将重要的日志整合到一起,同时使用这些数据库的查询能力快速的找到相关的日志。

这就涉及到一个日志格式的问题,对于需要进行统计分析的日志,应该使用专门的logger以及appender,这里就是使用FlumeAppender,将日志发送到Flume的输入源,然后经过Channel和Sink进入处理和分析的环节中。另一方面,针对这种的日志,需要根据业务的分析目标,严格定义其结构。

本文分析使用log4j将业务产生的日志通过FlumeAppender写到Flume的日志输入源(source),最后流出到Spark Streaming,交由Spark Streaming

 

 

 

1. log4j配置

 

###日志名称和级别
log4j.rootLogger=INFO,Flume
####未log4j定义的Flume专用Appender类
log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender
###将数据发往localhost的19999端口,此端口由Flume的一个Agent监听,该Agent接收Flume发送过来的数据
log4j.appender.Flume.Hostname= localhost
log4j.appender.Flume.Port=19999
log4j.appender.Flume.UnsafeMode=false
###输出格式
log4j.appender.Flume.layout=org.apache.log4j.PatternLayout
log4j.appender.Flume.layout.ConversionPattern=%d{ABSOLUTE} %-5p [%c] %m%
 

2. 应用程序依赖的jar

这里的应用程序不是指Spark提交的程序,而是指的是使用log4j输出日志的业务系统,因为业务系统使用了Flume的专用Appender,因此需要把这些依赖的jar加到classpath上

avro-1.7.3.jar                 jackson-mapper-asl-1.9.3.jar      slf4j-api-1.6.1.jar
avro-ipc-1.7.3.jar             flume-ng-core-1.5.2.jar           slf4j-log4j12-1.6.1.jar
commons-collections-3.2.1.jar  flume-ng-log4jappender-1.5.2.jar  log4j-1.2.17.jar
commons-lang-2.5.jar           flume-ng-sdk-1.5.2.jar            
commons-logging-1.1.1.jar      jackson-core-asl-1.9.3.jar        netty-3.5.12.Final.jar

3. Flume配置

Flume的配置与Spark Streaming与之前的处理一样,Flume Agent的source监听于19999端口,Spark Streaming的Worker Thread监听于9999端口,Flume Agent的sink往9999端口写入数据(或者直接写到KafkaSink,Spark Streaming从Kafka读取数据),

 

通过上面的配置可以看出来,Flume的各个组件时独立的,可以任意的搭配,使用Flume的Log4j Appender仅仅改变了Flume获取数据源的方式,获取到数据后,之前的操作都是一样的

 

 

问题:

Flume的source使用avro的方式从19999获取数据,而数据是通过Log4j Appender写入到19999端口的,之前是使用avro client的方式将数据写入到19999端口的,Log4jAppender输入的数据格式和avro client输入的数据一样吗?即两种方式写入到19999端口,能否被Flume source所识别。从上面可以看到Flume的Log4jAppender依赖于avro和avro ipc库,因此有理由相信,Flume的Log4jAppender也是采用类似avro-client的方式,以avro方式将数据进行包装后写到19999中的。实验验证也确实如此

 

 

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 19999


a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9999


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

 4. 验证

写一个java程序, 定时的写日志,然后Flume的Log4j Appender将数据发送到19999端口,作为Flume的输入源,Flume通过sink将数据写到9999端口,这正是Spark Streaming监听的端口,Spark Streaming读取到数据后,即可进行分析

 

 4.1 Java代码

 

package com.tom.flume.log4j.Example;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class FlumeLog4j {
	private static Log LOG = LogFactory.getLog(FlumeLog4j.class);

	public static void main(String[] args) {
		int loop = 60;
		int interval = 1000;
		if (args != null && args.length > 0) {
			interval = Integer.parseInt(args[0]);
		}
		if (args != null && args.length > 1) {
			loop = Integer.parseInt(args[1]);
		}
		try {
			int i = 0;
			while (i++ < loop) {
				System.out.println(i);
				LOG.info("This is the log " + i); //Spark Streaming收到这个日志
				Thread.sleep(interval); //暂停interval毫秒
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 4.2 log4j.properties

 

log4j.rootLogger=INFO,Flume
log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.Flume.Hostname= localhost
log4j.appender.Flume.Port=19999
log4j.appender.Flume.UnsafeMode=false
log4j.appender.Flume.layout=org.apache.log4j.PatternLayout
log4j.appender.Flume.layout.ConversionPattern=%d{ABSOLUTE} %-5p [%c] %m%

4.3 程序启动脚本launch.sh

 

java -classpath ".:./*" com.tom.flume.log4j.Example.FlumeLog4j

 将log4j.properties以及前面提到的14个jar以及FlumeLog4j这个类所打成的jar包放到launch.sh的同一个目录下

 

5.运行

5.1 启动Spark Streaming,监听于9999

5.2 启动Flume Agent a1,监听于19999等待数据输入作为数据源

5.3 通过launch.sh启动java程序,想19999端口写入数据

5.4 Flume接收到来自19999端口的写入数据后,通过sink写向9999,Spark Streaming接收到数据,注意Spark Streaming接收的数据格式为

 

02:01:49,255 INFO  [com.tom.flume.log4j.Example.FlumeLog4j] This is the log 34

可见Log4j根据appender的PatternLayout加了一些前缀,需要根据需要决定是否需要这个,需要的话就需要额外的解析工作。

 

 

 

 

 
 
 
分享到:
评论

相关推荐

    集群flume详细安装步骤

    然后,下载相关的 jar 包,例如 `spark-streaming-flume-sink_2.11-2.1.0.jar`、`scala-library-2.11.8.jar` 和 `commons-lang3-3.5.jar`,并将其放到 Flume 的安装目录下。 使用以下命令启动 Flume-Agent: ``` ...

    Spark-Streaming:Spark Streaming实时解析flume和kafka传来的josn数据写入mysql

    Spark-Streaming简单小项目 Spark Streaming实时解析...配置log4j.properties、my.properties 另,还需将您的spark和hadoop安装文件下的core-site.xml、hdfs-site.xml和hive-site.xml拷贝到src\main\resources目录下

    flume+kafka+sparkStream+redis实时日志采集.docx

    在构建实时日志采集系统时,常常会使用到Apache Flume、Apache Kafka、Apache Spark Streaming以及Redis等组件。以下是对这些技术的详细说明: **Apache Flume** 是一个分布式、可靠且可用于有效收集、聚合和移动...

    Spark Streaming解析

    如果程序运行时日志过多,可以修改 `sparkconf` 目录下的 `log4j.properties` 文件,将日志级别设置为 `WARN`。 #### 第3章 架构与抽象 ##### 架构与抽象 Spark Streaming 使用“微批次”架构来处理流式数据。它...

    基于Spark的高校数据分析系统

    介绍 基于Spark的高校数据分析系统 。同时实现了Spark-core(被注释了);Spark-ML,Spark-streaming。...运行环境:centos 6.x、java、kafka、zookeeper、Flume、Hbase、HDFS、YARN、Spark、MySQl。

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    7. **项目实现**:“Real-time-log-analysis-system-master” 文件夹中包含了完整的项目源码,包括Flume配置文件、Spark Streaming程序和HBase的相关设置。通过阅读和理解这些代码,开发者可以学习到如何将这些组件...

    aboutyun_log_anaysis

    本文将围绕“aboutyun_log_analysis”项目,深入探讨如何使用Spark Streaming进行日志分析,并结合Kafka、Flume以及MongoDB进行数据的收集、传输和存储。 首先,"aboutyun_log_analysis"项目的核心是利用Apache ...

    SparkStreaming学习札记4-2020-2-15–SparkStreaming实时流处理项目实战

    12-8 -通过定时调度工具每一分钟产生...2.对接python日志产生器输出的日志到Flume 定义名字为streaming_project.conf 选型:access.log ==&gt;控制台输出  exec  memory  logger streaming_project.conf文件具体配置:

    Log-Analysis:使用Flume + Spark + HDFS + HIVE + PostgreSQL构建日志分析系统

    本项目“Log-Analysis: 使用Flume + Spark + HDFS + HIVE + PostgreSQL构建日志分析系统”旨在搭建一个高效、可扩展的日志处理框架,以实现游戏数据的深度分析。下面将详细介绍这个系统中的关键技术及其应用。 首先...

    spark apache日志分析、流数据处理教程

    1. Spark Apache日志分析教程:这一部分将讲解如何使用Apache Spark对日志文件进行分析,包括使用Spark的基础功能进行日志分析,Spark SQL进行结构化日志查询,以及Spark Streaming处理流数据。 2. 流数据处理:流...

    01大数据项目之Spark实时(数据采集)

    大数据项目之Spark实时(数据采集)知识点 本节内容将围绕大数据项目之Spark实时(数据采集)进行详细的知识点总结。 1. 离线计算 离线计算是指通过批处理的方式计算已知的所有输入数据,输入数据不会产生变化,...

    大数据个人练习项目:一个大数据实时流处理分析系统

    这是一个大数据实时流处理分析系统 Demo,实现对用户日志的实时分析,采用 Flume + kafka + SparkStreaming + Hbase + SSM + Echarts 的架构。 主要内容包括: 编写 python 脚本,模拟源源不断产生网站的用户行为...

    Hadoop经典技术书籍合集(Spark, Kafka, HBase, etc.)

    4. **《Apache Flume Distributed Log Collection for Hadoop, 2nd Edition》**:Flume是Hadoop生态中的日志收集工具,用于高效、可靠地聚合和传输大规模数据。书中将详细阐述Flume的架构、配置、数据源、通道和接收...

    大数据日志分析实战

    在使用IntellijIdea搭建SparkStreaming开发环境中,项目涵盖了从配置开发环境到本地模式和集群模式的使用,如SparkLocal模式之Log文本清洗和根据ip计算地区访问论坛的比率等。 在整个项目中,可以看到如何通过各种...

    kafka全套视频教程

    作为一款高吞吐量的分布式发布订阅消息系统,Kafka能够高效地处理大规模的数据流,并且支持多种数据来源的接入。 - **应用场景**: - **网站活动流数据处理**:例如用户浏览行为、搜索记录等,这些数据对于构建现代...

    大数据面试100题.pdf

    Sparkstreaming以及基本工作原理: Spark Streaming支持实时数据流处理,它将实时数据流切分成一系列小批次,并在Spark上以批处理的方式执行。 spark有哪些组件: Spark组件主要包括Spark Core、Spark SQL、Spark ...

    BD大数据十三期.docx

    然而,在B斗大数据的十三期培训课程中,参与者能够在短短一个月内对大数据的全貌有初步的认识,这得益于其精心设计的课程结构和实用的教学方法。 【大数据核心组件】 1. **Linux**: 作为大数据平台的基础操作系统...

    RealTimeLogAnalyze:一个大数据实时流处理日志分析系统 Demo

    这是一个大数据实时流处理分析系统 Demo,实现对用户日志的实时分析,采用 Flume + kafka + SparkStreaming + Hbase + SSM + Echarts 的架构。 主要内容包括: 编写 python 脚本,模拟源源不断产生网站的用户行为...

    Scala的大数据分析代码

    在Scala中,可以使用Apache Log4j或其他日志框架收集日志,然后使用Spark的DataFrame API进行数据转换和分析,例如,找出登录失败的模式、用户活动的高峰时段等。 "付款情况分析"可能涉及到交易数据的处理,包括...

    Github Account

    推荐系统的整体架构包括前端系统(如Android、iOS、HTML5等)、后端系统(Nginx、Java、Spring等)、日志系统(scribe、flume、log stash、kafka等)、ETL系统(离线如Spark、Map/Reduce,在线如Spark Streaming、...

Global site tag (gtag.js) - Google Analytics