`

分布式日志

阅读更多

 

最近完成一个简单的日志管理系统,拿出来跟大家分享一下!


主要实现的功能:

1、支持动态修改配置

2、实现统一的配置管理

3、支持文件输出、habse输出、mongodb输出


基于以上三点功能,我们下面详细说明


1、支持动态修改配置

说道支持这个功能,有个同事认为没有这个必要,他的观点是log4j的配置不需要经常变动,不需要支持这样的功能;本人的观点是“配置可以进行统一管理、而且正式机跟测试机的log4j的配置肯定会有一些差异的”,因此这个功能是必须的。


下面说一下实现

通过log4j提供的PropertyConfigurator类可以非常轻松的实现这个功能

代码如下

 

 

public class Log4jConfigListener {

 private  String log4jPath;

 /**
 * @param log4jPath the log4jPath to set
 */
 public void setLog4jPath(String log4jPath) {
 this.log4jPath = log4jPath;
 }


 /**
 * 装载log4j配置文件
* 
 * @author mrh
 * @DATE 2011-5-28
 */
 public void load() {
 // String path="config/log4j.properties";
 System.out.println("log4j configfile path=" + log4j.toString());
 PropertyConfigurator.configureAndWatch(log4j.toString(), 1000);// 间隔特定时间,检测文件是否修改,自动重新读取配置
}
}

 

 Spring中的配置

 

<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="WEB-INF/classes/log4j/" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

 

2、统一配置管理

统一配置管理,是通过zookeeper实现的,配置形式与log4j.properties配置一致;

配置如下

 

log4j.rootLogger=INFO,console,HbaseAppender

#console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}\:%L - %m%n
#hbase
log4j.appender.HbaseAppender=com.jl.net.log4j.hbase.HbaseAppender
log4j.appender.HbaseAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.HbaseAppender.sysName=tuid
log4j.appender.HbaseAppender.serverIp=Y
log4j.appender.HbaseAppender.zookeeper_ip=10.1.18.100,10.1.18.103,10.1.18.102
log4j.appender.HbaseAppender.zookeeper_port=2181
log4j.appender.HbaseAppender.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss sss} %c.%t:%L %-5p %x  %m%n


Spring中的配置


<bean class="com.jl.net.log4j.config.Log4jConfigListener" init-method="load">
  <property name="log4jPath" value="cfg/jlcloud.uid.log4j" /> <!--通过zookeeper实现统一配置的dataId-->
 </bean>

3、支持文件、habse、mongodb的输出

log4j本身就支持文件输出,mongodb的输出需要借助log4mongo-Java这个项目,maven的坐标是

 

<dependency>
   <groupId>org.log4mongo</groupId>
   <artifactId>log4mongo-java</artifactId>
   <version>0.7.4</version>
   <optional>true</optional>
  </dependency>

 

 

下面介绍一下如何实现hbase的输出,通过继承log4j的AppenderSkeleton类,同时实现Runnable接口,来实现:

代码如下

 

package com.yck.worm.hbase;

import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

import com.yck.worm.logging.WLogger;



/**
 * 将日志 输出到HBase里
 * @author mrh
 *
 */
public class HbaseAppender extends AppenderSkeleton implements Runnable {
	
	private static final WLogger LOGGER = WLogger.getLogger(HbaseAppender.class);
	
	private int batchSize = -1;
	private int period = 1000;
	private String hbLogName = "jl_logs";
	private String hbLogFamily = "logs";
	private Queue<LoggingEvent> loggingEvents;
	private ScheduledExecutorService executor;
	private ScheduledFuture<?> task;
	private Configuration conf;
	private HTableInterface htable;
	private HBaseAdmin admin;

	private HConnection connection;
	
	private String zookeeper;
	
	private String sysName;
	
	/**
	 * if serverIp = Y then out put server's IP to hbase 
	 */
	private String serverIp;

	/**
	 * log4j初始设置,启动日志处理计划任务
	 */
	@Override
	public void activateOptions() {
		try {
			super.activateOptions();
			// 创建一个计划任务,并自定义线程名
			executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HbaseAppender"));
			// 日志队列
			loggingEvents = new ConcurrentLinkedQueue<LoggingEvent>();
			// 启动计划任务,如果run函数有异常任务将中断!
			task = executor.scheduleWithFixedDelay(this, period, period,TimeUnit.MILLISECONDS);
			if ("Y".equalsIgnoreCase(this.serverIp)) {
				InetAddress addr = InetAddress.getLocalHost();
				this.serverIp = addr.getHostAddress().toString();//获得本机IP 
			}
			LOGGER.info("ActivateOptions ok!");
		} catch (Exception e) {
			LOGGER.error("Error during activateOptions: " , e);
		}
	}

	/**
	 * 初始HBASE
	 *
	 * @return
	 */
	private boolean initHbase() {
        try {
            if (conf == null) {
            	LOGGER.info("initHbase ... zookeeper address is " + this.zookeeper);
            	if (this.zookeeper == null || "".equals(this.zookeeper)) {
            		throw new IllegalArgumentException("the zookeeper address for jlog4j's hadoop initial can not be null!");
            	}
            	String ip = this.zookeeper.split(":")[0];
            	String port = this.zookeeper.split(":")[1];
                conf = HBaseConfiguration.create();
                conf.set(HConstants.ZOOKEEPER_QUORUM, ip);
                conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.parseInt(port));
                
                this.admin  = new HBaseAdmin(conf);
                if (!this.admin.tableExists(Bytes.toBytes(hbLogName))) {
                	HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(hbLogName));
                	HColumnDescriptor columnDescriptor = new HColumnDescriptor(hbLogFamily.getBytes());
                	tableDescriptor.addFamily(columnDescriptor);
                	this.admin.createTable(tableDescriptor);
                }
                this.connection = HConnectionManager.createConnection(conf);
                this.htable = this.connection.getTable(hbLogName.getBytes());
            }
            LOGGER.error("Init Hbase success !");
            return true;
        } catch (Exception e) {
            task.cancel(false);
            executor.shutdown();
            LOGGER.error("Init Hbase fail !", e);
            return false;
        }
    }

	@Override
	public void run() {
		if (conf == null || htable == null) {
			initHbase();
		}
		try {
			if (this.batchSize < 0 && loggingEvents.size() > 0 ) {
				this.output();
			} else 
			 // 日志数据超出批量处理大小
			if (loggingEvents.size() > 0 && loggingEvents.size() > this.batchSize) {
				this.output();
			}
		} catch (Exception e) {
			LOGGER.error("HbaseAppender Error run ", e);
		}
	}
	
	/**
	 * 输出日志
	 * @throws IOException
	 */
	private void output() throws IOException {
		LoggingEvent event;
		List<Put> logs = new ArrayList<Put>();
		// 循环处理日志队列
		while ((event = loggingEvents.poll()) != null) {
			try {
				// 写日志内容
				SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
				Date date = new Date(event.getTimeStamp());
				// 创建日志并指定ROW KEY
				StringBuffer key = new StringBuffer("");
				if (this.sysName != null)
					key.append(this.sysName).append("#");
				key.append(event.getLoggerName()).append("#");
				key.append(event.getLevel()).append("#");
				key.append(sdf.format(date)).append("#");
				key.append(UUID.randomUUID().toString().replace("-", ""));
				Put log = new Put(key.toString().getBytes());
				if (this.sysName != null)
					log.add(hbLogFamily.getBytes(), "sysName".getBytes(), this.sysName.getBytes());
				if (this.serverIp != null)
					log.add(hbLogFamily.getBytes(), "serverIp".getBytes(), this.serverIp.getBytes());
				log.add(hbLogFamily.getBytes(), "LoggerName".getBytes(), event.getLoggerName().getBytes());
				log.add(hbLogFamily.getBytes(), "level".getBytes(), event.getLevel().toString().getBytes());
				log.add(hbLogFamily.getBytes(), "datetime".getBytes(), sdf.format(date).getBytes());
				log.add(hbLogFamily.getBytes(), "message".getBytes(), event.getMessage().toString().getBytes());
				logs.add(log);
			} catch (Exception e) {
				LOGGER.error("Error logging put ", e);
			}
		}
		// 批量写入HBASE
		if (logs.size() > 0)
			htable.put(logs);
	}

	/**
	 * 日志事件
	 *
	 * @param loggingEvent
	 */
	@Override
	protected void append(LoggingEvent loggingEvent) {
		try {
			populateEvent(loggingEvent);
			// 添加到日志队列
			loggingEvents.add(loggingEvent);
		} catch (Exception e) {
			LOGGER.error("Error populating event and adding to queue", e);
		}
	}

	/**
	 * 事件测试
	 *
	 * @param event
	 */
	protected void populateEvent(LoggingEvent event) {
		event.getThreadName();
		event.getRenderedMessage();
		event.getNDC();
		event.getMDCCopy();
		event.getThrowableStrRep();
		event.getLocationInformation();
	}

	@Override
	public void close() {
		try {
			if (this.task != null)
				this.task.cancel(false);
			if (this.executor != null)
				this.executor.shutdown();
			if (this.connection != null)
				this.connection.close();
			if (this.admin != null)
				this.admin.close();
			if (this.htable != null)
				this.htable.close();
		} catch (IOException e) {
			LOGGER.error("Error close ", e);
		}
	}

	@Override
	public boolean requiresLayout() {
		return true;
	}

	// 设置每一批日志处理数量
	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;
	}

	/**
	 * 设置计划任务执行间隔
	 *
	 * @param period
	 */
	public void setPeriod(int period) {
		this.period = period;
	}

	/**
	 * 设置日志存储HBASE表名
	 *
	 * @param hbLogName
	 */
	public void setHbLogName(String hbLogName) {
		this.hbLogName = hbLogName;
	}

	/**
	 * 日志表的列族名字
	 * 
	 * @param hbLogFamily
	 */
	public void setHbLogFamily(String hbLogFamily) {
		this.hbLogFamily = hbLogFamily;
	}

	/**
	 * @param zookeeper the zookeeper to set
	 */
	public void setZookeeper(String zookeeper) {
		this.zookeeper = zookeeper;
	}

	/**
	 * @param sysName the sysName to set
	 */
	public void setSysName(String sysName) {
		this.sysName = sysName;
	}

	/**
	 * @param serverIp the serverIp to set
	 */
	public void setServerIp(String serverIp) {
		this.serverIp = serverIp;
	}
}

 

 

收工,有些粗糙,将就吧!

 

附带源码,见附件

分享到:
评论

相关推荐

    基于Flume的分布式日志采集分析系统设计与实现.pdf

    基于Flume的分布式日志采集分析系统设计与实现 Flume是一种分布式日志采集系统,可以实时地采集和处理大量日志数据。该系统基于Flume、Elasticsearch和Kibana等技术手段,能够对海量日志数据进行实时采集、处理和...

    java分布式日志系统.zip

    Java分布式日志系统是现代大型网络应用不可或缺的组成部分,它为开发者提供了收集、存储、查询和分析应用程序日志的能力。在分布式环境中,由于系统由多个节点组成,日志的管理和处理变得复杂,因此,分布式日志系统...

    基于Python内建库的分布式日志系统设计与实现.pdf

    本文所讨论的主题是基于Python内建库设计并实现了一个分布式日志系统。该系统旨在应对自动化运维脚本数量增多、功能覆盖广泛而带来的管理难度增加的问题。分布式日志系统的设计,能够统一记录运维脚本的运行情况,...

    分布式日志处理系统调研报告

    ### 分布式日志处理系统调研报告:ELK详解 #### 一、引言 随着互联网技术的发展,大型系统的规模不断膨胀,系统架构也日益复杂。为了更好地管理和维护这些复杂的系统,日志作为记录系统运行状态的重要手段,其重要...

    一个Java开发的轻量级分布式日志标记追踪神器.zip

    一个Java开发的轻量级分布式日志标记追踪神器 一个Java开发的轻量级分布式日志标记追踪神器 一个Java开发的轻量级分布式日志标记追踪神器 一个Java开发的轻量级分布式日志标记追踪神器 一个Java开发的轻量级...

    linux平台centos7系统 - ELK+logback+kafka+nginx 搭建分布式日志分析平台.doc

    【Linux平台CentOS7系统 - ELK+logback+kafka+nginx搭建分布式日志分析平台】 在复杂的IT环境中,日志管理和分析对于诊断问题、优化性能以及确保系统稳定性至关重要。ELK栈(Elasticsearch、Logstash、Kibana)正是...

    基于socket分布式日志系统的设计与实现

    《基于Socket分布式日志系统的设计与实现》 在IT行业中,日志系统是不可或缺的一部分,它为系统的故障排查、性能优化、安全审计等提供了重要的数据支持。本文将深入探讨如何设计并实现一个基于Socket的分布式日志...

    Java开发案例-springboot-35-整合TLog实现分布式日志标记追踪-源代码+文档.rar

    Java开发案例-springboot-35-整合TLog实现分布式日志标记追踪-源代码+文档.rar Java开发案例-springboot-35-整合TLog实现分布式日志标记追踪-源代码+文档.rar Java开发案例-springboot-35-整合TLog实现分布式日志...

    ELK分布式日志解决方案.docx

    ELK 分布式日志解决方案 ELK 分布式日志解决方案是一种基于 Elasticsearch、Logstash、Kibana 三款产品的日志框架解决方案。该解决方案可以在项目中作为日志框架使用,用于收集、存储和可视化日志数据。 一、ELK ...

    ServiceStack.Redis+log4net分布式日志

    本例子是用ServiceStack.Redis+log4net做的一个分布式记日志的功能,使用场景:分布式系统中的各个服务向redis服务添加日志,服务定时轮询生成记录日志到数据库或文本中。如果你的系统所有的方法调记录都需要记录的...

    分布式日志恢复系统实现.pdf

    分布式日志恢复系统是保障数据一致性和完整性的关键机制,特别是在大规模分布式系统中。随着信息技术和互联网的快速发展,数据量激增,分布式数据库系统成为处理、存储和利用这些数据的有效手段。然而,系统故障可能...

    部署ELK分布式日志分析平台

    部署ELK分布式日志分析平台是一个系统工程,需要考虑日志数据的采集、传输、存储、分析、警告等各个关键环节。在生产环境中,业务规模扩大和服务器数量增多,使得日志量急剧增长,这就要求有一个能够处理大规模数据...

    分布式日志采集系统设计.pdf

    分布式日志采集系统是现代大型分布式应用系统中不可或缺的组成部分。随着信息技术的快速发展,业务系统产生的数据量呈指数级增长,特别是日志数据,其规模的快速增长给系统的监控和管理带来了巨大的挑战。为了有效...

    高性能分布式日志系统研究与设计.pdf

    【高性能分布式日志系统研究与设计】 在当前大数据时代,日志系统对于系统运维和开发人员至关重要,因为它提供了关于各种设备软硬件信息和工作状态的记录。日志按时间顺序组织,分为系统日志、应用日志、安全日志等...

    TLog分布式日志标记追踪器 v1.5.0.zip

    《TLog分布式日志标记追踪器 v1.5.0》是一款专为IT专业人士设计的高效日志管理和追踪工具,其v1.5.0版本带来了更多优化和增强的功能。这款软件工具主要用于解决在分布式系统中,日志数据分散、难以定位问题的挑战。...

    基于Python内建库的分布式日志系统设计与实现.zip

    分布式日志系统是大型复杂系统中的重要组成部分,它能够收集、存储、检索和分析应用程序的日志数据,为故障排查、性能优化以及业务监控提供有力支持。本项目将深入探讨如何利用Python内建库来设计和实现一个分布式...

    基于Flume的分布式日志聚合系统的研究.pdf

    【基于Flume的分布式日志聚合系统的研究】 随着互联网的快速发展和大数据技术的崛起,日志数据的规模急剧增加,对传统的日志采集和分析系统提出了更高的要求。为应对这一挑战,本文研究了一种基于Flume的分布式日志...

Global site tag (gtag.js) - Google Analytics