`
kylinsoong
  • 浏览: 240076 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Cassandra Dev 3:Cassandra 应用之CassandraAppender

阅读更多

    本文的目的是展示一个测试结果:Cassandra应用的高效性,稳定性,可靠性。具体:应用Cassandra记录Application运行中的日志。

    首先说明在一个Application中,不管是它在运行过程,还是在它的开发过程中,记录日志是非常有必要的,特别是一些Service-Based Application或Distributed系统。通过日志,Application的维护人员和开发人员可以更好的完成他们的工作。Log4j是一个很好的日志记录开源工具包,它可以将日志记录到文件系统、从Console输出、发送到指定邮箱等等。本文就是先对Log4j做一扩展,写一个自己的CassandraAppender,使其具有将日志保存到Cassandra的功能。具体实现Log4j定义的Appender……,如下

1. 首先Log4j结构:

 Log4j包括三个主要组件:

          Logger:用来记录日志,可以对她指定记录级别(ALL,DEBUG,INFO,WARN,ERROR,FATA);

          Layout:设置日志记录样式;

          Appender:指定日志输出位置。

这里主要研究Appender,先看下图:

  

 

      如图所示为Appender主要UML类图,AppenderSkeleton为抽象类,它实现了 org.apache.log4j.Appenderorg.apache.log4j.spi.OptionHandler 接口,所有的 appender 都必须扩展org.apache.log4j.AppenderSkeleton 类,

当然我们这里的CassandraAppender就是通过继 AppenderSkeleton来实现的 。在抽象类AppenderSkeleton中定义了抽象方法:

abstract protected void append(LoggingEvent event);

 


 所有的子类都要事先此方法,此方法中参数LoggingEvent代表日志信息,可以通过它获取日志名字,LoggerInfo等,此类还定义一方法:

public void activateOptions() { }

 


 如果需要对某些参数做一处理,本文中CassandraAppender就是在此方法中初始化Thrift RPC连接;

2. Appender生命周期,如下图:



     appender 实例不存在。或许框架还没有配置好。

     框架实例化了一个新的 appender。这发生在配置器类分析配置脚本中的一个 appender 声明的时候。配置器类调用 Class.newInstanceYourCustomAppender.class) ,这等价于动态调用 new YourCustomAppender() 。框架这样做是为了避免被硬编码为任何特定的 appender 名称;框架是通用的,适用于任何 appender。

   框架判断 appender 是否需要 layout。如果该 appender 不需要 layout,配置器就不会尝试从配置脚本中加载 layout 信息。



 如图CassandraAppender处于就绪状态时,将系统日志发送到Cassandra服务器是它的唯一任务,具体我给出源代码:

   Log4j 配置器调用 setter 方法。在所有属性都已设置好之后,框架就会调用这个方法。我们可以在这里激活必须同时激活的属性。

   配置器调用 activateOptions() 方法。在所有属性都已设置好之后,框架就会调用这个方法。程序员可以在这里激活必须同时激活的属性。

   Appender 准备就绪。 此刻,框架可以调用 append() 方法来处理日志记录请求。这个方法由 AppenderSkeleton.doAppend() 方法调用。

   最后,关闭appender。 当框架即将要删除您的自定义 appender 实例时,它会调用您的 appender 的 close() 方法。 close() 是一个清理方法,意味着 您需要释放已分配的所有资源。它是一个必需的方法,并且不接受任何参数。它必须把 closed 字段设置为 true ,并在有人尝试使用关闭的 appender 时向框架发出警报。

 

3. 扩展自己的Appender

如下图所示为CassandraAppender工作原理:

package com.xxx.log4j;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.UUID;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;


public class CassandraAppender extends AppenderSkeleton {
	
	Logger logger = Logger.getLogger(CassandraAppender.class);
	
	private TTransport tr;
	
	private Cassandra.Client client;
	
	private String keyspace;
	
	private String columnFamily;
	
	private String thriftAddress;
	
	private int thriftPort;
	
	private String keyStrategy;
	
	public CassandraAppender() {
	}

	public void activateOptions() {
		tr = new TSocket(thriftAddress, thriftPort);
		TProtocol proto = new TBinaryProtocol(tr);
        client = new Cassandra.Client(proto);
        try {
			tr.open();
		} catch (TTransportException e) {
			e.printStackTrace();
		}
	}

	public String getKeyspace() {
		return keyspace;
	}

	public void setKeyspace(String keyspace) {
		this.keyspace = keyspace;
	}

	public String getColumnFamily() {
		return columnFamily;
	}

	public void setColumnFamily(String columnFamily) {
		this.columnFamily = columnFamily;
	}

	public String getThriftAddress() {
		return thriftAddress;
	}

	public void setThriftAddress(String thriftAddress) {
		this.thriftAddress = thriftAddress;
	}

	public int getThriftPort() {
		return thriftPort;
	}

	public void setThriftPort(int thriftPort) {
		this.thriftPort = thriftPort;
	}

	public String getKeyStrategy() {
		return keyStrategy;
	}

	public void setKeyStrategy(String keyStrategy) {
		this.keyStrategy = keyStrategy;
	}

	protected void append(LoggingEvent event) {
		if(client == null) {
			System.out.println("client == null");
		}
		try {
			client.insert(
							keyspace, 
							event.getLoggerName(), 
							getColumnPath(columnFamily,event), 
							getStoreValue(event), 
							new Date().getTime(),
							ConsistencyLevel.ONE
							);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (InvalidRequestException e) {
			e.printStackTrace();
		} catch (UnavailableException e) {
			e.printStackTrace();
		} catch (TimedOutException e) {
			e.printStackTrace();
		} catch (TException e) {
			e.printStackTrace();
		}
			}

	private ColumnPath getColumnPath(String columnFamily, LoggingEvent event) throws UnsupportedEncodingException {
		ColumnPath path = new ColumnPath();
		path.setColumn_family(columnFamily);
		if(getKeyStrategy().equals("uuid")) {
			UUID uuid = UUID.randomUUID();   
	        String str = uuid.toString();   
	        path.setColumn(str.getBytes());
		} else {
			path.setColumn((event.getLocationInformation().getMethodName() + "-" +  event.getLocationInformation().getLineNumber() + new Date().getTime()).getBytes("UTF-8"));
		}
		return path;
	}
	
	private byte[] getStoreValue(LoggingEvent event) throws UnsupportedEncodingException {
		return (event.getMessage()+ "").getBytes("UTF-8");
	}
	
	public void close() {
		if(tr.isOpen()) {
			tr.close();
		}
	}

	public boolean requiresLayout() {
		return false;
	}

}

 

 4. 测试

首先在配置Log4j日志文件log4j.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" >  

   <appender name="Cassandra" class="com.xxx.log4j.CassandraAppender">
     <errorHandler class="org.apache.log4j.helpers.OnlyOnceErrorHandler"/>
     <param name="keyspace" value="Twitter"/>
     <param name="ColumnFamily" value="Users"/>
     <param name="ThriftAddress" value="0.0.0.0"/>
     <param name="ThriftPort" value="9160"/>
     <param name="keyStrategy" value="uuid"/>
     <layout class="org.apache.log4j.PatternLayout">
       <param name="ConversionPattern" value="%d %-5p [%c{1}] %m%n"/>
     </layout>	    
   </appender>
 
   <root>
  	  <priority value="INFO" />
  	  <appender-ref ref="Cassandra"/>
   </root>
   
</log4j:configuration>

 

 然后运行自己测试类:

public class Log4jTestOne {
	
	static Logger logger = Logger.getLogger(Log4jTestOne.class);

	public static void main(String[] args) {
		Date start = new Date();
		for(int i = 0 ; i < 100000 ; i ++) {
			logger.info("info");
		}
		Date end = new Date();
		System.out.println("Completed spent time: " + (end.getTime() - start.getTime()));
	}

}

 

 接着到Cassandra查看结果,可以用Cassandra Thrift访问(我会就Cassandra Client有专门Blog)



 

 

 

 

 

 


 

  • 大小: 42.7 KB
  • 大小: 25 KB
  • 大小: 7.4 KB
0
0
分享到:
评论

相关推荐

    DevCenter cassandra客户端

    DevCenter 是一个强大的工具,专为数据科学家、开发人员和管理员设计,用于与Apache Cassandra数据库进行交互。这个工具提供了一个直观的用户界面,使得管理、查询和开发Cassandra数据库变得简单易行。Cassandra是一...

    Scylladb或Cassandra客户端工具DevCenter

    Scylladb或Cassandra 客户端工具 DevCenter

    cassandra-3.11.3下载

    首先,Cassandra的核心特性之一是它的分布式架构。它采用了一种主从复制模型,每个节点都可以接受写入和读取请求,使得系统具有高可用性和容错性。在3.11.3版本中,Cassandra增强了数据复制的策略,能够更好地管理...

    DevCenter--Cassandra

    3. **一致性哈希**:Cassandra使用一致性哈希算法来实现数据的分布,这允许数据均匀分布在各个节点上,减少热点并提高性能。 4. **Gossip协议**:节点间通信采用Gossip协议,用于传播集群状态信息,如节点的加入、...

    cassandra-exporter:用于将Cassandra指标导出到Prometheus的Java代理

    cassandra出口商 cassandra-exporter是Java代理(具有可选的独立模式),可将Cassandra指标导出到 。 项目状态:测试版介绍cassandra-exporter可以实现Cassandra指标的高性能收集,并遵循Prometheus最佳做法进行指标...

    spark-cassandra-connector:DataStax Spark Cassandra连接器

    该库使您可以将Cassandra表公开为Spark RDD和数据集/数据框架,将Spark RDD和数据集/数据框架写入Cassandra表,并在Spark应用程序中执行任意CQL查询。 与Apache Cassandra 2.1或更高版本兼容(请参见下表) 与...

    bitnami-docker-cassandra-exporter:Bitnami Cassandra导出器Docker映像

    Cassandra导出器是一个独立的应用程序,可通过Prometheus友好端点导出Apache Cassandra指标。 TL; DR $ docker run --name cassandra-exporter bitnami/cassandra-exporter:latest 为什么要使用Bitnami Images? ...

    分布式存储系统:Cassandra:Cassandra与大数据平台的集成应用.docx

    分布式存储系统:Cassandra:Cassandra与大数据平台的集成应用.docx

    Cassandra:Cassandra在分布式系统中的应用.docx

    Cassandra:Cassandra在分布式系统中的应用.docx

    cassandra-web:cassandra web ui

    卡桑德拉网演示版特征主题黑暗表格行上一页下一页表格行编辑表格行过滤器表格行删除表名查找表定义表导出表导入CQL查询支持的Cassandra版本2.1.x 2.2.x 3.xx 是的是的是的用法下载$ wget ...解压缩$ tar zxvf linux.tar...

    Cassandra在饿了么的应用

    2. **Cassandra的历史和概述**:Cassandra经历了从Facebook到Apache的开源过程,成为了如今广泛应用于大数据处理的数据库系统。 3. **Cassandra架构关键字**:介绍了Cassandra的四个关键组件,包括Gossip通信协议、...

    分布式存储系统:Cassandra:Cassandra的高级特性:二级索引与轻量级事务.docx

    分布式存储系统:Cassandra:Cassandra的高级特性:二级索引与轻量级事务.docx

    藏经阁-Cassandra总体介绍.pdf

    "Cassandra总体介绍" Cassandra是一款开源的、分布式的NoSQL数据库管理系统,由Facebook开发,后来捐赠给Apache基金会。...3. 分布式系统:Cassandra能够提供高可用性和高扩展性,适合分布式系统应用场景。

    cassandra-phantom:Cassandra + Phantom示例

    【标题】:“Cassandra-Phantom:Cassandra与Phantom DSL结合使用实例” 【描述】:Cassandra-Phantom项目是展示如何在Scala环境中利用Cassandra数据库和Phantom DSL进行数据操作的一个实例。这个项目旨在帮助...

    cassandra工具类DevCenter-DevCenter-1.6.0.zip

    3. **实时监控**:DevCenter可以实时显示集群的健康状态和性能指标,如吞吐量、延迟和节点状态,这对于故障排查和性能优化至关重要。 4. **脚本支持**:用户可以创建和保存CQL脚本,便于重复执行常见的管理任务或...

    cassandra-operator:Apache Cassandra的Kubernetes运算符

    用于Cassandra的Kubernetes运算符 网站: : Wiki: : 说明文件: : 建立 项目状态:Alpha Cassandra Operator管理部署到Cassandra集群,并自动执行与操作Cassandra集群相关的任务。 当前,面向用户的...

    cassandra-builds:Apache Cassandra构建的镜像

    Apache Cassandra构建工具 Jenkins Job DSL脚本创建CI作业: jenkins-dsl/ Jenkins Job构建/测试运行时脚本: build-scripts/ Apache Cassandra打包实用程序: cassandra-release/ docker/ 建筑包 创建包含构建...

    cassandra-rpm:Apache Cassandra RPM 打包

    1. **Cassandra 简介**:Cassandra设计为高可用性和可扩展性,采用无中心架构,每个节点都具有相同的能力,支持水平扩展,能够处理PB级别的数据。 2. **RPM打包原理**:RPM包含软件的所有组件,如执行文件、配置...

Global site tag (gtag.js) - Google Analytics