import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; import org.mortbay.log.Log; public class HbaseUtil implements IOperator { private static Configuration conf = null; private static String configFile = "hbase-site-test_bj.xml"; private Map<String, String> aMap = null; private String mapTable = null; private String[] tableFamily = null; public HbaseUtil() { } public HbaseUtil( String mapAppTable , String[] appTableFamily ) { this.aMap = new HashMap<String, String>(); this.mapTable = mapAppTable; this.tableFamily = appTableFamily; } static { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.addResource(configFile); conf = HBaseConfiguration.create(HBASE_CONFIG); System.err.println(conf.get("hbase.zookeeper.property.dataDir")); } /** * 创建表操作 * * @throws IOException */ public void createTable(String tablename, String[] cfs) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tablename)) { System.out.println("表已经存在!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tablename); for (int i = 0; i < cfs.length; i++) { tableDesc.addFamily(new HColumnDescriptor(cfs[i])); } admin.createTable(tableDesc); System.out.println("表创建成功!"); } admin.close(); } /** * 删除表操作 * * @param tablename * @throws IOException */ public void deleteTable(String tablename) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(tablename)) { System.out.println("table(" + tablename + ") not exists, won't delete"); } else { admin.disableTable(tablename); admin.deleteTable(tablename); System.out.println("table(" + tablename + ") delete success"); } admin.close(); } public void insertRow() throws IOException { HTable table = new HTable(conf, "test"); Put put = new Put(Bytes.toBytes("row3")); put.add(Bytes.toBytes("cf"), Bytes.toBytes("444"), Bytes.toBytes("value444")); table.put(put); table.close(); } /** * 插入一行记录 * * @param tablename * @param cfs * @throws IOException */ public void writeRow(String tablename, String[] cfs) throws IOException { HTable table = new HTable(conf, tablename); Put put = new Put(Bytes.toBytes(cfs[0])); put.add(Bytes.toBytes(cfs[1]), Bytes.toBytes(cfs[2]), Bytes.toBytes(cfs[3])); table.put(put); System.out.println("写入成功!"); table.close(); } // 写多条记录 public void writeMultRow(String tablename, String[][] cfs) throws IOException { List<Put> lists = new ArrayList<Put>(); HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } table.put(lists); table.close(); } // 写多条记录 public void writeMultRowByDevice(HTable table, String tablename, String[][] cfs) throws IOException { List<Put> lists = new ArrayList<Put>(); // HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); Log.info("writeMultRowByDevice "+Bytes.toBytes(cfs[i][1])+"="+Bytes.toBytes(cfs[i][2])+"="+Bytes.toBytes(cfs[i][3])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } Log.info("push start"); table.put(lists); Log.info("push end"); } /** * 删除一行记录 * * @param tablename * @param rowkey * @throws IOException */ public void deleteRow(String tablename, String rowkey) throws IOException { HTable table = new HTable(conf, tablename); List<Delete> list = new ArrayList<Delete>(); Delete d1 = new Delete(rowkey.getBytes()); list.add(d1); table.delete(list); System.out.println("delete row(" + rowkey + ") sucess"); table.close(); } /** * 查找一行记录 * * @param tablename * @param rowkey */ public void selectRow(String tablename, String rowKey) throws IOException { HTable table = new HTable(conf, tablename); Get g = new Get(rowKey.getBytes()); // g.addColumn(Bytes.toBytes("cf:1")); Result rs = table.get(g); for (KeyValue kv : rs.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } table.close(); } /** * 查询表中所有行 * * @param tablename * @throws IOException */ public void scaner(String tablename) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); // for (int i = 0; i < kv.length; i++) { /* * System.out.print(new String(kv[i].getRow()) + " "); * System.out.print(new String(kv[i].getFamily()) + ":"); * System.out.print(new String(kv[i].getQualifier()) + " "); * System.out.print(kv[i].getTimestamp() + " "); * System.out.println(new String(kv[i].getValue())); */ System.out.println(new String(kv[1].getValue()) + "==" + new String(kv[0].getValue())); // } } rs.close(); table.close(); } public void scanByTimestamp(String tablename, long maxtime) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); // TODO 存放所有的结果 FilterList allInfo = new FilterList(); // allInfo.addFilter(); s.setFilter(allInfo); } public Map<String, String> getMap() { Map<String, String> map = new HashMap<String, String>(); try { HTable table = new HTable(conf, mapTable); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); map.put(new String(kv[0].getRow()), new String(kv[0].getValue())); } } catch (IOException e) { e.printStackTrace(); } return map; } }
import java.io.IOException; import java.util.Map; public interface IOperator { public void createTable(String tablename, String[] cfs) throws IOException ; public void deleteTable(String tablename) throws IOException; public void insertRow() throws IOException; public void writeRow(String tablename, String[] cfs) throws IOException; public void writeMultRow(String tablename, String[][] cfs) throws IOException; public void deleteRow(String tablename, String rowkey) throws IOException; public void selectRow(String tablename, String rowKey) throws IOException; public void scaner(String tablename) throws IOException; public void scanByTimestamp(String tablename, long maxtime) throws IOException; public Map<String, String> getMap() throws IOException; }
public abstract class BaseRunnabler implements Runnable{ String sourceFile=""; // 读取文件路径 String numberFile=""; String hbaseTable=""; // hbase 表名 String [] hbaseFamily=null; // 行列簇名 String keywords =""; public BaseRunnabler(String sourceFile,String hbaseTable,String [] hbaseFamily,String numberFile ,String keywords ){ this.sourceFile=sourceFile; this.numberFile=numberFile; this.hbaseTable=hbaseTable; this.hbaseFamily = hbaseFamily; this.keywords = keywords; } @Override public void run() { try{ IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily); hu.createTable(hbaseTable,hbaseFamily ); processFile(hu ); }catch (Exception e) { e.printStackTrace(); } } public abstract void processFile(IOperator hu) throws Exception; }
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import Model.Device; import com.alibaba.fastjson.JSON; public class DeviceReadThread extends BaseRunnabler { static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class); public DeviceReadThread(String sourceFile, String hbaseTable, String[] hbaseFamily, String numberFile, String keywords) { super(sourceFile, hbaseTable, hbaseFamily, numberFile, keywords); } @Override public void processFile(IOperator hu) { FileReader logReader = null; BufferedReader logBufferedReader = null; try { File logFile = new File(sourceFile); logReader = new FileReader(logFile); logBufferedReader = new BufferedReader(logReader); String temp = logBufferedReader.readLine(); //logger.error(" temp is " + temp ); while ( temp != null) { Device device = JSON.parseObject(temp, Device.class); //logger.error(" device is null ? " + ( device == null ) ); String[][] s = new String[][] { { device.getLid(), hbaseFamily[0], "lid" , device.getLid() } , { device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, { device.getLid(), hbaseFamily[2], "os", "2" }, { device.getLid(), hbaseFamily[2], "osv", "3" } }; hu.writeMultRow(hbaseTable, s); logger.info(" hbase util end " ); temp = logBufferedReader.readLine(); } } catch (Exception e) { logger.error(" DeviceReadThread error " ); e.printStackTrace(); } finally { try { logBufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } try { logReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.FileInputStream; import java.io.FileNotFoundException; import java.util.Properties; public class HbaseStarter { public static void main(String[] args) throws Exception { Properties properties=new Properties(); //String config = "D:/work/util/aoi-hbase/trunk/src/main/resources/testua.properties"; String config = "/home/aoi/aoi-hbase/conf/config.properties"; FileInputStream fis = new FileInputStream(config); properties.load(fis); fis.close(); String sourceFile=properties.getProperty("sourceFile")+"device2.log"+","+properties.getProperty("sourceFile")+"applist.log"; String hbaseTable = properties.getProperty("hbaseTable"); String hbaseFamily = properties.getProperty("hbaseFamily"); String numFile=properties.getProperty("sourceFile")+"num.txt"; String[] sourceFileName=sourceFile.split(","); // file String[] hbaseTableName=hbaseTable.split(","); // table String[] hbaseFamilyName=hbaseFamily.split("&"); // family DeviceReadThread device = new DeviceReadThread(sourceFileName[0],hbaseTableName[0],hbaseFamilyName[0].split(","),"",""); new Thread(device).start(); AppReadThread app = new AppReadThread(sourceFileName[1],hbaseTableName[1],hbaseFamilyName[1].split(","),numFile,""); new Thread(app).start(); } }
config.properties
sourceFile=//data//logs// hbaseTable=device-ua,app-ua hbaseFamily="device","history","Description"&"app", "history", "Description"
hbase-site-test_bj.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://xxx.com:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hfile.block.cache.size</name> <value>0.4</value> </property> <property> <name>hbase.regionserver.handler.count</name> <value>150</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/var/lib/zookeeper</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>xxx.com,xxx.com,rabbitmq1</value> </property> <property> <name>zookeeper.session.timeout</name> <value>60000</value> </property> <property> <name>hbase.master.maxclockskew</name> <value>180000</value> <description>Time difference of regionserver from master</description> </property> <property> <name>hbase.hregion.memstore.flush.size</name> <value>512</value> </property> <property> <name>hbase.zookeeper.property.maxClientCnxns</name> <value>1000</value> </property> <property> <name>hbase.hregion.max.filesize</name> <value>1024</value> </property> </configuration>
device2.log
结果:
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
在Java编程环境中,将本地文件读取并上传到HBase是一项常见的任务,特别是在大数据处理和存储的场景下。HBase是一个分布式、版本化的NoSQL数据库,基于Apache Hadoop,适用于大规模数据存储。以下是一个详细的过程,...
通过HbaseTemplate,我们可以执行常见的CRUD(创建、读取、更新和删除)操作以及更复杂的查询。 1. **HbaseTemplate的初始化**:在使用HbaseTemplate之前,我们需要在Spring配置文件中配置HBase的相关连接信息,如...
在Java编程环境中,链接并操作HBase是一种常见的任务,特别是在大数据处理和分布式存储的应用场景下。HBase是一个基于Google Bigtable设计的开源NoSQL数据库,它运行在Hadoop之上,提供高并发、低延迟的数据存储服务...
### HBase基本数据操作详解 #### 一、命名空间 Namespace **1.1 命名空间概述** 在HBase中,命名空间(namespace)的概念类似于传统数据库中的模式(schema),它提供了一种对表进行逻辑分组的方式。这种分组不仅有助...
总的来说,封装HBase以便Java调用是一个常见的开发任务,它涉及到对HBase API的理解,接口设计,以及对Java编程和项目管理的综合运用。封装后的库不仅提高了代码的可读性和可维护性,也使得应用与数据存储的交互更加...
这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop....
在Java中操作HBase是一种常见的任务,特别是在大数据处理和存储的场景中。HBase是一个分布式的、基于列族的NoSQL数据库,它构建在Hadoop之上,提供了高性能、低延迟的数据存储和访问能力。本教程将详细介绍如何使用...
Java访问Hbase数据库是大数据处理中的常见操作,尤其在分布式存储和实时数据分析场景下。HBase,一个基于Google Bigtable模型的开源非关系型数据库,是Apache Hadoop生态系统的一部分,提供高并发、低延迟的数据存储...
Java连接远程HBase数据库是一项常见的任务,特别是在大数据处理和分布式存储的应用场景中。HBase是一个构建在Hadoop文件系统(HDFS)之上的分布式、版本化的NoSQL数据库,它提供了高性能、低延迟的数据访问能力。...
在IT领域,尤其是在大数据处理和分布式系统中,Java、Thrift和HBase是常见的技术组合。本主题将详细探讨如何利用Java通过Thrift-0.9.1版本来读取HBase表数据。 HBase是一个基于Google Bigtable设计的开源NoSQL...
【标题】"hbase hadoop chm java 帮助文档"揭示了这是一份针对Java程序员在Hadoop和HBase开发中使用的CHM(Windows帮助文档)工具集。CHM文件是一种常见的技术文档格式,它将多个HTML页面、图像和其他资源打包成一个...
在Java API中访问HBase是大数据处理中常见的一项任务,HBase作为一个分布式、列式存储的NoSQL数据库,常用于海量数据的实时读写。在这个Java API访问HBase的Maven项目中,我们将探讨如何配置项目,引入依赖,以及...
它提供了多种命令,帮助用户执行常见的数据库操作,如创建、读取、更新和删除数据。以下是一些重要的HBase Shell操作命令及其详细说明: 1. **查询服务器状态** `status` 命令用于查看集群的运行状态,包括活动...
### HBase常见错误及解决方案:3年运维经验总结 #### 一、配置第三方依赖包HADOOP_CLASSPATH和HBase问题 **问题描述** 在本地开发HBase程序时,虽然本地编译能够通过(因为在IDE中已经导入了必要的jar包),但在...
Java链接HBase进行增删改查操作是大数据领域常见的任务,尤其在处理大规模分布式存储时。HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的NoSQL数据库,提供了高性能、高可扩展性的数据存储解决方案。这...
SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则是一个基于Java的现代Web框架,提供了...
使用Java对HBase进行操作是大数据处理中常见的场景,本文将总结使用Java对HBase进行操作的方法,并提供详细的示例代码。 一、Configuration 在使用Java API时,Client端需要知道HBase的配置环境,如存储地址,...
对于开发人员来说,通过Java API与HBase进行交互是常见的操作方式。本指南将深入探讨如何利用Java API来访问和操作HBase。 一、HBase简介 HBase是建立在Hadoop文件系统(HDFS)之上,提供实时读写操作的非关系型...
这个过程涉及了SQL查询、数据转换、数据预处理、JSON解析、HBase表设计以及客户端API的使用,是大数据领域常见的数据迁移任务。在实际操作中,可能还需要考虑到性能优化、错误处理和数据一致性等问题,以确保整个...