用 hadoop mapreduce 任务生成HFile文件,再使用BulkLoad 导入到hbase库。
Mapper 类:
public class HiveToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { private static final Logger logger = LoggerFactory.getLogger(HiveToHbaseMapper.class); public static final long timeStamp = System.currentTimeMillis(); private String dataSeperator = null; private String columnFamily = null; private Map<String,Integer> columnMap = null; private int rowKeyIndex = 0; public void setup(Context context) { Configuration configuration = context.getConfiguration();//获取作业参数 dataSeperator = configuration.get("data.seperator"); columnFamily = configuration.get("column.family"); String columnMapStr = configuration.get("column.map"); String columnSplit [] = columnMapStr.split(","); Map<String,Integer> columnMap = new HashMap<String,Integer>(); for(int i=0;i<columnSplit.length;i++){ String temp[] = columnSplit[i].split("="); String columnName =temp[0]; String columnIndex =temp[1]; Integer columnIndexInt = Integer.parseInt(columnIndex); if(columnName.equalsIgnoreCase("rowKey")){ this.rowKeyIndex = columnIndexInt; }else{ columnMap.put(columnName,columnIndexInt); } } this.columnMap = columnMap; logger.info("data.seperator:" + dataSeperator); logger.info("column.family:" + columnFamily); logger.info("column.map:" + columnMap); } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String valueStr = value.toString(); String[] values = value.toString().split(dataSeperator); String rowKeyStr = values[rowKeyIndex]; byte rowKeyBytes[] = Bytes.toBytes(rowKeyStr); byte columnFamilyBytes[] = Bytes.toBytes(columnFamily); ImmutableBytesWritable rowKey = new ImmutableBytesWritable(rowKeyBytes); if(columnMap.size() != 5){ throw new RuntimeException(columnMap.toString()); } Iterator<Map.Entry<String,Integer>> iterator = this.columnMap.entrySet().iterator(); //StringBuffer stringBuffer = new StringBuffer(); while(iterator.hasNext()){ Map.Entry<String,Integer> columnEntry = iterator.next(); String columnName = columnEntry.getKey(); Integer columnIndex = columnEntry.getValue(); String columnValue = values[columnIndex]; //stringBuffer.append("columnName:"+columnName+",columnValue:"+columnValue); KeyValue kv = new KeyValue(rowKeyBytes,columnFamilyBytes , Bytes.toBytes(columnName),timeStamp,Bytes.toBytes(columnValue)); context.write(rowKey, kv); } /* if(true){ throw new RuntimeException("valueStr:"+valueStr+",values.length:"+values.length+" ,columnMap:"+columnMap.toString()+" rowkey:"+rowKeyStr+",columnFamily:"+columnFamily+",columns:"+stringBuffer.toString()); }*/ } catch(Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
创建job:
private Configuration getConfiguration(String interfaceId) throws Exception { InterfaceConfig interfaceConfig = this.getInterfaceConfigList().getHiveToHBaseHConfig(interfaceId); HBaseConfig hBaseConfig = this.gethBaseConfig(); Configuration configuration = hBaseConfig.getConfiguration(); configuration.set(tableNameVar,interfaceConfig.getHBaseTableName()); configuration.set(dataSpliterVar,interfaceConfig.getDataSpliter()); configuration.set(columnFamilyVar,interfaceConfig.builderColumnFamily()); configuration.set(columnMapVar,interfaceConfig.getColumnMap()); configuration.set(inputPathVar,interfaceConfig.builderInputPath()); configuration.set(outputPathVar,interfaceConfig.builderOutputPath()); return configuration; } private void showConfig(Configuration configuration ){ logger.info("Configuration:["+tableNameVar+":{}]]", configuration.get(tableNameVar)); logger.info("Configuration:["+dataSpliterVar+":{}]",configuration.get(dataSpliterVar)); logger.info("Configuration:["+columnFamilyVar+":{}]",configuration.get(columnFamilyVar)); logger.info("Configuration:["+columnMapVar+":{}]",configuration.get(columnMapVar)); logger.info("Configuration:["+inputPathVar+":{}]",configuration.get(inputPathVar)); logger.info("Configuration:["+outputPathVar+":{}]",configuration.get(outputPathVar)); } public boolean start(String interfaceId) throws Throwable { Configuration configuration = this.getConfiguration(interfaceId); showConfig(configuration); String outputPath = configuration.get(outputPathVar); String tableName = configuration.get(tableNameVar); /* if(true){ return false; }*/ Job job = Job.getInstance(configuration , "Bulk Loading HBaseConfig Table::" ); job.setJobName("hive to hbase"); job.setJarByClass(HiveToHbaseMapper.class); //job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类 job.setOutputValueClass(KeyValue.class);//指定输出值类 job.setMapperClass(HiveToHbaseMapper.class);//指定Map函数 job.setReducerClass(KeyValueSortReducer.class); //job.setOutputFormatClass(HFileOutputFormat2.class); logger.info("-----------------------------------------------------------"); /* job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false);*/ FileInputFormat.addInputPaths(job, configuration.get(inputPathVar));//输入路径 Path output = new Path(URI.create(outputPath)); FileSystem fs = FileSystem.get(configuration); if (fs.exists(output)) { boolean result = fs.delete(output, true);//如果输出路径存在,就将其删除 logger.info("delete hdfs path:{},result={}",outputPath,result); } FileOutputFormat.setOutputPath(job, output);//输出路径 Connection connection = ConnectionFactory.createConnection(configuration); // TableName tableName = TableName.valueOf(); // Table table = connection.getTable(tableName); HTable table = new HTable(configuration, tableName); HFileOutputFormat.configureIncrementalLoad(job, table); //HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(tableName)); job.waitForCompletion(true); showConfig(configuration); if(job.isSuccessful()){ logger.info(" map reduce generate HFile success[HFilePath:{}]",outputPath); RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(output, true); while(remoteIterator.hasNext()){ LocatedFileStatus locatedFileStatus = remoteIterator.next(); String filePath =locatedFileStatus.getPath().toUri().getPath(); long fileLength = locatedFileStatus.getLen(); logger.info("HFile:[filePath:{}],[fileLength:{}]",filePath,fileLength); } showConfig(configuration); logger.info("doBulkLoad to hbase table:{}",table.getName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); //HTable table = new HTable(configuration,tableName); try { loader.doBulkLoad(output, table); }catch (Throwable e){ logger.info("doBulkLoad to hbase fail ",e); throw e; } logger.info("doBulkLoad to hbase success "); return true; } else { logger.info(" map reduce generate HFile fail "); return false; } }
运行环境:hadoop 环境
程序打成jar包。
运行命令:hadoop jar [程序jar包] [参数]
日志输出:
17/03/22 19:24:43 INFO mapreduce.Job: Job job_1488875331552_130336 running in uber mode : false 17/03/22 19:24:43 INFO mapreduce.Job: map 0% reduce 0% 17/03/22 19:24:53 INFO mapreduce.Job: map 10% reduce 0% 17/03/22 19:24:54 INFO mapreduce.Job: map 27% reduce 0% 17/03/22 19:24:57 INFO mapreduce.Job: map 30% reduce 0% 17/03/22 19:25:00 INFO mapreduce.Job: map 33% reduce 0% 17/03/22 19:25:02 INFO mapreduce.Job: map 35% reduce 0% 17/03/22 19:25:03 INFO mapreduce.Job: map 41% reduce 0% 17/03/22 19:25:05 INFO mapreduce.Job: map 42% reduce 0% 17/03/22 19:25:06 INFO mapreduce.Job: map 44% reduce 3% 17/03/22 19:25:08 INFO mapreduce.Job: map 45% reduce 3% 17/03/22 19:25:09 INFO mapreduce.Job: map 49% reduce 3% 17/03/22 19:25:11 INFO mapreduce.Job: map 50% reduce 3% 17/03/22 19:25:12 INFO mapreduce.Job: map 55% reduce 3% 17/03/22 19:25:14 INFO mapreduce.Job: map 57% reduce 3% 17/03/22 19:25:15 INFO mapreduce.Job: map 59% reduce 3% 17/03/22 19:25:17 INFO mapreduce.Job: map 60% reduce 3% 17/03/22 19:25:18 INFO mapreduce.Job: map 64% reduce 3% 17/03/22 19:25:21 INFO mapreduce.Job: map 67% reduce 3% 17/03/22 19:25:23 INFO mapreduce.Job: map 68% reduce 3% 17/03/22 19:25:24 INFO mapreduce.Job: map 70% reduce 3% 17/03/22 19:25:27 INFO mapreduce.Job: map 73% reduce 3% 17/03/22 19:25:30 INFO mapreduce.Job: map 77% reduce 3% 17/03/22 19:25:33 INFO mapreduce.Job: map 82% reduce 3% 17/03/22 19:25:35 INFO mapreduce.Job: map 83% reduce 3% 17/03/22 19:25:36 INFO mapreduce.Job: map 87% reduce 3% 17/03/22 19:25:38 INFO mapreduce.Job: map 88% reduce 3% 17/03/22 19:25:39 INFO mapreduce.Job: map 92% reduce 3% 17/03/22 19:25:41 INFO mapreduce.Job: map 93% reduce 3% 17/03/22 19:25:42 INFO mapreduce.Job: map 96% reduce 3% 17/03/22 19:25:43 INFO mapreduce.Job: map 96% reduce 10% 17/03/22 19:25:44 INFO mapreduce.Job: map 97% reduce 10% 17/03/22 19:25:45 INFO mapreduce.Job: map 98% reduce 10% 17/03/22 19:25:46 INFO mapreduce.Job: map 98% reduce 17% 17/03/22 19:25:47 INFO mapreduce.Job: map 99% reduce 17% 17/03/22 19:25:51 INFO mapreduce.Job: map 100% reduce 17% 17/03/22 19:25:52 INFO mapreduce.Job: map 100% reduce 30% 17/03/22 19:25:58 INFO mapreduce.Job: map 100% reduce 67% 17/03/22 19:26:11 INFO mapreduce.Job: map 100% reduce 68% 17/03/22 19:26:20 INFO mapreduce.Job: map 100% reduce 69% 17/03/22 19:26:29 INFO mapreduce.Job: map 100% reduce 70% 17/03/22 19:26:38 INFO mapreduce.Job: map 100% reduce 71% 17/03/22 19:26:47 INFO mapreduce.Job: map 100% reduce 72% 17/03/22 19:26:53 INFO mapreduce.Job: map 100% reduce 73% 17/03/22 19:27:02 INFO mapreduce.Job: map 100% reduce 74% 17/03/22 19:27:08 INFO mapreduce.Job: map 100% reduce 75% 17/03/22 19:27:17 INFO mapreduce.Job: map 100% reduce 76% 17/03/22 19:27:23 INFO mapreduce.Job: map 100% reduce 77% 17/03/22 19:27:32 INFO mapreduce.Job: map 100% reduce 78% 17/03/22 19:27:41 INFO mapreduce.Job: map 100% reduce 79% 17/03/22 19:27:47 INFO mapreduce.Job: map 100% reduce 80% 17/03/22 19:27:53 INFO mapreduce.Job: map 100% reduce 81% 17/03/22 19:28:03 INFO mapreduce.Job: map 100% reduce 82% 17/03/22 19:28:09 INFO mapreduce.Job: map 100% reduce 83% 17/03/22 19:28:15 INFO mapreduce.Job: map 100% reduce 84% 17/03/22 19:28:24 INFO mapreduce.Job: map 100% reduce 85% 17/03/22 19:28:30 INFO mapreduce.Job: map 100% reduce 86% 17/03/22 19:28:39 INFO mapreduce.Job: map 100% reduce 87% 17/03/22 19:28:45 INFO mapreduce.Job: map 100% reduce 88% 17/03/22 19:28:51 INFO mapreduce.Job: map 100% reduce 89% 17/03/22 19:29:00 INFO mapreduce.Job: map 100% reduce 90% 17/03/22 19:29:06 INFO mapreduce.Job: map 100% reduce 91% 17/03/22 19:29:15 INFO mapreduce.Job: map 100% reduce 92% 17/03/22 19:29:21 INFO mapreduce.Job: map 100% reduce 93% 17/03/22 19:29:27 INFO mapreduce.Job: map 100% reduce 94% 17/03/22 19:29:36 INFO mapreduce.Job: map 100% reduce 95% 17/03/22 19:29:42 INFO mapreduce.Job: map 100% reduce 96% 17/03/22 19:29:48 INFO mapreduce.Job: map 100% reduce 97% 17/03/22 19:29:57 INFO mapreduce.Job: map 100% reduce 98% 17/03/22 19:30:03 INFO mapreduce.Job: map 100% reduce 99% 17/03/22 19:30:09 INFO mapreduce.Job: map 100% reduce 100% 17/03/22 19:30:13 INFO mapreduce.Job: Job job_1488875331552_130336 completed successfully 17/03/22 19:30:13 INFO mapreduce.Job: Counters: 51 File System Counters FILE: Number of bytes read=24493297777 FILE: Number of bytes written=36853210071 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1230457141 HDFS: Number of bytes written=8171126646 HDFS: Number of read operations=35 HDFS: Number of large read operations=0 HDFS: Number of write operations=3 Job Counters Launched map tasks=10 Launched reduce tasks=1 Other local map tasks=7 Data-local map tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=538122 Total time spent by all reduces in occupied slots (ms)=630690 Total time spent by all map tasks (ms)=538122 Total time spent by all reduce tasks (ms)=315345 Total vcore-seconds taken by all map tasks=538122 Total vcore-seconds taken by all reduce tasks=315345 Total megabyte-seconds taken by all map tasks=1102073856 Total megabyte-seconds taken by all reduce tasks=1291653120 Map-Reduce Framework Map input records=29206691 Map output records=146033455 Map output bytes=12066006570 Map output materialized bytes=12358073540 Input split bytes=1860 Combine input records=0 Combine output records=0 Reduce input groups=29206691 Reduce shuffle bytes=12358073540 Reduce input records=146033455 Reduce output records=146033455 Spilled Records=435469330 Shuffled Maps =10 Failed Shuffles=0 Merged Map outputs=10 GC time elapsed (ms)=11752 CPU time spent (ms)=1070230 Physical memory (bytes) snapshot=9382916096 Virtual memory (bytes) snapshot=43484647424 Total committed heap usage (bytes)=10108272640 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1230455281//读取字节数 File Output Format Counters Bytes Written=8171126646//生成文件的字节数 17/03/22 19:30:13 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://user/my/cf/8d4d9058fe9d4dbfa7fd854d45002010 first=04528183988_01_00001 last=97477539821_01_00001 //first 和 last 为 rowkey 范围 17/03/22 19:30:13 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 17/03/22 19:30:13 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x158b58cea130bc6 17/03/22 19:30:13 INFO zookeeper.ZooKeeper: Session: 0x158b58cea130bc6 closed 17/03/22 19:30:13 INFO zookeeper.ClientCnxn: EventThread shut down 17/03/22 19:30:14 INFO hadoop.HiveToHBaseMain: doBulkLoad to hbase success 17/03/22 19:30:14 INFO hadoop.HiveToHBaseStartUtils: System.exit(0) ---- success
相关推荐
这个项目可能包括了Hive和HBase的连接代码、数据预处理逻辑、MapReduce作业的配置以及加载HFiles的Java代码。通过阅读和理解这个项目的源码,你可以更好地掌握如何在实际项目中实现Hive到HBase的数据快速导入。 ...
总结,通过上述步骤,我们可以成功地将MySQL中的数据导入到HBase。在实际项目中,可能需要考虑更多因素,例如数据清洗、错误处理、性能优化等。此外,为了实现大规模数据迁移,可以考虑使用批处理或MapReduce等技术...
源代码可能包括了数据预处理的MapReduce作业,HBase表的创建逻辑,以及使用HBase Java API的导入功能。通过阅读和理解这段代码,可以深入学习如何在实际项目中将HDFS数据导入HBase,这对于大数据平台的开发和运维...
博客文档链接中提到的内容可能包括使用HBase的命令行接口(HBase Shell)或编程API(如Java API)来导入数据。ORDER_INFO.txt文件很可能是我们需要导入的数据源,它可能包含了订单信息,如订单ID、用户ID、商品ID、...
Bulk Load是将大量数据高效导入HBase的一种方法,它可以显著提高数据导入速度。Filters则可以帮助我们实现复杂的数据筛选,以满足特定的查询需求。 总之,《HBase权威指南》的源代码为学习者提供了宝贵的实践材料,...
标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...
本文将深入探讨如何使用代码实现将CSV(逗号分隔值)数据存储到HBase中,帮助你更好地理解和掌握HBase的用法。 首先,我们需要理解HBase的基本概念。HBase是构建在Hadoop之上的NoSQL数据库,它以行键、列族、列和...
在大数据处理领域,Spark 和 HBase 以及 MySQL 都扮演着重要的角色。Spark 提供了高效的数据处理能力,HBase 是一个分布式、面向列的NoSQL数据库,而 MySQL 是广泛使用的的关系型数据库。本示例将详细介绍如何使用 ...
本篇文章将详细介绍如何将关系型数据库的数据导入到Hbase中,包括离线和实时两种方式。 1. 离线数据导入: 离线数据导入通常在系统低峰期进行,适用于大量数据迁移。常见的工具包括Apache Nifi、Sqoop和Hadoop ...
MySQL通过sqoop工具用命令将数据导入到hbase的代码文件
- 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...
通过导入这个POM文件到Eclipse或其他IDE,用户可以轻松地将源代码作为工程来运行和调试,极大地简化了开发流程。 在研究HBase源码时,我们可以关注以下几个核心概念和组件: 1. **Region服务器**:HBase的主要工作...
这暗示了可能有一个解决方案,可以将MongoDB的数据先导入到Solr,然后通过Solr与HBase交互,因为Solr与HBase的集成相对成熟,有专门的HBase Realtime Get (HRG)插件支持。 另一个文件名"W"没有明确的上下文,可能是...
通过学习和理解这些代码,你可以更好地掌握如何在实际生产环境中使用MapReduce进行HBase的数据批量导入。在具体操作时,需要注意数据的正确性、性能优化以及错误处理,以确保数据加载的稳定性和高效性。
本篇文章将围绕"基于Hadoop、HBase的WordCount代码"进行详细讲解,旨在帮助读者理解如何在Hadoop上实现基础的数据统计,并利用HBase存储和查询结果。 首先,让我们来了解一下Hadoop的WordCount程序。WordCount是...
geomesa插入数据,除了使用代码实现外,还可以使用geomesa-hbase ingest 命令行实现
Hbase 调用 JavaAPI 实现批量导入操作 在大数据时代,Hbase 作为一个分布式、面向列的 NoSQL 数据库,...使用 JavaAPI 调用 Hbase 实现批量导入操作可以提高数据处理效率和存储能力,满足大规模数据存储和处理的需求。
这篇博客“Hbase调用Java API实现批量导入操作”聚焦于如何利用Java编程语言高效地向HBase中批量导入数据。在这个过程中,我们将探讨以下几个关键知识点: 1. **HBase架构**: HBase是基于列族的存储模型,数据被...
通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析任务。 在实际使用中,选择哪个客户端工具取决于具体的需求和使用场景。例如,如果需要快速原型开发或简单的数据操作,...
这些都涉及到HBase的高级特性,例如使用`HTablePool`管理表对象池,或者使用`BulkLoadHFile`工具进行数据导入优化。同时,HBase的Region分布、表分区策略、以及Compaction和Flush机制也是理解HBase性能调优的重要...