将手机上网日志文件批量导入到Hbase中,操作步骤:
1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop fs -put input /
2、创建Hbase表,通过Java操作
package com.jiewen.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; public class HbaseDemo { public static void main(String[] args) throws IOException { String tableName = "wlan_log"; String columnFamily = "cf"; HbaseDemo.create(tableName, columnFamily); // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data"); // HbaseDemo.get(tableName, "row1"); // HbaseDemo.scan(tableName); // HbaseDemo.delete(tableName); } // hbase操作必备 private static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase"); // 使用eclipse时必须添加这个,否则无法定位 conf.set("hbase.zookeeper.quorum", "hadoop1"); return conf; } // 创建一张表 public static void create(String tableName, String columnFamily) throws IOException { HBaseAdmin admin = new HBaseAdmin(getConfiguration()); if (admin.tableExists(tableName)) { System.out.println("table exists!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tableName); tableDesc.addFamily(new HColumnDescriptor(columnFamily)); admin.createTable(tableDesc); System.out.println("create table success!"); } } // 添加一条记录 public static void put(String tableName, String row, String columnFamily, String column, String data) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Put p1 = new Put(Bytes.toBytes(row)); p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes .toBytes(data)); table.put(p1); System.out.println("put'" + row + "'," + columnFamily + ":" + column + "','" + data + "'"); } // 读取一条记录 public static void get(String tableName, String row) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Get get = new Get(Bytes.toBytes(row)); Result result = table.get(get); System.out.println("Get: " + result); } // 显示所有数据 public static void scan(String tableName) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("Scan: " + result); } } // 删除表 public static void delete(String tableName) throws IOException { HBaseAdmin admin = new HBaseAdmin(getConfiguration()); if (admin.tableExists(tableName)) { try { admin.disableTable(tableName); admin.deleteTable(tableName); } catch (IOException e) { e.printStackTrace(); System.out.println("Delete " + tableName + " 失败"); } } System.out.println("Delete " + tableName + " 成功"); } }
3、将日志文件导入Hbase表wlan_log中:
import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class HbaseBatchImport { public static void main(String[] args) throws Exception { final Configuration configuration = new Configuration(); // 设置zookeeper configuration.set("hbase.zookeeper.quorum", "hadoop1"); // 设置hbase表名称 configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log"); // 将该值改大,防止hbase超时退出 configuration.set("dfs.socket.timeout", "180000"); final Job job = new Job(configuration, "HBaseBatchImport"); job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class); // 设置map的输出,不设置reduce的输出类型 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); // 不再设置输出路径,而是设置输出格式类型 job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input"); job.waitForCompletion(true); } static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text> { SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss"); Text v2 = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { final String[] splited = value.toString().split("\t"); try { final Date date = new Date(Long.parseLong(splited[0].trim())); final String dateFormat = dateformat1.format(date); String rowKey = splited[1] + ":" + dateFormat; v2.set(rowKey + "\t" + value.toString()); context.write(key, v2); } catch (NumberFormatException e) { final Counter counter = context.getCounter("BatchImport", "ErrorFormat"); counter.increment(1L); System.out.println("出错了" + splited[0] + " " + e.getMessage()); } }; } static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable> { protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException { for (Text text : values) { final String[] splited = text.toString().split("\t"); final Put put = new Put(Bytes.toBytes(splited[0])); put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes .toBytes(splited[1])); // 省略其他字段,调用put.add(....)即可 context.write(NullWritable.get(), put); } }; } }
4、查看导入结果:
相关推荐
Hbase 调用 JavaAPI 实现批量导入操作 在大数据时代,Hbase 作为一个分布式、面向列的 NoSQL 数据库,广泛应用于大规模数据存储和处理中。同时,JavaAPI 作为一个强大且流行的编程语言,广泛应用于各种软件开发中。...
这个Java API访问HBase的Maven项目可能还包含了对HBase其他特性的演示,如扫描(Scan)、过滤器(Filter)、批量操作(Bulk Load)等。通过`Scan`可以实现数据的批量读取,`Filter`可以定制化查询条件,`Bulk Load`...
导入这些库后,你可以开始编写Java代码来操作HBase表。以下是一些关键步骤和API的使用: 1. **连接HBase**:首先,你需要创建一个`Configuration`对象,设置HBase的ZooKeeper连接信息,然后使用`ConnectionFactory`...
HBase提供了一套Java API,使得开发者能够轻松地进行数据的增删改查操作。本教程将深入探讨如何在代码中利用HBase API进行基本的数据操作。 首先,为了使用HBase API,我们需要在项目中引入相应的依赖。由于HBase是...
REST Gateway是基于JAX-RS(Java API for RESTful Web Services)实现的,提供了标准的CRUD(Create, Read, Update, Delete)操作。 3. **Thrift Gateway**:Thrift是一个跨语言的服务框架,允许使用多种编程语言来...
除了HBase Shell,我们还可以使用Java API编写程序来执行上述操作。在Java代码中,可以创建`Table`对象,然后使用`put`方法插入数据,最后调用`flushCommits`方法提交更改。 在测试环境中,导入数据集有助于验证...
Thrift接口将HBase的操作转换为易于理解和使用的API,用户可以通过这些API执行基本的CRUD(创建、读取、更新、删除)操作。 3. **Python与HBase通信**:Python客户端通过导入Thrift生成的库,可以建立到HBase ...
在特殊情况下,如数据恢复或批量导入,开发者可能需要直接操作HFile。这通常涉及使用HBase的Admin API创建表,然后使用HFile工具将数据加载到表中。 4. **Java API使用**:在Java中,我们可以使用`org.apache....
在Java开发中,Apache的HBase和Hadoop库提供了丰富的API来支持上述步骤。例如,`org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil`类提供了一些实用方法,如初始化Job,设置Mapper和Reducer,以及配置...
- **关键词管理**:实现关键词的增删改查操作,支持关键词的批量导入和导出。 - **关键词导入与导出**:支持从CSV或DICX文件导入关键词,并能够将关键词导出到指定格式的文件中。 ##### 7. 系统API模块 - **API设计...
4. **使用HBase API**:Java中提供了HBase的API,如`Table`接口和`Put`对象,用于向表中添加数据。先创建`Connection`和`Table`实例,然后遍历CSV数据,对于每一行,创建一个`Put`对象,设置行键和列族/列限定符对应...