`
knight_black_bob
  • 浏览: 850379 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hbase 报文 处理 逻辑

阅读更多

 

 

报文格式:

 

每隔一个小时 出现一个文件类型 报文 ,所以 我们的处理思路是,一个小时做一次处理。

 

import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 

import com.cmcc.aoi.util.OsUtil;

public class HbaseStarter {

	public static void main(String[] args) throws  Exception {
		Properties properties=new Properties();
		String config="";
		if(!OsUtil.isLinux())
			config= "D:/work/util/aoi-hbase/trunk/src/main/resources/config.properties";
		else	
			config = "/home/aoi/aoi-hbase/conf/config.properties"; 
		FileInputStream fis = new FileInputStream(config);
		properties.load(fis);
		fis.close(); 
		
		String hbaseTable = properties.getProperty("com.cmcc.aoi.ua.hbaseTable");
		String hbaseFamily = properties.getProperty("com.cmcc.aoi.ua.hbaseFamily"); 
		String sourceFilePath=properties.getProperty("com.cmcc.aoi.ua.sourceFilePath");
		String archivelogsPath=properties.getProperty("com.cmcc.aoi.ua.archivelogsPath");
		boolean isDebug= Integer.parseInt( properties.getProperty("com.cmcc.aoi.ua.isDebug")) == 0 ? false : true;
		String sourceFileName = properties.getProperty("com.cmcc.aoi.ua.sourceFileName"); 
		 
		String[] hbaseTableName=hbaseTable.split(",");  // table
		String[] hbaseFamilyName=hbaseFamily.split("&");// family  
		String[] sourceFileNameArr=sourceFileName.split(","); 
		 
		
		ScheduledExecutorService service = Executors.newScheduledThreadPool(2);  
		service.scheduleAtFixedRate(new DeviceReadThread (sourceFileNameArr[0],hbaseTableName[0],hbaseFamilyName[0].split(","),sourceFilePath,archivelogsPath,isDebug) 
			               ,0, 1,TimeUnit.HOURS); 
		service.scheduleAtFixedRate(new AppReadThread (sourceFileNameArr[1],hbaseTableName[1],hbaseFamilyName[1].split(","),sourceFilePath,archivelogsPath,isDebug) 
                         ,0, 1,TimeUnit.HOURS); 
	}
}

 

 

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader; 
import java.io.IOException;
import java.util.Date;  
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
import Model.Device; 
import com.alibaba.fastjson.JSON; 

public class DeviceReadThread extends BaseRunnabler {

	static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class);
	
	public DeviceReadThread(String sourceFileName, String hbaseTable,
			String[] hbaseFamily, String sourceFilePath,
			String archivelogsPath, boolean isDebug) {
		super(sourceFileName, hbaseTable, hbaseFamily, sourceFilePath, archivelogsPath,
				isDebug); 
	}
	 
	public void processFile(IOperator hu) {
		FileReader logReader = null;
		BufferedReader logBufferedReader = null;
		try { 
			File logFile = new File(sourceFilePath+sourceFileName);
			logReader = new FileReader(logFile);
			logBufferedReader = new BufferedReader(logReader);
			String temp = logBufferedReader.readLine();
			//logger.error(" temp is  " + temp );
			while ( temp  != null) {
				Device device = JSON.parseObject(temp, Device.class); 
				//logger.error(" device is null ? " + ( device == null ) );
				
				String[][] s = new String[][] {
						{ device.getLid(), hbaseFamily[0], "lid" , device.getLid() } ,
						{ device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, 
						{ device.getLid(), hbaseFamily[2], "os", device.getOs() },
						{ device.getLid(), hbaseFamily[2], "osv", device.getOsv()} };
				hu.writeMultRow(hbaseTable, s);
				logger.info(" hbase util end "   );
				temp = logBufferedReader.readLine();
			}
		} catch (Exception e) {
			logger.error(" DeviceReadThread error "   );
			e.printStackTrace();
		} finally { 
			try {
				logBufferedReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
			try {
				logReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
		}
	} 

}

 

import java.io.File; 
import java.util.Arrays; 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.cmcc.aoi.util.FileUtil;

public abstract class BaseRunnabler implements Runnable{
	protected static Logger logger = LoggerFactory.getLogger(BaseRunnabler.class);
	
	String sourceFileName=""; // 读取文件路径  
	String hbaseTable="";  // hbase  表名
	String [] hbaseFamily=null;    // 行列簇名 
	String sourceFilePath ;
	String archivelogsPath ;
	boolean isDebug; 
		
	
	public BaseRunnabler(String sourceFileName,String hbaseTable,String [] hbaseFamily  ,String sourceFilePath,String archivelogsPath,boolean isDebug ){
		this.hbaseTable=hbaseTable;
		this.hbaseFamily = hbaseFamily; 
		this.sourceFileName=sourceFileName;  
		this.sourceFilePath = sourceFilePath;
		this.archivelogsPath = archivelogsPath;
		this.isDebug = isDebug; 
	}	
	
	@Override
	public void run() {
		try{
		IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily);
    	hu.createTable(hbaseTable,hbaseFamily );  
    	
    	File file=new File(sourceFilePath);
		 File[] tempFileList = file.listFiles();
		 Arrays.sort(tempFileList);
		 for (File tempFile: tempFileList) {
			 if (tempFile.isFile() && tempFile.getName().contains(sourceFileName +".") ) {
				try{
					try{
						processFile(hu);
					}catch (Exception e) { 
						logger.error("readfile error ,must continue to protect  to read  other file ");
						continue;
					}
				
				removeFile(tempFile);
				
				}catch (Exception e2) { 
					logger.error(" one file has an error ,other  file must continue to do this task ");
				}
			}
		 }
    	
    	 
		}catch (Exception e) {
			e.printStackTrace();
		}

	}
	
	public abstract void processFile(IOperator hu) throws Exception; 	
	
	private void removeFile(File file) {
		if (isDebug) {
			 File path = new File(archivelogsPath);
			 if (!path.exists()) {
				 path.mkdirs();
			 }
			 FileUtil.moveFile(file, new File(archivelogsPath,file.getName()));
			 logger.info("remove file :" + file.getName());
		 }else{
			 file.delete();
			 logger.info("delete file :" + file.getName());
		 }
	}
}



 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 
 谢谢您的赞助,我会做的更好!

 

 

  • 大小: 3 KB
分享到:
评论

相关推荐

    hbase的rowkey设计与hbase的协处理器运用.docx

    HBase RowKey 设计与协处理器运用 HBase 是一个基于 HDFS 的分布式、面向列的 NoSQL 数据库,具有高性能、可靠性和扩展性等特点。本文将详细介绍 HBase 的 RowKey 设计和协处理器运用。 HBase 的介绍 HBase 是一...

    HbaseTemplate 操作hbase

    在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data...

    Hbase思维导图之物理模型.png

    Hbase思维导图之物理模型

    Hbase协处理器详解.md

    Hbase协处理器详解,进阶篇

    Hbase基本知识介绍

    ### HBase基本知识介绍 ...其独特的物理模型和逻辑模型设计使得HBase成为了处理大规模数据的理想选择之一。对于那些需要高并发读写、随机访问以及实时分析的应用场景而言,HBase是一个非常有价值的工具。

    HBase分布式架构处理大数据量(高并发和实时处理)

    本文来自于csdn,介绍了Hadoop的原理,HBase的特点,HBase 的高并发和实时处理数据,数据模型,工作流程等。(一)HDFS主要是用于做什么的?HDFS(HadoopDistributedFileSystem)分布式文件管理系统、是Hadoop项目的...

    hbase和hadoop数据块损坏处理

    HBase 和 Hadoop 数据块损坏处理 HBase 和 Hadoop 数据块损坏是非常常见的问题,可能会导致数据丢失、集群崩溃等严重后果。因此,了解如何处理 HBase 和 Hadoop 数据块损坏是非常重要的。本文将介绍 HBase 和 ...

    Hbase思维导图之逻辑结构

    Hbase思维导图

    基于springboot集成hbase过程解析

    SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则是一个基于Java的现代Web框架,提供了...

    Hbase权威指南(HBase: The Definitive Guide)

    - **MapReduce集成**:HBase支持MapReduce任务直接对存储在HBase中的数据进行处理,这大大简化了数据处理流程,提高了效率。 #### 四、HBase的关键特性 - **可扩展性**:HBase能够轻松地扩展到数百甚至数千台服务器...

    HBase学习利器:HBase实战

    - **第5章:通过Coprocessors扩展HBase**:Coprocessors是HBase中的一个重要特性,用于在服务器端执行用户定义的逻辑,从而减少网络传输开销并提高性能。本章将详细介绍Coprocessors的工作原理及其应用场景。 - **...

    HBase官方文档中文版-HBase手册中文版

    HBase是一种分布式、基于列族的NoSQL数据库,由Apache软件基金会开发并维护,是Hadoop生态系统中的重要组件。这份“HBase官方文档中文版”提供了...通过深入学习,我们可以更好地理解和利用HBase处理海量数据的挑战。

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统,大数据处理技术

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...

    hbase安装与hbase架构说明

    同时,它也借鉴了MapReduce的并行处理能力,利用Hadoop的MapReduce框架来处理HBase中的大数据量计算任务。 在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和...

    hbase社区2018精选资料

    HBase是一个高性能的开源NoSQL数据库,属于BigTable的开源实现,其分布式、多版本、面向列的特点使其适合存储和处理大量的非结构化数据。随着大数据技术的发展,HBase面临着更复杂多样的数据格式和业务需求,因此...

    hbase用于查询客户端工具

    7. **HBase MapReduce**:MapReduce是Hadoop处理大数据的主要工具,HBase与MapReduce结合可以进行批量数据处理和分析。通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析...

    Hbase 组件 、架构

    HBase是Apache软件基金会下的一个开源项目,是一个分布式的、面向列的NoSQL数据库。...HBase特别适用于处理大量数据的实时读写操作,并且支持海量数据的快速访问,这使得它在大数据处理领域中得到了广泛的应用。

    HBase Introduction

    ### HBase介绍与内部原理详解 ...通过深入理解HBase的逻辑视图和物理视图,以及掌握有效的表结构设计方法,我们可以更有效地利用HBase来处理大规模数据集,从而满足现代应用程序对高性能和高可用性的需求。

Global site tag (gtag.js) - Google Analytics