package cmd;
/**
* MapReduce 读取hdfs上的文件,
* 以HTable.put(put)的方式在map中完成数据写入,无reduce过程
*/
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool {
static final Log LOG = LogFactory.getLog(HBaseImport.class);
public static final String JOBNAME = "MRImport ";
public static class Map extends
Mapper<LongWritable, Text, NullWritable, NullWritable> {
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
super.cleanup(context);
xTable.flushCommits();
xTable.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String all[] = value.toString().split("/t");
Put put = null;
if (all.length == 2) {
put = new Put(Bytes.toBytes(all[0]));
put.add(Bytes.toBytes("xxx"), Bytes.toBytes("20110313"),
Bytes.toBytes(all[1]));
}
if (!wal) {
put.setWriteToWAL(false);
}
xTable.put(put);
if ((++count % 100) == 0) {
context.setStatus(count + " DOCUMENTS done!");
context.progress();
System.out.println(count + " DOCUMENTS done!");
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
configuration = context.getConfiguration();
xTable = new HTable(configuration, "testKang");
xTable.setAutoFlush(false);
xTable.setWriteBufferSize(12 * 1024 * 1024);
wal = true;
}
}
@Override
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.master", "m0:60000");
Job job = new Job(conf, JOBNAME);
job.setJarByClass(HBaseImport.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
int res = 1;
try {
res = ToolRunner.run(conf, new HBaseImport(), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
package data2hbase;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class DataToHBase {
private static final String TABLE_NAME = "TrafficInfo";
private static final String FAMILY_NAME = "cf";
private static final String INPUT_PATH = "hdfs://hadoop.master:9000/traffic_in.dat";
// private static final String OUT_PATH = "hdfs://hadoop.master:/9000/traffic_out.dat";
public static void main(String[] args) throws Exception {
// 创建table,
Configuration conf = new Configuration();
// conf.set("hbase.rootdir", "hdfs://hadoop.master:9000/hbase");
//使用eclipse时必须添加这个,否则无法定位
// conf.set("hbase.zookeeper.quorum",
// "hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
Configuration cf = HBaseConfiguration.create(conf);
HBaseAdmin hbaseAdmin = new HBaseAdmin(cf);
boolean tableExists = hbaseAdmin.tableExists(TABLE_NAME);
if (tableExists) {
hbaseAdmin.disableTable(TABLE_NAME);
hbaseAdmin.deleteTable(TABLE_NAME);
System.err.println("............del table: " + TABLE_NAME);
}
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME.getBytes());
desc.addFamily(family);
hbaseAdmin.createTable(desc);
System.err.println(".................create table: " + TABLE_NAME);
// 1.1
conf = new Configuration();
// // 设置zookeeper
// conf.set("hbase.zookeeper.quorum",
// "hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
// 设置hbase表名称
conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
conf.set("dfs.socket.timeout", "180000");
Job job = new Job(conf, DataToHBase.class.getName());
job.setJarByClass(DataToHBase.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 1.2
job.setMapperClass(BatchImportMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 1.3
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// 1.4
// job.setGroupingComparatorClass(cls);
// 1.5
// job.setCombinerClass(cls)
// 2.1
// 2.2
job.setReducerClass(BatchImportReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
// 2.3
// FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.waitForCompletion(true);
// 只想批量导入
}
// static class BatchImportMapper extends TableMapper<Text, Text> {
// protected void map(
// org.apache.hadoop.hbase.io.ImmutableBytesWritable key,
// org.apache.hadoop.hbase.client.Result value,
// org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.hbase.io.ImmutableBytesWritable,
// org.apache.hadoop.hbase.client.Result, Text, Text>.Context context)
// throws java.io.IOException, InterruptedException {
// };
// }
static class BatchImportMapper extends
Mapper<LongWritable, Text, Text, Text> {
SimpleDateFormat simpleDataFormat = new SimpleDateFormat(
"yyyyMMddHHmmss");
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
String[] split = value.toString().split("\t");
String time = split[0].trim();
System.err.println("=="+time+"==");
String formatDate = simpleDataFormat.format(new Date(Long
.parseLong(time)));
context.write(new Text(split[1]), new Text(split[1] + ":"
+ formatDate + "\t" + value.toString()));
};
}
static class BatchImportReducer extends TableReducer<Text, Text, Text> {
protected void reduce(
Text k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
throws java.io.IOException, InterruptedException {
for (Text text : v2s) {
String[] split = text.toString().split("\t");
String tableRowKey = split[0].trim();
String phoneNum = split[2];
String upPackNum = split[7];
String downPackNum = split[8];
String upPayLoad = split[9];
String downPayLoad = split[10];
String host = split[5];
Put put = new Put(tableRowKey.getBytes());
put.add(FAMILY_NAME.getBytes(), "phoneNum".getBytes(),
phoneNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "upPackNum".getBytes(),
upPackNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "downPackNum".getBytes(),
downPackNum.getBytes());
put.add(FAMILY_NAME.getBytes(), "upPayLoad".getBytes(),
upPayLoad.getBytes());
put.add(FAMILY_NAME.getBytes(), "downPayLoad".getBytes(),
downPayLoad.getBytes());
put.add(FAMILY_NAME.getBytes(), "host".getBytes(),
host.getBytes());
context.write(new Text(tableRowKey), put);
// HTable htable=new HTable(new Configuration(),TABLE_NAME);
// htable.put(put);
//
// context.write(new Text(tableRowKey), new Text(text));
}
};
}
}
分享到:
相关推荐
标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...
本文将详细讨论如何使用Java编程语言实现从Hive到HBase的快速数据导入方案。 首先,Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合大规模数据的离线...
标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...
标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...
2. 数据查询:通过MapReduce实现对HBase表的查询,可以在Map阶段进行过滤,Reduce阶段进行聚合操作。 3. 数据更新:在Map阶段定位到需要更新的行,然后在Reduce阶段完成更新操作。 4. 数据删除:Map阶段标识出需要...
总结,通过上述步骤,我们可以成功地将MySQL中的数据导入到HBase。在实际项目中,可能需要考虑更多因素,例如数据清洗、错误处理、性能优化等。此外,为了实现大规模数据迁移,可以考虑使用批处理或MapReduce等技术...
标题中的“hbase导入话单数据mapreduce函数实现执行过程实例”揭示了本文将探讨如何使用MapReduce在HBase中导入大数据,特别是话单记录。HBase是一个分布式、版本化的NoSQL数据库,常用于处理大规模的数据。...
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
基于MapReduce和HBase的海量网络数据处理 ...本文提出了一种基于MapReduce和HBase的海量网络数据处理系统,可以实现对海量网络数据的高效处理和分析,为大数据时代的网络数据处理提供了一种良好的解决方案。
在数据导入阶段,可以使用MapReduce将大量数据加载到HBase;在数据分析阶段,可以读取HBase中的数据进行复杂计算,然后将结果写回HBase或其他存储系统。这种结合使得HBase能够支持大规模的数据处理,同时也保留了其...
通过 Sqoop导出到Hbase,需要先将数据导入HDFS,再用Hbase的Import命令将数据加载到Hbase表中。 - Hadoop MapReduce:可以编写自定义的MapReduce作业,将RDBMS数据读取、转换并写入Hbase。这种方法灵活性高,但开发...
7. **监控和验证**:导入完成后,通过HBase的监控工具或自定义脚本检查导入结果,确保数据正确无误地导入到HBase。 在提供的"ImpDataToHbase"源代码中,我们可以看到这些步骤的具体实现。源代码可能包括了数据...
在MapReduce的基础上,可以实现对HBase数据库的数据操作,包括读取、写入和更新HBase表中的数据。 在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在...
【标题】:“Hadoop MR(MapReduce)将文件转换为HFile并导入到HBase” 在大数据处理领域,Hadoop MapReduce是一种广泛使用的分布式计算框架,而HBase是基于Hadoop的数据存储系统,用于处理大规模的非结构化数据。...
1. **预分区**:在导入数据前,根据预计的数据量创建足够的区域(region),避免数据导入过程中动态分区导致的性能损耗。 2. **使用HFileOutputFormat**:HBase提供了HFileOutputFormat类,可以将数据直接写入HFile...
2. HBase到Hive:通过MapReduce任务,将HBase中的数据导入到Hive,创建Hive表并加载数据。也可以使用HBaseSerDe来解析HBase数据。 三、HBase和HDFS互导 1. HBase到HDFS:可以通过HBase的Export工具,将HBase表的...
例如,可以使用HBase的API创建和管理数据库,使用Hive的HQL进行数据分析,以及编写自定义的MapReduce作业来执行特定的数据处理任务。 总结来说,这个压缩包提供了Hadoop生态系统的几个关键组件的示例和库,对于学习...
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
实验结果显示,随着Hadoop和HBase集群规模的扩大,图像数据的导入和检索速度显著提高。这种方法尤其适用于处理大量遥感图像,可以在普通X86架构的服务器上实现高效的数据管理和检索,而不需要大量的计算资源。 总结...
HBase提供了数据的批量导入操作,常见的工具有HBase自带的ImportTsv和CompleteBulkLoad工具,可以高效地将大量数据导入HBase表中。 ### 实验原理和环境 HBase是一种基于列存储的数据库,它适用于存储非结构化和半...