`

Hadoop MapReduce操作Hbase范例学习(TableMapReduceUtil)

 
阅读更多
Hbase里的数据量一般都小不了,因此MapReduce跟Hbase就成了天然的好搭档。

1.ZK授权表

首先一点来说,Hbase是强依赖于ZK的。博主所在的team,就经常出现ZK连接数太多被打爆然后Hbase挂了的情况。一般在访问Hbase表之前,需要通过访问ZK得到授权:

/**
     * 为hbase表授权。
     *
     * @param tableConfigKey 任意一个字符串。
     * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。
     * @param job 相关job。
     * @throws IOException
     */
    public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException {
        Configuration peerConf = job.getConfiguration();
        peerConf.set(tableConfigKey, tableName);
        ZKUtil.applyClusterKeyToConf(peerConf, tableName);
        if (User.isHBaseSecurityEnabled(peerConf)) {
            LOGGER.info("Obtaining remote user authentication token with table:{}", tableName);
            try {
                User.getCurrent().obtainAuthTokenForJob(peerConf, job);
            } catch (InterruptedException ex) {
                LOGGER.info("Interrupted obtaining remote user authentication token");
                LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex);
                Thread.interrupted();
            }
            LOGGER.info("Obtained remote user authentication token with table:{}", tableName);
        }
    }


2.thrift对象转化

本例中操作的对象为XX表,Column Family为”P”, Qualifer 为”P”与”C”,里面对应的value都是thrift对象。其中”P”对应的thrift对象为:

struct UserProfile {
    1: optional byte sex; 
    2: optional i32 age;
    3: optional string phoneBrand;
    4: optional string locationProvince;
}

“C”对应的thrift对象为:

struct UserClickInfo {
    1: required i32 totolAck;
    2: required i32 totalClick;
    3: optional map<i64, map<string, i32>> ackClick;
}


这个时候我们就需要经常将Bytes转化为thrift对象,通用的方法为:

/**
     * convert byte array to thrift object.
     *
     * @param <T> type of thrift object.
     * @param thriftObj an thrift object.
     * @return byte array if convert succeeded, <code>null</code> if convert failed.
     * @throws TException
     */
    public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException {
        if (ArrayUtils.isEmpty(raw)) {
            return null;
        }
        Validate.notNull(thriftObj, "thriftObj");

        TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory());
        serializer.deserialize(thriftObj, raw);
        return thriftObj;
    }


3.Map阶段读取Hbase里的数据

在Map阶段对Hbase表扫描,得出数据

//输出的KV均为Text
    static class ReadMapper extends TableMapper<Text,Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{
            String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes()));
            if (res == null || res.isEmpty()) return;
            res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY);


            for(KeyValue kv:res.list()) {
                String qualifier = Bytes.toString(kv.getQualifier());
                //String qualifier = kv.getKeyString();
                if(qualifier.equals("P")) {

                    try {
                        UserProfile userProfile = new UserProfile();
                        convertBytesToThriftObject(kv.getValue(), userProfile);
                        String profileRes = userProfile.getAge() + "," + userProfile.getSex() + ","
                                + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince();
                        context.write(new Text(uuid),new Text(profileRes));
                    } catch (Exception ex) {}
                }
                else if(qualifier.equals("C")) {
                    UserClickInfo userClickInfo = new UserClickInfo();
                    try {
                        convertBytesToThriftObject(kv.getValue(), userClickInfo);
                        Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick();
                        for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) {
                            String appid = String.valueOf(entry.getKey());
                            int click = entry.getValue().get("click");
                            int ack = entry.getValue().get("ack");
                            String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack);
                            context.write(new Text(uuid),new Text(all));
                        }
                        int allClick = userClickInfo.getTotalClick();
                        int allAck = userClickInfo.getTotolAck();
                        String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck);
                        context.write(new Text(uuid),new Text(allNum));
                    } catch (Exception ex) {}
                }
            }
        }
    }


4.run方法里配置相关驱动

run方法里需要配置一些相关的参数,保证任务的顺利进行。

其中, TableMapReduceUtil.addDependencyJars 方法添加了完成任务一些必要的类。

public int run(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();

        Job job = Job.getInstance(conf,"read_data_from_hbase");
        job.setJarByClass(ReadDataFromHbase.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(ReadReducer.class);
        job.setSpeculativeExecution(false);

        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class,
                CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class);

        Scan scan = new Scan();
        //对整个CF扫描
        scan.addFamily(USER_FEATURE_COLUMN_FAMILY);

        String table = "XXX";
        initAuthentication(table,table,job);
        TableMapReduceUtil.initTableMapperJob(table,
                scan,
                ReadMapper.class,
                Text.class,
                Text.class,
                job);

        String output = "";
        FileSystem.get(job.getConfiguration()).delete(new Path(output), true);
        FileOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true) ? 0 : 1;
    }


5.完整的代码

TableMapReduceUtil.initTableMapperJob把HBase表中的数据注入到Mapper中

package com.xiaomi.xmpush.mr_job_and_tools.task;

import com.twitter.elephantbird.thrift.TStructDescriptor;
import XXX.XXX.XXX.XXX.common.util.TimeUtils;
import XXX.XXX.XXX.thrift.UserClickInfo;
import XXX.XXX.XXX.thrift.UserProfile;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.parquet.org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Created by WangLei on 17-3-13.
 */
public class ReadDataFromHbase extends Configured implements Tool{

    private static final Logger LOGGER = LoggerFactory.getLogger(ReadDataFromHbase.class);
    public static final byte[] USER_FEATURE_COLUMN_FAMILY = Bytes.toBytes("P");

    /**
     * convert byte array to thrift object.
     *
     * @param <T> type of thrift object.
     * @param thriftObj an thrift object.
     * @return byte array if convert succeeded, <code>null</code> if convert failed.
     * @throws TException
     */
    public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException {
        if (ArrayUtils.isEmpty(raw)) {
            return null;
        }
        Validate.notNull(thriftObj, "thriftObj");

        TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory());
        serializer.deserialize(thriftObj, raw);
        return thriftObj;
    }


    /**
     * 为hbase表授权。
     *
     * @param tableConfigKey 任意一个字符串。
     * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。
     * @param job 相关job。
     * @throws IOException
     */
    public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException {
        Configuration peerConf = job.getConfiguration();
        peerConf.set(tableConfigKey, tableName);
        ZKUtil.applyClusterKeyToConf(peerConf, tableName);
        if (User.isHBaseSecurityEnabled(peerConf)) {
            LOGGER.info("Obtaining remote user authentication token with table:{}", tableName);
            try {
                User.getCurrent().obtainAuthTokenForJob(peerConf, job);
            } catch (InterruptedException ex) {
                LOGGER.info("Interrupted obtaining remote user authentication token");
                LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex);
                Thread.interrupted();
            }
            LOGGER.info("Obtained remote user authentication token with table:{}", tableName);
        }
    }

    static class ReadMapper extends TableMapper<Text,Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{
            String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes()));
            if (res == null || res.isEmpty()) return;
            res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY);


            for(KeyValue kv:res.list()) {
                String qualifier = Bytes.toString(kv.getQualifier());
                //String qualifier = kv.getKeyString();
                if(qualifier.equals("P")) {

                    try {
                        UserProfile userProfile = new UserProfile();
                        convertBytesToThriftObject(kv.getValue(), userProfile);
                        String profileRes = userProfile.getAge() + "," + userProfile.getSex() + ","
                                + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince();
                        context.write(new Text(uuid),new Text(profileRes));
                    } catch (Exception ex) {}
                }
                else if(qualifier.equals("C")) {
                    UserClickInfo userClickInfo = new UserClickInfo();
                    try {
                        convertBytesToThriftObject(kv.getValue(), userClickInfo);
                        Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick();
                        for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) {
                            String appid = String.valueOf(entry.getKey());
                            int click = entry.getValue().get("click");
                            int ack = entry.getValue().get("ack");
                            String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack);
                            context.write(new Text(uuid),new Text(all));
                        }
                        int allClick = userClickInfo.getTotalClick();
                        int allAck = userClickInfo.getTotolAck();
                        String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck);
                        context.write(new Text(uuid),new Text(allNum));
                    } catch (Exception ex) {}
                }
            }
        }
    }

    static class ReadReducer extends Reducer<Text,Text,Text,Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{
            List<String> resultList = new ArrayList<String>();
            for(Text each:values) {
                resultList.add(each.toString());
            }
            String res = StringUtils.join(resultList,":");
            context.write(key,new Text(res));
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();

        Job job = Job.getInstance(conf,"read_data_from_hbase");
        job.setJarByClass(ReadDataFromHbase.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(ReadReducer.class);
        job.setSpeculativeExecution(false);

        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class,
                CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class);

        Scan scan = new Scan();
        //对整个CF扫描
        scan.addFamily(USER_FEATURE_COLUMN_FAMILY);

        String table = "XXX";
        initAuthentication(table,table,job);
        TableMapReduceUtil.initTableMapperJob(table,
                scan,
                ReadMapper.class,
                Text.class,
                Text.class,
                job);

        String output = "";
        FileSystem.get(job.getConfiguration()).delete(new Path(output), true);
        FileOutputFormat.setOutputPath(job,new Path(output));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new ReadDataFromHbase(),args));
    }
}


然后将代码打包,提交到集群上运行对应的shell脚本即可。

转自:http://www.tuicool.com/articles/u6z6Nvb
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Hadoop/HDFS/MapReduce/HBase

    对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount ...通过理解和实践 Hadoop MapReduce 的 WordCount 示例,开发者可以快速掌握 MapReduce 的基本工作原理,为进一步学习和应用大数据处理技术打下坚实基础。

    基于MapReduce和HBase的海量网络数据处理.pdf

    系统的设计基于Hadoop和HBase的架构,充分利用了Hadoop的分布式文件系统HDFS和MapReduce的快速计算能力,以及HBase的高效数据存储和查询能力。该系统可以实现对海量网络数据的可靠存储、快速解析和高效查询。 系统...

    Hadoop MapReduce实战手册(完整版)

    总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、编程模型以及在大数据处理中的实际应用,是学习和理解大数据处理技术的理想读物。通过深入阅读,读者可以提升在大数据环境下的编程和...

    大数据 hadoop mapreduce 词频统计

    【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...

    Hadoop MapReduce Cookbook 源码

    通过本书的学习,读者不仅能掌握MapReduce的基本操作,还能了解到如何通过实践提升Hadoop系统的效率和稳定性。书中提供的源码对于理解MapReduce的工作流程至关重要,读者可以通过实际运行和修改这些代码,加深对概念...

    hadoop map reduce hbase 一人一档

    在大数据处理领域,Hadoop MapReduce、HBase和一人一档的概念构成了一个高效、可扩展的数据管理和分析系统。本文将深入探讨这些技术及其在实际应用中的结合。 标题“hadoop map reduce hbase 一人一档”揭示了这个...

    MapReduce on Hbase

    在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在Hadoop集群上运行MapReduce作业,以批量处理存储在HBase中的大量数据。由于HBase和Hadoop都是基于...

    Hadoop3.1.1集成hbase2.1.1

    Hadoop和HBase都是开源的分布式大数据处理框架,Hadoop主要用于大数据的存储和处理,而HBase是一个构建在Hadoop之上的分布式、可扩展、非关系型的NoSQL数据库。Hadoop和HBase的集成允许HBase使用Hadoop的文件系统...

    HDFS 通过mapreduce 进行 HBase 导入导出

    标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...

    hadoop+hbase集群搭建 详细手册

    本文将详细介绍如何搭建Hadoop+HBase集群,包括前提准备、机器集群结构分布、硬件环境、软件准备、操作步骤等。 一、前提准备 在搭建Hadoop+HBase集群之前,需要准备以下几个组件: 1. Hadoop:Hadoop是一个基于...

    hadoop,hbase,hive版本整合兼容性最全,最详细说明【适用于任何版本】

    在大数据领域中,Hadoop、HBase和Hive是重要的组件,它们通常需要协同工作以实现数据存储、管理和分析。随着各个软件的版本不断更新,确保不同组件之间的兼容性成为了一个挑战。本文将介绍Hadoop、HBase、Hive以及...

    Hadoop Hive与Hbase整合

    Hbase是一个分布式、面向列的NoSQL数据库,擅长处理大规模数据,而Hive则是一个基于Hadoop的数据仓库工具,擅长处理复杂的查询操作。通过整合Hive和Hbase,可以实现对大规模数据的高效查询和分析。 首先,需要将...

    利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎.zip

    利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 ## 基本介绍 - InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 - FetchDriver 负责抓取url对应的网页内容 - ParserUrlDriver 解析所抓取...

    详解Hadoop核心架构HDFS+MapReduce+Hbase+Hive

    通过对Hadoop分布式计算平台最核心的分布式文件系统HDFS、MapReduce处理过程,以及数据仓库工具Hive和分布式数据库Hbase的介绍,基本涵盖了Hadoop分布式平台的所有技术核心。通过这一阶段的调研总结,从内部机理的...

    10.Hadoop MapReduce教程1

    1. 复杂性:Hadoop MapReduce 的编程模型和执行机制相对复杂,需要一定的学习和实践经验。 2. 资源消耗:Hadoop MapReduce 需要大量的计算资源和存储资源,以支持大规模数据处理。 Hadoop MapReduce 是一种功能强大...

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...

    hadoop-2.7.2-hbase-jar.tar.gz

    集成Hadoop和HBase时,通常会将HBase的JAR包添加到Hadoop的类路径中,确保Hadoop集群能够识别并处理HBase的相关操作。这个过程可能涉及到配置Hadoop的环境变量,如HADOOP_CLASSPATH,以及修改HBase的配置文件,如...

Global site tag (gtag.js) - Google Analytics