`
ganliang13
  • 浏览: 250780 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

阅读更多
package com.bfd.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseTest  {
	
	   public static Configuration configuration;

	/**
	 * a.配置host, 例如:bfdbjc2 192.168.11.72 
	 * b.参考:hbase-site.xml: 
	 * <property> <name>hbase.zookeeper.quorum</name>
	 *			  <value>bfdbjc2,bfdbjc3,bfdbjc4</value>
	 * </property> 
	 * <property> <name>hbase.rootdir</name> 
	 * 			  <value>hdfs://bfdbjc1:12000/hbase</value>
	 * </property>
	 */
	    static {  
	        configuration = HBaseConfiguration.create(); 
		    //String ZOOKEEPER_QUORAM =  "zk-1:2181,zk-2:2181,zk-3:2181,zk-4:2181,zk-5:2181,zk-6:2181";

	        configuration.set("hbase.zookeeper.quorum","bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181");  
	        configuration.set("hbase.rootdir", "hdfs://bfdbjc1:12000/hbase");  
	    }  
	    
	    
	    public static void main(String[] args) throws Exception {  
	       // createTable("gangliang13");   
	        //insertData("gangliang13");
	    	//deleteRow("gangliang13", "112");
	        //queryAllLimit("gangliang13",2); 
	    	//queryByRowKey("GidCross","0100020a000056c100004e374e267993");
	    	//queryByColumn("gangliang13","aaa1");
	    	//testLikeQuery("gangliang13","11");
	    	//queryByManyColumn("gangliang13");
	    	//queryAll("gangliang13");
	    	//System.out.println("input GidCross rawkey:"+args[0]);
	    	//queryByRowKey("GidCross", "0100020a000056c100004e374e267993");
	    	//testScanByTimeStamp("ganglia2",1367984937372L);

	    	deleteColumnData("t12","111","f1","",1371107067100L);
	    }  
	    
	    /**
	     * 如果存在要创建的表,那么先删除,再创建
	     * @param tableName
	     */
	    public static void createTable(String tableName) {  
	        System.out.println("start create table ......");  
	        try {  
	            HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);  
	            if (hBaseAdmin.tableExists(tableName)) {  
	                hBaseAdmin.disableTable(tableName);  
	                hBaseAdmin.deleteTable(tableName);  
	                System.out.println(tableName + " is exist,detele....");  
	            }  
	            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  
	            tableDescriptor.addFamily(new HColumnDescriptor("name"));  
	            tableDescriptor.addFamily(new HColumnDescriptor("age"));  
	            tableDescriptor.addFamily(new HColumnDescriptor("sex"));  
	            hBaseAdmin.createTable(tableDescriptor);  
	        } catch (MasterNotRunningException e) {  
	            e.printStackTrace();  
	        } catch (ZooKeeperConnectionException e) {  
	            e.printStackTrace();  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println("end create table ......");  
	    }  
	    
	    /**
	     * 插入数据
	     * @param tableName
	     */
	    public static void insertData(String tableName) {  
	        System.out.println("start insert data ......");  
	        HTablePool pool = new HTablePool(configuration, 1000);  
	        HTableInterface table =  pool.getTable(tableName);  
	        Put put = new Put("111".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
	        put.add("name".getBytes(), null, "aaa1".getBytes());// 本行数据的第一列  
	        put.add("age".getBytes(), null, "bbb1".getBytes());// 本行数据的第三列  
	        put.add("sex".getBytes(), null, "ccc1".getBytes());// 本行数据的第三列  
	        
	        Put put2 = new Put("222".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
	        put2.add("name".getBytes(), null, "aaa2".getBytes());// 本行数据的第一列  
	        put2.add("name".getBytes(), "nickname".getBytes(), "aaabbbbbnick".getBytes());
	        put2.add("age".getBytes(), null, "bbb2".getBytes());// 本行数据的第三列  
	        put2.add("sex".getBytes(), null, "ccc2".getBytes());// 本行数据的第三列  
	        try {  
	            table.put(put);  
	            table.put(put2);
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println("end insert data ......");  
	    }  
	    
	    /**
	     * 
	     * @param tableName 取前面limit条
	     */
	    public static void queryAllLimit(String tableName,int limit) {  
	        HTablePool pool = new HTablePool(configuration, 1000); 
	        HTableInterface table = pool.getTable(tableName);
	        try {  
	            Scan scan = new Scan();  
	            scan.setCaching(1);  
	            Filter filter = new PageFilter(limit); 
	            scan.setFilter(filter);  
	            ResultScanner scanner = table.getScanner(scan);// 执行扫描查找 int num = 0;  
	            Iterator<Result> res = scanner.iterator();// 返回查询遍历器  
	            while (res.hasNext()) {  
	                Result result = res.next();  
	                table.setWriteBufferSize(1024*1024*1);
	                KeyValue[] kv = result.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                }
	            }  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    public static void testScanByTimeStamp(String tablename,Long timestamp) throws IOException{
	            //Scan类常用方法说明
	            //指定需要的family或column ,如果没有调用任何addFamily或Column,会返回所有的columns; 
	            // scan.addFamily(); 
	            // scan.addColumn();
	            // scan.setMaxVersions(); //指定最大的版本个数。如果不带任何参数调用setMaxVersions,表示取所有的版本。如果不掉用setMaxVersions,只会取到最新的版本.
	            // scan.setTimeRange(); //指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取.
	            // scan.setTimeStamp(); //指定时间戳
	            // scan.setFilter(); //指定Filter来过滤掉不需要的信息
	            // scan.setStartRow(); //指定开始的行。如果不调用,则从表头开始;
	            // scan.setStopRow(); //指定结束的行(不含此行);
	            // scan.setBatch(); //指定最多返回的Cell数目。用于防止一行中有过多的数据,导致OutofMemory错误。
	            
	            //过滤器
	            //1、FilterList代表一个过滤器列表
	            //FilterList.Operator.MUST_PASS_ALL -->and
	            //FilterList.Operator.MUST_PASS_ONE -->or
	            //eg、FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
	            //2、SingleColumnValueFilter
	            //3、ColumnPrefixFilter用于指定列名前缀值相等
	            //4、MultipleColumnPrefixFilter和ColumnPrefixFilter行为差不多,但可以指定多个前缀。
	            //5、QualifierFilter是基于列名的过滤器。
	            //6、RowFilter
	            //7、RegexStringComparator是支持正则表达式的比较器。
	            //8、SubstringComparator用于检测一个子串是否存在于值中,大小写不敏感。
	    	 
	        	HTablePool pool = new HTablePool(configuration, 1000); 
		        HTableInterface table = pool.getTable(tablename);
	            Scan scan = new Scan();  

	            scan.setTimeStamp(timestamp);
	            //scan.setTimeRange(NumberUtils.toLong("1370336286283"), NumberUtils.toLong("1370336337163"));
	            //scan.setStartRow(Bytes.toBytes("quanzhou"));
	            //scan.setStopRow(Bytes.toBytes("xiamen"));
	            //scan.addFamily(Bytes.toBytes("info")); 
	            //scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("id"));
	            
	            //查询列镞为info,列id值为1的记录
	            //方法一(单个查询)
	            // Filter filter = new SingleColumnValueFilter(
	            //         Bytes.toBytes("info"), Bytes.toBytes("id"), CompareOp.EQUAL, Bytes.toBytes("1")); 
	            // scan.setFilter(filter);
	            
	            //方法二(组合查询)
	            //FilterList filterList=new FilterList();
	            //Filter filter = new SingleColumnValueFilter(
	            //    Bytes.toBytes("info"), Bytes.toBytes("id"), CompareOp.EQUAL, Bytes.toBytes("1"));
	            //filterList.addFilter(filter);
	            //scan.setFilter(filterList);
	            
	            ResultScanner rs = table.getScanner(scan);
	            
	            for (Result r : rs) {
	                for (KeyValue kv : r.raw()) {
	                    System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.", 
	                            Bytes.toString(kv.getRow()), 
	                            Bytes.toString(kv.getFamily()), 
	                            Bytes.toString(kv.getQualifier()), 
	                            Bytes.toString(kv.getValue()),
	                            kv.getTimestamp()));
	                }
	            }
	            
	            rs.close();
	    }
	    
	    /**
	     * 对行key进行like查询
	     * @param table
	     * @param likeQuery
	     * @throws Exception
	     */
	    public static void testLikeQuery(String table,String likeQuery) throws Exception {  
	        Scan scan = new Scan();  
	        RegexStringComparator comp = new RegexStringComparator(likeQuery);  
	        RowFilter filter = new RowFilter(CompareOp.EQUAL, comp);  
	        scan.setFilter(filter);  
	        scan.setCaching(200);  
	        scan.setCacheBlocks(false);  
	        HTable hTable = new HTable(configuration, table);  
	        ResultScanner scanner = hTable.getScanner(scan);  
	        for (Result r : scanner) {  
	        	KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
	                System.out.print(new String(kv[i].getRow()) + "  ");
                    System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                    System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                } 
            }  
	    }  
	    
	    /**
	     * 查询表所有行
	     * @param tableName
	     */
	    public static void queryAll(String tableName) {  
	        HTablePool pool = new HTablePool(configuration, 1000); 
	        HTableInterface table = pool.getTable(tableName);
	        try {  
	            ResultScanner rs = table.getScanner(new Scan());  
	            for (Result r : rs) {  
	            	KeyValue[] kv = r.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                } 
	            }  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /**
	     * 根据行记录值删除
	     * @param tablename
	     * @param rowkey
	     */
	    public static void deleteRow(String tablename, String rowkey)  {  
	        try {  
	            HTable table = new HTable(configuration, tablename);  
	            List<Delete> list = new ArrayList<Delete>();  
	            Delete d1 = new Delete(rowkey.getBytes());  
	            list.add(d1);  
	            table.delete(list);  
	            System.out.println("删除行成功!");  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    /**
	     * 根据时间戳删除记录,删除后再put 时间戳不能比原来值小,创建表时,指定版本记录数
	     * @param tblName
	     * @param rowKey
	     * @param family
	     * @param column
	     * @param timestamp
	     * @throws Exception
	     */
	    public static void deleteColumnData(String tblName,String rowKey,String family,String column,long timestamp) throws Exception{  
	        HTable htbl = new HTable(configuration,tblName);  
	        Delete dlt = new Delete(Bytes.toBytes(rowKey));  
	        dlt.deleteColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp);  
	        htbl.delete(dlt);  
	        htbl.flushCommits();  
	        htbl.close();  
	    }  
	    
	    /**
	     * 根据行记录索引
	     * @param tableName
	     * @param row
	     * @throws IOException
	     */
	    public static void queryByRowKey(String tableName,String row) throws IOException {  
	    	HTable table = new HTable(configuration, tableName); 
	    	System.err.println(table.getRegionLocation(row.getBytes()));
	        try {  
	            Get scan = new Get(row.getBytes());// 根据rowkey查询  
	            Result r = table.get(scan);  
	        	KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
	               // System.out.print(new String(kv[i].getRow()) + "  ");
                    //System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                   // System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                }
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /** 
	     * 按列查询,查询多条记录 
	     * @param tableName 
	     */  
	    public static void queryByColumn(String tableName,String columnValue) {  
	        try {  
	            HTable table = new HTable(configuration,tableName);  
	            Filter filter = new SingleColumnValueFilter(Bytes.toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes(columnValue)); // 当列column1的值为aaa时进行查询  
	            Scan s = new Scan();  
	            s.setFilter(filter);  
	            ResultScanner rs = table.getScanner(s);  
	            for (Result r : rs) {  
	            	KeyValue[] kv = r.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                }
	            }  
	        } catch (Exception e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /** 
	     * 按多列查询,查询多条记录 
	     * @param tableName 
	     */  
	    public static void queryByManyColumn(String tableName) {  
	    	 try {  
	             HTable table =  new HTable(configuration,tableName); 
	             List<Filter> filters = new ArrayList<Filter>();  
	   
	             Filter filter1 = new SingleColumnValueFilter(Bytes  
	                     .toBytes("age"), null, CompareOp.EQUAL, Bytes.toBytes("bbb1"));  
	             filters.add(filter1);  
	   
	             Filter filter2 = new SingleColumnValueFilter(Bytes  
	                     .toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes("aaa1")); 
	             filters.add(filter2);  
	   
	             FilterList filterList1 = new FilterList(filters);  
	   
	             Scan scan = new Scan();  
	             scan.setFilter(filterList1);  
	             ResultScanner rs = table.getScanner(scan);  
	             for (Result r : rs) {  
            		KeyValue[] kv = r.raw();
                    for (int i = 0; i < kv.length; i++) {
    	                System.out.print(new String(kv[i].getRow()) + "  ");
                        System.out.print(new String(kv[i].getFamily()) + ":");
                        System.out.print(new String(kv[i].getQualifier()) + "  ");
                        System.out.print(kv[i].getTimestamp() + "  ");
                        System.out.println(new String(kv[i].getValue()));
                    }  
	             }  
	             rs.close();  
	   
	         } catch (Exception e) {  
	             e.printStackTrace();  
	         }  
	    }  
}

 

 

分享到:
评论

相关推荐

    Java操作Hbase进行建表、删表以及对数据进行增删改查

    Java 操作 Hbase 进行建表、删表以及对数据进行增删改查 一、Hbase 简介 Hbase 是一个开源的、分布式的、基于 column-family 的 NoSQL 数据库。它是基于 Hadoop 的,使用 HDFS 作为其存储层。Hbase 提供了高性能、...

    eclipse构建HBase开发环境并运行实例对Hbase建表增删改查

    下面将详细介绍如何在Eclipse中搭建HBase开发环境,并对HBase进行建表、增、删、改、查等操作。 一、环境准备 首先需要确定HBase和Hadoop的版本是否一致,为了避免版本不兼容问题。在本例中,我们使用的HBase版本...

    java调用hbase实现数据库的增删改查

    java调用hbase数据库,完成对hbase常用api的封装和对hbase数据库的增删改查等操作,经测试绝对可用。

    java链接并对hbase进行增删改查操作的实例代码(含批量插入,范围查询等,并包含所需jar包)

    Java链接HBase进行增删改查操作是大数据领域常见的任务,尤其在处理大规模分布式存储时。HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的NoSQL数据库,提供了高性能、高可扩展性的数据存储解决方案。这...

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    HBase的设计目标是对超大型表进行随机、实时读写操作。而HDFS则是Hadoop的核心组件,作为一个分布式文件系统,它能够跨多台服务器存储和处理大量数据。 在Java中操作HBase,我们需要使用HBase的Java客户端API。首先...

    java操作Hbase之实现表的创建删除源码

    在Java中操作HBase是一种常见的任务,特别是在大数据处理和存储的场景中。HBase是一个分布式的、基于列族的NoSQL数据库,它构建在Hadoop之上,提供了高性能、低延迟的数据存储和访问能力。本教程将详细介绍如何使用...

    HBase基本操作 Java代码

    HBase基本操作 增删改查 java代码 要使用须导入对应的jar包

    Hbase笔记 —— 利用JavaAPI的方式操作Hbase数据库(往hbase的表中批量插入数据).pdf

    在本文档中,我们将深入探讨如何使用Java API与HBase数据库进行交互,特别是关于如何创建表、修改表结构以及批量插入数据。HBase是Apache的一个分布式、可扩展的大数据存储系统,它基于谷歌的Bigtable设计,适用于...

    hbase用于查询客户端工具

    通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析任务。 在实际使用中,选择哪个客户端工具取决于具体的需求和使用场景。例如,如果需要快速原型开发或简单的数据操作,...

    hbase java api 访问 查询、分页

    在HBase这个分布式列式数据库中,Java API是开发者常用的一种接口来操作HBase,包括创建表、插入数据、查询数据以及实现分页等操作。本文将深入探讨如何使用HBase Java API进行数据访问和分页查询。 首先,我们要...

    java-hbase开发包

    1. **HBase客户端API**:这是Java-HBase开发包的核心,提供了一组Java接口和类,用于连接到HBase集群,创建表,插入、查询和更新数据。例如,`HTableInterface` 和 `HBaseAdmin` 接口,以及 `Put`、`Get` 和 `Scan` ...

    hbase数据可视化系统

    4. 表管理:支持HBase的建表和删除操作,这需要调用HBase的Admin API,完成表的创建、删除、修改等管理任务。 五、安全性与优化 1. 认证授权:为了保护数据安全,需要在HBase和SpringBoot应用中实现认证和授权机制...

    java操作hbase,增删查改

    java操作hbase,增删查改 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhangjingao/article/details/104390560 例子功能说明 不说废话,直接开始,撸代码。hbase的介绍查看其他博...

    java链接并对hbase进行增删改查操作的实例代码(含批量插入,范围查询等,并包含所需jar包).zip

    在Java中操作HBase数据库,通常需要通过HBase的Java API来实现数据的增、删、改、查等基本操作。HBase是一个分布式、版本化的NoSQL数据库,它基于Google的Bigtable设计,并且构建在Hadoop之上。下面将详细解释如何...

    hbase分页查询实现.pdf

    HBase分页查询实现 HBase作为一个NoSQL数据库,具有高性能、...本文讲解了如何使用Java语言实现HBase的分页查询,并介绍了HBase的配置、HTablePool、获取HBase表、字节数组的转换、Filter和ResultScanner等知识点。

    java代码使用thrift2操作hbase示例

    在本文中,我们将深入探讨如何使用Java通过Thrift2接口操作HBase数据库。HBase是一个分布式、可扩展的大数据存储系统,它构建于Hadoop之上,支持实时读写。Thrift是一个轻量级的框架,用于跨语言服务开发,允许不同...

    基于Hbase的大数据查询优化

    因为面向列的特点,Hbase只能单单地以rowkey为主键作查询,而无法对表进行多维查询和join操作,并且查询通常都是全表扫描,耗费资源较大,查询效率较低。类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了...

    java代码将mysql表数据导入HBase表

    本文将详细介绍如何使用Java代码实现这一过程,包括样例MySQL表和数据,以及HBase表的创建。 首先,我们需要了解MySQL和HBase的基本概念。MySQL是一种关系型数据库管理系统,它基于ACID(原子性、一致性、隔离性和...

    利用Hbase相关的API,包括对HBase的增删改查等操作

    在本文中,我们将深入探讨如何使用Java API进行HBase的操作,包括增、删、改、查等基本功能。HBase是一个分布式、版本化的NoSQL数据库,它构建在Hadoop文件系统(HDFS)之上,提供了高可靠性、高性能、可伸缩的数据...

    java 通过thrift-0.9.1读取hbase表数据

    - 执行操作:通过客户端对象的方法,如`get`, `scan`等,对HBase表进行读取操作。 - 处理结果:解析返回的数据,通常是`TResult`对象,从中获取HBase行和列族的信息。 - 关闭资源:确保在操作完成后关闭连接,...

Global site tag (gtag.js) - Google Analytics