`

征服flume之三——使用log4j输出日志到flume

阅读更多
接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。

本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!

1、flume配置文件
agent.sources = so1
agent.channels = c1
agent.sinks = s1

# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30

# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined

agent.sinks.s1.type = logger

#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1


2、测试代码
public class FlumeLogTest {
	private Logger logger = LoggerFactory.getLogger(getClass());
	public static void main(String[] args) throws Exception {
		DOMConfigurator.configureAndWatch("config/log4j.xml");
		new FlumeLogTest().start();
	}

	public void start() {
		while(true){
			logger.debug("flume log test:{}",System.currentTimeMillis());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
}


3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
	<appender name="flume"
		class="org.apache.flume.clients.log4jappender.Log4jAppender">
		<param name="hostname" value="192.168.113.181" />
		<param name="port" value="44444" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" />
		</layout>
	</appender>
	<appender name="async" class="org.apache.log4j.AsyncAppender">
		<param name="Blocking" value="false" />
		<param name="BufferSize" value="500" />
		<appender-ref ref="flume" />
	</appender>

	<appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender">
		<param name="target" value="System.out" />
		<layout class="org.apache.log4j.PatternLayout">
			<param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" />
		</layout>
		<filter class="org.apache.log4j.varia.LevelRangeFilter">
			<param name="LevelMin" value="debug" />
			<param name="LevelMax" value="info" />
			<param name="AcceptOnMatch" value="false" />
		</filter>
	</appender>

	<logger name="org.springframework">
		<level value="ERROR" />
	</logger>
	<logger name="com.cp.flume">
		<level value="debug" />
	</logger>
	<root>
		<priority value="info"></priority>
		<appender-ref ref="async" />
		<appender-ref ref="CONSOLE.OUT" />
	</root>
</log4j:configuration>

4、pom.xml
		<dependency>
			<groupId>org.apache.flume.flume-ng-clients</groupId>
			<artifactId>flume-ng-log4jappender</artifactId>
			<version>1.7.0-SNAPSHOT</version>
		</dependency>


这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)

package org.apache;


import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcClientFactory.ClientType;
import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/** 
* @project:		flume-log4j-test
* @Title:		FailoverLog4jAppender.java
* @Package:		org.apache
  @author: 		chenpeng
* @email: 		46731706@qq.com
* @date:		2016年2月24日下午2:12:16
* @description:
* @version:
*/ 
public class FailoverLog4jAppender extends Log4jAppender {

	private String hosts;
	private String maxAttempts;
	private boolean configured = false;

	public void setHosts(String hostNames) {
		this.hosts = hostNames;
	}

	public void setMaxAttempts(String maxAttempts) {
		this.maxAttempts = maxAttempts;
	}

	@Override
	public synchronized void append(LoggingEvent event) {
		if (!configured) {
			String errorMsg = "Flume Log4jAppender not configured correctly! Cannot"
					+ " send events to Flume.";
			LogLog.error(errorMsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(errorMsg);
		}
		try {
			super.append(event);
		} catch (FlumeException e) {
			e.printStackTrace();
		}
	}

	/**
	 *
	 * @throws FlumeException
	 *             if the FailoverRpcClient cannot be instantiated.
	 */
	@Override
	public void activateOptions() throws FlumeException {
		try {
			final Properties properties = getProperties(hosts, maxAttempts,
					getTimeout());
			rpcClient = RpcClientFactory.getInstance(properties);
			if (layout != null) {
				layout.activateOptions();
			}
			configured = true;
		} catch (Exception e) {
			String errormsg = "RPC client creation failed! " + e.getMessage();
			LogLog.error(errormsg);
			if (getUnsafeMode()) {
				return;
			}
			throw new FlumeException(e);
		}

	}

	/**
	*/
	private Properties getProperties(String hosts, String maxAttempts,
			long timeout) throws FlumeException {

		if (StringUtils.isEmpty(hosts)) {
			throw new FlumeException("hosts must not be null");
		}

		Properties props = new Properties();
		String[] hostsAndPorts = hosts.split("\\s+");
		StringBuilder names = new StringBuilder();
		for (int i = 0; i < hostsAndPorts.length; i++) {
			String hostAndPort = hostsAndPorts[i];
			String name = "h" + i;
			props.setProperty(
					RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,
					hostAndPort);
			names.append(name).append(" ");
		}
		props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
				names.toString());
		props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
				ClientType.DEFAULT_FAILOVER.toString());

		if (StringUtils.isEmpty(maxAttempts)) {
			throw new FlumeException("hosts must not be null");
		}

		props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS,
				maxAttempts);

		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
				String.valueOf(timeout));
		props.setProperty(
				RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
				String.valueOf(timeout));
		return props;
	}

}
分享到:
评论
1 楼 di1984HIT 2017-07-21  
不错,学习了~~

相关推荐

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    三、使用 Log4j 将日志输出到 HDFS 首先,需要启动 Hadoop 服务。然后,需要新建一个 Java 项目,引入 Log4j.jar 包依赖和 Flume 相关依赖。在 pom.xml 文件中添加依赖项: ``` &lt;groupId&gt;log4j &lt;artifactId&gt;log...

    log4j输出日志到flume

    本篇将详细介绍如何利用Log4j将应用日志输出到Flume,以便进行集中管理和分析。 首先,Log4j是一款Java日志框架,它提供了灵活的日志记录功能,允许开发者根据需求定制日志级别、格式和输出目的地。Log4j的配置文件...

    Flume + kafka + log4j构建日志采集系统

    本文将详细探讨如何使用Flume、Kafka和log4j构建一个高效的日志采集系统,帮助你理解这三个组件在日志处理中的角色以及如何协同工作。 首先,让我们了解一下这三个工具的基本概念: 1. **Flume**: Apache Flume 是...

    LogDemox 收集信息通过log4j直接打到flume中

    LogDemox是一个日志收集工具,它利用了Apache的开源组件Log4j来实现日志的生成,并将这些日志信息直接发送到Flume中进行处理和存储。这个过程涉及到了日志管理和大数据流处理两个关键领域。 首先,Log4j是Java应用...

    log4j+flume+kafka+storm

    接下来需要修改项目的Log4j配置文件`log4j.properties`,以便将日志数据发送到Flume。配置示例如下: ```properties log4j.rootLogger=INFO,flume log4j.appender.flume=org.apache.flume.clients.log4jappender....

    flume 简介安装使用案例(将log4j数据写到hdfs中)

    Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。...对于 log4j 数据,Flume 可以实时监控日志文件并将其无缝地导入 HDFS,为后续的大数据分析提供基础。

    flume log4f示例源码

    Flume 的 SpoolDirectorySource 会持续监视 Log4j 输出的日志文件,一旦有新的文件产生,就会读取并将其发送到配置的 HDFS 目录。这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了...

    log4j.jar各个版本

    apache-log4j-1.2.15.jar, apache-log4j-extras-1.0.jar, apache-log4j-extras-1.1.jar, apache-log4j.jar, log4j-1.2-api-2.0.2-javadoc.jar, log4j-1.2-api-2.0.2-sources.jar, log4j-1.2-api-2.0.2.jar, log4j-...

    springboot_log4j2_flume

    Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 文档

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...

    Flume_1.6.0——Spark Streaming_2.1XXX

    该压缩包下commons-lang3-3.3.2.jar,spark-streaming-flume_2.10-1.6.0.jar,scala-compiler-2.10.5.jar用于实现Flume监控文件夹中的内容变化,然后Spark Streaming对数据进行分析。

    flume采集日志信息简报

    该 Appender 需要配置 jar 包, Client 端 Log4j 配置文件中需要配置日志输出的目标主机和端口号。 在配置文件中,可以看到 Log4jAppender 的配置信息。例如,日志需要发送到的端口号为 44444,该端口号需要有 ARVO...

    决对可用,log4j日志集中处理扩展(含server与client)

    Log4j是一款广泛使用的Java日志框架,它提供了灵活的日志记录功能,允许开发者控制日志输出的级别、格式和目的地。本文将详细讨论“决对可用,log4j日志集中处理扩展(含server与client)”这一资源所涉及的知识点,...

    logging-log4j2-log4j-2.15.0-rc2.7z

    7. **log4j-flume-ng**:Flume是Cloudera的数据收集工具,这个组件使Log4j能够与Flume协同工作,实现大规模日志数据的收集和传输。新版本可能提升了数据处理的吞吐量和可靠性。 8. **log4j-cassandra**:对于...

    hadoop集群配置之————flume安装配置(详细版)

    Apache Flume是一个分布式、可靠且可用的系统,主要用于有效地收集、聚合大量日志数据,并将其从不同的源移动到集中式数据存储中。Flume不仅仅适用于日志数据的聚合,由于其数据源的可定制性,它还可以用于传输大量...

    Flume_离线处理_日志收集1

    Log4j是Java世界里广泛使用的日志框架,它的配置文件(如`log4j.properties`或`log4j.xml`)是关键,其中定义了日志输出的级别、目的地以及格式。在提供的配置片段中,可以看到`rootLogger`设置为`info`级别,表示...

    log4j:对log4j修改,重新修改日志写出格式

    Log4j是一款广泛使用的开源日志框架,它提供了灵活的日志配置和多种输出格式,使得开发者可以根据需要定制日志信息。本文将深入探讨如何对Log4j进行修改,以适应Flume-ng的日志格式,并对异常信息进行重构。 首先,...

    Spring Boot使用Log4j2的实例代码

    "Spring Boot使用Log4j2的实例代码" 本文主要介绍了Spring Boot使用Log4j2的实例代码,包括log4j2.xml配置和Maven依赖项配置。下面我们将详细介绍这些知识点。 Log4j2简介 Log4j2是Java中的一种日志记录工具,...

    日志-Apache-log4j2-Java-下载慢

    6. **安装与配置**:将对应的JAR文件添加到项目的类路径中,然后通过配置文件(如log4j2.xml或log4j2.json)设置日志输出的级别、格式、目标等参数。 了解以上知识点后,你可以根据具体的应用场景来有效地使用和...

    基于Flume的分布式日志采集分析系统设计与实现.pdf

    日志采集部分使用Flume来实时采集日志数据,日志处理部分使用Elasticsearch来存储和处理日志数据,日志分析部分使用Kibana来提供可视化的展示结果。 系统实现 该系统的实现主要包括三个步骤:日志采集、日志处理和...

Global site tag (gtag.js) - Google Analytics