接下来的几篇文章,我们将逐步学习使用各种方式对日志进行采集。
本文讲述的是如何使用log4j直接输出日志到flume。
先上干货,再讲理论!
1、flume配置文件
agent.sources = so1
agent.channels = c1
agent.sinks = s1
# For each one of the sources, the type is defined
agent.sources.so1.type = avro
agent.sources.so1.bind = 0.0.0.0
agent.sources.so1.port = 44444
tier1.channels.channel1.keep-alive=30
# The channel can be defined as follows.
# agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.s1.type = logger
#Specify the channel the sink should use
# agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
agent.sources.so1.channels = c1
agent.sinks.s1.channel = c1
2、测试代码
public class FlumeLogTest {
private Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
DOMConfigurator.configureAndWatch("config/log4j.xml");
new FlumeLogTest().start();
}
public void start() {
while(true){
logger.debug("flume log test:{}",System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3、log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
<log4j:configuration>
<appender name="flume"
class="org.apache.flume.clients.log4jappender.Log4jAppender">
<param name="hostname" value="192.168.113.181" />
<param name="port" value="44444" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%p] %d{dd MMM hh:mm:ss aa} %t [%l] %m%n" />
</layout>
</appender>
<appender name="async" class="org.apache.log4j.AsyncAppender">
<param name="Blocking" value="false" />
<param name="BufferSize" value="500" />
<appender-ref ref="flume" />
</appender>
<appender name="CONSOLE.OUT" class="org.apache.log4j.ConsoleAppender">
<param name="target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d][%p, (%F:%L).%M] %m%n" />
</layout>
<filter class="org.apache.log4j.varia.LevelRangeFilter">
<param name="LevelMin" value="debug" />
<param name="LevelMax" value="info" />
<param name="AcceptOnMatch" value="false" />
</filter>
</appender>
<logger name="org.springframework">
<level value="ERROR" />
</logger>
<logger name="com.cp.flume">
<level value="debug" />
</logger>
<root>
<priority value="info"></priority>
<appender-ref ref="async" />
<appender-ref ref="CONSOLE.OUT" />
</root>
</log4j:configuration>
4、pom.xml
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.7.0-SNAPSHOT</version>
</dependency>
这里要说明的几点:
1、flume的Log4j Appender必须使用Log4j的异步加载器,否则一旦日志服务器挂掉,将会导致应用服务器宕机;需要将异步加载器中的消息队列设置为非阻塞模式Blocking=false,并设置相应的buffersize,否则flume服务器宕机时,会导致应用服务器宕机。
2、当flume服务器宕机时,Log4jAppender类的append方法会抛出FlumeException,需要对该异常进行捕获,否则同样会引起应用服务器的宕机。这部分我是通过继承Log4jAppender并重写append接口实现的,参见下述代码。
3、正常状况下Log4jAppender本身有自动重连机制(已测试)
package org.apache;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcClientFactory.ClientType;
import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
/**
* @project: flume-log4j-test
* @Title: FailoverLog4jAppender.java
* @Package: org.apache
@author: chenpeng
* @email: 46731706@qq.com
* @date: 2016年2月24日下午2:12:16
* @description:
* @version:
*/
public class FailoverLog4jAppender extends Log4jAppender {
private String hosts;
private String maxAttempts;
private boolean configured = false;
public void setHosts(String hostNames) {
this.hosts = hostNames;
}
public void setMaxAttempts(String maxAttempts) {
this.maxAttempts = maxAttempts;
}
@Override
public synchronized void append(LoggingEvent event) {
if (!configured) {
String errorMsg = "Flume Log4jAppender not configured correctly! Cannot"
+ " send events to Flume.";
LogLog.error(errorMsg);
if (getUnsafeMode()) {
return;
}
throw new FlumeException(errorMsg);
}
try {
super.append(event);
} catch (FlumeException e) {
e.printStackTrace();
}
}
/**
*
* @throws FlumeException
* if the FailoverRpcClient cannot be instantiated.
*/
@Override
public void activateOptions() throws FlumeException {
try {
final Properties properties = getProperties(hosts, maxAttempts,
getTimeout());
rpcClient = RpcClientFactory.getInstance(properties);
if (layout != null) {
layout.activateOptions();
}
configured = true;
} catch (Exception e) {
String errormsg = "RPC client creation failed! " + e.getMessage();
LogLog.error(errormsg);
if (getUnsafeMode()) {
return;
}
throw new FlumeException(e);
}
}
/**
*/
private Properties getProperties(String hosts, String maxAttempts,
long timeout) throws FlumeException {
if (StringUtils.isEmpty(hosts)) {
throw new FlumeException("hosts must not be null");
}
Properties props = new Properties();
String[] hostsAndPorts = hosts.split("\\s+");
StringBuilder names = new StringBuilder();
for (int i = 0; i < hostsAndPorts.length; i++) {
String hostAndPort = hostsAndPorts[i];
String name = "h" + i;
props.setProperty(
RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + name,
hostAndPort);
names.append(name).append(" ");
}
props.put(RpcClientConfigurationConstants.CONFIG_HOSTS,
names.toString());
props.put(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
ClientType.DEFAULT_FAILOVER.toString());
if (StringUtils.isEmpty(maxAttempts)) {
throw new FlumeException("hosts must not be null");
}
props.put(RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS,
maxAttempts);
props.setProperty(
RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
String.valueOf(timeout));
props.setProperty(
RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
String.valueOf(timeout));
return props;
}
}
分享到:
相关推荐
三、使用 Log4j 将日志输出到 HDFS 首先,需要启动 Hadoop 服务。然后,需要新建一个 Java 项目,引入 Log4j.jar 包依赖和 Flume 相关依赖。在 pom.xml 文件中添加依赖项: ``` <groupId>log4j <artifactId>log...
本篇将详细介绍如何利用Log4j将应用日志输出到Flume,以便进行集中管理和分析。 首先,Log4j是一款Java日志框架,它提供了灵活的日志记录功能,允许开发者根据需求定制日志级别、格式和输出目的地。Log4j的配置文件...
本文将详细探讨如何使用Flume、Kafka和log4j构建一个高效的日志采集系统,帮助你理解这三个组件在日志处理中的角色以及如何协同工作。 首先,让我们了解一下这三个工具的基本概念: 1. **Flume**: Apache Flume 是...
LogDemox是一个日志收集工具,它利用了Apache的开源组件Log4j来实现日志的生成,并将这些日志信息直接发送到Flume中进行处理和存储。这个过程涉及到了日志管理和大数据流处理两个关键领域。 首先,Log4j是Java应用...
接下来需要修改项目的Log4j配置文件`log4j.properties`,以便将日志数据发送到Flume。配置示例如下: ```properties log4j.rootLogger=INFO,flume log4j.appender.flume=org.apache.flume.clients.log4jappender....
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。...对于 log4j 数据,Flume 可以实时监控日志文件并将其无缝地导入 HDFS,为后续的大数据分析提供基础。
Flume 的 SpoolDirectorySource 会持续监视 Log4j 输出的日志文件,一旦有新的文件产生,就会读取并将其发送到配置的 HDFS 目录。这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了...
apache-log4j-1.2.15.jar, apache-log4j-extras-1.0.jar, apache-log4j-extras-1.1.jar, apache-log4j.jar, log4j-1.2-api-2.0.2-javadoc.jar, log4j-1.2-api-2.0.2-sources.jar, log4j-1.2-api-2.0.2.jar, log4j-...
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
该压缩包下commons-lang3-3.3.2.jar,spark-streaming-flume_2.10-1.6.0.jar,scala-compiler-2.10.5.jar用于实现Flume监控文件夹中的内容变化,然后Spark Streaming对数据进行分析。
该 Appender 需要配置 jar 包, Client 端 Log4j 配置文件中需要配置日志输出的目标主机和端口号。 在配置文件中,可以看到 Log4jAppender 的配置信息。例如,日志需要发送到的端口号为 44444,该端口号需要有 ARVO...
Log4j是一款广泛使用的Java日志框架,它提供了灵活的日志记录功能,允许开发者控制日志输出的级别、格式和目的地。本文将详细讨论“决对可用,log4j日志集中处理扩展(含server与client)”这一资源所涉及的知识点,...
7. **log4j-flume-ng**:Flume是Cloudera的数据收集工具,这个组件使Log4j能够与Flume协同工作,实现大规模日志数据的收集和传输。新版本可能提升了数据处理的吞吐量和可靠性。 8. **log4j-cassandra**:对于...
Apache Flume是一个分布式、可靠且可用的系统,主要用于有效地收集、聚合大量日志数据,并将其从不同的源移动到集中式数据存储中。Flume不仅仅适用于日志数据的聚合,由于其数据源的可定制性,它还可以用于传输大量...
Log4j是Java世界里广泛使用的日志框架,它的配置文件(如`log4j.properties`或`log4j.xml`)是关键,其中定义了日志输出的级别、目的地以及格式。在提供的配置片段中,可以看到`rootLogger`设置为`info`级别,表示...
Log4j是一款广泛使用的开源日志框架,它提供了灵活的日志配置和多种输出格式,使得开发者可以根据需要定制日志信息。本文将深入探讨如何对Log4j进行修改,以适应Flume-ng的日志格式,并对异常信息进行重构。 首先,...
"Spring Boot使用Log4j2的实例代码" 本文主要介绍了Spring Boot使用Log4j2的实例代码,包括log4j2.xml配置和Maven依赖项配置。下面我们将详细介绍这些知识点。 Log4j2简介 Log4j2是Java中的一种日志记录工具,...
6. **安装与配置**:将对应的JAR文件添加到项目的类路径中,然后通过配置文件(如log4j2.xml或log4j2.json)设置日志输出的级别、格式、目标等参数。 了解以上知识点后,你可以根据具体的应用场景来有效地使用和...
日志采集部分使用Flume来实时采集日志数据,日志处理部分使用Elasticsearch来存储和处理日志数据,日志分析部分使用Kibana来提供可视化的展示结果。 系统实现 该系统的实现主要包括三个步骤:日志采集、日志处理和...