- 浏览: 602080 次
- 性别:
- 来自: 厦门
文章分类
- 全部博客 (669)
- oracle (36)
- java (98)
- spring (48)
- UML (2)
- hibernate (10)
- tomcat (7)
- 高性能 (11)
- mysql (25)
- sql (19)
- web (42)
- 数据库设计 (4)
- Nio (6)
- Netty (8)
- Excel (3)
- File (4)
- AOP (1)
- Jetty (1)
- Log4J (4)
- 链表 (1)
- Spring Junit4 (3)
- Autowired Resource (0)
- Jackson (1)
- Javascript (58)
- Spring Cache (2)
- Spring - CXF (2)
- Spring Inject (2)
- 汉字拼音 (3)
- 代理模式 (3)
- Spring事务 (4)
- ActiveMQ (6)
- XML (3)
- Cglib (2)
- Activiti (15)
- 附件问题 (1)
- javaMail (1)
- Thread (19)
- 算法 (6)
- 正则表达式 (3)
- 国际化 (2)
- Json (3)
- EJB (3)
- Struts2 (1)
- Maven (7)
- Mybatis (7)
- Redis (8)
- DWR (1)
- Lucene (2)
- Linux (73)
- 杂谈 (2)
- CSS (13)
- Linux服务篇 (3)
- Kettle (9)
- android (81)
- protocol (2)
- EasyUI (6)
- nginx (2)
- zookeeper (6)
- Hadoop (41)
- cache (7)
- shiro (3)
- HBase (12)
- Hive (8)
- Spark (15)
- Scala (16)
- YARN (3)
- Kafka (5)
- Sqoop (2)
- Pig (3)
- Vue (6)
- sprint boot (19)
- dubbo (2)
- mongodb (2)
最新评论
Hbase里的数据量一般都小不了,因此MapReduce跟Hbase就成了天然的好搭档。
1.ZK授权表
首先一点来说,Hbase是强依赖于ZK的。博主所在的team,就经常出现ZK连接数太多被打爆然后Hbase挂了的情况。一般在访问Hbase表之前,需要通过访问ZK得到授权:
2.thrift对象转化
本例中操作的对象为XX表,Column Family为”P”, Qualifer 为”P”与”C”,里面对应的value都是thrift对象。其中”P”对应的thrift对象为:
“C”对应的thrift对象为:
这个时候我们就需要经常将Bytes转化为thrift对象,通用的方法为:
3.Map阶段读取Hbase里的数据
在Map阶段对Hbase表扫描,得出数据
4.run方法里配置相关驱动
run方法里需要配置一些相关的参数,保证任务的顺利进行。
其中, TableMapReduceUtil.addDependencyJars 方法添加了完成任务一些必要的类。
5.完整的代码
TableMapReduceUtil.initTableMapperJob把HBase表中的数据注入到Mapper中
然后将代码打包,提交到集群上运行对应的shell脚本即可。
转自:http://www.tuicool.com/articles/u6z6Nvb
1.ZK授权表
首先一点来说,Hbase是强依赖于ZK的。博主所在的team,就经常出现ZK连接数太多被打爆然后Hbase挂了的情况。一般在访问Hbase表之前,需要通过访问ZK得到授权:
/** * 为hbase表授权。 * * @param tableConfigKey 任意一个字符串。 * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。 * @param job 相关job。 * @throws IOException */ public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException { Configuration peerConf = job.getConfiguration(); peerConf.set(tableConfigKey, tableName); ZKUtil.applyClusterKeyToConf(peerConf, tableName); if (User.isHBaseSecurityEnabled(peerConf)) { LOGGER.info("Obtaining remote user authentication token with table:{}", tableName); try { User.getCurrent().obtainAuthTokenForJob(peerConf, job); } catch (InterruptedException ex) { LOGGER.info("Interrupted obtaining remote user authentication token"); LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex); Thread.interrupted(); } LOGGER.info("Obtained remote user authentication token with table:{}", tableName); } }
2.thrift对象转化
本例中操作的对象为XX表,Column Family为”P”, Qualifer 为”P”与”C”,里面对应的value都是thrift对象。其中”P”对应的thrift对象为:
struct UserProfile { 1: optional byte sex; 2: optional i32 age; 3: optional string phoneBrand; 4: optional string locationProvince; }
“C”对应的thrift对象为:
struct UserClickInfo { 1: required i32 totolAck; 2: required i32 totalClick; 3: optional map<i64, map<string, i32>> ackClick; }
这个时候我们就需要经常将Bytes转化为thrift对象,通用的方法为:
/** * convert byte array to thrift object. * * @param <T> type of thrift object. * @param thriftObj an thrift object. * @return byte array if convert succeeded, <code>null</code> if convert failed. * @throws TException */ public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException { if (ArrayUtils.isEmpty(raw)) { return null; } Validate.notNull(thriftObj, "thriftObj"); TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory()); serializer.deserialize(thriftObj, raw); return thriftObj; }
3.Map阶段读取Hbase里的数据
在Map阶段对Hbase表扫描,得出数据
//输出的KV均为Text static class ReadMapper extends TableMapper<Text,Text> { @Override protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{ String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes())); if (res == null || res.isEmpty()) return; res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY); for(KeyValue kv:res.list()) { String qualifier = Bytes.toString(kv.getQualifier()); //String qualifier = kv.getKeyString(); if(qualifier.equals("P")) { try { UserProfile userProfile = new UserProfile(); convertBytesToThriftObject(kv.getValue(), userProfile); String profileRes = userProfile.getAge() + "," + userProfile.getSex() + "," + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince(); context.write(new Text(uuid),new Text(profileRes)); } catch (Exception ex) {} } else if(qualifier.equals("C")) { UserClickInfo userClickInfo = new UserClickInfo(); try { convertBytesToThriftObject(kv.getValue(), userClickInfo); Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick(); for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) { String appid = String.valueOf(entry.getKey()); int click = entry.getValue().get("click"); int ack = entry.getValue().get("ack"); String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack); context.write(new Text(uuid),new Text(all)); } int allClick = userClickInfo.getTotalClick(); int allAck = userClickInfo.getTotolAck(); String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck); context.write(new Text(uuid),new Text(allNum)); } catch (Exception ex) {} } } } }
4.run方法里配置相关驱动
run方法里需要配置一些相关的参数,保证任务的顺利进行。
其中, TableMapReduceUtil.addDependencyJars 方法添加了完成任务一些必要的类。
public int run(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf,"read_data_from_hbase"); job.setJarByClass(ReadDataFromHbase.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ReadReducer.class); job.setSpeculativeExecution(false); TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class, CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class); Scan scan = new Scan(); //对整个CF扫描 scan.addFamily(USER_FEATURE_COLUMN_FAMILY); String table = "XXX"; initAuthentication(table,table,job); TableMapReduceUtil.initTableMapperJob(table, scan, ReadMapper.class, Text.class, Text.class, job); String output = ""; FileSystem.get(job.getConfiguration()).delete(new Path(output), true); FileOutputFormat.setOutputPath(job,new Path(output)); return job.waitForCompletion(true) ? 0 : 1; }
5.完整的代码
TableMapReduceUtil.initTableMapperJob把HBase表中的数据注入到Mapper中
package com.xiaomi.xmpush.mr_job_and_tools.task; import com.twitter.elephantbird.thrift.TStructDescriptor; import XXX.XXX.XXX.XXX.common.util.TimeUtils; import XXX.XXX.XXX.thrift.UserClickInfo; import XXX.XXX.XXX.thrift.UserProfile; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shaded.parquet.org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * Created by WangLei on 17-3-13. */ public class ReadDataFromHbase extends Configured implements Tool{ private static final Logger LOGGER = LoggerFactory.getLogger(ReadDataFromHbase.class); public static final byte[] USER_FEATURE_COLUMN_FAMILY = Bytes.toBytes("P"); /** * convert byte array to thrift object. * * @param <T> type of thrift object. * @param thriftObj an thrift object. * @return byte array if convert succeeded, <code>null</code> if convert failed. * @throws TException */ public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException { if (ArrayUtils.isEmpty(raw)) { return null; } Validate.notNull(thriftObj, "thriftObj"); TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory()); serializer.deserialize(thriftObj, raw); return thriftObj; } /** * 为hbase表授权。 * * @param tableConfigKey 任意一个字符串。 * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。 * @param job 相关job。 * @throws IOException */ public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException { Configuration peerConf = job.getConfiguration(); peerConf.set(tableConfigKey, tableName); ZKUtil.applyClusterKeyToConf(peerConf, tableName); if (User.isHBaseSecurityEnabled(peerConf)) { LOGGER.info("Obtaining remote user authentication token with table:{}", tableName); try { User.getCurrent().obtainAuthTokenForJob(peerConf, job); } catch (InterruptedException ex) { LOGGER.info("Interrupted obtaining remote user authentication token"); LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex); Thread.interrupted(); } LOGGER.info("Obtained remote user authentication token with table:{}", tableName); } } static class ReadMapper extends TableMapper<Text,Text> { @Override protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{ String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes())); if (res == null || res.isEmpty()) return; res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY); for(KeyValue kv:res.list()) { String qualifier = Bytes.toString(kv.getQualifier()); //String qualifier = kv.getKeyString(); if(qualifier.equals("P")) { try { UserProfile userProfile = new UserProfile(); convertBytesToThriftObject(kv.getValue(), userProfile); String profileRes = userProfile.getAge() + "," + userProfile.getSex() + "," + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince(); context.write(new Text(uuid),new Text(profileRes)); } catch (Exception ex) {} } else if(qualifier.equals("C")) { UserClickInfo userClickInfo = new UserClickInfo(); try { convertBytesToThriftObject(kv.getValue(), userClickInfo); Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick(); for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) { String appid = String.valueOf(entry.getKey()); int click = entry.getValue().get("click"); int ack = entry.getValue().get("ack"); String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack); context.write(new Text(uuid),new Text(all)); } int allClick = userClickInfo.getTotalClick(); int allAck = userClickInfo.getTotolAck(); String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck); context.write(new Text(uuid),new Text(allNum)); } catch (Exception ex) {} } } } } static class ReadReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{ List<String> resultList = new ArrayList<String>(); for(Text each:values) { resultList.add(each.toString()); } String res = StringUtils.join(resultList,":"); context.write(key,new Text(res)); } } @Override public int run(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf,"read_data_from_hbase"); job.setJarByClass(ReadDataFromHbase.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ReadReducer.class); job.setSpeculativeExecution(false); TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class, CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class); Scan scan = new Scan(); //对整个CF扫描 scan.addFamily(USER_FEATURE_COLUMN_FAMILY); String table = "XXX"; initAuthentication(table,table,job); TableMapReduceUtil.initTableMapperJob(table, scan, ReadMapper.class, Text.class, Text.class, job); String output = ""; FileSystem.get(job.getConfiguration()).delete(new Path(output), true); FileOutputFormat.setOutputPath(job,new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new ReadDataFromHbase(),args)); } }
然后将代码打包,提交到集群上运行对应的shell脚本即可。
转自:http://www.tuicool.com/articles/u6z6Nvb
发表评论
文章已被作者锁定,不允许评论。
-
Hadoop namenode的fsimage与editlog详解
2017-05-19 10:04 1198Namenode主要维护两个文件,一个是fsimage,一个是 ... -
Hadoop HBase建表时预分区(region)的方法学习
2017-05-15 11:18 1199如果知道Hbase数据表的key的分布情况,就可以在建表的时候 ... -
Hadoop HBase行健(rowkey)设计原则学习
2017-05-15 10:34 1130Hbase是三维有序存储的,通过rowkey(行键),colu ... -
Hadoop HBase中split原理学习
2017-05-12 13:38 2286在Hbase中split是一个很重 ... -
Hadoop HBase中Compaction原理学习
2017-05-12 10:34 1004HBase Compaction策略 RegionServer ... -
Hadoop HBase性能优化学习
2017-05-12 09:15 691一、调整参数 入门级的调优可以从调整参数开始。投入小,回报快 ... -
Hadoop 分布式文件系统学习
2017-05-10 15:34 509一. 分布式文件系统 分布式文件系统,在整个分布式系统体系中处 ... -
Hadoop MapReduce处理wordcount代码分析
2017-04-28 14:25 598package org.apache.hadoop.exa ... -
Hadoop YARN完全分布式配置学习
2017-04-26 10:27 579版本及配置简介 Java: J ... -
Hadoop YARN各个组件和流程的学习
2017-04-24 19:04 653一、基本组成结构 * 集 ... -
Hadoop YARN(Yet Another Resource Negotiator)详细解析
2017-04-24 18:30 1164带有 MapReduce 的 Apache Had ... -
Hive 注意事项与扩展特性
2017-04-06 19:31 7571. 使用HIVE注意点 字符集 Hadoop和Hive都 ... -
Hive 元数据和QL基本操作学习整理
2017-04-06 14:36 1041Hive元数据库 Hive将元数据存储在RDBMS 中,一般常 ... -
Hive 文件压缩存储格式(STORED AS)
2017-04-06 09:35 2329Hive文件存储格式包括以下几类: 1.TEXTFILE ... -
Hive SQL自带函数总结
2017-04-05 19:25 1146字符串长度函数:length ... -
Hive 连接查询操作(不支持IN查询)
2017-04-05 19:16 734CREATE EXTERNAL TABLE IF NOT ... -
Hive优化学习(join ,group by,in)
2017-04-05 18:48 1820一、join优化 Join ... -
Hive 基础知识学习(语法)
2017-04-05 15:51 907一.Hive 简介 Hive是基于 Hadoop 分布式文件 ... -
Hive 架构与基本语法(OLAP)
2017-04-05 15:16 1260Hive 是什么 Hive是建立在Hadoop上的数据仓库基础 ... -
Hadoop MapReduce将HDFS文本数据导入HBase
2017-03-24 11:13 1226HBase本身提供了很多种数据导入的方式,通常有两种常用方式: ...
相关推荐
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
【标题】Hadoop MapReduce 实现 WordCount ...通过理解和实践 Hadoop MapReduce 的 WordCount 示例,开发者可以快速掌握 MapReduce 的基本工作原理,为进一步学习和应用大数据处理技术打下坚实基础。
系统的设计基于Hadoop和HBase的架构,充分利用了Hadoop的分布式文件系统HDFS和MapReduce的快速计算能力,以及HBase的高效数据存储和查询能力。该系统可以实现对海量网络数据的可靠存储、快速解析和高效查询。 系统...
总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、编程模型以及在大数据处理中的实际应用,是学习和理解大数据处理技术的理想读物。通过深入阅读,读者可以提升在大数据环境下的编程和...
【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...
通过本书的学习,读者不仅能掌握MapReduce的基本操作,还能了解到如何通过实践提升Hadoop系统的效率和稳定性。书中提供的源码对于理解MapReduce的工作流程至关重要,读者可以通过实际运行和修改这些代码,加深对概念...
在大数据处理领域,Hadoop MapReduce、HBase和一人一档的概念构成了一个高效、可扩展的数据管理和分析系统。本文将深入探讨这些技术及其在实际应用中的结合。 标题“hadoop map reduce hbase 一人一档”揭示了这个...
在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在Hadoop集群上运行MapReduce作业,以批量处理存储在HBase中的大量数据。由于HBase和Hadoop都是基于...
Hadoop和HBase都是开源的分布式大数据处理框架,Hadoop主要用于大数据的存储和处理,而HBase是一个构建在Hadoop之上的分布式、可扩展、非关系型的NoSQL数据库。Hadoop和HBase的集成允许HBase使用Hadoop的文件系统...
标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...
本文将详细介绍如何搭建Hadoop+HBase集群,包括前提准备、机器集群结构分布、硬件环境、软件准备、操作步骤等。 一、前提准备 在搭建Hadoop+HBase集群之前,需要准备以下几个组件: 1. Hadoop:Hadoop是一个基于...
在大数据领域中,Hadoop、HBase和Hive是重要的组件,它们通常需要协同工作以实现数据存储、管理和分析。随着各个软件的版本不断更新,确保不同组件之间的兼容性成为了一个挑战。本文将介绍Hadoop、HBase、Hive以及...
Hbase是一个分布式、面向列的NoSQL数据库,擅长处理大规模数据,而Hive则是一个基于Hadoop的数据仓库工具,擅长处理复杂的查询操作。通过整合Hive和Hbase,可以实现对大规模数据的高效查询和分析。 首先,需要将...
利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 ## 基本介绍 - InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 - FetchDriver 负责抓取url对应的网页内容 - ParserUrlDriver 解析所抓取...
通过对Hadoop分布式计算平台最核心的分布式文件系统HDFS、MapReduce处理过程,以及数据仓库工具Hive和分布式数据库Hbase的介绍,基本涵盖了Hadoop分布式平台的所有技术核心。通过这一阶段的调研总结,从内部机理的...
1. 复杂性:Hadoop MapReduce 的编程模型和执行机制相对复杂,需要一定的学习和实践经验。 2. 资源消耗:Hadoop MapReduce 需要大量的计算资源和存储资源,以支持大规模数据处理。 Hadoop MapReduce 是一种功能强大...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
集成Hadoop和HBase时,通常会将HBase的JAR包添加到Hadoop的类路径中,确保Hadoop集群能够识别并处理HBase的相关操作。这个过程可能涉及到配置Hadoop的环境变量,如HADOOP_CLASSPATH,以及修改HBase的配置文件,如...