`

HBASE 导入数据代码

 
阅读更多

   用 hadoop mapreduce 任务生成HFile文件,再使用BulkLoad 导入到hbase库。

 

 

  Mapper 类:

   

public class HiveToHbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    private static final Logger logger = LoggerFactory.getLogger(HiveToHbaseMapper.class);
    public static final long timeStamp = System.currentTimeMillis();
    private String dataSeperator = null;
    private String columnFamily  = null;
    private Map<String,Integer> columnMap  = null;
    private int rowKeyIndex = 0;
    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();//获取作业参数

        dataSeperator = configuration.get("data.seperator");
        columnFamily  = configuration.get("column.family");

        String columnMapStr  = configuration.get("column.map");


        String columnSplit [] = columnMapStr.split(",");

        Map<String,Integer> columnMap = new HashMap<String,Integer>();
        for(int i=0;i<columnSplit.length;i++){
            String temp[] = columnSplit[i].split("=");
            String columnName =temp[0];
            String columnIndex =temp[1];
            Integer columnIndexInt = Integer.parseInt(columnIndex);
            if(columnName.equalsIgnoreCase("rowKey")){
                this.rowKeyIndex = columnIndexInt;
            }else{
                columnMap.put(columnName,columnIndexInt);
            }
        }
        this.columnMap = columnMap;
        logger.info("data.seperator:" + dataSeperator);
        logger.info("column.family:" + columnFamily);
        logger.info("column.map:" + columnMap);
    }


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        try {
            String valueStr = value.toString();
            String[] values = value.toString().split(dataSeperator);
            String rowKeyStr = values[rowKeyIndex];
            byte rowKeyBytes[] = Bytes.toBytes(rowKeyStr);
            byte columnFamilyBytes[] =  Bytes.toBytes(columnFamily);
            ImmutableBytesWritable rowKey = new ImmutableBytesWritable(rowKeyBytes);
            if(columnMap.size() != 5){
                throw new RuntimeException(columnMap.toString());
            }

            Iterator<Map.Entry<String,Integer>> iterator = this.columnMap.entrySet().iterator();
            //StringBuffer stringBuffer = new StringBuffer();
            while(iterator.hasNext()){
                Map.Entry<String,Integer> columnEntry = iterator.next();
                String columnName = columnEntry.getKey();
                Integer  columnIndex = columnEntry.getValue();
                String  columnValue = values[columnIndex];
                //stringBuffer.append("columnName:"+columnName+",columnValue:"+columnValue);

                KeyValue   kv = new KeyValue(rowKeyBytes,columnFamilyBytes ,  Bytes.toBytes(columnName),timeStamp,Bytes.toBytes(columnValue));
                context.write(rowKey, kv);
            }
           /* if(true){
                throw new RuntimeException("valueStr:"+valueStr+",values.length:"+values.length+" ,columnMap:"+columnMap.toString()+" rowkey:"+rowKeyStr+",columnFamily:"+columnFamily+",columns:"+stringBuffer.toString());
            }*/
        } catch(Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

    }
}

 

   创建job:

    

 private Configuration getConfiguration(String interfaceId) throws Exception {
        InterfaceConfig interfaceConfig = this.getInterfaceConfigList().getHiveToHBaseHConfig(interfaceId);
        HBaseConfig hBaseConfig = this.gethBaseConfig();
        Configuration configuration = hBaseConfig.getConfiguration();
         configuration.set(tableNameVar,interfaceConfig.getHBaseTableName());
        configuration.set(dataSpliterVar,interfaceConfig.getDataSpliter());
        configuration.set(columnFamilyVar,interfaceConfig.builderColumnFamily());
        configuration.set(columnMapVar,interfaceConfig.getColumnMap());
        configuration.set(inputPathVar,interfaceConfig.builderInputPath());
        configuration.set(outputPathVar,interfaceConfig.builderOutputPath());


        return configuration;
    }
    private void showConfig(Configuration configuration ){

        logger.info("Configuration:["+tableNameVar+":{}]]", configuration.get(tableNameVar));
        logger.info("Configuration:["+dataSpliterVar+":{}]",configuration.get(dataSpliterVar));
        logger.info("Configuration:["+columnFamilyVar+":{}]",configuration.get(columnFamilyVar));
        logger.info("Configuration:["+columnMapVar+":{}]",configuration.get(columnMapVar));
        logger.info("Configuration:["+inputPathVar+":{}]",configuration.get(inputPathVar));
        logger.info("Configuration:["+outputPathVar+":{}]",configuration.get(outputPathVar));
    }
    public  boolean start(String interfaceId) throws Throwable {



        Configuration configuration = this.getConfiguration(interfaceId);
        showConfig(configuration);
        String outputPath = configuration.get(outputPathVar);
        String tableName = configuration.get(tableNameVar);
       /* if(true){
            return false;
        }*/
        Job job = Job.getInstance(configuration , "Bulk Loading HBaseConfig Table::" );
        job.setJobName("hive to hbase");
        job.setJarByClass(HiveToHbaseMapper.class);
        //job.setInputFormatClass(TextInputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类
        job.setOutputValueClass(KeyValue.class);//指定输出值类
        job.setMapperClass(HiveToHbaseMapper.class);//指定Map函数
        job.setReducerClass(KeyValueSortReducer.class);
        //job.setOutputFormatClass(HFileOutputFormat2.class);
        logger.info("-----------------------------------------------------------");
      /*  job.setSpeculativeExecution(false);
        job.setReduceSpeculativeExecution(false);*/

        FileInputFormat.addInputPaths(job, configuration.get(inputPathVar));//输入路径

        Path output = new Path(URI.create(outputPath));
        FileSystem fs = FileSystem.get(configuration);
        if (fs.exists(output)) {
            boolean result = fs.delete(output, true);//如果输出路径存在,就将其删除
            logger.info("delete hdfs path:{},result={}",outputPath,result);
        }

        FileOutputFormat.setOutputPath(job,  output);//输出路径
        Connection connection = ConnectionFactory.createConnection(configuration);
       // TableName tableName = TableName.valueOf();
       // Table table = connection.getTable(tableName);
        HTable table = new HTable(configuration, tableName);
        HFileOutputFormat.configureIncrementalLoad(job, table);

        //HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(tableName));
        job.waitForCompletion(true);
        showConfig(configuration);

        if(job.isSuccessful()){
            logger.info(" map reduce generate HFile success[HFilePath:{}]",outputPath);
            RemoteIterator<LocatedFileStatus> remoteIterator =  fs.listFiles(output, true);
            while(remoteIterator.hasNext()){
                LocatedFileStatus locatedFileStatus =  remoteIterator.next();
                String filePath =locatedFileStatus.getPath().toUri().getPath();
                long   fileLength = locatedFileStatus.getLen();
                logger.info("HFile:[filePath:{}],[fileLength:{}]",filePath,fileLength);
            }
            showConfig(configuration);
            logger.info("doBulkLoad to hbase  table:{}",table.getName());
            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
            //HTable table = new HTable(configuration,tableName);
            try {
                 loader.doBulkLoad(output,  table);
            }catch (Throwable e){
                logger.info("doBulkLoad to hbase fail ",e);
                throw e;
            }
            logger.info("doBulkLoad to hbase success ");
            return true;
        } else {
            logger.info(" map reduce generate HFile fail ");
            return false;
        }
    }

 

   运行环境:hadoop 环境

   程序打成jar包。

   运行命令:hadoop jar [程序jar包]  [参数]

   日志输出:

   

17/03/22 19:24:43 INFO mapreduce.Job: Job job_1488875331552_130336 running in uber mode : false
17/03/22 19:24:43 INFO mapreduce.Job:  map 0% reduce 0%
17/03/22 19:24:53 INFO mapreduce.Job:  map 10% reduce 0%
17/03/22 19:24:54 INFO mapreduce.Job:  map 27% reduce 0%
17/03/22 19:24:57 INFO mapreduce.Job:  map 30% reduce 0%
17/03/22 19:25:00 INFO mapreduce.Job:  map 33% reduce 0%
17/03/22 19:25:02 INFO mapreduce.Job:  map 35% reduce 0%
17/03/22 19:25:03 INFO mapreduce.Job:  map 41% reduce 0%
17/03/22 19:25:05 INFO mapreduce.Job:  map 42% reduce 0%
17/03/22 19:25:06 INFO mapreduce.Job:  map 44% reduce 3%
17/03/22 19:25:08 INFO mapreduce.Job:  map 45% reduce 3%
17/03/22 19:25:09 INFO mapreduce.Job:  map 49% reduce 3%
17/03/22 19:25:11 INFO mapreduce.Job:  map 50% reduce 3%
17/03/22 19:25:12 INFO mapreduce.Job:  map 55% reduce 3%
17/03/22 19:25:14 INFO mapreduce.Job:  map 57% reduce 3%
17/03/22 19:25:15 INFO mapreduce.Job:  map 59% reduce 3%
17/03/22 19:25:17 INFO mapreduce.Job:  map 60% reduce 3%
17/03/22 19:25:18 INFO mapreduce.Job:  map 64% reduce 3%
17/03/22 19:25:21 INFO mapreduce.Job:  map 67% reduce 3%
17/03/22 19:25:23 INFO mapreduce.Job:  map 68% reduce 3%
17/03/22 19:25:24 INFO mapreduce.Job:  map 70% reduce 3%
17/03/22 19:25:27 INFO mapreduce.Job:  map 73% reduce 3%
17/03/22 19:25:30 INFO mapreduce.Job:  map 77% reduce 3%
17/03/22 19:25:33 INFO mapreduce.Job:  map 82% reduce 3%
17/03/22 19:25:35 INFO mapreduce.Job:  map 83% reduce 3%
17/03/22 19:25:36 INFO mapreduce.Job:  map 87% reduce 3%
17/03/22 19:25:38 INFO mapreduce.Job:  map 88% reduce 3%
17/03/22 19:25:39 INFO mapreduce.Job:  map 92% reduce 3%
17/03/22 19:25:41 INFO mapreduce.Job:  map 93% reduce 3%
17/03/22 19:25:42 INFO mapreduce.Job:  map 96% reduce 3%
17/03/22 19:25:43 INFO mapreduce.Job:  map 96% reduce 10%
17/03/22 19:25:44 INFO mapreduce.Job:  map 97% reduce 10%
17/03/22 19:25:45 INFO mapreduce.Job:  map 98% reduce 10%
17/03/22 19:25:46 INFO mapreduce.Job:  map 98% reduce 17%
17/03/22 19:25:47 INFO mapreduce.Job:  map 99% reduce 17%
17/03/22 19:25:51 INFO mapreduce.Job:  map 100% reduce 17%
17/03/22 19:25:52 INFO mapreduce.Job:  map 100% reduce 30%
17/03/22 19:25:58 INFO mapreduce.Job:  map 100% reduce 67%
17/03/22 19:26:11 INFO mapreduce.Job:  map 100% reduce 68%
17/03/22 19:26:20 INFO mapreduce.Job:  map 100% reduce 69%
17/03/22 19:26:29 INFO mapreduce.Job:  map 100% reduce 70%
17/03/22 19:26:38 INFO mapreduce.Job:  map 100% reduce 71%
17/03/22 19:26:47 INFO mapreduce.Job:  map 100% reduce 72%
17/03/22 19:26:53 INFO mapreduce.Job:  map 100% reduce 73%
17/03/22 19:27:02 INFO mapreduce.Job:  map 100% reduce 74%
17/03/22 19:27:08 INFO mapreduce.Job:  map 100% reduce 75%
17/03/22 19:27:17 INFO mapreduce.Job:  map 100% reduce 76%
17/03/22 19:27:23 INFO mapreduce.Job:  map 100% reduce 77%
17/03/22 19:27:32 INFO mapreduce.Job:  map 100% reduce 78%
17/03/22 19:27:41 INFO mapreduce.Job:  map 100% reduce 79%
17/03/22 19:27:47 INFO mapreduce.Job:  map 100% reduce 80%
17/03/22 19:27:53 INFO mapreduce.Job:  map 100% reduce 81%
17/03/22 19:28:03 INFO mapreduce.Job:  map 100% reduce 82%
17/03/22 19:28:09 INFO mapreduce.Job:  map 100% reduce 83%
17/03/22 19:28:15 INFO mapreduce.Job:  map 100% reduce 84%
17/03/22 19:28:24 INFO mapreduce.Job:  map 100% reduce 85%
17/03/22 19:28:30 INFO mapreduce.Job:  map 100% reduce 86%
17/03/22 19:28:39 INFO mapreduce.Job:  map 100% reduce 87%
17/03/22 19:28:45 INFO mapreduce.Job:  map 100% reduce 88%
17/03/22 19:28:51 INFO mapreduce.Job:  map 100% reduce 89%
17/03/22 19:29:00 INFO mapreduce.Job:  map 100% reduce 90%
17/03/22 19:29:06 INFO mapreduce.Job:  map 100% reduce 91%
17/03/22 19:29:15 INFO mapreduce.Job:  map 100% reduce 92%
17/03/22 19:29:21 INFO mapreduce.Job:  map 100% reduce 93%
17/03/22 19:29:27 INFO mapreduce.Job:  map 100% reduce 94%
17/03/22 19:29:36 INFO mapreduce.Job:  map 100% reduce 95%
17/03/22 19:29:42 INFO mapreduce.Job:  map 100% reduce 96%
17/03/22 19:29:48 INFO mapreduce.Job:  map 100% reduce 97%
17/03/22 19:29:57 INFO mapreduce.Job:  map 100% reduce 98%
17/03/22 19:30:03 INFO mapreduce.Job:  map 100% reduce 99%
17/03/22 19:30:09 INFO mapreduce.Job:  map 100% reduce 100%
17/03/22 19:30:13 INFO mapreduce.Job: Job job_1488875331552_130336 completed successfully
17/03/22 19:30:13 INFO mapreduce.Job: Counters: 51
	File System Counters
		FILE: Number of bytes read=24493297777
		FILE: Number of bytes written=36853210071
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1230457141
		HDFS: Number of bytes written=8171126646
		HDFS: Number of read operations=35
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=3
	Job Counters 
		Launched map tasks=10
		Launched reduce tasks=1
		Other local map tasks=7
		Data-local map tasks=1
		Rack-local map tasks=2
		Total time spent by all maps in occupied slots (ms)=538122
		Total time spent by all reduces in occupied slots (ms)=630690
		Total time spent by all map tasks (ms)=538122
		Total time spent by all reduce tasks (ms)=315345
		Total vcore-seconds taken by all map tasks=538122
		Total vcore-seconds taken by all reduce tasks=315345
		Total megabyte-seconds taken by all map tasks=1102073856
		Total megabyte-seconds taken by all reduce tasks=1291653120
	Map-Reduce Framework
		Map input records=29206691
		Map output records=146033455
		Map output bytes=12066006570
		Map output materialized bytes=12358073540
		Input split bytes=1860
		Combine input records=0
		Combine output records=0
		Reduce input groups=29206691
		Reduce shuffle bytes=12358073540
		Reduce input records=146033455
		Reduce output records=146033455
		Spilled Records=435469330
		Shuffled Maps =10
		Failed Shuffles=0
		Merged Map outputs=10
		GC time elapsed (ms)=11752
		CPU time spent (ms)=1070230
		Physical memory (bytes) snapshot=9382916096
		Virtual memory (bytes) snapshot=43484647424
		Total committed heap usage (bytes)=10108272640
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1230455281//读取字节数
	File Output Format Counters 
		Bytes Written=8171126646//生成文件的字节数
 

17/03/22 19:30:13 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://user/my/cf/8d4d9058fe9d4dbfa7fd854d45002010 first=04528183988_01_00001 last=97477539821_01_00001
//first 和 last 为 rowkey 范围

17/03/22 19:30:13 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
17/03/22 19:30:13 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x158b58cea130bc6
17/03/22 19:30:13 INFO zookeeper.ZooKeeper: Session: 0x158b58cea130bc6 closed
17/03/22 19:30:13 INFO zookeeper.ClientCnxn: EventThread shut down
17/03/22 19:30:14 INFO hadoop.HiveToHBaseMain: doBulkLoad to hbase success 
17/03/22 19:30:14 INFO hadoop.HiveToHBaseStartUtils: System.exit(0) ----   success

 

分享到:
评论

相关推荐

    java解决hive快速导数据到Hbase代码

    这个项目可能包括了Hive和HBase的连接代码、数据预处理逻辑、MapReduce作业的配置以及加载HFiles的Java代码。通过阅读和理解这个项目的源码,你可以更好地掌握如何在实际项目中实现Hive到HBase的数据快速导入。 ...

    java代码将mysql表数据导入HBase表

    总结,通过上述步骤,我们可以成功地将MySQL中的数据导入到HBase。在实际项目中,可能需要考虑更多因素,例如数据清洗、错误处理、性能优化等。此外,为了实现大规模数据迁移,可以考虑使用批处理或MapReduce等技术...

    将hdfs上的文件导入hbase的源代码

    源代码可能包括了数据预处理的MapReduce作业,HBase表的创建逻辑,以及使用HBase Java API的导入功能。通过阅读和理解这段代码,可以深入学习如何在实际项目中将HDFS数据导入HBase,这对于大数据平台的开发和运维...

    hbase导入测试数据集

    博客文档链接中提到的内容可能包括使用HBase的命令行接口(HBase Shell)或编程API(如Java API)来导入数据。ORDER_INFO.txt文件很可能是我们需要导入的数据源,它可能包含了订单信息,如订单ID、用户ID、商品ID、...

    hbase权威指南.源代码

    Bulk Load是将大量数据高效导入HBase的一种方法,它可以显著提高数据导入速度。Filters则可以帮助我们实现复杂的数据筛选,以满足特定的查询需求。 总之,《HBase权威指南》的源代码为学习者提供了宝贵的实践材料,...

    HDFS 通过mapreduce 进行 HBase 导入导出

    标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...

    hbase存储csv数据的代码实现

    本文将深入探讨如何使用代码实现将CSV(逗号分隔值)数据存储到HBase中,帮助你更好地理解和掌握HBase的用法。 首先,我们需要理解HBase的基本概念。HBase是构建在Hadoop之上的NoSQL数据库,它以行键、列族、列和...

    spark读取hbase数据,并使用spark sql保存到mysql

    在大数据处理领域,Spark 和 HBase 以及 MySQL 都扮演着重要的角色。Spark 提供了高效的数据处理能力,HBase 是一个分布式、面向列的NoSQL数据库,而 MySQL 是广泛使用的的关系型数据库。本示例将详细介绍如何使用 ...

    关系型数据库的数据导入Hbase

    本篇文章将详细介绍如何将关系型数据库的数据导入到Hbase中,包括离线和实时两种方式。 1. 离线数据导入: 离线数据导入通常在系统低峰期进行,适用于大量数据迁移。常见的工具包括Apache Nifi、Sqoop和Hadoop ...

    mysql数据导入hbase

    MySQL通过sqoop工具用命令将数据导入到hbase的代码文件

    Hive、MySQL、HBase数据互导

    - 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...

    Hbase权威指南 随书源代码 源码包 绝对完整版

    通过导入这个POM文件到Eclipse或其他IDE,用户可以轻松地将源代码作为工程来运行和调试,极大地简化了开发流程。 在研究HBase源码时,我们可以关注以下几个核心概念和组件: 1. **Region服务器**:HBase的主要工作...

    连接 HBASE和MongoDB的驱动程序,配置后可直接导数据。很好用.zip

    这暗示了可能有一个解决方案,可以将MongoDB的数据先导入到Solr,然后通过Solr与HBase交互,因为Solr与HBase的集成相对成熟,有专门的HBase Realtime Get (HRG)插件支持。 另一个文件名"W"没有明确的上下文,可能是...

    MR程序Bulkload数据到hbase

    通过学习和理解这些代码,你可以更好地掌握如何在实际生产环境中使用MapReduce进行HBase的数据批量导入。在具体操作时,需要注意数据的正确性、性能优化以及错误处理,以确保数据加载的稳定性和高效性。

    基于Hadoop、HBase的wordcount代码.zip

    本篇文章将围绕"基于Hadoop、HBase的WordCount代码"进行详细讲解,旨在帮助读者理解如何在Hadoop上实现基础的数据统计,并利用HBase存储和查询结果。 首先,让我们来了解一下Hadoop的WordCount程序。WordCount是...

    geomesa使用命令行插入数据

    geomesa插入数据,除了使用代码实现外,还可以使用geomesa-hbase ingest 命令行实现

    Hbase调用JavaAPI实现批量导入操作.docx

    Hbase 调用 JavaAPI 实现批量导入操作 在大数据时代,Hbase 作为一个分布式、面向列的 NoSQL 数据库,...使用 JavaAPI 调用 Hbase 实现批量导入操作可以提高数据处理效率和存储能力,满足大规模数据存储和处理的需求。

    Hbase调用JavaAPI实现批量导入操作

    这篇博客“Hbase调用Java API实现批量导入操作”聚焦于如何利用Java编程语言高效地向HBase中批量导入数据。在这个过程中,我们将探讨以下几个关键知识点: 1. **HBase架构**: HBase是基于列族的存储模型,数据被...

    hbase用于查询客户端工具

    通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析任务。 在实际使用中,选择哪个客户端工具取决于具体的需求和使用场景。例如,如果需要快速原型开发或简单的数据操作,...

    java链接及操作hbase实例代码

    这些都涉及到HBase的高级特性,例如使用`HTablePool`管理表对象池,或者使用`BulkLoadHFile`工具进行数据导入优化。同时,HBase的Region分布、表分区策略、以及Compaction和Flush机制也是理解HBase性能调优的重要...

Global site tag (gtag.js) - Google Analytics