`
liyonghui160com
  • 浏览: 778524 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HbaseUtils,HBase Java Api

阅读更多

 

 

 

package com.xx.xx.service.spark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.log4j.Logger;

import com.xx.xx.common.util.PropertiesUtils;

public class HbaseUtils {
	private final static Logger LOG = Logger.getLogger(HbaseUtils.class);
	private static final String zookeeperQuorum = (String) PropertiesUtils
			.getJddpConf().get("zookeeperQuorum");
	private static HTablePool tablePool;
	private final static int BATCH_SIZE = 1000;
	public final static String QUERY_REPORT_TABLE = "query_report";
	public final static String QUERY_STATUS_TABLE = "query_status";
	public final static String CONSOLE_LOG_TABLE = "console_log";
	public final static String TRY_RUN_RESULT_TABLE = "try_run_result";
	public final static String COLUMN_FAMILY = "cf";
	public final static String QUALIFIER = "c";

	static {
		Configuration conf = new Configuration();
		LOG.info("zookeeperQuorum: " + zookeeperQuorum);
		conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
		Configuration hbaseConf = HBaseConfiguration.create(conf);
		tablePool = new HTablePool(hbaseConf, 30);
	}

	private List<Result> getResult(String hbaseTableName, String columnFamily,
			String qualifier, String keyPrefix) {
		List<Result> result = new ArrayList<Result>();
		HTableInterface table = null;
		try {
			table = tablePool.getTable(hbaseTableName);
			Scan scan = new Scan();
			List<Filter> filters = new ArrayList<Filter>();
			Filter prifixFilter = new PrefixFilter(keyPrefix.getBytes());
			filters.add(prifixFilter);
			Filter allFilter = new FilterList(Operator.MUST_PASS_ALL, filters);
			scan.setFilter(allFilter);

			ResultScanner rs = table.getScanner(scan);
			try {
				for (Result r : rs) {
					result.add(r);
				}
			} finally {
				rs.close();
			}
		} catch (Throwable e) {
			LOG.error("ERROR: get table: " + hbaseTableName + ", prefix key:"
					+ keyPrefix, e);
		} finally {
			if (table != null)
				try {
					table.close();
				} catch (IOException e) {
					LOG.error("close table error, get table: " + hbaseTableName
							+ ", prefix key:" + keyPrefix, e);
				}
		}
		return result;
	}

	private Map<String, String> read(String hbaseTableName, String columnFamily,
			String qualifier, String keyPrefix) {
		Map<String, String> result = new HashMap<String, String>();
		List<Result> resultList = getResult(hbaseTableName, columnFamily,
				qualifier, keyPrefix);
		for (Result r : resultList) {
			KeyValue[] kv = r.raw();
			for (int i = 0; i < kv.length; i++) {
				result.put(new String(kv[i].getRow()), new String(kv[i].getValue()));
			}
		}
		return result;
	}

	private void write(String hbaseTableName, String columnFamily,
			String qualifier, String keyPrefix, Collection<String> contents) {
		HTableInterface table = null;
		try {
			table = tablePool.getTable(hbaseTableName);
			List<Put> putList = new ArrayList<Put>();
			int idx = 0;
			for (String line : contents) {
				String rowKey = keyPrefix + idx;
				if (contents.size() == 1)
					rowKey = keyPrefix;
				idx++;
				Put put = new Put(rowKey.getBytes());
				put.add(columnFamily.getBytes(), qualifier.getBytes(),
						line.getBytes());
				putList.add(put);
				if (putList.size() >= BATCH_SIZE) {
					table.put(putList);
					table.flushCommits();
					putList.clear();
				}
			}
			table.put(putList);
			table.flushCommits();
		} catch (Throwable e) {
			LOG.error("ERROR: write into table: " + hbaseTableName + ", prefix key:"
					+ keyPrefix, e);
		} finally {
			if (table != null) {
				try {
					table.close();
				} catch (IOException e) {
					LOG.error("close table error, write into table: "
							+ hbaseTableName + ", prefix key:" + keyPrefix, e);
				}
			}
		}
	}

	private void delete(String hbaseTableName, String columnFamily,
			String qualifier, Collection<String> rowKeys) {
		HTableInterface table = null;
		try {
			table = tablePool.getTable(hbaseTableName);
			List<Delete> deleteList = new ArrayList<Delete>();
			int idx = 0;
			for (String r : rowKeys) {
				Delete del = new Delete(r.getBytes());
				deleteList.add(del);
				if (deleteList.size() >= BATCH_SIZE) {
					table.delete(deleteList);
					table.flushCommits();
					deleteList.clear();
				}
				idx++;
			}
			table.delete(deleteList);
			table.flushCommits();
			LOG.info("deleted " + idx + " rows from HBase table. "
					+ hbaseTableName);
		} catch (Throwable e) {
			LOG.error("delete from table: " + hbaseTableName, e);
		} finally {
			if (table != null) {
				try {
					table.close();
				} catch (IOException e) {
					LOG.error("close table error, delete from table: "
							+ hbaseTableName, e);
				}
			}
		}
	}
	
	public void writeQueryReport(String queryId, String reportJson) {
		String keyPrefix = queryId;
		write(QUERY_REPORT_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix,
				Arrays.asList(reportJson));
	}

	private String getFirst(Collection<String> list) {
		for (String s : list) {
			return s;
		}
		return null;
	}
	
	public String getQueryReport(String queryId) {
		String keyPrefix = queryId;
		Collection<String> data = read(QUERY_REPORT_TABLE, COLUMN_FAMILY, QUALIFIER,
				keyPrefix).values();
		if (data == null || data.size() == 0)
			return null;
		else
			return getFirst(data);
	}

	public String getAsyncQueryStatus(String queryId) {
		String keyPrefix = queryId;
		Collection<String> data = read(QUERY_STATUS_TABLE, COLUMN_FAMILY, QUALIFIER,
				keyPrefix).values();
		if (data == null || data.size() == 0)
			return null;
		else
			return getFirst(data);
	}

	public void setAsyncQueryStatus(String queryId, String status) {
		String keyPrefix = queryId;
		write(QUERY_STATUS_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix,
				Arrays.asList(status));
	}

	public Collection<String> getConsoleLog(String shellJobId) {
		String keyPrefix = "tryRun_" + shellJobId;
		Map<String,String> map = read(CONSOLE_LOG_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix);
		delete(CONSOLE_LOG_TABLE, COLUMN_FAMILY, QUALIFIER, map.keySet());
		return map.values();
	}

	public Collection<String> getQueryResult(String queryId) {
		String keyPrefix = queryId;
		Map<String,String> map = read(TRY_RUN_RESULT_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix);
		delete(TRY_RUN_RESULT_TABLE, COLUMN_FAMILY, QUALIFIER, map.keySet());
		return map.values();
	}

}

 

分享到:
评论

相关推荐

    HBase Java API类介绍

    ### HBase Java API类介绍 #### 一、概述 HBase是一个分布式的、面向列的开源数据库,基于Google的Bigtable论文实现。它适合于非结构化数据存储,并且能够实时处理PB级别的数据。HBase提供了Java API供开发者使用...

    hbase java api 访问 增加修改删除(一)

    在本文中,我们将深入探讨如何使用HBase的Java API进行数据的增加、修改和删除操作。HBase是一个基于Google Bigtable设计的开源分布式数据库,它属于Apache Hadoop生态系统的一部分,适用于处理大规模数据存储。通过...

    hbase java api 访问 查询、分页

    在HBase这个分布式列式数据库中,Java API是开发者常用的一种接口来操作HBase,包括创建表、插入数据、查询数据以及实现分页等操作。本文将深入探讨如何使用HBase Java API进行数据访问和分页查询。 首先,我们要...

    Hbase Java API

    HBase Java API HBase 是 Hadoop 的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。HBase 的整体结构主要包括 HBase Master、HRegion 服务器和 HRegion Server...

    hbase java api 所需最精简 jar

    "hbase java api 所需最精简 jar"这个标题意味着我们将探讨的是为了在Java环境中最小化依赖,但仍能实现基本HBase操作所需的JAR文件。 首先,我们需要理解HBase Java API的核心组件。HBase的Java客户端API提供了一...

    Hbase调用JavaAPI实现批量导入操作

    这篇博客“Hbase调用Java API实现批量导入操作”聚焦于如何利用Java编程语言高效地向HBase中批量导入数据。在这个过程中,我们将探讨以下几个关键知识点: 1. **HBase架构**: HBase是基于列族的存储模型,数据被...

    Hadoop+HBase+Java API

    标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...

    HBase JavaAPI开发

    使用JavaAPI实现HBase的ddl(创建表、删除表、修改表(添加列族等))、dml(添加数据、删除数据)、dql(查询数据(get、scan))等操作 除此之外还包含一些其他操作:命名空间的应用、快照的应用等 对应(《HBase...

    HBase Java API操作数据库示例代码-HBaseDemo.rar

    HBase Java API操作数据库示例代码-HBaseDemo.rar HBase Java API操作数据库示例代码-HBaseDemo.rar HBase Java API操作数据库示例代码-HBaseDemo.rar

    hbase资料api

    HBase数据查询API HBase是一种分布式的、面向列的NoSQL数据库,主要应用于存储大量的半结构化数据。HBase提供了多种查询方式,包括单条查询和批量查询。 单条查询 单条查询是通过rowkey在table中查询某一行的数据...

    11-HBase Java API编程实践1

    HBase Java API 编程实践 在本实践中,我们将使用 Eclipse 编写 Java 程序,来对 HBase 数据库进行增删改查等操作。首先,我们需要启动 Hadoop 和 HBase,然后新建一个 Java 项目并导入 HBase 的 jar 包。接着,...

    使用Java API连接虚拟机HBase并进行数据库操作,Java源代码

    在本文中,我们将深入探讨如何使用Java API连接到运行在虚拟机上的HBase数据库,并进行相关的数据操作。HBase是一个分布式的、版本化的、基于列族的NoSQL数据库,它构建于Hadoop之上,适用于处理大规模的数据存储和...

    Hadoop平台技术 5.4.2 HBase Java API应用-教学课件

    Hadoop平台技术 5.4.2 HBase Java API应用-教学课件

    hbase常用JAVA API

    可以通过HBase Shell命令来查看是否成功执行了Java API的操作,例如,`hbase hbasetest.jar hbase.java.txt` 可能是一个运行包含上述操作的Java程序,并输出结果到`hbase.java.txt`的命令。 以上就是HBase常用Java...

    Hbase Java API详解.pdf

    在HBase Java API使用方面,HBaseConfiguration对象是每个HBase客户端都需要使用的,它代表了HBase的配置信息。可以通过默认构造函数来创建HBaseConfiguration对象,它会尝试从类路径中的hbase-default.xml和hbase-...

    javaApi_sparkhiveAPI_hbaseAPI.zip

    本压缩包"javaApi_sparkhiveAPI_hbaseAPI.zip"包含了2019年8月至10月期间针对这些技术的Java版API实现,以及与Spark相关的Hive和HBase API。以下是关于这些技术的详细知识: 1. **Java API for Hive**: - **Hive*...

    HBase 1.2.0 Javadoc API CHM

    自行制作的HBase 1.2.0 Javadoc API CHM版本。内容抽取自官方站点网页

    Hbase的JavaAPI

    在Java环境中,HBase提供了丰富的Java API供开发者进行数据操作,包括创建表、删除表、更新表以及查询表等基本功能。下面我们将深入探讨HBase的Java API及其在实际应用中的使用。 1. **HBase连接** 在Java中使用...

    Hbase调用JavaAPI实现批量导入操作.docx

    Hbase 调用 JavaAPI 实现批量导入操作 在大数据时代,Hbase 作为一个分布式、面向列的 NoSQL 数据库,广泛应用于大规模数据存储和处理中。同时,JavaAPI 作为一个强大且流行的编程语言,广泛应用于各种软件开发中。...

    Java SpringBoot 连接 Hbase Demo

    Java SpringBoot 连接 Hbase Demo 创建表 插入数据 列族 列 查询:全表、数据过滤 删除数据 删除表 Hbase 集群搭建:https://blog.csdn.net/weixin_42176639/article/details/131796472

Global site tag (gtag.js) - Google Analytics