`

HBase开发实例学习

 
阅读更多

1 开发环境

在进行Hbase开发前,需要安装JDK、Hadoop和HBase,选择一款合适的开发IDE,具体安装方法就不介绍了,网上有很多参考资料,这里给出我的开发环境:

操作系统:Ubuntu 14.04 LTS

Java版本:jdk1.7.0_79

Hadoop版本:hadoop-2.6.0-cdh5.7.1

HBase版本:hbase-1.2.0-cdh5.7.1

Ecipse版本:Eclipse Java EE LunaRelease

使用Maven构建项目,在pom.xml中添加hbase的依赖如下:

 

[html] view plain copy
 
  1.   <repositories>  
  2.       <repository>  
  3.         <id>cloudera</id>  
  4.         <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
  5.       </repository>  
  6.   </repositories>  
  7.   
  8.   <dependencies>  
  9.       <dependency>  
  10.           <groupId>junit</groupId>  
  11.           <artifactId>junit</artifactId>  
  12.           <version>3.8.1</version>  
  13.           <scope>test</scope>  
  14.       </dependency>  
  15.       <dependency>    
  16.           <groupId>org.apache.hadoop</groupId>    
  17.           <artifactId>hadoop-common</artifactId>    
  18.           <version>2.6.0-cdh5.7.1</version>    
  19.       </dependency>    
  20.       <dependency>    
  21.           <groupId>org.apache.hadoop</groupId>    
  22.           <artifactId>hadoop-hdfs</artifactId>    
  23.           <version>2.6.0-cdh5.7.1</version>    
  24.       </dependency>  
  25.       <dependency>    
  26.           <groupId>org.apache.hbase</groupId>    
  27.           <artifactId>hbase-client</artifactId>    
  28.           <version>1.2.0-cdh5.7.1</version>    
  29.       </dependency>  
  30. <dependency>    
  31.           <groupId>org.apache.hbase</groupId>    
  32.           <artifactId>hbase-server</artifactId>    
  33.           <version>1.2.0-cdh5.7.1</version>    
  34.       </dependency>  
  35.   </dependencies>  

 

2 初始化配置

 

首先需要设置HBase的配置,如ZooKeeper的地址、端口号等等。可以通过org.apache.hadoop.conf.Configuration.set方法手工设置HBase的配置信息,也可以直接将HBase的hbase-site.xml配置文件引入项目即可。下面给出配置代码:

[java] view plain copy
 
  1. // 声明静态配置  
  2.   private static Configuration conf = null;  
  3.   static {  
  4.       conf = HBaseConfiguration.create();  
  5.       conf.set("hbase.zookeeper.quorum""localhost");  
  6.       conf.set("hbase.zookeeper.property.clientPort""2181");  
  7.   }  

3 常见API的使用

HBase的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

3.1 创建数据库表

[java] view plain copy
 
  1.     // 创建数据库表  
  2. public static void createTable(String tableName, String[] columnFamilys) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 创建一个数据库管理员  
  6.     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
  7.     if (hAdmin.tableExists(tableName)) {  
  8.         System.out.println(tableName + "表已存在");  
  9.         conn.close();  
  10.         System.exit(0);  
  11.     } else {  
  12.         // 新建一个表描述  
  13.         HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));  
  14.         // 在表描述里添加列族  
  15.         for (String columnFamily : columnFamilys) {  
  16.             tableDesc.addFamily(new HColumnDescriptor(columnFamily));  
  17.         }  
  18.         // 根据配置好的表描述建表  
  19.         hAdmin.createTable(tableDesc);  
  20.         System.out.println("创建" + tableName + "表成功");  
  21.     }  
  22.     conn.close();  
  23. }  

3.2 添加一条数据

[java] view plain copy
 
  1.  // 添加一条数据  
  2. public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value)   
  3.         throws IOException {  
  4.     // 建立一个数据库的连接  
  5.     Connection conn = ConnectionFactory.createConnection(conf);  
  6.     // 获取表  
  7.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  8.     // 通过rowkey创建一个put对象  
  9.     Put put = new Put(Bytes.toBytes(rowKey));  
  10.     // 在put对象中设置列族、列、值  
  11.     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
  12.     // 插入数据,可通过put(List<Put>)批量插入  
  13.     table.put(put);  
  14.     // 关闭资源  
  15.     table.close();  
  16.     conn.close();  
  17. }  

3.3 获取一条数据

[java] view plain copy
 
  1. // 通过rowkey获取一条数据  
  2. public static void getRow(String tableName, String rowKey) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 获取表  
  6.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  7.     // 通过rowkey创建一个get对象  
  8.     Get get = new Get(Bytes.toBytes(rowKey));  
  9.     // 输出结果  
  10.     Result result = table.get(get);  
  11.     for (Cell cell : result.rawCells()) {  
  12.         System.out.println(  
  13.                 "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
  14.                 "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
  15.                 "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
  16.                 "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
  17.                 "时间戳:" + cell.getTimestamp());  
  18.     }  
  19.     // 关闭资源  
  20.     table.close();  
  21.     conn.close();  
  22. }  

3.4 全表扫描

[java] view plain copy
 
  1.     // 全表扫描  
  2.     public static void scanTable(String tableName) throws IOException {  
  3.         // 建立一个数据库的连接  
  4.         Connection conn = ConnectionFactory.createConnection(conf);  
  5.         // 获取表  
  6.         HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  7.         // 创建一个扫描对象  
  8.         Scan scan = new Scan();  
  9.         // 扫描全表输出结果  
  10.         ResultScanner results = table.getScanner(scan);  
  11.         for (Result result : results) {  
  12.             for (Cell cell : result.rawCells()) {  
  13.                 System.out.println(  
  14.                         "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
  15.                         "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
  16.                         "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
  17.                         "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
  18.                         "时间戳:" + cell.getTimestamp());  
  19.             }  
  20.         }  
  21.         // 关闭资源  
  22.         results.close();  
  23.         table.close();  
  24.         conn.close();  
  25. }  

3.5 删除一条数据

[java] view plain copy
 
  1. // 删除一条数据  
  2. public static void delRow(String tableName, String rowKey) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 获取表  
  6.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  7.     // 删除数据  
  8.     Delete delete = new Delete(Bytes.toBytes(rowKey));  
  9.     table.delete(delete);  
  10.     // 关闭资源  
  11.     table.close();  
  12.     conn.close();  
  13. }  

3.6 删除多条数据

[java] view plain copy
 
  1. // 删除多条数据  
  2. public static void delRows(String tableName, String[] rows) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 获取表  
  6.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  7.     // 删除多条数据  
  8.     List<Delete> list = new ArrayList<Delete>();  
  9.     for (String row : rows) {  
  10.         Delete delete = new Delete(Bytes.toBytes(row));  
  11.         list.add(delete);  
  12.     }  
  13.     table.delete(list);  
  14.     // 关闭资源  
  15.     table.close();  
  16.     conn.close();  
  17. }  

3.7 删除列族

[java] view plain copy
 
  1. // 删除列族  
  2. public static void delColumnFamily(String tableName, String columnFamily) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 创建一个数据库管理员  
  6.     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
  7.     // 删除一个表的指定列族  
  8.     hAdmin.deleteColumn(tableName, columnFamily);  
  9.     // 关闭资源  
  10.     conn.close();  
  11. }  

3.8 删除数据库表

[java] view plain copy
 
  1. // 删除数据库表  
  2. public static void deleteTable(String tableName) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 创建一个数据库管理员  
  6.     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
  7.     if (hAdmin.tableExists(tableName)) {  
  8.         // 失效表  
  9.         hAdmin.disableTable(tableName);  
  10.         // 删除表  
  11.         hAdmin.deleteTable(tableName);  
  12.         System.out.println("删除" + tableName + "表成功");  
  13.         conn.close();  
  14.     } else {  
  15.         System.out.println("需要删除的" + tableName + "表不存在");  
  16.         conn.close();  
  17.         System.exit(0);  
  18.     }  
  19. }  

3.9 追加插入

[java] view plain copy
 
  1. // 追加插入(将原有value的后面追加新的value,如原有value=a追加value=bc则最后的value=abc)  
  2. public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value)   
  3.         throws IOException {  
  4.     // 建立一个数据库的连接  
  5.     Connection conn = ConnectionFactory.createConnection(conf);  
  6.     // 获取表  
  7.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  8.     // 通过rowkey创建一个append对象  
  9.     Append append = new Append(Bytes.toBytes(rowKey));  
  10.     // 在append对象中设置列族、列、值  
  11.     append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
  12.     // 追加数据  
  13.     table.append(append);  
  14.     // 关闭资源  
  15.     table.close();  
  16.     conn.close();  
  17. }  

3.10 符合条件后添加数据

[java] view plain copy
 
  1. // 符合条件后添加数据(只能针对某一个rowkey进行原子操作)  
  2. public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {  
  3.     // 建立一个数据库的连接  
  4.     Connection conn = ConnectionFactory.createConnection(conf);  
  5.     // 获取表  
  6.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  7.     // 设置需要添加的数据  
  8.     Put put = new Put(Bytes.toBytes(rowKey));  
  9.     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
  10.     // 当判断条件为真时添加数据  
  11.     boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck),   
  12.             Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);  
  13.     // 关闭资源  
  14.     table.close();  
  15.     conn.close();  
  16.       
  17.     return result;  
  18. }  

3.11 符合条件后删除数据

[java] view plain copy
 
  1. // 符合条件后刪除数据(只能针对某一个rowkey进行原子操作)  
  2. public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck,   
  3.         String valueCheck, String columnFamily, String column) throws IOException {  
  4.     // 建立一个数据库的连接  
  5.     Connection conn = ConnectionFactory.createConnection(conf);  
  6.     // 获取表  
  7.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  8.     // 设置需要刪除的delete对象  
  9.     Delete delete = new Delete(Bytes.toBytes(rowKey));  
  10.     delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));  
  11.     // 当判断条件为真时添加数据  
  12.     boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck),   
  13.             Bytes.toBytes(valueCheck), delete);  
  14.     // 关闭资源  
  15.     table.close();  
  16.     conn.close();  
  17.   
  18.     return result;  
  19. }  

3.12 计数器

[java] view plain copy
 
  1. // 计数器(amount为正数则计数器加,为负数则计数器减,为0则获取当前计数器的值)  
  2. public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount)   
  3.         throws IOException {  
  4.     // 建立一个数据库的连接  
  5.     Connection conn = ConnectionFactory.createConnection(conf);  
  6.     // 获取表  
  7.     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  8.     // 计数器  
  9.     long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);  
  10.     // 关闭资源  
  11.     table.close();  
  12.     conn.close();  
  13.       
  14.     return result;  
  15. }  

4 内置过滤器的使用

HBase为筛选数据提供了一组过滤器,通过这个过滤器可以在HBase中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的HBase内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用BinaryComparator可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为row1的一行数据。

[java] view plain copy
 
  1. // 筛选出匹配的所有的行  
  2. Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));    

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由RowFilter结合RegexComparator来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以row为前缀的所有的行。

[java] view plain copy
 
  1. // 筛选匹配行键的前缀成功的行  
  2. Filter pf = new PrefixFilter(Bytes.toBytes("row"));  

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

 

[java] view plain copy
 
  1. // 返回所有的行键,但值全是空  
  2. Filter kof = new KeyOnlyFilter();  

 

4、RandomRowFilter:按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

[java] view plain copy
 
  1. // 随机选出一部分的行  
  2. Filter rrf = new RandomRowFilter((float0.8);     

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

[java] view plain copy
 
  1. // 包含了扫描的上限在结果之内  
  2. Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));    

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

[java] view plain copy
 
  1. // 筛选出每行的第一个单元格  
  2. Filter fkof = new FirstKeyOnlyFilter();     

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

[java] view plain copy
 
  1. // 筛选出前缀匹配的列  
  2. Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));     

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含ROW2_QUAL1,那么这个列就不会返回给客户端。

[java] view plain copy
 
  1. // 筛选某个(值的条件满足的)特定的单元格  
  2. Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));  

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

[java] view plain copy
 
  1. // 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止  
  2. Filter ccf = new ColumnCountGetFilter(2);    

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用setFilterIfMissing方法,默认的参数是false。其作用是,对于咱们要使用作为条件的列,如果参数为true,这样的行将会被过滤掉,如果参数为false,这样的行会包含在结果集中。

[java] view plain copy
 
  1. // 将满足条件的列所在的行过滤掉  
  2. SingleColumnValueFilter scvf = new SingleColumnValueFilter(    
  3. •          Bytes.toBytes("colfam1"),     
  4. •          Bytes.toBytes("qual2"),     
  5. •          CompareFilter.CompareOp.NOT_EQUAL,     
  6. •          new SubstringComparator("BOGUS"));    
  7. scvf.setFilterIfMissing(true);  

11、SingleColumnValueExcludeFilter:这个过滤器与第10种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与ValueFilter结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

[java] view plain copy
 
  1. // 发现某一行中的一列需要过滤时,整个行就会被过滤掉  
  2. Filter skf = new SkipFilter(vf);  

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

[java] view plain copy
 
  1. // 当遇到不符合过滤器rf设置的条件时,整个扫描结束  
  2. Filter wmf = new WhileMatchFilter(rf);     

14. FilterList:可以用于综合使用多个过滤器。其有两种关系: Operator.MUST_PASS_ONE表示关系AND,Operator.MUST_PASS_ALL表示关系OR,并且FilterList可以嵌套使用,使得我们能够表达更多的需求。

[java] view plain copy
 
  1. // 综合使用多个过滤器,AND和OR两种关系  
  2. List<Filter> filters = new ArrayList<Filter>();    
  3. filters.add(rf);    
  4. filters.add(vf);    
  5. FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);  

下面给出一个使用RowFilter过滤器的完整示例:

[java] view plain copy
 
  1. public class HBaseFilter {  
  2.       
  3.     private static final String TABLE_NAME = "table1";  
  4.   
  5.     public static void main(String[] args) throws IOException {  
  6.         // 设置配置  
  7.         Configuration conf = HBaseConfiguration.create();  
  8.         conf.set("hbase.zookeeper.quorum""localhost");  
  9.         conf.set("hbase.zookeeper.property.clientPort""2181");  
  10.         // 建立一个数据库的连接  
  11.         Connection conn = ConnectionFactory.createConnection(conf);  
  12.         // 获取表  
  13.         HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));  
  14.         // 创建一个扫描对象  
  15.         Scan scan = new Scan();  
  16.         // 创建一个RowFilter过滤器  
  17.         Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));  
  18.         // 将过滤器加入扫描对象  
  19.         scan.setFilter(filter);  
  20.         // 输出结果  
  21.         ResultScanner results = table.getScanner(scan);  
  22.         for (Result result : results) {  
  23.             for (Cell cell : result.rawCells()) {  
  24.                 System.out.println(  
  25.                         "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
  26.                         "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
  27.                         "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
  28.                         "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
  29.                         "时间戳:" + cell.getTimestamp());  
  30.             }  
  31.         }  
  32.         // 关闭资源  
  33.         results.close();  
  34.         table.close();  
  35.         conn.close();  
  36.           
  37.     }  
  38.   
  39. }  

5 HBase与MapReduce

我们知道,在伪分布式模式和完全分布式模式下的HBase是架构在HDFS之上的,因此完全可以将MapReduce编程框架和HBase结合起来使用。也就是说,将HBase作为底层存储结构,MapReduce调用HBase进行特殊的处理,这样能够充分结合HBase分布式大型数据库和MapReduce并行计算的优点。

HBase实现了TableInputFormatBase类,该类提供了对表数据的大部分操作,其子类TableInputFormat则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat类将数据表按照Region分割成split,即有多少个Regions就有多个splits,然后将Region按行键分成<key,value>对,key值对应与行键,value值为该行所包含的数据。

HBase实现了MapReduce计算框架对应的TableMapper类和TableReducer类。其中,TableMapper类并没有具体的功能,只是将输入的<key,value>对的类型分别限定为Result和ImmutableBytesWritable。IdentityTableMapper类和IdentityTableReducer类则是上述两个类的具体实现,其和Mapper类和Reducer类一样,只是简单地将<key,value>对输出到下一个阶段。

HBase实现的TableOutputFormat将输出的<key,value>对写到指定的HBase表中,该类不会对WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用MultipleTableOutputFormat类解决这个问题,该类可以对是否写入WAL进行设置。

为了能使Hadoop集群上运行HBase程序,还需要把相关的类文件引入Hadoop集群上,不然会出现ClassNotFoundException错误。其具体方法是可在hadoop的环境配置文件hadoop-env.sh中引入HBASE_HOME和HBase的相关jar包,或者直接将HBase的jar包打包到应用程序文件中。

下面这个例子是将MapReduce和HBase结合起来的WordCount程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到HBase中:

 

[java] view plain copy
 
  1. package com.hbase.demo;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.hbase.HBaseConfiguration;  
  8. import org.apache.hadoop.hbase.HColumnDescriptor;  
  9. import org.apache.hadoop.hbase.HTableDescriptor;  
  10. import org.apache.hadoop.hbase.TableName;  
  11. import org.apache.hadoop.hbase.client.Connection;  
  12. import org.apache.hadoop.hbase.client.ConnectionFactory;  
  13. import org.apache.hadoop.hbase.client.HBaseAdmin;  
  14. import org.apache.hadoop.hbase.client.Put;  
  15. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
  16. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  17. import org.apache.hadoop.io.IntWritable;  
  18. import org.apache.hadoop.io.LongWritable;  
  19. import org.apache.hadoop.io.NullWritable;  
  20. import org.apache.hadoop.io.Text;  
  21. import org.apache.hadoop.mapreduce.Job;  
  22. import org.apache.hadoop.mapreduce.Mapper;  
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  24. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  25.   
  26. public class HBaseWordCount {  
  27.       
  28.     public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {  
  29.   
  30.         private final static IntWritable ONE = new IntWritable(1);  
  31.         private Text word = new Text();  
  32.   
  33.         @Override  
  34.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  35.             String[] words = value.toString().split(" ");  
  36.             for ( String w : words) {  
  37.                 word.set(w);  
  38.                 context.write(word, ONE);  
  39.             }  
  40.         }  
  41.     }  
  42.       
  43.     public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {  
  44.   
  45.         @Override  
  46.         protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
  47.             int sum = 0;  
  48.             for (IntWritable value : values) {  
  49.                 sum += value.get();  
  50.             }  
  51.               
  52.             // Put实例化,每个词存一行  
  53.             Put put = new Put(key.getBytes());  
  54.             // 列族为content,列名为count,列值为单词的数目  
  55.             put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());  
  56.               
  57.             context.write(NullWritable.get(), put);  
  58.         }  
  59.           
  60.     }  
  61.       
  62.     // 创建HBase数据表  
  63.     public static void createHBaseTable(String tableName) throws IOException {  
  64.         // 配置HBse  
  65.         Configuration conf = HBaseConfiguration.create();  
  66.         conf.set("hbase.zookeeper.quorum""localhost");  
  67.         conf.set("hbase.zookeeper.property.clientPort""2181");  
  68.         // 建立一个数据库的连接  
  69.         Connection conn = ConnectionFactory.createConnection(conf);  
  70.         // 创建一个数据库管理员  
  71.         HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
  72.         // 判断表是否存在  
  73.         if (hAdmin.tableExists(tableName)) {  
  74.             System.out.println("该数据表已存在,正在重新创建");  
  75.             hAdmin.disableTable(tableName);  
  76.             hAdmin.deleteTable(tableName);  
  77.         }  
  78.         // 创建表描述  
  79.         HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));  
  80.         // 在表描述里添加列族  
  81.         tableDesc.addFamily(new HColumnDescriptor("content"));  
  82.         // 创建表  
  83.         hAdmin.createTable(tableDesc);  
  84.         System.out.println("创建" + tableName + "表成功");  
  85.     }  
  86.       
  87.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
  88.           
  89.         if (args.length != 3) {  
  90.             System.out.println("args error");  
  91.             System.exit(0);  
  92.         }  
  93.           
  94.         String input = args[0];  
  95.         String jobName = args[1];  
  96.         String tableName = args[2];  
  97.           
  98.         // 创建数据表  
  99.         HBaseWordCount.createHBaseTable(tableName);  
  100.           
  101.         // 配置MapReduce(或者将hadoop和hbase的相关配置文件引入项目)  
  102.         Configuration conf = new Configuration();  
  103.         conf.set("fs.defaultFS""localhost:9000");  
  104.        conf.set("mapred.job.tracker""localhost:9001");  
  105.         conf.set("hbase.zookeeper.quorum""localhost");  
  106.         conf.set("hbase.zookeeper.property.clientPort""2181");  
  107.         conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);  
  108.           
  109.         // 配置任务  
  110.         Job job = Job.getInstance(conf, jobName);  
  111.         job.setJarByClass(HBaseWordCount.class);  
  112.         job.setMapperClass(hBaseMapper.class);  
  113.         job.setReducerClass(hBaseReducer.class);  
  114.         job.setMapOutputKeyClass(Text.class);  
  115.         job.setMapOutputValueClass(IntWritable.class);  
  116.         job.setInputFormatClass(TextInputFormat.class);  
  117.         job.setOutputFormatClass(TableOutputFormat.class);  
  118.         FileInputFormat.addInputPath(job, new Path(input));  
  119.           
  120.         //执行MR任务  
  121.         boolean result = job.waitForCompletion(true);  
  122.         System.exit(result ? 0 : 1);  
  123.     }  
  124.   
  125. }  

6 HBase的Bulkload

HBase可以让我们随机的、实时的访问大数据,但是怎样有效的将数据导入到HBase呢?HBase有多种导入数据的方法,最直接的方法就是在MapReduce作业中使用TableOutputFormat作为输出,或者使用标准的客户端API,但是这些都不是非常有效的方法。

如果HDFS中有海量数据要导入HBase,可以先将这些数据生成HFile文件,然后批量导入HBase的数据表中,这样可以极大地提升数据导入HBase的效率。这就是HBase的Bulkload,即利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。两个表之间的数据迁移也可以使用这种方法。下面给出具体示例:

 

[java] view plain copy
 
  1. package com.hbase.demo;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.hbase.HBaseConfiguration;  
  8. import org.apache.hadoop.hbase.TableName;  
  9. import org.apache.hadoop.hbase.client.Connection;  
  10. import org.apache.hadoop.hbase.client.ConnectionFactory;  
  11. import org.apache.hadoop.hbase.client.HTable;  
  12. import org.apache.hadoop.hbase.client.Put;  
  13. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  14. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;  
  15. import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  
  16. import org.apache.hadoop.hbase.mapreduce.PutSortReducer;  
  17. import org.apache.hadoop.hbase.util.Bytes;  
  18. import org.apache.hadoop.io.LongWritable;  
  19. import org.apache.hadoop.io.Text;  
  20. import org.apache.hadoop.mapreduce.Job;  
  21. import org.apache.hadoop.mapreduce.Mapper;  
  22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  23. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  25.   
  26.   
  27. public class HBaseBulk {  
  28.       
  29.     public static class bulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {  
  30.   
  31.         @Override  
  32.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  33.             // 将输入数据用tab键分词  
  34.             String[] values = value.toString().split("\t");  
  35.             if (values.length == 2) {  
  36.                 // 设置行键、列族、列名和值  
  37.                 byte[] rowKey = Bytes.toBytes(values[0]);  
  38.                 byte[] family = Bytes.toBytes("content");  
  39.                 byte[] column = Bytes.toBytes("number");  
  40.                 byte[] colValue = Bytes.toBytes(values[1]);  
  41.                 // 将行键序列化作为mapper输出的key  
  42.                 ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowKey);  
  43.                 // 将put对象作为mapper输出的value  
  44.                 Put put = new Put(rowKey);  
  45.                 put.addColumn(family, column, colValue);  
  46.                 context.write(rowKeyWritable, put);  
  47.             }  
  48.         }  
  49.     }  
  50.     
  51.     @SuppressWarnings("deprecation")  
  52.     public static void main(String[] args) throws Exception {  
  53.           
  54.         if (args.length != 3) {  
  55.             System.out.println("args error");  
  56.             System.exit(0);  
  57.         }  
  58.           
  59.         String input = args[0];  
  60.         String output = args[1];  
  61.         String jobName = args[2];  
  62.         String tableName = args[3];  
  63.           
  64.         // 配置MapReduce(或者将hadoop的相关配置文件引入项目)  
  65.         Configuration hadoopConf = new Configuration();  
  66.         hadoopConf.set("fs.defaultFS""localhost:9000");  
  67.         hadoopConf.set("mapred.job.tracker""localhost:9001");  
  68.         Job job = Job.getInstance(hadoopConf, jobName);  
  69.         job.setJarByClass(HBaseBulk.class);  
  70.         job.setMapperClass(bulkMapper.class);  
  71.         job.setReducerClass(PutSortReducer.class);  
  72.         job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
  73.         job.setMapOutputValueClass(Put.class);  
  74.         job.setInputFormatClass(TextInputFormat.class);  
  75.         job.setOutputFormatClass(HFileOutputFormat2.class);  
  76.         FileInputFormat.addInputPath(job, new Path(input));  
  77.         FileOutputFormat.setOutputPath(job, new Path(output));  
  78.           
  79.         // 配置HBase(或者将hbase的相关配置文件引入项目)  
  80.         Configuration hbaseConf = HBaseConfiguration.create();  
  81.         hbaseConf.set("hbase.zookeeper.quorum""localhost");  
  82.         hbaseConf.set("hbase.zookeeper.property.clientPort""2181");  
  83.           
  84.         // 生成HFile  
  85.         Connection conn = ConnectionFactory.createConnection(hbaseConf);  
  86.         HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
  87.         HFileOutputFormat2.configureIncrementalLoad(job, table);  
  88.           
  89.         // 执行任务  
  90.         job.waitForCompletion(true);  
  91.           
  92.         // 将HFile文件导入HBase  
  93.         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);  
  94.         loader.doBulkLoad(new Path(output), table);  
  95.     }  
  96. }  

上述代码首先将HDFS中的数据文件通过MapReduce任务生成HFile文件,然后将HFile文件导入HBase数据表(该数据表已存在)。HDFS中的数据文件和导入HBase后的数据表分别如下图所示:

 

 

分享到:
评论

相关推荐

    HBase MapReduce完整实例.rar

    通过这个实例,学习者可以深入了解HBase与MapReduce的整合过程,掌握如何利用MapReduce进行HBase数据的批处理,以及如何设计和优化MapReduce任务以提高处理效率。这对于大数据开发人员来说,是一份非常有价值的参考...

    hbase的java client实例

    Spring框架简化了Java应用的开发,它可以帮助我们更好地管理HBase的生命周期。通过Spring的`HBaseTemplate`和`HBaseAdminTemplate`,可以方便地进行CRUD操作。在Spring配置文件中,我们可以声明`HBaseConfigurer`和`...

    HBase企业应用开发实战-高清

    通过实例,读者可以学习如何创建表、插入和查询数据,以及执行复杂的扫描操作,这对于实际开发中的数据存取操作非常实用。 在高级特性部分,书里涵盖了Coprocessor、Filter、Compaction和Replication等关键特性。...

    阿里云 专有云企业版 V3.7.0 云数据库 HBase 开发指南 20190322.pdf

    总之,阿里云专有云企业版V3.7.0的云数据库HBase开发指南是用户深入理解和高效利用HBase服务的重要工具,它涵盖了从基本操作到高级特性的全面内容,旨在确保用户能够安全、稳定地运行和管理他们的HBase实例。...

    C#使用Thrift2操作HBase数据库

    在IT领域,尤其是在大数据处理和分布式存储中,HBase是一个重要的NoSQL数据库,它构建于Hadoop之上,提供...通过学习和实践,你不仅能理解Thrift的原理,还能深入理解HBase的内部机制,提升你的分布式数据库操作能力。

    HBase企业应用开发实战

    《HBase企业应用开发实战》一书聚焦于利用HBase...总的来说,《HBase企业应用开发实战》旨在帮助读者掌握HBase的基本原理和高级技巧,通过实例学习如何在企业环境中有效地使用HBase解决大数据问题,提升数据处理能力。

    HbaseReferenceBook-Hbase参考指南英文版

    HBase的独立模式适用于开发和测试环境,而分布式模式则是生产环境的首选,它支持在多台机器上水平扩展HBase实例。 HBase作为海量数据分析和存储的NoSQL解决方案,在大数据处理领域具有重要地位。它能够有效地处理...

    Hbase操作Hadoop分布式数据库java工程实例(含测试用例)

    总结起来,这个“Hbase操作Hadoop分布式数据库java工程实例”是一个全面的学习资源,涵盖了HBase的基本概念、Java编程接口和测试实践。通过学习和实践这个实例,开发者能够掌握HBase在Hadoop环境下的实际应用,为大...

    thrift操作Hbase数据库

    使用Thrift生成的代码,我们需要实例化一个Hbase的客户端对象,然后通过这个对象建立到Hbase服务器的连接。连接通常需要指定服务器地址和端口。 4. **基本操作** - **创建表(Create Table)**: 需要指定表名和列...

    hbase权威指南 配套源码

    《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其配套源码提供了书中所提及的示例代码...通过学习《HBase权威指南》及其配套源码,开发者能够熟练掌握HBase的使用,为大数据应用开发打下坚实基础。

    hbase实战(HBase in Action)

    标题和描述中提到的《HBase实战》(HBase in Action)是一本专著,专注于HBase的实用开发案例和原理讲解。HBase是Apache软件基金会旗下的一个开源、非关系型、分布式数据库系统,它是基于Google的Bigtable论文构建的...

    经过测试,总结出可运行成功的C#For HBase示例代码

    在IT行业中,HBase是一个广泛使用的分布式列式...通过学习和理解这些示例代码,开发者不仅可以掌握C#与HBase集成的基本方法,还能深入理解HBase的工作原理和最佳实践,从而在实际项目中更高效地利用HBase处理大数据。

    HbaseJavaWeb实例

    在本项目中,“HbaseJavaWeb实例”是一个基于Java Web技术实现的HBase数据库操作应用。这个项目旨在提供一个交互式的平台,用户...这个项目有助于学习者理解和实践HBase的Java API,以及如何将它们整合到Web应用中。

    hbaseTemplate

    接着,创建一个`HBaseOperations`的bean,注入`HBaseConfigurer`,Spring会自动完成`HbaseTemplate`的实例化。这样,你就可以在业务代码中通过@Autowired注解注入`HbaseTemplate`,并调用其方法进行操作。 在描述中...

    基于Mysql的表转HBase小Demo

    总之,“基于Mysql的表转HBase小Demo”是一个很好的学习资源,它演示了如何在Java环境中实现数据从关系型数据库到NoSQL数据库的迁移。对于初学者来说,这是一个理解数据模型转换和熟悉HBase操作的好机会,同时也可...

Global site tag (gtag.js) - Google Analytics