`
ganliang13
  • 浏览: 252786 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于mapreduce hbase操作血的教训

阅读更多

最近在做一些基于mapreduce 操作hbase 表的工作,碰到了几个问题。

 

一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。

它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

 

1.reduce 在写的时候由于词排序问题导致程序运行异常。

java.io.IOException: Added a key not lexically larger than previous key=\x00\x04r100\x02f1c100\x00\x00\x01?'c \x1E\x04, lastkey=\x00\x03r99\x02f1c99\x00\x00\x01?'c \x1E\x04
        at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:207)
        at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:324)
        at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:289)
        at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:1197)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:168)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:124)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at com.gump.test.HBaseHFileReducer.reduce(HBaseHFileReducer.java:21)
        at com.gump.test.HBaseHFileReducer.reduce(HBaseHFileReducer.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Unknown Source)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

    这个问题,是由于hbase的row key 是基于词典排序,比如说reduce 写入 hbase 的row key 顺序是,r10,r11,r00,则会报上述异常。考虑到map有排序功能,于是将其rowkey作为map的输出key.

2.无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:<ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:


java.lang.IllegalArgumentException: Can’t read partitions file

Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

3.reduce 的任务槽数为1.

   a. 进入hbase shell > create 't1','f1'

   b.运行类

package com.gump.test; 
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
public class HbaseHFileDriver { 
    public static void main(String[] args) throws IOException, 
            InterruptedException, ClassNotFoundException { 
         
        Configuration conf = new Configuration(); 
 
        Job job = new Job(conf, "ganliang"); 
        job.setJarByClass(HbaseHFileDriver.class); 
 
        job.setMapperClass(HBaseHFileMapper.class); 
        job.setReducerClass(HBaseHFileReducer.class); 
        
        job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
        job.setMapOutputValueClass(Text.class); 
        
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        HFileOutputFormat.setOutputPath(job, new Path(args[1])); 
 
        Configuration HBASE_CONFIG = new Configuration();  
        HBASE_CONFIG.set("hbase.zookeeper.quorum","bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181");  
        HBASE_CONFIG.set("hbase.rootdir", "hdfs://bfdbjc1:12000/hbase");  
        HBASE_CONFIG.set("zookeeper.znode.parent","/hbase");
        String tableName = "t1"; 
        HTable htable = new HTable(HBASE_CONFIG, tableName); 
        HFileOutputFormat.configureIncrementalLoad(job, htable); 
 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

 c.Mapper

package com.gump.test;

import java.io.IOException;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { 
    private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); 
    @Override 
    protected void map(LongWritable key, Text value, 
            org.apache.hadoop.mapreduce.Mapper.Context context) 
            throws IOException, InterruptedException { 
    	String rowkey = value.toString().split(":")[0];
        immutableBytesWritable.set(Bytes.toBytes(rowkey)); 
        context.write(immutableBytesWritable, value); 
        System.out.println(rowkey+" "+value);
    } 
} 

 d.reducer

package com.gump.test;

import java.io.IOException;
import java.util.TreeSet;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class HBaseHFileReducer extends
		Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
	protected void reduce(ImmutableBytesWritable key, Iterable<Text> values,
			Context context) throws IOException, InterruptedException {
		String value = "";
		while (values.iterator().hasNext()) {
			value = values.iterator().next().toString();
			if (value != null && !"".equals(value)) {
				KeyValue kv = createKeyValue(value.toString());
				if (kv != null){
					context.write(key, kv);
				}
			}
		}
	}

	private KeyValue createKeyValue(String str) {
		
		String[] strs = str.split(":");
		if (strs.length < 4)
			return null;
		String row = strs[0];
		String family = strs[1];
		String qualifier = strs[2];
		String value = strs[3];
		System.out.println(strs[0]+" "+strs[1]+" "+strs[2]+" "+strs[3]);
		return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
				Bytes.toBytes(qualifier), System.currentTimeMillis(),
				Bytes.toBytes(value));
		
	}
}

    d:导入

     hadoop jar /usr/local/hbase-0.94.6.1/hbase-0.94.6.1.jar  completebulkload /user/work/b2_output/ t1

    e:数据

     r10:f1:c10:value10
     r99:f1:c99:value99
     r100:f1:c100:value100
     r101:f1:c101:value101

分享到:
评论

相关推荐

    基于MapReduce和HBase的海量网络数据处理.pdf

    基于MapReduce和HBase的海量网络数据处理 大数据时代,网络数据的处理和分析变得越来越重要。传统的网络数据处理模式已经无法满足大数据量的需求,需要寻求更高效的网络数据计算模式和数据存储模式。基于MapReduce...

    HDFS 通过mapreduce 进行 HBase 导入导出

    总结而言,"HDFS 通过 mapreduce 进行 HBase 导入导出" 是大数据处理中的关键操作,涉及 HDFS、HBase 和 MapReduce 三个核心组件的协同工作。通过熟练掌握这一过程,可以有效地管理和利用大数据,满足实时查询和分析...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    mapreduce方式入库hbase hive hdfs

    mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载

    MapReduce on Hbase

    在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在Hadoop集群上运行MapReduce作业,以批量处理存储在HBase中的大量数据。由于HBase和Hadoop都是基于...

    基于MapReduce的商品推荐算法.zip

    《基于MapReduce的商品推荐算法详解》 在大数据时代,如何有效地利用海量用户行为数据,为用户提供个性化推荐,已经成为电商、社交媒体等领域的核心竞争力之一。基于MapReduce的商品推荐算法,正是解决这一问题的...

    HBase与MapReduce处理操作(基于JavaAPI)

    该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例

    基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示

    基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示 数据科学与大数据技术领域中,电影排名与推荐系统的开发是非常重要的一部分。该系统可以通过对电影数据的分析和处理,提供电影排名和推荐服务,满足...

    基于MapReduce的Apriori算法代码

    基于MapReduce的Apriori算法代码 基于MapReduce的Apriori算法代码是一个使用Hadoop MapReduce框架实现的关联规则挖掘算法,称为Apriori算法。Apriori算法是一种经典的关联规则挖掘算法,用于发现事务数据库中频繁...

    Hadoop/HDFS/MapReduce/HBase

    对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看

    HBase MapReduce完整实例.rar

    HBase的核心特性包括强一致性的读写操作、水平扩展的架构以及基于行键的索引,这些特性使得它在大数据领域中独树一帜。 MapReduce是处理大数据的一种编程模型,它将复杂的计算任务分解为两个阶段:Map阶段和Reduce...

    结合MapReduce和HBase的遥感图像并行分布式查询.pdf

    作者提出了一种基于“分层分块”的瓦片金字塔模型,通过MapReduce进行并行构建,并利用HBase实现分布式检索。 MapReduce是一种分布式计算框架,它将大规模数据处理任务分解为多个子任务,这些子任务可以在分布式...

    HBase MapReduce完整实例

    2. 数据查询:通过MapReduce实现对HBase表的查询,可以在Map阶段进行过滤,Reduce阶段进行聚合操作。 3. 数据更新:在Map阶段定位到需要更新的行,然后在Reduce阶段完成更新操作。 4. 数据删除:Map阶段标识出需要...

    基于mapreduce的聚类算法研究

    基于MapReduce的聚类算法研究 MapReduce是一种分布式计算模型,基于Hadoop的云计算环境下,可以实现高效的数据处理和分析。本研究主要聚焦于基于MapReduce的聚类算法研究,旨在实现高效的聚类算法在Hadoop集群上...

    基于mapreduce的小型电影推荐系统

    【标题】:“基于MapReduce的小型电影推荐系统” 【描述】:“这个项目是利用MapReduce框架,结合JavaWeb技术,构建的一个小型电影推荐系统。它旨在利用用户的历史观影行为,通过数据分析来为用户推荐最可能感兴趣...

    基于MapReduce分析的招聘信息大数据可视化系统.zip

    《基于MapReduce分析的招聘信息大数据可视化系统》 在当今数据驱动的时代,招聘信息的大数据分析与可视化已经成为企业决策的重要工具。本文将深入探讨一个基于Hadoop的MapReduce实现的招聘信息大数据处理和可视化...

    基于MapReduce的图算法

    基于MapReduce的图算法

Global site tag (gtag.js) - Google Analytics