`
sunwinner
  • 浏览: 203299 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Running MapReduce Job with HBase

 
阅读更多

Generally there are three different ways of interacting with HBase from a MapReduce application. HBase can be used as data source at the beginning of a job, as a data sink at the end of a job or as a shared resource.

  • HBase as a data source:  The following example using HBase as a MapReduce source in read-only manner. Specifically, there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper.
    package hbaseinaction;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * HBase as a data source example, you can write your own code in map(...) to read
     * data from the HBase table specified during job initialization.
     * In this case, the table is your_hbase_table_name.
     * <p/>
     * User: George Sun
     * Date: 7/21/13
     * Time: 12:42 AM
     */
    public class HBaseAsDataSource extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration config = HBaseConfiguration.create();
            Job job = new Job(config, "ExampleRead");
            job.setJarByClass(HBaseAsDataSource.class);     
    
            Scan scan = new Scan();
            // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCaching(500);
            // don't set to true for MR jobs
            scan.setCacheBlocks(false);
            // set other scan attrs here...
    
            TableMapReduceUtil.initTableMapperJob(
                    // input HBase table name
                    "your_hbase_table_name",
                    // Scan instance to control column family and attribute selection
                    scan,
                    MyMapper.class,   // mapper
                    null,             // mapper output key
                    null,             // mapper output value
                    job);
            // because we aren't emitting anything from mapper
            job.setOutputFormatClass(NullOutputFormat.class);
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new HBaseAsDataSource(), args);
            System.exit(exitCode);
        }
    
    
        public static class MyMapper extends TableMapper<Text, Text> {
    
            public void map(ImmutableBytesWritable row, Result result, Context context)
                    throws InterruptedException, IOException {
                // process data for the row from the Result instance.
                // For example, read data from HBase table, then populate it into HDFS.
            }
        }
    }
    
     
  • HBase as data sink:   Writing to a HBase table from MapReduce as a data sink is similar to reading from a table in terms of implementation. Of course you can use HBase as a data sink and use HBase as a data source at the same time. The following example using HBase both as a source and as a sink with MapReduce. This example will simply copy data from one table to another.
    package hbaseinaction;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * HBase is used as data source as well as data sink. This MapReduce job will try to copy data from
     * the source table to the target table. Note that no reduce task needed.
     * <p/>
     * User: George Sun
     * Date: 7/21/13
     * Time: 12:55 AM
     */
    public class HBaseAsDataSourceSink extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration config = HBaseConfiguration.create();
            Job job = new Job(config, "ExampleReadWrite");
            job.setJarByClass(HBaseAsDataSourceSink.class);
    
            Scan scan = new Scan();
            // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCaching(500);
            // don't set to true for MR jobs
            scan.setCacheBlocks(false);
            // set other scan attrs
    
            TableMapReduceUtil.initTableMapperJob(
                    // input table
                    "your_hbase_source_table",
                    // Scan instance to control CF and attribute selection
                    scan,
                    // mapper class
                    MyMapper.class,
                    // mapper output key
                    null,
                    // mapper output value
                    null,
                    job);
            TableMapReduceUtil.initTableReducerJob(
                    // output table
                    "your_hbase_target_table",
                    // reducer class
                    null,
                    job);
            job.setNumReduceTasks(0);// No reducer actually needed here
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new HBaseAsDataSourceSink(), args);
            System.exit(exitCode);
        }
    
        public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
    
            public void map(ImmutableBytesWritable row, Result value, Context context)
                    throws IOException, InterruptedException {
    
                // this example is just copying the data from the source table...
                context.write(row, resultToPut(row, value));
            }
    
            private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
                Put put = new Put(key.get());
                for (KeyValue kv : result.raw()) {
                    put.add(kv);
                }
                return put;
            }
        }
    }
    
     
  • HBase used as shared resource to do map-side join:    As we know, HBase could be considered as a giant hable-table, it would be clear that HBase is a perfect condidate to be used within a map-side join.
  • HBase MapReduce read/write with multi-table output:  Leverage to MultiTableInputFormat and MultiTableOutputFormat shipped with HBase. Take MultiTableInputFormat as an example.
    List<Scan> scans = new ArrayList<Scan>();
      
    Scan scan1 = new Scan();
    scan1.setStartRow(firstRow1);
    scan1.setStopRow(lastRow1);
    scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
    scans.add(scan1);
    
    Scan scan2 = new Scan();
    scan2.setStartRow(firstRow2);
    scan2.setStopRow(lastRow2);
    scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
    scans.add(scan2);
    
    // Initialized with more than one scan to read data from multiple tables.
    TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
         IntWritable.class, job);
    You can find an example of MultiTableOutputFormat from this blog post:
  • There's also a few predefined MapReduce jobs ship with HBase under package org.apache.hadoop.hbase.mapreduce, such as Export to export data from HBase into HDFS, Import to import data from HDFS into HBase, CopyTable to copy data from one HBase table to another. You can explore their source code for more examples of using HBase from MapReduce. 

Important note from HBase in Action: Hadoop MapReduce assumes your map and reduce tasks are idempotent. This means the map and reduce tasks can be run any number of times with the same input and produce the same output. This allows MapReduce to provide fault tolerance in job execution and also take maximum advantage of cluster processing power. You must take care when performing stateful operations. HBase's Increment is an example of such a stateful operation.

 

So instead of incrementing a counter in mapper, a better approach is to emit ["counter", 1] pairs from each mapper. Failed tasks are recovered, and their ourput isn't double-counterd. Sum the pairs in reducer, and write out a single value from there. This also avoids an unduly high bunden being applied to the single machine hosting the invremented cell.

分享到:
评论

相关推荐

    mapreduce方式入库hbase hive hdfs

    mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载

    MapReduce输出至hbase共16页.pdf.zip

    《MapReduce输出至HBase详解》 MapReduce与HBase,两者都是大数据处理的重要组成部分,它们在大数据领域中各自扮演着关键角色。MapReduce作为分布式计算框架,擅长处理大规模数据的批处理任务;而HBase则是一个基于...

    基于MapReduce和HBase的海量网络数据处理.pdf

    基于MapReduce和HBase的海量网络数据处理 大数据时代,网络数据的处理和分析变得越来越重要。传统的网络数据处理模式已经无法满足大数据量的需求,需要寻求更高效的网络数据计算模式和数据存储模式。基于MapReduce...

    MapReduce on Hbase

    在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在Hadoop集群上运行MapReduce作业,以批量处理存储在HBase中的大量数据。由于HBase和Hadoop都是基于...

    HDFS 通过mapreduce 进行 HBase 导入导出

    标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...

    google三大论文 gfs bigtable mapreduce hadoop hdfs hbase的原型

    Google三大论文分别指的是《Google File System》、《MapReduce: Simplified Data Processing on Large Clusters》和《Bigtable: A Distributed Storage System for Structured Data》。这三篇论文详细介绍了Google...

    结合MapReduce和HBase的遥感图像并行分布式查询.pdf

    为实现海量遥感图像数据的高效处理和检索,本文提出了一种结合MapReduce和HBase的遥感图像并行分布式查询技术。通过“分层分块”的瓦片金字塔模型,该技术不仅优化了数据存储结构,而且借助于MapReduce框架的并行...

    Hadoop/HDFS/MapReduce/HBase

    对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看

    HBase MapReduce完整实例

    《HBase MapReduce实战详解》 在大数据处理领域,HBase和MapReduce是两个不可或缺的重要组件。HBase作为分布式列式存储系统,适用于大规模数据的实时读写操作;而MapReduce则是Apache Hadoop的核心组件之一,用于...

    通用MapReduce程序复制HBase表数据

    在本文中,我们将深入探讨如何使用通用MapReduce程序来复制HBase表数据。MapReduce是一种分布式计算模型,常用于处理大规模数据集,而HBase是一个分布式、列式存储的NoSQL数据库,适合处理大规模结构化数据。通过...

    HDFS+MapReduce+Hive+HBase十分钟快速入门.pdf

    HDFS+MapReduce+Hive+HBase十分钟快速入门.pdf

    HBase MapReduce完整实例.rar

    《HBase与MapReduce的深度整合实践》 在大数据处理领域,HBase和MapReduce是两个重要的技术组件。HBase,作为一个分布式、列式存储的NoSQL数据库,为大规模数据提供了高并发、低延迟的访问能力。而MapReduce,作为...

    HDFS+MapReduce+Hive+HBase十分钟快速入门

    HDFS+MapReduce+Hive+HBase十分钟快速入门,包括这几个部分的简单使用

    详解Hadoop核心架构HDFS+MapReduce+Hbase+Hive

    通过对Hadoop分布式计算平台最核心的分布式文件系统HDFS、MapReduce处理过程,以及数据仓库工具Hive和分布式数据库Hbase的介绍,基本涵盖了Hadoop分布式平台的所有技术核心。通过这一阶段的调研总结,从内部机理的...

    HDFS+MapReduce+Hive+HBase十分钟快速入门.zip_hbase_hdfs_hive_mapReduce

    在大数据处理领域,Hadoop生态系统中的HDFS(Hadoop Distributed File System)、MapReduce、Hive和HBase是四个至关重要的组件。本资料“HDFS+MapReduce+Hive+HBase十分钟快速入门”旨在帮助初学者迅速理解这些技术...

    hadoop基础,hdfs,hive,mapreduce,hbase

    hadoop基础,hdfs,hive,mapreduce,hbase

    InformationInteraction:信息交互MapReduce库-HBase + YARN(实施环境)

    在IT行业中,大数据处理是至关重要的,而MapReduce、HBase和YARN是这个领域中的核心组件。本文将深入探讨这些技术以及它们如何在实际环境中整合以实现高效的信息交互。 首先,让我们了解一下MapReduce。MapReduce是...

Global site tag (gtag.js) - Google Analytics