package com.xx.xx.service.spark; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.log4j.Logger; import com.xx.xx.common.util.PropertiesUtils; public class HbaseUtils { private final static Logger LOG = Logger.getLogger(HbaseUtils.class); private static final String zookeeperQuorum = (String) PropertiesUtils .getJddpConf().get("zookeeperQuorum"); private static HTablePool tablePool; private final static int BATCH_SIZE = 1000; public final static String QUERY_REPORT_TABLE = "query_report"; public final static String QUERY_STATUS_TABLE = "query_status"; public final static String CONSOLE_LOG_TABLE = "console_log"; public final static String TRY_RUN_RESULT_TABLE = "try_run_result"; public final static String COLUMN_FAMILY = "cf"; public final static String QUALIFIER = "c"; static { Configuration conf = new Configuration(); LOG.info("zookeeperQuorum: " + zookeeperQuorum); conf.set("hbase.zookeeper.quorum", zookeeperQuorum); Configuration hbaseConf = HBaseConfiguration.create(conf); tablePool = new HTablePool(hbaseConf, 30); } private List<Result> getResult(String hbaseTableName, String columnFamily, String qualifier, String keyPrefix) { List<Result> result = new ArrayList<Result>(); HTableInterface table = null; try { table = tablePool.getTable(hbaseTableName); Scan scan = new Scan(); List<Filter> filters = new ArrayList<Filter>(); Filter prifixFilter = new PrefixFilter(keyPrefix.getBytes()); filters.add(prifixFilter); Filter allFilter = new FilterList(Operator.MUST_PASS_ALL, filters); scan.setFilter(allFilter); ResultScanner rs = table.getScanner(scan); try { for (Result r : rs) { result.add(r); } } finally { rs.close(); } } catch (Throwable e) { LOG.error("ERROR: get table: " + hbaseTableName + ", prefix key:" + keyPrefix, e); } finally { if (table != null) try { table.close(); } catch (IOException e) { LOG.error("close table error, get table: " + hbaseTableName + ", prefix key:" + keyPrefix, e); } } return result; } private Map<String, String> read(String hbaseTableName, String columnFamily, String qualifier, String keyPrefix) { Map<String, String> result = new HashMap<String, String>(); List<Result> resultList = getResult(hbaseTableName, columnFamily, qualifier, keyPrefix); for (Result r : resultList) { KeyValue[] kv = r.raw(); for (int i = 0; i < kv.length; i++) { result.put(new String(kv[i].getRow()), new String(kv[i].getValue())); } } return result; } private void write(String hbaseTableName, String columnFamily, String qualifier, String keyPrefix, Collection<String> contents) { HTableInterface table = null; try { table = tablePool.getTable(hbaseTableName); List<Put> putList = new ArrayList<Put>(); int idx = 0; for (String line : contents) { String rowKey = keyPrefix + idx; if (contents.size() == 1) rowKey = keyPrefix; idx++; Put put = new Put(rowKey.getBytes()); put.add(columnFamily.getBytes(), qualifier.getBytes(), line.getBytes()); putList.add(put); if (putList.size() >= BATCH_SIZE) { table.put(putList); table.flushCommits(); putList.clear(); } } table.put(putList); table.flushCommits(); } catch (Throwable e) { LOG.error("ERROR: write into table: " + hbaseTableName + ", prefix key:" + keyPrefix, e); } finally { if (table != null) { try { table.close(); } catch (IOException e) { LOG.error("close table error, write into table: " + hbaseTableName + ", prefix key:" + keyPrefix, e); } } } } private void delete(String hbaseTableName, String columnFamily, String qualifier, Collection<String> rowKeys) { HTableInterface table = null; try { table = tablePool.getTable(hbaseTableName); List<Delete> deleteList = new ArrayList<Delete>(); int idx = 0; for (String r : rowKeys) { Delete del = new Delete(r.getBytes()); deleteList.add(del); if (deleteList.size() >= BATCH_SIZE) { table.delete(deleteList); table.flushCommits(); deleteList.clear(); } idx++; } table.delete(deleteList); table.flushCommits(); LOG.info("deleted " + idx + " rows from HBase table. " + hbaseTableName); } catch (Throwable e) { LOG.error("delete from table: " + hbaseTableName, e); } finally { if (table != null) { try { table.close(); } catch (IOException e) { LOG.error("close table error, delete from table: " + hbaseTableName, e); } } } } public void writeQueryReport(String queryId, String reportJson) { String keyPrefix = queryId; write(QUERY_REPORT_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix, Arrays.asList(reportJson)); } private String getFirst(Collection<String> list) { for (String s : list) { return s; } return null; } public String getQueryReport(String queryId) { String keyPrefix = queryId; Collection<String> data = read(QUERY_REPORT_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix).values(); if (data == null || data.size() == 0) return null; else return getFirst(data); } public String getAsyncQueryStatus(String queryId) { String keyPrefix = queryId; Collection<String> data = read(QUERY_STATUS_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix).values(); if (data == null || data.size() == 0) return null; else return getFirst(data); } public void setAsyncQueryStatus(String queryId, String status) { String keyPrefix = queryId; write(QUERY_STATUS_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix, Arrays.asList(status)); } public Collection<String> getConsoleLog(String shellJobId) { String keyPrefix = "tryRun_" + shellJobId; Map<String,String> map = read(CONSOLE_LOG_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix); delete(CONSOLE_LOG_TABLE, COLUMN_FAMILY, QUALIFIER, map.keySet()); return map.values(); } public Collection<String> getQueryResult(String queryId) { String keyPrefix = queryId; Map<String,String> map = read(TRY_RUN_RESULT_TABLE, COLUMN_FAMILY, QUALIFIER, keyPrefix); delete(TRY_RUN_RESULT_TABLE, COLUMN_FAMILY, QUALIFIER, map.keySet()); return map.values(); } }
相关推荐
### HBase Java API类介绍 #### 一、概述 HBase是一个分布式的、面向列的开源数据库,基于Google的Bigtable论文实现。它适合于非结构化数据存储,并且能够实时处理PB级别的数据。HBase提供了Java API供开发者使用...
在本文中,我们将深入探讨如何使用HBase的Java API进行数据的增加、修改和删除操作。HBase是一个基于Google Bigtable设计的开源分布式数据库,它属于Apache Hadoop生态系统的一部分,适用于处理大规模数据存储。通过...
在HBase这个分布式列式数据库中,Java API是开发者常用的一种接口来操作HBase,包括创建表、插入数据、查询数据以及实现分页等操作。本文将深入探讨如何使用HBase Java API进行数据访问和分页查询。 首先,我们要...
HBase Java API HBase 是 Hadoop 的数据库,能够对大数据提供随机、实时读写访问。他是开源的,分布式的,多版本的,面向列的,存储模型。HBase 的整体结构主要包括 HBase Master、HRegion 服务器和 HRegion Server...
"hbase java api 所需最精简 jar"这个标题意味着我们将探讨的是为了在Java环境中最小化依赖,但仍能实现基本HBase操作所需的JAR文件。 首先,我们需要理解HBase Java API的核心组件。HBase的Java客户端API提供了一...
这篇博客“Hbase调用Java API实现批量导入操作”聚焦于如何利用Java编程语言高效地向HBase中批量导入数据。在这个过程中,我们将探讨以下几个关键知识点: 1. **HBase架构**: HBase是基于列族的存储模型,数据被...
标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...
使用JavaAPI实现HBase的ddl(创建表、删除表、修改表(添加列族等))、dml(添加数据、删除数据)、dql(查询数据(get、scan))等操作 除此之外还包含一些其他操作:命名空间的应用、快照的应用等 对应(《HBase...
HBase Java API操作数据库示例代码-HBaseDemo.rar HBase Java API操作数据库示例代码-HBaseDemo.rar HBase Java API操作数据库示例代码-HBaseDemo.rar
HBase数据查询API HBase是一种分布式的、面向列的NoSQL数据库,主要应用于存储大量的半结构化数据。HBase提供了多种查询方式,包括单条查询和批量查询。 单条查询 单条查询是通过rowkey在table中查询某一行的数据...
HBase Java API 编程实践 在本实践中,我们将使用 Eclipse 编写 Java 程序,来对 HBase 数据库进行增删改查等操作。首先,我们需要启动 Hadoop 和 HBase,然后新建一个 Java 项目并导入 HBase 的 jar 包。接着,...
在本文中,我们将深入探讨如何使用Java API连接到运行在虚拟机上的HBase数据库,并进行相关的数据操作。HBase是一个分布式的、版本化的、基于列族的NoSQL数据库,它构建于Hadoop之上,适用于处理大规模的数据存储和...
Hadoop平台技术 5.4.2 HBase Java API应用-教学课件
可以通过HBase Shell命令来查看是否成功执行了Java API的操作,例如,`hbase hbasetest.jar hbase.java.txt` 可能是一个运行包含上述操作的Java程序,并输出结果到`hbase.java.txt`的命令。 以上就是HBase常用Java...
在HBase Java API使用方面,HBaseConfiguration对象是每个HBase客户端都需要使用的,它代表了HBase的配置信息。可以通过默认构造函数来创建HBaseConfiguration对象,它会尝试从类路径中的hbase-default.xml和hbase-...
本压缩包"javaApi_sparkhiveAPI_hbaseAPI.zip"包含了2019年8月至10月期间针对这些技术的Java版API实现,以及与Spark相关的Hive和HBase API。以下是关于这些技术的详细知识: 1. **Java API for Hive**: - **Hive*...
自行制作的HBase 1.2.0 Javadoc API CHM版本。内容抽取自官方站点网页
在Java环境中,HBase提供了丰富的Java API供开发者进行数据操作,包括创建表、删除表、更新表以及查询表等基本功能。下面我们将深入探讨HBase的Java API及其在实际应用中的使用。 1. **HBase连接** 在Java中使用...
Hbase 调用 JavaAPI 实现批量导入操作 在大数据时代,Hbase 作为一个分布式、面向列的 NoSQL 数据库,广泛应用于大规模数据存储和处理中。同时,JavaAPI 作为一个强大且流行的编程语言,广泛应用于各种软件开发中。...
Java SpringBoot 连接 Hbase Demo 创建表 插入数据 列族 列 查询:全表、数据过滤 删除数据 删除表 Hbase 集群搭建:https://blog.csdn.net/weixin_42176639/article/details/131796472