`
tenght
  • 浏览: 52081 次
社区版块
存档分类
最新评论

基于MapReduce的HBase开发

 
阅读更多
在伪分布式模式和全分布式模式下 HBase 是架构在 HDFS 上的,因此完全可以将MapReduce 编程框架和 HBase 结合起来使用。也就是说,将 HBase 作为底层“存储结构”,MapReduce 调用 HBase 进行特殊的处理,这样能够充分结合 HBase 分布式大型数据库和MapReduce 并行计算的优点。

相对应MapReduce的hbase实现类:

1)InputFormat 类:HBase 实现了 TableInputFormatBase 类,该类提供了对表数据的大部分操作,其子类 TableInputFormat 则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat 类将数据表按照 Region 分割成 split,既有多少个 Regions 就有多个splits。然后将 Region 按行键分成<key,value>对,key 值对应与行健,value 值为该行所包含的数据。
2)Mapper 类和 Reducer 类:HBase 实现了 TableMapper 类和 TableReducer 类,其中TableMapper 类并没有具体的功能,只是将输入的<key,value>对的类型分别限定为 Result 和ImmutableBytesWritable。IdentityTableMapper 类和 IdentityTableReducer 类则是上述两个类的具体实现,其和 Mapper 类和 Reducer 类一样,只是简单地将<key,value>对输出到下一个阶段。

3)OutputFormat 类:HBase 实现的 TableOutputFormat 将输出的<key,value>对写到指定的 HBase 表中,该类不会对 WAL(Write-Ahead Log)进行操作,即如果服务器发生
故障将面临丢失数据的风险。可以使用 MultipleTableOutputFormat 类解决这个问题,该类可以对是否写入 WAL 进行设置。

代码:

importjava.io.IOException;
importjava.util.Iterator;
importjava.util.StringTokenizer;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.HColumnDescriptor;
importorg.apache.hadoop.hbase.HTableDescriptor;
importorg.apache.hadoop.hbase.client.HBaseAdmin;
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat;
importorg.apache.hadoop.hbase.mapreduce.TableReducer;
importorg.apache.hadoop.hbase.util.Bytes;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

publicclassWordCountHBase{

 //实现 Map 类
 publicstaticclassMapextends
   Mapper<LongWritable,Text,Text,IntWritable>{
  privatefinalstaticIntWritableone=newIntWritable(1);
  privateTextword=newText();

  publicvoidmap(LongWritablekey,Textvalue,Contextcontext)
    throwsIOException,InterruptedException{
   StringTokenizeritr=newStringTokenizer(value.toString());
   while(itr.hasMoreTokens()){
    word.set(itr.nextToken());
    context.write(word,one);
   }
  }
 }

 //实现 Reduce 类
 publicstaticclassReduceextends
   TableReducer<Text,IntWritable,NullWritable>{

  publicvoidreduce(Textkey,Iterable<IntWritable>values,
    Contextcontext)throwsIOException,InterruptedException{

   intsum=0;

   Iterator<IntWritable>iterator=values.iterator();
   while(iterator.hasNext()){
    sum+=iterator.next().get();
   }

   //Put 实例化,每个词存一行
   Putput=newPut(Bytes.toBytes(key.toString()));
   //列族为 content,列修饰符为 count,列值为数目
   put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),
     Bytes.toBytes(String.valueOf(sum)));

   context.write(NullWritable.get(),put);
  }
 }

 //创建 HBase 数据表
 publicstaticvoidcreateHBaseTable(StringtableName)
throwsIOException{
  //创建表描述
  HTableDescriptorhtd=newHTableDescriptor(tableName);
  //创建列族描述
  HColumnDescriptorcol=newHColumnDescriptor("content");
  htd.addFamily(col);

  //配置 HBase
  Configurationconf=HBaseConfiguration.create();

  conf.set("hbase.zookeeper.quorum","master");
  conf.set("hbase.zookeeper.property.clientPort","2181");
  HBaseAdminhAdmin=newHBaseAdmin(conf);

  if(hAdmin.tableExists(tableName)){
   System.out.println("该数据表已经存在,正在重新创建。");
   hAdmin.disableTable(tableName);
   hAdmin.deleteTable(tableName);
  }

  System.out.println("创建表:"+tableName);
  hAdmin.createTable(htd);
 }

 publicstaticvoidmain(String[]args)throwsException{
  StringtableName="wordcount";
  //第一步:创建数据库表
  WordCountHBase.createHBaseTable(tableName);

  //第二步:进行 MapReduce 处理
  //配置 MapReduce
  Configurationconf=newConfiguration();
  //这几句话很关键
  conf.set("mapred.job.tracker","master:9001");
  conf.set("hbase.zookeeper.quorum","master");
  conf.set("hbase.zookeeper.property.clientPort","2181");
  conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);

  Jobjob=newJob(conf,"NewWordCount");
  job.setJarByClass(WordCountHBase.class);

  //设置 Map 和 Reduce 处理类
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  //设置输出类型
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  //设置输入和输出格式
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TableOutputFormat.class);

  //设置输入目录
  FileInputFormat.addInputPath(job,newPath("hdfs://master:9000/in/"));
  System.exit(job.waitForCompletion(true)?0:1);

 }
}

常见错误及解决方法:

1、java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableOutputFormat

错误输出节选:

13/09/10 21:14:01 INFO mapred.JobClient: Running job: job_201308101437_0016
13/09/10 21:14:02 INFO mapred.JobClient:  map 0% reduce 0%
13/09/10 21:14:16 INFO mapred.JobClient: Task Id : attempt_201308101437_0016_m_000007_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableOutputFormat
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:849)
	at org.apache.hadoop.mapreduce.JobContext.getOutputFormatClass(JobContext.java:235)
	at org.apache.hadoop.mapred.Task.initialize(Task.java:513)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:353)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.TableOutputFormat
	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:802)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:847)
	... 8 more

错误原因:

相关的类文件没有引入到 Hadoop 集群上。

解决步骤:

步骤一、停止HBase数据库:

[hadoop@master bin]$ stop-hbase.sh 
stopping hbase............
master: stopping zookeeper.
[hadoop@master bin]$ jps
16186 Jps
26186 DataNode
26443 TaskTracker
26331 JobTracker
26063 NameNode
停止Hadoop集群:

[hadoop@master bin]$ stop-all.sh 
Warning: $HADOOP_HOME is deprecated.

stopping jobtracker
master: Warning: $HADOOP_HOME is deprecated.
master: 
master: stopping tasktracker
node1: Warning: $HADOOP_HOME is deprecated.
node1: 
node1: stopping tasktracker
stopping namenode
master: Warning: $HADOOP_HOME is deprecated.
master: 
master: stopping datanode
node1: Warning: $HADOOP_HOME is deprecated.
node1: stopping datanode
node1: 
node1: Warning: $HADOOP_HOME is deprecated.
node1: 
node1: stopping secondarynamenode
[hadoop@master bin]$ jps
16531 Jps

步骤二、需要配置 Hadoop 集群中每台机器,在 hadoop 目录的 conf 子目录中,找 hadoop-env.sh文件,并添加如下内容:

# set hbase environment
export HBASE_HOME=/opt/modules/hadoop/hbase/hbase-0.94.11-security
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.94.11-security.jar:$HBASE_HOME/hbase-0.94.11-security-tests.jar:$HBASE_HOME/conf:$HBASE_HOME/lib/zookeeper-3.4.5.jar
步骤三、重新启动集群和hbase数据库。

2、Error: java.lang.ClassNotFoundException: com.google.protobuf.Message

错误输出节选:

2013-09-12 12:38:57,833 INFO  mapred.JobClient (JobClient.java:monitorAndPrintJob(1363)) -  map 0% reduce 0%
2013-09-12 12:39:12,490 INFO  mapred.JobClient (JobClient.java:monitorAndPrintJob(1392)) - Task Id : attempt_201309121232_0001_m_000007_0, Status : FAILED
Error: java.lang.ClassNotFoundException: com.google.protobuf.Message
	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
错误原因:

明显,没找到protobuf-java-2.4.0a.jar包,将该包路径加入hadoop-env.sh中。



分享到:
评论

相关推荐

    HBase MapReduce完整实例.rar

    HBase的核心特性包括强一致性的读写操作、水平扩展的架构以及基于行键的索引,这些特性使得它在大数据领域中独树一帜。 MapReduce是处理大数据的一种编程模型,它将复杂的计算任务分解为两个阶段:Map阶段和Reduce...

    结合MapReduce和HBase的遥感图像并行分布式查询.pdf

    为实现海量遥感图像数据的高效处理和检索,本文提出了一种结合MapReduce和HBase的遥感图像并行分布式查询技术。通过“分层分块”的瓦片金字塔模型,该技术不仅优化了数据存储结构,而且借助于MapReduce框架的并行...

    基于javaweb + mapreduce的小型电影推荐系统

    【描述】"基于mapreduce的小型电影推荐系统,使用javaweb的方式实现,包含数据集"暗示了项目结构和数据处理的两个关键点。首先,项目采用Java Web技术来设计用户交互的网页和服务器端处理程序,这可能包括Servlets、...

    MapReduce输出至hbase共16页.pdf.zip

    而HBase则是一个基于Hadoop的分布式数据库,提供高并发、低延迟的NoSQL存储解决方案。当MapReduce需要将处理结果存入HBase时,这种结合便能发挥出强大的效能。 一、MapReduce简介 MapReduce是由Google提出的分布式...

    HBase与MapReduce处理操作(基于JavaAPI)

    该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例

    基于hbase, mapreduce结合springMVC查询操作扫描操作,数据分析.zip

    "基于hbase, mapreduce结合springMVC查询操作扫描操作,数据分析.zip"这个压缩包文件,显然涉及到的是一个关于大数据处理和分析的项目,主要使用了HBase、MapReduce以及Spring MVC这些核心技术。下面我们将深入探讨...

    基于Python+SpringBoot+Vue+HDFS+MapReduce+HBase+Hive+Kafka+Sp.zip

    标题中的“基于Python+SpringBoot+Vue+HDFS+MapReduce+HBase+Hive+Kafka+Spark”提到了一系列技术,它们都是大数据处理、分布式系统和Web开发的重要组件。接下来,我们将深入探讨这些技术及其在实际项目中的应用。 ...

    hbase 资源合集 hbase 企业应用开发实战 权威指南 hbase 实战 hbase 应用架构

    此外,还讲解了HBase的高级特性,如MapReduce、Hive、Pig等与HBase的集成,以及如何进行复杂的查询操作。 《HBase实战》是一本实践导向的书籍,通过实际的项目案例,展示了HBase在不同场景下的应用,如实时分析、...

    中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx

    【描述】:本演讲主要介绍了上海久耶供应链在大数据平台中基于HBase实现的实时数仓的实践与探索,涵盖了从第一代离线数仓到第二代实时数仓的转变,以及业务场景、开发流程、集群调优监控等方面的内容,并分享了两个...

    Mapreduce实验报告.doc

    它将复杂的分布式系统操作抽象成简单的编程模型,使开发人员能够专注于编写Map和Reduce函数,从而实现大规模数据处理。 MapReduce的核心思想是“分而治之”(Divide and Conquer)。在处理海量数据时,首先将数据分解...

    基于MapReduce的SQL查询优化分析.pdf

    【基于MapReduce的SQL查询优化分析】 SQL查询在传统的关系型数据库中被广泛使用,它是一种结构化查询语言,能够方便地执行数据的增、删、改、查操作。然而,在分布式数据库系统如HBase中,由于其底层依赖Hadoop的...

    基于hadoop和hbase的分布式索引集群研究.pdf

    Hadoop是一个由Apache基金会开发的分布式系统基础架构,它由HDFS、MapReduce和HBase三个核心组件组成。HDFS是一种主从结构的分布式文件系统,它包含了管理元数据的NameNode和存储数据的DataNode。MapReduce是分布式...

    基于HadoopHBase的一淘搜索离线系统PPT课件.pptx

    - **HBase扩展开发**:为了适应大规模数据处理,进行了定制化开发,如Load Balance插件、Region Split/Merge插件和工具,扩展了ThriftServer API,增强MapReduce库支持,增加更多Metrics指标,并开发了多种...

    HDFS+MapReduce+Hive+HBase十分钟快速入门.zip_hbase_hdfs_hive_mapReduce

    在大数据处理领域,Hadoop生态系统中的HDFS(Hadoop Distributed File System)、MapReduce、Hive和HBase是四个至关重要的组件。本资料“HDFS+MapReduce+Hive+HBase十分钟快速入门”旨在帮助初学者迅速理解这些技术...

    hbase官方开发参考手册

    HBase 官方开发参考手册是一份详尽的文档,主要面向开发人员和系统管理员,提供了关于 HBase 的配置、使用和优化等方面的知识。 ### HBase 系统架构 - **快速入门**:这部分内容介绍了如何快速启动一个单节点的 ...

    Hbase:HBase MapReduce投影

    5. **Java编程**:由于HBase MapReduce主要基于Java实现,所以开发过程中需要掌握Java编程。此外,理解HBase的数据模型和MapReduce的工作原理也是必不可少的。 6. **优化技巧**:为了提升性能,可以考虑以下策略: ...

    基于hadoop+hbase+springboot实现分布式网盘系统.zip

    在构建分布式网盘系统时,通常会涉及到多个技术栈,如大数据处理框架Hadoop、分布式数据库HBase以及微服务开发框架Spring Boot。本项目“基于hadoop+hbase+springboot实现分布式网盘系统”旨在利用这些技术搭建一个...

    基于Hbase的海量视频存储简单模拟

    本文将深入探讨一个基于Hbase的海量视频存储简单模拟项目,旨在利用Hadoop和Hbase这两个强大的开源工具来解决这个问题。 首先,我们要理解Hadoop和Hbase的角色。Hadoop是分布式计算框架,其核心组件包括HDFS...

    hbase1.2+java开发最小依赖jar包合集

    这个"**hbase1.2+java开发最小依赖jar包合集**"是为Java开发者提供的一套精简版的HBase开发环境,包含了进行HBase开发所必需的基础库。以下是关于HBase 1.2版本以及Java开发相关的知识点: 1. **HBase架构**:HBase...

Global site tag (gtag.js) - Google Analytics