`
tenght
  • 浏览: 52080 次
社区版块
存档分类
最新评论

基于MapReduce的HBase开发(续)

 
阅读更多

示例

代码:

importjava.io.ByteArrayOutputStream;
importjava.io.DataOutputStream;
importjava.io.IOException;
importjava.util.HashMap;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.HColumnDescriptor;
importorg.apache.hadoop.hbase.HTableDescriptor;
importorg.apache.hadoop.hbase.client.HBaseAdmin;
importorg.apache.hadoop.hbase.client.HTable;
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.client.Result;
importorg.apache.hadoop.hbase.client.Scan;
importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
importorg.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
importorg.apache.hadoop.hbase.mapreduce.TableInputFormat;
importorg.apache.hadoop.hbase.util.Base64;
importorg.apache.hadoop.hbase.util.Bytes;
importorg.apache.hadoop.io.Writable;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;

publicclassIndexBuilder{

 //索引表唯一的一列为 INDEX_ROW,其中 INDEX 为列族
 privatestaticfinalbyte[]INDEX_COLUMN=Bytes.toBytes("INDEX");
 privatestaticfinalbyte[]INDEX_QUALIFIER=Bytes.toBytes("ROW");

 //实现 Map 类
 publicstaticclassMapextends
  Mapper<ImmutableBytesWritable,Result,ImmutableBytesWritable,Writable>{

  //存储了“列名”到“表名——列名”的映射
  //前者用于获取某列的值,并作为索引表的键值;后者用户作为索引表的表名
  privateHashMap<byte[],ImmutableBytesWritable>indexes;
  privatebyte[]family;

  //实现 map 函数
  publicvoidmap(ImmutableBytesWritablekey,Resultvalue,
    Contextcontext)throwsIOException,InterruptedException{
   for(java.util.Map.Entry<byte[],ImmutableBytesWritable>index:indexes
     .entrySet()){
    //获取列名
    byte[]qualifier=index.getKey();
    //索引表的表名
    ImmutableBytesWritabletableName=index.getValue();
    //根据“列族:列名”获得元素值
    byte[]newValue=value.getValue(family,qualifier);

    if(newValue!=null){
     //以列值作为行健,在列“INDEX:ROW”中插入行健
     Putput=newPut(newValue);
     put.add(INDEX_COLUMN,INDEX_QUALIFIER,key.get());

     //在 tableName 表上执行 put
     //操作使用 MultipleOutputFormat 时,
     //第二个参数必须是 Put 和 Delete 类型
     context.write(tableName,put);
    }
   }
  }

  //setup为Mapper中的方法,该方法只在任务初始化时执行一次
  protectedvoidsetup(Contextcontext)throwsIOException,
    InterruptedException{
   Configurationconf=context.getConfiguration();

   //通过 Configuration.set()方法传递参数
   StringtableName=conf.get("index.tablename");
   String[]fields=conf.getStrings("index.fields");

   //fields 内为需要做索引的列名
   StringfamilyName=conf.get("index.familyname");
   family=Bytes.toBytes(familyName);

   //初始化 indexes 方法
   indexes=newHashMap<byte[],ImmutableBytesWritable>();

   for(Stringfield:fields){
    //如果给 name 做索引,则索引表的名称为“heroes‐name”
    indexes.put(Bytes.toBytes(field),
newImmutableBytesWritable(
      Bytes.toBytes(tableName+"‐"+field)));
   }
  }
 }

 //初始化示例数据表——“heroes”
 publicstaticvoidinitHBaseTable(Configurationconf,StringtableName)
   throwsIOException{
  //创建表描述
  HTableDescriptorhtd=newHTableDescriptor(tableName);
  //创建列族描述
  HColumnDescriptorcol=newHColumnDescriptor("info");

  htd.addFamily(col);

  HBaseAdminhAdmin=newHBaseAdmin(conf);

  if(hAdmin.tableExists(tableName)){
   System.out.println("该数据表已经存在,正在重新创建。");
   hAdmin.disableTable(tableName);
   hAdmin.deleteTable(tableName);
  }

  System.out.println("创建表:"+tableName);
  //创建表
  hAdmin.createTable(htd);
  HTabletable=newHTable(conf,tableName);
  System.out.println("向表中插入数据");
  //添加数据
  addRow(table,"1","info","name","peter");
  addRow(table,"1","info","email","peter@heroes.com");
  addRow(table,"1","info","power","absorbabilities");

  addRow(table,"2","info","name","hiro");
  addRow(table,"2","info","email","hiro@heroes.com");
  addRow(table,"2","info","power","bendtimeandspace");

  addRow(table,"3","info","name","sylar");
  addRow(table,"3","info","email","sylar@heroes.com");
  addRow(table,"3","info","power","hnowhowthingswork");

  addRow(table,"4","info","name","claire");
  addRow(table,"4","info","email","claire@heroes.com");
  addRow(table,"4","info","power","heal");

  addRow(table,"5","info","name","noah");
  addRow(table,"5","info","email","noah@heroes.com");
  addRow(table,"5","info","power","caththepeoplewithablities");
 }

 //添加一条数据
 privatestaticvoidaddRow(HTabletable,Stringrow,
StringcolumnFamily,Stringcolumn,Stringvalue)throwsIOException{
  Putput=newPut(Bytes.toBytes(row));
  //参数出分别:列族、列、值
  put.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),
    Bytes.toBytes(value));
  table.put(put);
 }

 //创建数据库表
 publicstaticvoidcreateIndexTable(Configurationconf,
StringtableName)throwsException{
  //新建一个数据库管理员
  HBaseAdminhAdmin=newHBaseAdmin(conf);

  if(hAdmin.tableExists(tableName)){
   System.out.println("该数据表已经存在,正在重新创建。");
   hAdmin.disableTable(tableName);
   hAdmin.deleteTable(tableName);
  }

  //新建一个表的描述
  HTableDescriptortableDesc=newHTableDescriptor(tableName);
  //在描述里添加列族
  tableDesc.addFamily(newHColumnDescriptor(INDEX_COLUMN));

  //根据配置好的描述建表
  hAdmin.createTable(tableDesc);
  System.out.println("创建"+tableName+"表成功");
 }

 publicstaticJobconfigureJob(Configurationconf,StringjobName)
   throwsIOException{
  Jobjob=newJob(conf,jobName);
  job.setJarByClass(IndexBuilder.class);

  //设置 Map 处理类
  job.setMapperClass(Map.class);

  //设置 Reduce 个数
  job.setNumReduceTasks(0);

  //设置输入和输出格式
  job.setInputFormatClass(TableInputFormat.class);
  job.setOutputFormatClass(MultiTableOutputFormat.class);

  returnjob;
 }

 privatestaticStringconvertScanToString(Scanscan)
throwsIOException{
  ByteArrayOutputStreamout=newByteArrayOutputStream();
  DataOutputStreamdos=newDataOutputStream(out);
  scan.write(dos);
  returnBase64.encodeBytes(out.toByteArray());
 }

 publicstaticvoidmain(String[]args)throwsException{
  Configurationconf=HBaseConfiguration.create();
  conf.set("hbase.zookeeper.quorum","master");
  conf.set("hbase.zookeeper.property.clientPort","2181");

  StringtableName="heroes";
  StringcolumnFamily="info";
  String[]fields={"name","power"};
  //第一步:初始化数据库表
  IndexBuilder.initHBaseTable(conf,tableName);

  //第二步:创建索引表
  for(Stringfield:fields){
   IndexBuilder.createIndexTable(conf,tableName+"‐"+field);
  }

  //第三步:进行 MapReduce 处理
  conf.set("mapred.job.tracker","master:9001");
  conf.set(TableInputFormat.SCAN,convertScanToString(newScan()));
  conf.set(TableInputFormat.INPUT_TABLE,tableName);
 //设置传递属性值
  conf.set("index.tablename",tableName);
  conf.set("index.familyname",columnFamily);
  conf.setStrings("index.fields",fields);

  Jobjob=IndexBuilder.configureJob(conf,"IndexBuilder");

  System.exit(job.waitForCompletion(true)?0:1);
 }
}
编译完成后,可在hbase shell下运行:list,查看所创建的表,其他命令来操作表,在此不再赘述。







分享到:
评论

相关推荐

    HBase MapReduce完整实例.rar

    HBase的核心特性包括强一致性的读写操作、水平扩展的架构以及基于行键的索引,这些特性使得它在大数据领域中独树一帜。 MapReduce是处理大数据的一种编程模型,它将复杂的计算任务分解为两个阶段:Map阶段和Reduce...

    结合MapReduce和HBase的遥感图像并行分布式查询.pdf

    为实现海量遥感图像数据的高效处理和检索,本文提出了一种结合MapReduce和HBase的遥感图像并行分布式查询技术。通过“分层分块”的瓦片金字塔模型,该技术不仅优化了数据存储结构,而且借助于MapReduce框架的并行...

    基于javaweb + mapreduce的小型电影推荐系统

    【描述】"基于mapreduce的小型电影推荐系统,使用javaweb的方式实现,包含数据集"暗示了项目结构和数据处理的两个关键点。首先,项目采用Java Web技术来设计用户交互的网页和服务器端处理程序,这可能包括Servlets、...

    HBase与MapReduce处理操作(基于JavaAPI)

    该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例

    MapReduce输出至hbase共16页.pdf.zip

    而HBase则是一个基于Hadoop的分布式数据库,提供高并发、低延迟的NoSQL存储解决方案。当MapReduce需要将处理结果存入HBase时,这种结合便能发挥出强大的效能。 一、MapReduce简介 MapReduce是由Google提出的分布式...

    基于hbase, mapreduce结合springMVC查询操作扫描操作,数据分析.zip

    "基于hbase, mapreduce结合springMVC查询操作扫描操作,数据分析.zip"这个压缩包文件,显然涉及到的是一个关于大数据处理和分析的项目,主要使用了HBase、MapReduce以及Spring MVC这些核心技术。下面我们将深入探讨...

    hbase 资源合集 hbase 企业应用开发实战 权威指南 hbase 实战 hbase 应用架构

    《HBase资源合集》包含了四本重量级的书籍,分别是《HBase企业应用开发实战》、《HBase权威指南》、《HBase实战》以及《HBase应用架构》。这些书籍深入浅出地探讨了HBase在大数据环境中的应用与开发,是学习和掌握...

    基于Python+SpringBoot+Vue+HDFS+MapReduce+HBase+Hive+Kafka+Sp.zip

    标题中的“基于Python+SpringBoot+Vue+HDFS+MapReduce+HBase+Hive+Kafka+Spark”提到了一系列技术,它们都是大数据处理、分布式系统和Web开发的重要组件。接下来,我们将深入探讨这些技术及其在实际项目中的应用。 ...

    中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx

    【描述】:本演讲主要介绍了上海久耶供应链在大数据平台中基于HBase实现的实时数仓的实践与探索,涵盖了从第一代离线数仓到第二代实时数仓的转变,以及业务场景、开发流程、集群调优监控等方面的内容,并分享了两个...

    Mapreduce实验报告.doc

    它将复杂的分布式系统操作抽象成简单的编程模型,使开发人员能够专注于编写Map和Reduce函数,从而实现大规模数据处理。 MapReduce的核心思想是“分而治之”(Divide and Conquer)。在处理海量数据时,首先将数据分解...

    HDFS+MapReduce+Hive+HBase十分钟快速入门.zip_hbase_hdfs_hive_mapReduce

    在大数据处理领域,Hadoop生态系统中的HDFS(Hadoop Distributed File System)、MapReduce、Hive和HBase是四个至关重要的组件。本资料“HDFS+MapReduce+Hive+HBase十分钟快速入门”旨在帮助初学者迅速理解这些技术...

    hbase官方开发参考手册

    HBase 官方开发参考手册是一份详尽的文档,主要面向开发人员和系统管理员,提供了关于 HBase 的配置、使用和优化等方面的知识。 ### HBase 系统架构 - **快速入门**:这部分内容介绍了如何快速启动一个单节点的 ...

    基于hadoop和hbase的分布式索引集群研究.pdf

    Hadoop是一个由Apache基金会开发的分布式系统基础架构,它由HDFS、MapReduce和HBase三个核心组件组成。HDFS是一种主从结构的分布式文件系统,它包含了管理元数据的NameNode和存储数据的DataNode。MapReduce是分布式...

    Hbase:HBase MapReduce投影

    5. **Java编程**:由于HBase MapReduce主要基于Java实现,所以开发过程中需要掌握Java编程。此外,理解HBase的数据模型和MapReduce的工作原理也是必不可少的。 6. **优化技巧**:为了提升性能,可以考虑以下策略: ...

    基于MapReduce的SQL查询优化分析.pdf

    【基于MapReduce的SQL查询优化分析】 SQL查询在传统的关系型数据库中被广泛使用,它是一种结构化查询语言,能够方便地执行数据的增、删、改、查操作。然而,在分布式数据库系统如HBase中,由于其底层依赖Hadoop的...

    基于HadoopHBase的一淘搜索离线系统PPT课件.pptx

    - **HBase扩展开发**:为了适应大规模数据处理,进行了定制化开发,如Load Balance插件、Region Split/Merge插件和工具,扩展了ThriftServer API,增强MapReduce库支持,增加更多Metrics指标,并开发了多种...

    基于hadoop+hbase+springboot实现分布式网盘系统.zip

    在构建分布式网盘系统时,通常会涉及到多个技术栈,如大数据处理框架Hadoop、分布式数据库HBase以及微服务开发框架Spring Boot。本项目“基于hadoop+hbase+springboot实现分布式网盘系统”旨在利用这些技术搭建一个...

    hbase1.2+java开发最小依赖jar包合集

    这个"**hbase1.2+java开发最小依赖jar包合集**"是为Java开发者提供的一套精简版的HBase开发环境,包含了进行HBase开发所必需的基础库。以下是关于HBase 1.2版本以及Java开发相关的知识点: 1. **HBase架构**:HBase...

    基于Hbase的海量视频存储简单模拟

    本文将深入探讨一个基于Hbase的海量视频存储简单模拟项目,旨在利用Hadoop和Hbase这两个强大的开源工具来解决这个问题。 首先,我们要理解Hadoop和Hbase的角色。Hadoop是分布式计算框架,其核心组件包括HDFS...

Global site tag (gtag.js) - Google Analytics