本例中操作的对象为XX表,Column Family为”P”, Qualifer 为”P”与”C”,里面对应的value都是thrift对象。其中”P”对应的thrift对象为:
其中, TableMapReduceUtil.addDependencyJars 方法添加了完成任务一些必要的类。
/** * 为hbase表授权。 * * @param tableConfigKey 任意一个字符串。 * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。 * @param job 相关job。 * @throws IOException */ public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException { Configuration peerConf = job.getConfiguration(); peerConf.set(tableConfigKey, tableName); ZKUtil.applyClusterKeyToConf(peerConf, tableName); if (User.isHBaseSecurityEnabled(peerConf)) { LOGGER.info("Obtaining remote user authentication token with table:{}", tableName); try { User.getCurrent().obtainAuthTokenForJob(peerConf, job); } catch (InterruptedException ex) { LOGGER.info("Interrupted obtaining remote user authentication token"); LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex); Thread.interrupted(); } LOGGER.info("Obtained remote user authentication token with table:{}", tableName); } }
本例中操作的对象为XX表,Column Family为”P”, Qualifer 为”P”与”C”,里面对应的value都是thrift对象。其中”P”对应的thrift对象为:
struct UserProfile { 1: optional byte sex; 2: optional i32 age; 3: optional string phoneBrand; 4: optional string locationProvince; }
struct UserClickInfo { 1: required i32 totolAck; 2: required i32 totalClick; 3: optional map<i64, map<string, i32>> ackClick; }
/** * convert byte array to thrift object. * * @param <T> type of thrift object. * @param thriftObj an thrift object. * @return byte array if convert succeeded, <code>null</code> if convert failed. * @throws TException */ public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException { if (ArrayUtils.isEmpty(raw)) { return null; } Validate.notNull(thriftObj, "thriftObj"); TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory()); serializer.deserialize(thriftObj, raw); return thriftObj; }
//输出的KV均为Text static class ReadMapper extends TableMapper<Text,Text> { @Override protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{ String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes())); if (res == null || res.isEmpty()) return; res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY); for(KeyValue kv:res.list()) { String qualifier = Bytes.toString(kv.getQualifier()); //String qualifier = kv.getKeyString(); if(qualifier.equals("P")) { try { UserProfile userProfile = new UserProfile(); convertBytesToThriftObject(kv.getValue(), userProfile); String profileRes = userProfile.getAge() + "," + userProfile.getSex() + "," + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince(); context.write(new Text(uuid),new Text(profileRes)); } catch (Exception ex) {} } else if(qualifier.equals("C")) { UserClickInfo userClickInfo = new UserClickInfo(); try { convertBytesToThriftObject(kv.getValue(), userClickInfo); Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick(); for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) { String appid = String.valueOf(entry.getKey()); int click = entry.getValue().get("click"); int ack = entry.getValue().get("ack"); String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack); context.write(new Text(uuid),new Text(all)); } int allClick = userClickInfo.getTotalClick(); int allAck = userClickInfo.getTotolAck(); String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck); context.write(new Text(uuid),new Text(allNum)); } catch (Exception ex) {} } } } }
其中, TableMapReduceUtil.addDependencyJars 方法添加了完成任务一些必要的类。
public int run(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf,"read_data_from_hbase"); job.setJarByClass(ReadDataFromHbase.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ReadReducer.class); job.setSpeculativeExecution(false); TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class, CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class); Scan scan = new Scan(); //对整个CF扫描 scan.addFamily(USER_FEATURE_COLUMN_FAMILY); String table = "XXX"; initAuthentication(table,table,job); TableMapReduceUtil.initTableMapperJob(table, scan, ReadMapper.class, Text.class, Text.class, job); String output = ""; FileSystem.get(job.getConfiguration()).delete(new Path(output), true); FileOutputFormat.setOutputPath(job,new Path(output)); return job.waitForCompletion(true) ? 0 : 1; }
package com.xiaomi.xmpush.mr_job_and_tools.task; import com.twitter.elephantbird.thrift.TStructDescriptor; import XXX.XXX.XXX.XXX.common.util.TimeUtils; import XXX.XXX.XXX.thrift.UserClickInfo; import XXX.XXX.XXX.thrift.UserProfile; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.Validate; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shaded.parquet.org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * Created by WangLei on 17-3-13. */ public class ReadDataFromHbase extends Configured implements Tool{ private static final Logger LOGGER = LoggerFactory.getLogger(ReadDataFromHbase.class); public static final byte[] USER_FEATURE_COLUMN_FAMILY = Bytes.toBytes("P"); /** * convert byte array to thrift object. * * @param <T> type of thrift object. * @param thriftObj an thrift object. * @return byte array if convert succeeded, <code>null</code> if convert failed. * @throws TException */ public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException { if (ArrayUtils.isEmpty(raw)) { return null; } Validate.notNull(thriftObj, "thriftObj"); TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory()); serializer.deserialize(thriftObj, raw); return thriftObj; } /** * 为hbase表授权。 * * @param tableConfigKey 任意一个字符串。 * @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。 * @param job 相关job。 * @throws IOException */ public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException { Configuration peerConf = job.getConfiguration(); peerConf.set(tableConfigKey, tableName); ZKUtil.applyClusterKeyToConf(peerConf, tableName); if (User.isHBaseSecurityEnabled(peerConf)) { LOGGER.info("Obtaining remote user authentication token with table:{}", tableName); try { User.getCurrent().obtainAuthTokenForJob(peerConf, job); } catch (InterruptedException ex) { LOGGER.info("Interrupted obtaining remote user authentication token"); LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex); Thread.interrupted(); } LOGGER.info("Obtained remote user authentication token with table:{}", tableName); } } static class ReadMapper extends TableMapper<Text,Text> { @Override protected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{ String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes())); if (res == null || res.isEmpty()) return; res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY); for(KeyValue kv:res.list()) { String qualifier = Bytes.toString(kv.getQualifier()); //String qualifier = kv.getKeyString(); if(qualifier.equals("P")) { try { UserProfile userProfile = new UserProfile(); convertBytesToThriftObject(kv.getValue(), userProfile); String profileRes = userProfile.getAge() + "," + userProfile.getSex() + "," + userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince(); context.write(new Text(uuid),new Text(profileRes)); } catch (Exception ex) {} } else if(qualifier.equals("C")) { UserClickInfo userClickInfo = new UserClickInfo(); try { convertBytesToThriftObject(kv.getValue(), userClickInfo); Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick(); for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) { String appid = String.valueOf(entry.getKey()); int click = entry.getValue().get("click"); int ack = entry.getValue().get("ack"); String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack); context.write(new Text(uuid),new Text(all)); } int allClick = userClickInfo.getTotalClick(); int allAck = userClickInfo.getTotolAck(); String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck); context.write(new Text(uuid),new Text(allNum)); } catch (Exception ex) {} } } } } static class ReadReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{ List<String> resultList = new ArrayList<String>(); for(Text each:values) { resultList.add(each.toString()); } String res = StringUtils.join(resultList,":"); context.write(key,new Text(res)); } } @Override public int run(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); Job job = Job.getInstance(conf,"read_data_from_hbase"); job.setJarByClass(ReadDataFromHbase.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ReadReducer.class); job.setSpeculativeExecution(false); TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class, CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class); Scan scan = new Scan(); //对整个CF扫描 scan.addFamily(USER_FEATURE_COLUMN_FAMILY); String table = "XXX"; initAuthentication(table,table,job); TableMapReduceUtil.initTableMapperJob(table, scan, ReadMapper.class, Text.class, Text.class, job); String output = ""; FileSystem.get(job.getConfiguration()).delete(new Path(output), true); FileOutputFormat.setOutputPath(job,new Path(output)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new ReadDataFromHbase(),args)); } }
■ Hadoop系统的安装和操作管理 ■ 大数据分布式文件系统HDFS ■ Hadoop MapReduce并行编程模型、框架与编程接口 ■ 分布式数据表HBase ■ 分布式数据仓库Hive ■ Intel Hadoop系统优化与功能增强 ■ MapReduce 基础...
- **气象数据集分析**:使用一个具体的气象数据集作为例子,展示了如何使用Hadoop进行数据分析,包括使用Unix工具进行初步处理,然后利用Hadoop MapReduce完成进一步的数据分析。 - **分布化**:介绍MapReduce如何...
Hadoop分布式系统通过集群的方式运行,能够处理大量数据,并允许用户通过Hadoop的MapReduce编程范例创建并执行应用程序。Hadoop系统包含以下几个核心组件: 1. Hadoop分布式文件系统(HDFS):HDFS是一个高度容错性...
【描述】:本文档提供了2019年关于Hadoop的开题报告的范例,适合指导进行Hadoop相关研究的学生进行开题报告的撰写。 【标签】:“互联网” 【内容摘要】: Hadoop是一个开源框架,主要解决海量数据的存储和处理...