`

MapReduce牛逼(2)MR简单实现 导入数据到hbase例子

 
阅读更多
package cmd;

/**
 * MapReduce 读取hdfs上的文件,
 *  以HTable.put(put)的方式在map中完成数据写入,无reduce过程
 */
import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class HBaseImport extends Configured implements Tool {
	static final Log LOG = LogFactory.getLog(HBaseImport.class);
	public static final String JOBNAME = "MRImport ";

	public static class Map extends
			Mapper<LongWritable, Text, NullWritable, NullWritable> {
		Configuration configuration = null;
		HTable xTable = null;
		private boolean wal = true;
		static long count = 0;

		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			super.cleanup(context);
			xTable.flushCommits();
			xTable.close();
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String all[] = value.toString().split("/t");
			Put put = null;
			if (all.length == 2) {
				put = new Put(Bytes.toBytes(all[0]));
				put.add(Bytes.toBytes("xxx"), Bytes.toBytes("20110313"),
						Bytes.toBytes(all[1]));
			}
			if (!wal) {
				put.setWriteToWAL(false);
			}
			xTable.put(put);
			if ((++count % 100) == 0) {
				context.setStatus(count + " DOCUMENTS done!");
				context.progress();
				System.out.println(count + " DOCUMENTS done!");
			}
		}

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			super.setup(context);
			configuration = context.getConfiguration();
			xTable = new HTable(configuration, "testKang");
			xTable.setAutoFlush(false);
			xTable.setWriteBufferSize(12 * 1024 * 1024);
			wal = true;
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		String input = args[0];
		Configuration conf = HBaseConfiguration.create(getConf());
		conf.set("hbase.master", "m0:60000");
		Job job = new Job(conf, JOBNAME);
		job.setJarByClass(HBaseImport.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.setInputPaths(job, input);
		job.setOutputFormatClass(NullOutputFormat.class);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		int res = 1;
		try {
			res = ToolRunner.run(conf, new HBaseImport(), otherArgs);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(res);
	}

}

package data2hbase;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class DataToHBase {
	private static final String TABLE_NAME = "TrafficInfo";
	private static final String FAMILY_NAME = "cf";
	private static final String INPUT_PATH = "hdfs://hadoop.master:9000/traffic_in.dat";
//	private static final String OUT_PATH = "hdfs://hadoop.master:/9000/traffic_out.dat";

	public static void main(String[] args) throws Exception {
		// 创建table,
		Configuration conf = new Configuration();
		
//		conf.set("hbase.rootdir", "hdfs://hadoop.master:9000/hbase");
		//使用eclipse时必须添加这个,否则无法定位
//		conf.set("hbase.zookeeper.quorum",
//		"hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
		Configuration cf = HBaseConfiguration.create(conf);
		HBaseAdmin hbaseAdmin = new HBaseAdmin(cf);

		boolean tableExists = hbaseAdmin.tableExists(TABLE_NAME);
		if (tableExists) {
			hbaseAdmin.disableTable(TABLE_NAME);
			hbaseAdmin.deleteTable(TABLE_NAME);
			System.err.println("............del table: " + TABLE_NAME);
		}

		HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
		HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME.getBytes());
		desc.addFamily(family);
		hbaseAdmin.createTable(desc);
		System.err.println(".................create table: " + TABLE_NAME);

		// 1.1
		conf = new Configuration();
//		// 设置zookeeper
//		conf.set("hbase.zookeeper.quorum",
//				"hadoop.master,hadoop.slave0,hadoop.slave1,hadoop.slave2");
		// 设置hbase表名称
		conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
		conf.set("dfs.socket.timeout", "180000");
		Job job = new Job(conf, DataToHBase.class.getName());
		job.setJarByClass(DataToHBase.class);
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		// 1.2
		job.setMapperClass(BatchImportMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		// 1.3
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		// 1.4
		// job.setGroupingComparatorClass(cls);
		// 1.5
		// job.setCombinerClass(cls)

		// 2.1
		// 2.2
		job.setReducerClass(BatchImportReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setOutputFormatClass(TableOutputFormat.class);

		// 2.3
//		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

		job.waitForCompletion(true);

		// 只想批量导入

	}

	// static class BatchImportMapper extends TableMapper<Text, Text> {
	// protected void map(
	// org.apache.hadoop.hbase.io.ImmutableBytesWritable key,
	// org.apache.hadoop.hbase.client.Result value,
	// org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.hbase.io.ImmutableBytesWritable,
	// org.apache.hadoop.hbase.client.Result, Text, Text>.Context context)
	// throws java.io.IOException, InterruptedException {
	// };
	// }

	static class BatchImportMapper extends
			Mapper<LongWritable, Text, Text, Text> {

		SimpleDateFormat simpleDataFormat = new SimpleDateFormat(
				"yyyyMMddHHmmss");

		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
				throws java.io.IOException, InterruptedException {

			String[] split = value.toString().split("\t");
			String time = split[0].trim();
			System.err.println("=="+time+"==");
			String formatDate = simpleDataFormat.format(new Date(Long
					.parseLong(time)));
			context.write(new Text(split[1]), new Text(split[1] + ":"
					+ formatDate + "\t" + value.toString()));
		};
	}

	static class BatchImportReducer extends TableReducer<Text, Text, Text> {
		protected void reduce(
				Text k2,
				java.lang.Iterable<Text> v2s,
				org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
				throws java.io.IOException, InterruptedException {

			for (Text text : v2s) {
				String[] split = text.toString().split("\t");
				String tableRowKey = split[0].trim();
				String phoneNum = split[2];
				String upPackNum = split[7];
				String downPackNum = split[8];
				String upPayLoad = split[9];
				String downPayLoad = split[10];
				String host = split[5];

				Put put = new Put(tableRowKey.getBytes());
				put.add(FAMILY_NAME.getBytes(), "phoneNum".getBytes(),
						phoneNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "upPackNum".getBytes(),
						upPackNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "downPackNum".getBytes(),
						downPackNum.getBytes());
				put.add(FAMILY_NAME.getBytes(), "upPayLoad".getBytes(),
						upPayLoad.getBytes());
				put.add(FAMILY_NAME.getBytes(), "downPayLoad".getBytes(),
						downPayLoad.getBytes());
				put.add(FAMILY_NAME.getBytes(), "host".getBytes(),
						host.getBytes());

				context.write(new Text(tableRowKey), put);

				// HTable htable=new HTable(new Configuration(),TABLE_NAME);
				// htable.put(put);
				//
				// context.write(new Text(tableRowKey), new Text(text));
			}

		};
	}

}
分享到:
评论

相关推荐

    MR程序Bulkload数据到hbase

    标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...

    java解决hive快速导数据到Hbase代码

    本文将详细讨论如何使用Java编程语言实现从Hive到HBase的快速数据导入方案。 首先,Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合大规模数据的离线...

    hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序

    标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...

    HDFS 通过mapreduce 进行 HBase 导入导出

    标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...

    HBase MapReduce完整实例

    2. 数据查询:通过MapReduce实现对HBase表的查询,可以在Map阶段进行过滤,Reduce阶段进行聚合操作。 3. 数据更新:在Map阶段定位到需要更新的行,然后在Reduce阶段完成更新操作。 4. 数据删除:Map阶段标识出需要...

    java代码将mysql表数据导入HBase表

    总结,通过上述步骤,我们可以成功地将MySQL中的数据导入到HBase。在实际项目中,可能需要考虑更多因素,例如数据清洗、错误处理、性能优化等。此外,为了实现大规模数据迁移,可以考虑使用批处理或MapReduce等技术...

    hbase导入话单数据mapreduce函数实现执行过程实例(博客附件)

    标题中的“hbase导入话单数据mapreduce函数实现执行过程实例”揭示了本文将探讨如何使用MapReduce在HBase中导入大数据,特别是话单记录。HBase是一个分布式、版本化的NoSQL数据库,常用于处理大规模的数据。...

    mapreduce方式入库hbase hive hdfs

    mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载

    基于MapReduce和HBase的海量网络数据处理.pdf

    基于MapReduce和HBase的海量网络数据处理 ...本文提出了一种基于MapReduce和HBase的海量网络数据处理系统,可以实现对海量网络数据的高效处理和分析,为大数据时代的网络数据处理提供了一种良好的解决方案。

    HBase MapReduce完整实例.rar

    在数据导入阶段,可以使用MapReduce将大量数据加载到HBase;在数据分析阶段,可以读取HBase中的数据进行复杂计算,然后将结果写回HBase或其他存储系统。这种结合使得HBase能够支持大规模的数据处理,同时也保留了其...

    关系型数据库的数据导入Hbase

    通过 Sqoop导出到Hbase,需要先将数据导入HDFS,再用Hbase的Import命令将数据加载到Hbase表中。 - Hadoop MapReduce:可以编写自定义的MapReduce作业,将RDBMS数据读取、转换并写入Hbase。这种方法灵活性高,但开发...

    将hdfs上的文件导入hbase的源代码

    7. **监控和验证**:导入完成后,通过HBase的监控工具或自定义脚本检查导入结果,确保数据正确无误地导入到HBase。 在提供的"ImpDataToHbase"源代码中,我们可以看到这些步骤的具体实现。源代码可能包括了数据...

    MapReduce on Hbase

    在MapReduce的基础上,可以实现对HBase数据库的数据操作,包括读取、写入和更新HBase表中的数据。 在使用MapReduce操作HBase时,可以通过Hadoop MapReduce框架提供的API与HBase数据库进行交互。这使得开发者可以在...

    hadoop mr file2hfile2hbase

    【标题】:“Hadoop MR(MapReduce)将文件转换为HFile并导入到HBase” 在大数据处理领域,Hadoop MapReduce是一种广泛使用的分布式计算框架,而HBase是基于Hadoop的数据存储系统,用于处理大规模的非结构化数据。...

    hbase海量数据的全量导入方法

    1. **预分区**:在导入数据前,根据预计的数据量创建足够的区域(region),避免数据导入过程中动态分区导致的性能损耗。 2. **使用HFileOutputFormat**:HBase提供了HFileOutputFormat类,可以将数据直接写入HFile...

    hbase备份和数据恢复

    2. HBase到Hive:通过MapReduce任务,将HBase中的数据导入到Hive,创建Hive表并加载数据。也可以使用HBaseSerDe来解析HBase数据。 三、HBase和HDFS互导 1. HBase到HDFS:可以通过HBase的Export工具,将HBase表的...

    hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包

    例如,可以使用HBase的API创建和管理数据库,使用Hive的HQL进行数据分析,以及编写自定义的MapReduce作业来执行特定的数据处理任务。 总结来说,这个压缩包提供了Hadoop生态系统的几个关键组件的示例和库,对于学习...

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...

    结合MapReduce和HBase的遥感图像并行分布式查询.pdf

    实验结果显示,随着Hadoop和HBase集群规模的扩大,图像数据的导入和检索速度显著提高。这种方法尤其适用于处理大量遥感图像,可以在普通X86架构的服务器上实现高效的数据管理和检索,而不需要大量的计算资源。 总结...

    HBase基本操作.pdf

    HBase提供了数据的批量导入操作,常见的工具有HBase自带的ImportTsv和CompleteBulkLoad工具,可以高效地将大量数据导入HBase表中。 ### 实验原理和环境 HBase是一种基于列存储的数据库,它适用于存储非结构化和半...

Global site tag (gtag.js) - Google Analytics