报文格式:
每隔一个小时 出现一个文件类型 报文 ,所以 我们的处理思路是,一个小时做一次处理。
import java.io.FileInputStream; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.cmcc.aoi.util.OsUtil; public class HbaseStarter { public static void main(String[] args) throws Exception { Properties properties=new Properties(); String config=""; if(!OsUtil.isLinux()) config= "D:/work/util/aoi-hbase/trunk/src/main/resources/config.properties"; else config = "/home/aoi/aoi-hbase/conf/config.properties"; FileInputStream fis = new FileInputStream(config); properties.load(fis); fis.close(); String hbaseTable = properties.getProperty("com.cmcc.aoi.ua.hbaseTable"); String hbaseFamily = properties.getProperty("com.cmcc.aoi.ua.hbaseFamily"); String sourceFilePath=properties.getProperty("com.cmcc.aoi.ua.sourceFilePath"); String archivelogsPath=properties.getProperty("com.cmcc.aoi.ua.archivelogsPath"); boolean isDebug= Integer.parseInt( properties.getProperty("com.cmcc.aoi.ua.isDebug")) == 0 ? false : true; String sourceFileName = properties.getProperty("com.cmcc.aoi.ua.sourceFileName"); String[] hbaseTableName=hbaseTable.split(","); // table String[] hbaseFamilyName=hbaseFamily.split("&");// family String[] sourceFileNameArr=sourceFileName.split(","); ScheduledExecutorService service = Executors.newScheduledThreadPool(2); service.scheduleAtFixedRate(new DeviceReadThread (sourceFileNameArr[0],hbaseTableName[0],hbaseFamilyName[0].split(","),sourceFilePath,archivelogsPath,isDebug) ,0, 1,TimeUnit.HOURS); service.scheduleAtFixedRate(new AppReadThread (sourceFileNameArr[1],hbaseTableName[1],hbaseFamilyName[1].split(","),sourceFilePath,archivelogsPath,isDebug) ,0, 1,TimeUnit.HOURS); } }
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import Model.Device; import com.alibaba.fastjson.JSON; public class DeviceReadThread extends BaseRunnabler { static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class); public DeviceReadThread(String sourceFileName, String hbaseTable, String[] hbaseFamily, String sourceFilePath, String archivelogsPath, boolean isDebug) { super(sourceFileName, hbaseTable, hbaseFamily, sourceFilePath, archivelogsPath, isDebug); } public void processFile(IOperator hu) { FileReader logReader = null; BufferedReader logBufferedReader = null; try { File logFile = new File(sourceFilePath+sourceFileName); logReader = new FileReader(logFile); logBufferedReader = new BufferedReader(logReader); String temp = logBufferedReader.readLine(); //logger.error(" temp is " + temp ); while ( temp != null) { Device device = JSON.parseObject(temp, Device.class); //logger.error(" device is null ? " + ( device == null ) ); String[][] s = new String[][] { { device.getLid(), hbaseFamily[0], "lid" , device.getLid() } , { device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, { device.getLid(), hbaseFamily[2], "os", device.getOs() }, { device.getLid(), hbaseFamily[2], "osv", device.getOsv()} }; hu.writeMultRow(hbaseTable, s); logger.info(" hbase util end " ); temp = logBufferedReader.readLine(); } } catch (Exception e) { logger.error(" DeviceReadThread error " ); e.printStackTrace(); } finally { try { logBufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } try { logReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.File; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cmcc.aoi.util.FileUtil; public abstract class BaseRunnabler implements Runnable{ protected static Logger logger = LoggerFactory.getLogger(BaseRunnabler.class); String sourceFileName=""; // 读取文件路径 String hbaseTable=""; // hbase 表名 String [] hbaseFamily=null; // 行列簇名 String sourceFilePath ; String archivelogsPath ; boolean isDebug; public BaseRunnabler(String sourceFileName,String hbaseTable,String [] hbaseFamily ,String sourceFilePath,String archivelogsPath,boolean isDebug ){ this.hbaseTable=hbaseTable; this.hbaseFamily = hbaseFamily; this.sourceFileName=sourceFileName; this.sourceFilePath = sourceFilePath; this.archivelogsPath = archivelogsPath; this.isDebug = isDebug; } @Override public void run() { try{ IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily); hu.createTable(hbaseTable,hbaseFamily ); File file=new File(sourceFilePath); File[] tempFileList = file.listFiles(); Arrays.sort(tempFileList); for (File tempFile: tempFileList) { if (tempFile.isFile() && tempFile.getName().contains(sourceFileName +".") ) { try{ try{ processFile(hu); }catch (Exception e) { logger.error("readfile error ,must continue to protect to read other file "); continue; } removeFile(tempFile); }catch (Exception e2) { logger.error(" one file has an error ,other file must continue to do this task "); } } } }catch (Exception e) { e.printStackTrace(); } } public abstract void processFile(IOperator hu) throws Exception; private void removeFile(File file) { if (isDebug) { File path = new File(archivelogsPath); if (!path.exists()) { path.mkdirs(); } FileUtil.moveFile(file, new File(archivelogsPath,file.getName())); logger.info("remove file :" + file.getName()); }else{ file.delete(); logger.info("delete file :" + file.getName()); } } }
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
HBase RowKey 设计与协处理器运用 HBase 是一个基于 HDFS 的分布式、面向列的 NoSQL 数据库,具有高性能、可靠性和扩展性等特点。本文将详细介绍 HBase 的 RowKey 设计和协处理器运用。 HBase 的介绍 HBase 是一...
在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data...
Hbase思维导图之物理模型
Hbase协处理器详解,进阶篇
### HBase基本知识介绍 ...其独特的物理模型和逻辑模型设计使得HBase成为了处理大规模数据的理想选择之一。对于那些需要高并发读写、随机访问以及实时分析的应用场景而言,HBase是一个非常有价值的工具。
本文来自于csdn,介绍了Hadoop的原理,HBase的特点,HBase 的高并发和实时处理数据,数据模型,工作流程等。(一)HDFS主要是用于做什么的?HDFS(HadoopDistributedFileSystem)分布式文件管理系统、是Hadoop项目的...
HBase 和 Hadoop 数据块损坏处理 HBase 和 Hadoop 数据块损坏是非常常见的问题,可能会导致数据丢失、集群崩溃等严重后果。因此,了解如何处理 HBase 和 Hadoop 数据块损坏是非常重要的。本文将介绍 HBase 和 ...
Hbase思维导图
SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则是一个基于Java的现代Web框架,提供了...
- **MapReduce集成**:HBase支持MapReduce任务直接对存储在HBase中的数据进行处理,这大大简化了数据处理流程,提高了效率。 #### 四、HBase的关键特性 - **可扩展性**:HBase能够轻松地扩展到数百甚至数千台服务器...
- **第5章:通过Coprocessors扩展HBase**:Coprocessors是HBase中的一个重要特性,用于在服务器端执行用户定义的逻辑,从而减少网络传输开销并提高性能。本章将详细介绍Coprocessors的工作原理及其应用场景。 - **...
HBase是一种分布式、基于列族的NoSQL数据库,由Apache软件基金会开发并维护,是Hadoop生态系统中的重要组件。这份“HBase官方文档中文版”提供了...通过深入学习,我们可以更好地理解和利用HBase处理海量数据的挑战。
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
同时,它也借鉴了MapReduce的并行处理能力,利用Hadoop的MapReduce框架来处理HBase中的大数据量计算任务。 在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和...
HBase是一个高性能的开源NoSQL数据库,属于BigTable的开源实现,其分布式、多版本、面向列的特点使其适合存储和处理大量的非结构化数据。随着大数据技术的发展,HBase面临着更复杂多样的数据格式和业务需求,因此...
7. **HBase MapReduce**:MapReduce是Hadoop处理大数据的主要工具,HBase与MapReduce结合可以进行批量数据处理和分析。通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析...
HBase是Apache软件基金会下的一个开源项目,是一个分布式的、面向列的NoSQL数据库。...HBase特别适用于处理大量数据的实时读写操作,并且支持海量数据的快速访问,这使得它在大数据处理领域中得到了广泛的应用。
### HBase介绍与内部原理详解 ...通过深入理解HBase的逻辑视图和物理视图,以及掌握有效的表结构设计方法,我们可以更有效地利用HBase来处理大规模数据集,从而满足现代应用程序对高性能和高可用性的需求。