`
knight_black_bob
  • 浏览: 853858 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hbase java 常见操作

阅读更多

 

 

 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.HashMap;
import java.util.List;
import java.util.Map; 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import org.mortbay.log.Log; 




public class HbaseUtil implements IOperator
{
	private static Configuration conf = null; 
	private static String configFile = "hbase-site-test_bj.xml";
	private   Map<String, String> aMap = null;
	private   String mapTable = null; 
	private   String[] tableFamily = null;  
	
	public HbaseUtil() { 
		
	}
	 
	public HbaseUtil( String mapAppTable , String[] appTableFamily ) { 
		this.aMap =  new HashMap<String, String>();
		this.mapTable = mapAppTable; 
		this.tableFamily = appTableFamily;  
		
	}

	static
	{
		Configuration HBASE_CONFIG = new Configuration();
		HBASE_CONFIG.addResource(configFile);
		conf = HBaseConfiguration.create(HBASE_CONFIG);
		System.err.println(conf.get("hbase.zookeeper.property.dataDir"));
	}

	/**
	 * 创建表操作
	 * 
	 * @throws IOException
	 */
	public void createTable(String tablename, String[] cfs) throws IOException
	{
		HBaseAdmin admin  = new HBaseAdmin(conf);
			if (admin.tableExists(tablename))
			{
				System.out.println("表已经存在!");
			}
			else
			{
				HTableDescriptor tableDesc = new HTableDescriptor(tablename);
				for (int i = 0; i < cfs.length; i++)
				{
					tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
				}
				admin.createTable(tableDesc);
				System.out.println("表创建成功!");
			}
		  admin.close(); 
	}

	/**
	 * 删除表操作
	 * 
	 * @param tablename
	 * @throws IOException
	 */
	public void deleteTable(String tablename) throws IOException
	{
		HBaseAdmin admin = new HBaseAdmin(conf);
			if (!admin.tableExists(tablename))
			{
				System.out.println("table(" + tablename + ") not exists, won't delete");
			}
			else
			{
				admin.disableTable(tablename);
				admin.deleteTable(tablename);
				System.out.println("table(" + tablename + ") delete success");
			} 
		 admin.close(); 
	}

	public void insertRow() throws IOException
	{
		HTable table = new HTable(conf, "test");
		Put put = new Put(Bytes.toBytes("row3"));
		put.add(Bytes.toBytes("cf"), Bytes.toBytes("444"), Bytes.toBytes("value444"));
		table.put(put);
		table.close();
	}

	/**
	 * 插入一行记录
	 * 
	 * @param tablename
	 * @param cfs
	 * @throws IOException 
	 */
	public void writeRow(String tablename, String[] cfs) throws IOException
	{
		HTable    table = new HTable(conf, tablename);
		Put put = new Put(Bytes.toBytes(cfs[0])); 
		put.add(Bytes.toBytes(cfs[1]), Bytes.toBytes(cfs[2]), Bytes.toBytes(cfs[3]));
		table.put(put);
		System.out.println("写入成功!"); 
		table.close();
	}

	// 写多条记录
	public void writeMultRow(String tablename, String[][] cfs) throws IOException
	{
		List<Put> lists = new ArrayList<Put>();
		HTable table = new HTable(conf, tablename);
		for (int i = 0; i < cfs.length; i++)
		{
			Put put = new Put(Bytes.toBytes(cfs[i][0]));
			put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3]));
			lists.add(put);
		}
		table.put(lists);
		 table.close();
		 
	}

	// 写多条记录
	public void writeMultRowByDevice(HTable table, String tablename, String[][] cfs) throws IOException
	{
		 
		List<Put> lists = new ArrayList<Put>();
		// HTable table = new HTable(conf, tablename);
		for (int i = 0; i < cfs.length; i++)
		{
			Put put = new Put(Bytes.toBytes(cfs[i][0]));
			Log.info("writeMultRowByDevice  "+Bytes.toBytes(cfs[i][1])+"="+Bytes.toBytes(cfs[i][2])+"="+Bytes.toBytes(cfs[i][3]));
			put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3]));
			lists.add(put);
		}
		Log.info("push start");
		table.put(lists);
		Log.info("push end");
		 
	}

	/**
	 * 删除一行记录
	 * 
	 * @param tablename
	 * @param rowkey
	 * @throws IOException
	 */
	public void deleteRow(String tablename, String rowkey) throws IOException
	{
		HTable table = new HTable(conf, tablename);
		List<Delete> list = new ArrayList<Delete>();
		Delete d1 = new Delete(rowkey.getBytes());
		list.add(d1);
		table.delete(list);
		System.out.println("delete row(" + rowkey + ") sucess");
		table.close();
	}

	/**
	 * 查找一行记录
	 * 
	 * @param tablename
	 * @param rowkey
	 */
	public   void selectRow(String tablename, String rowKey) throws IOException
	{
		HTable table = new HTable(conf, tablename);
		Get g = new Get(rowKey.getBytes());
		// g.addColumn(Bytes.toBytes("cf:1"));
		Result rs = table.get(g);
		for (KeyValue kv : rs.raw())
		{
			System.out.print(new String(kv.getRow()) + "  ");
			System.out.print(new String(kv.getFamily()) + ":");
			System.out.print(new String(kv.getQualifier()) + "  ");
			System.out.print(kv.getTimestamp() + "  ");
			System.out.println(new String(kv.getValue()));
		}
	   table.close();
	   
	}

	/**
	 * 查询表中所有行
	 * 
	 * @param tablename
	 * @throws IOException 
	 */
	public void scaner(String tablename) throws IOException
	{
		 
			HTable table = new HTable(conf, tablename);
			Scan s = new Scan();
			ResultScanner rs = table.getScanner(s);
			for (Result r : rs)
			{
				KeyValue[] kv = r.raw();
				// for (int i = 0; i < kv.length; i++) {
				/*
				 * System.out.print(new String(kv[i].getRow()) + "  ");
				 * System.out.print(new String(kv[i].getFamily()) + ":");
				 * System.out.print(new String(kv[i].getQualifier()) + "  ");
				 * System.out.print(kv[i].getTimestamp() + "  ");
				 * System.out.println(new String(kv[i].getValue()));
				 */
				System.out.println(new String(kv[1].getValue()) + "==" + new String(kv[0].getValue()));
				// }
			}
		 rs.close();
		 table.close(); 
	}


	public void scanByTimestamp(String tablename, long maxtime) throws IOException
	{ 
			HTable table = new HTable(conf, tablename);
			Scan s = new Scan();
			// TODO 存放所有的结果
			FilterList allInfo = new FilterList();
			// allInfo.addFilter();
			s.setFilter(allInfo);
			
	}

	public   Map<String, String> getMap()
	{
		Map<String, String> map = new HashMap<String, String>();
		try
		{
			HTable table = new HTable(conf, mapTable);
			Scan s = new Scan();
			ResultScanner rs = table.getScanner(s);
			for (Result r : rs)
			{
				KeyValue[] kv = r.raw();
				map.put(new String(kv[0].getRow()), new String(kv[0].getValue()));
			}
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
		return map;
	}
 

	

}

 

 

import java.io.IOException;
import java.util.Map;

public interface IOperator {

	public void createTable(String tablename, String[] cfs) throws IOException ;
	public void deleteTable(String tablename) throws IOException;
	public void insertRow() throws IOException;
	public void writeRow(String tablename, String[] cfs) throws IOException;
	public void writeMultRow(String tablename, String[][] cfs) throws IOException;
	public void deleteRow(String tablename, String rowkey) throws IOException;
	public void selectRow(String tablename, String rowKey) throws IOException;
	public void scaner(String tablename) throws IOException;
	public void scanByTimestamp(String tablename, long maxtime) throws IOException;
	public Map<String, String> getMap() throws IOException; 
}

 

public abstract class BaseRunnabler implements Runnable{

	String sourceFile=""; // 读取文件路径
	String numberFile="";     
	String hbaseTable="";  // hbase  表名
	String [] hbaseFamily=null;    // 行列簇名
	String keywords ="";
	
	public BaseRunnabler(String sourceFile,String hbaseTable,String [] hbaseFamily,String numberFile ,String keywords ){
		this.sourceFile=sourceFile;
		this.numberFile=numberFile;
		this.hbaseTable=hbaseTable;
		this.hbaseFamily = hbaseFamily;
		this.keywords = keywords;
	}	
	
	@Override
	public void run() {
		try{
		IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily);
    	hu.createTable(hbaseTable,hbaseFamily ); 
		processFile(hu );
		}catch (Exception e) {
			e.printStackTrace();
		}

	}

	public abstract void processFile(IOperator hu) throws Exception; 	 
}

 

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader; 
import java.io.IOException;
import java.util.Date;  
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
import Model.Device; 
import com.alibaba.fastjson.JSON; 

public class DeviceReadThread extends BaseRunnabler {

	  static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class);
	
	public DeviceReadThread(String sourceFile, String hbaseTable,
			String[] hbaseFamily, String numberFile, String keywords) {
		super(sourceFile, hbaseTable, hbaseFamily, numberFile, keywords);
	}

	@Override
	public void processFile(IOperator hu) {
		FileReader logReader = null;
		BufferedReader logBufferedReader = null;
		try { 
			File logFile = new File(sourceFile);
			logReader = new FileReader(logFile);
			logBufferedReader = new BufferedReader(logReader);
			String temp = logBufferedReader.readLine();
			//logger.error(" temp is  " + temp );
			while ( temp  != null) {
				Device device = JSON.parseObject(temp, Device.class); 
				//logger.error(" device is null ? " + ( device == null ) );
				
				String[][] s = new String[][] {
						{ device.getLid(), hbaseFamily[0], "lid" , device.getLid() } ,
						{ device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, 
						{ device.getLid(), hbaseFamily[2], "os", "2" },
						{ device.getLid(), hbaseFamily[2], "osv", "3" } };
				hu.writeMultRow(hbaseTable, s);
				logger.info(" hbase util end "   );
				temp = logBufferedReader.readLine();
			}
		} catch (Exception e) {
			logger.error(" DeviceReadThread error "   );
			e.printStackTrace();
		} finally { 
			try {
				logBufferedReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
			try {
				logReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
		}
	}

}

 

 

 

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Properties;

public class HbaseStarter {

	public static void main(String[] args) throws  Exception {
		Properties properties=new Properties();
		//String config = "D:/work/util/aoi-hbase/trunk/src/main/resources/testua.properties";
		String config = "/home/aoi/aoi-hbase/conf/config.properties"; 
		FileInputStream fis = new FileInputStream(config);
		properties.load(fis);
		fis.close(); 
		
		String sourceFile=properties.getProperty("sourceFile")+"device2.log"+","+properties.getProperty("sourceFile")+"applist.log";
		String hbaseTable = properties.getProperty("hbaseTable");
		String hbaseFamily = properties.getProperty("hbaseFamily");
		String numFile=properties.getProperty("sourceFile")+"num.txt";
		
		
		String[] sourceFileName=sourceFile.split(",");  // file 
		String[] hbaseTableName=hbaseTable.split(",");  // table
		String[] hbaseFamilyName=hbaseFamily.split("&");     // family  
		
		
		DeviceReadThread device = new DeviceReadThread(sourceFileName[0],hbaseTableName[0],hbaseFamilyName[0].split(","),"","");
		new Thread(device).start();
		
		AppReadThread app = new AppReadThread(sourceFileName[1],hbaseTableName[1],hbaseFamilyName[1].split(","),numFile,"");
		new Thread(app).start();
		
	}
}
 

 

 

config.properties
sourceFile=//data//logs//
hbaseTable=device-ua,app-ua
hbaseFamily="device","history","Description"&"app", "history", "Description"
 

 

hbase-site-test_bj.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
        <property>
                <name>hbase.rootdir</name>
                <value>hdfs://xxx.com:9000/hbase</value>
        </property>
        <property>
                <name>hbase.cluster.distributed</name>
                <value>true</value>
        </property>
        <property>
                <name>hfile.block.cache.size</name>
                <value>0.4</value>
        </property>
        <property>
                <name>hbase.regionserver.handler.count</name>
                <value>150</value>
        </property>

        <property>
                <name>hbase.zookeeper.property.dataDir</name>
                <value>/var/lib/zookeeper</value>
        </property>

        <property>
                <name>hbase.zookeeper.property.clientPort</name>
                <value>2181</value>
        </property>
        <property>
                <name>hbase.zookeeper.quorum</name>
                <value>xxx.com,xxx.com,rabbitmq1</value>
        </property>

        <property>
                <name>zookeeper.session.timeout</name>
                <value>60000</value>
        </property>

        <property>
                <name>hbase.master.maxclockskew</name>
                <value>180000</value>
                <description>Time difference of regionserver from master</description>
        </property>
        <property>
                <name>hbase.hregion.memstore.flush.size</name>
                <value>512</value>
        </property>
        <property>
                <name>hbase.zookeeper.property.maxClientCnxns</name>
                <value>1000</value>
        </property>
        <property>
                <name>hbase.hregion.max.filesize</name>
                <value>1024</value>
        </property>
</configuration>

 

 

 

device2.log



 

 

 

 结果:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 
 谢谢您的赞助,我会做的更好!

 

 

  • 大小: 15.2 KB
  • 大小: 2.2 KB
分享到:
评论

相关推荐

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...

    java从本地读文件并上传Hbase

    在Java编程环境中,将本地文件读取并上传到HBase是一项常见的任务,特别是在大数据处理和存储的场景下。HBase是一个分布式、版本化的NoSQL数据库,基于Apache Hadoop,适用于大规模数据存储。以下是一个详细的过程,...

    HbaseTemplate 操作hbase

    通过HbaseTemplate,我们可以执行常见的CRUD(创建、读取、更新和删除)操作以及更复杂的查询。 1. **HbaseTemplate的初始化**:在使用HbaseTemplate之前,我们需要在Spring配置文件中配置HBase的相关连接信息,如...

    java链接及操作hbase实例代码

    在Java编程环境中,链接并操作HBase是一种常见的任务,特别是在大数据处理和分布式存储的应用场景下。HBase是一个基于Google Bigtable设计的开源NoSQL数据库,它运行在Hadoop之上,提供高并发、低延迟的数据存储服务...

    HBase基本数据操作详解.docx

    ### HBase基本数据操作详解 #### 一、命名空间 Namespace **1.1 命名空间概述** 在HBase中,命名空间(namespace)的概念类似于传统数据库中的模式(schema),它提供了一种对表进行逻辑分组的方式。这种分组不仅有助...

    封装hbase以便java调用

    总的来说,封装HBase以便Java调用是一个常见的开发任务,它涉及到对HBase API的理解,接口设计,以及对Java编程和项目管理的综合运用。封装后的库不仅提高了代码的可读性和可维护性,也使得应用与数据存储的交互更加...

    java代码将mysql表数据导入HBase表

    这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop....

    java操作Hbase之实现表的创建删除源码

    在Java中操作HBase是一种常见的任务,特别是在大数据处理和存储的场景中。HBase是一个分布式的、基于列族的NoSQL数据库,它构建在Hadoop之上,提供了高性能、低延迟的数据存储和访问能力。本教程将详细介绍如何使用...

    java访问Hbase数据库Demo

    Java访问Hbase数据库是大数据处理中的常见操作,尤其在分布式存储和实时数据分析场景下。HBase,一个基于Google Bigtable模型的开源非关系型数据库,是Apache Hadoop生态系统的一部分,提供高并发、低延迟的数据存储...

    java 连接远程hbase 数据库

    Java连接远程HBase数据库是一项常见的任务,特别是在大数据处理和分布式存储的应用场景中。HBase是一个构建在Hadoop文件系统(HDFS)之上的分布式、版本化的NoSQL数据库,它提供了高性能、低延迟的数据访问能力。...

    java 通过thrift-0.9.1读取hbase表数据

    在IT领域,尤其是在大数据处理和分布式系统中,Java、Thrift和HBase是常见的技术组合。本主题将详细探讨如何利用Java通过Thrift-0.9.1版本来读取HBase表数据。 HBase是一个基于Google Bigtable设计的开源NoSQL...

    hbase hadoop chm java 帮助文档

    【标题】"hbase hadoop chm java 帮助文档"揭示了这是一份针对Java程序员在Hadoop和HBase开发中使用的CHM(Windows帮助文档)工具集。CHM文件是一种常见的技术文档格式,它将多个HTML页面、图像和其他资源打包成一个...

    java api 访问hbase demo(Maven)

    在Java API中访问HBase是大数据处理中常见的一项任务,HBase作为一个分布式、列式存储的NoSQL数据库,常用于海量数据的实时读写。在这个Java API访问HBase的Maven项目中,我们将探讨如何配置项目,引入依赖,以及...

    hbase_shell操作命令

    它提供了多种命令,帮助用户执行常见的数据库操作,如创建、读取、更新和删除数据。以下是一些重要的HBase Shell操作命令及其详细说明: 1. **查询服务器状态** `status` 命令用于查看集群的运行状态,包括活动...

    hbase常见错误整理3年运维经验整理

    ### HBase常见错误及解决方案:3年运维经验总结 #### 一、配置第三方依赖包HADOOP_CLASSPATH和HBase问题 **问题描述** 在本地开发HBase程序时,虽然本地编译能够通过(因为在IDE中已经导入了必要的jar包),但在...

    java链接并对hbase进行增删改查操作的实例代码(含批量插入,范围查询等,并包含所需jar包)

    Java链接HBase进行增删改查操作是大数据领域常见的任务,尤其在处理大规模分布式存储时。HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的NoSQL数据库,提供了高性能、高可扩展性的数据存储解决方案。这...

    基于springboot集成hbase过程解析

    SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则是一个基于Java的现代Web框架,提供了...

    使用Java对Hbase操作总结及示例代码

    使用Java对HBase进行操作是大数据处理中常见的场景,本文将总结使用Java对HBase进行操作的方法,并提供详细的示例代码。 一、Configuration 在使用Java API时,Client端需要知道HBase的配置环境,如存储地址,...

    hbase访问方式之javaapi共3页.pdf.zip

    对于开发人员来说,通过Java API与HBase进行交互是常见的操作方式。本指南将深入探讨如何利用Java API来访问和操作HBase。 一、HBase简介 HBase是建立在Hadoop文件系统(HDFS)之上,提供实时读写操作的非关系型...

    mysql中数据经处理导入到hbase中

    这个过程涉及了SQL查询、数据转换、数据预处理、JSON解析、HBase表设计以及客户端API的使用,是大数据领域常见的数据迁移任务。在实际操作中,可能还需要考虑到性能优化、错误处理和数据一致性等问题,以确保整个...

Global site tag (gtag.js) - Google Analytics