`

Hbase MapReduce Integration

 
阅读更多

Delete Hbase rows example

 

$hadoop jar ./sponge-hserver.jar com.citi.sponge.mapreduce.MRDeleteRows -Dtable="elf_log" -DstartKey="10000:1365663164575:88888:testhome" -DstopKey="10000:1365663164575:88890:testhome" -Dquorum="vm-15c2-3bbf.nam.nsroot.net,vm-ab1f-dd21.nam.nsroot.net,vm-cb03-2277.nam.nsroot.net" 

 
$hadoop jar ./sponge-hserver.jar com.citi.sponge.mapreduce.MRDeleteRows -Dtable="elf_log" -Dappid="10000" -DstartTime="2010-01-01-01-01" -DstopTime="2014-01-01-01-01" -Dquorum="vm-15c2-3bbf.nam.nsroot.net,vm-ab1f-dd21.nam.nsroot.net,vm-cb03-2277.nam.nsroot.net"
 

 

import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MRDeleteRows extends Configured implements Tool {
	String startRowKey;
	String stopRowKey;
	String quorum;
	String table; 
	
	String startTime;
	String stopTime;
	String appID;
	
	public String getStartTime() {
		return startTime;
	}

	public String getStopTime() {
		return stopTime;
	}

	public String getAppID() {
		return appID;
	}

	public String getQuorum() {
		return quorum;
	}

	public String getStartRowKey() {
		return startRowKey;
	}

	public String getStopRowKey() {
		return stopRowKey;
	}
	
	public String getTable() {
		return table;
	}


	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		for (Entry<String, String> entry : conf) {
			if (entry.getKey().equals("startKey")) {
				this.startRowKey = entry.getValue();
			}
			if (entry.getKey().equals("stopKey")) {
				this.stopRowKey = entry.getValue();
			}

			if (entry.getKey().equals("quorum")) {
				this.quorum = entry.getValue();
			}
			
			if (entry.getKey().equals("table")) {
				this.table = entry.getValue();
			}
			
			if (entry.getKey().equals("startTime")) {
				this.startTime = entry.getValue();
			}
			if (entry.getKey().equals("stopTime")) {
				this.stopTime = entry.getValue();
			}
			
			if (entry.getKey().equals("appid")) {
				this.appID = entry.getValue();
			}
			 
		}
		return 0;
	}
	
	static String getRowKey(String appID, String time){ 
		
		DateFormat df =  new SimpleDateFormat("yyyy-MM-dd-HH-mm");
		Date date = null;   
		try{
			date = df.parse(time);
		}catch(ParseException e){
			System.out.println("Please input correct date format");
			System.exit(1);
		}
		
		return appID + ":" + date.getTime();
		
	}
	 

	static class DeleteMapper extends
			TableMapper<ImmutableBytesWritable, Delete> {
		public DeleteMapper() {
		} 
		@Override
		public void map(ImmutableBytesWritable row, Result value,
				Context context) throws IOException { 
			ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get());
			try{
				Delete delete = new Delete(row.get());
				context.write(userKey, delete);
			} catch (InterruptedException e){
				e.printStackTrace();
				throw new IOException(e);
			} 
		}

	}
	 

	public static void main(String[] args) throws Exception {
		MRDeleteRows deleteElf = new MRDeleteRows();
		ToolRunner.run(deleteElf, args);
		Configuration config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", deleteElf.getQuorum());
	 	Job job = new Job(config, "DeleteHbaseRowkeys"); 
		job.setJarByClass(MRDeleteRows.class);
		Scan scan = new Scan();
	 	System.out.println("quorum: " + deleteElf.getQuorum());
		System.out.println("table: " + deleteElf.getTable());
		if(deleteElf.getStartRowKey()!=null && deleteElf.getStopRowKey()!=null){
			System.out.println("startkey: " + deleteElf.getStartRowKey());
			System.out.println("stopkey: " + deleteElf.getStopRowKey());
			scan.setStartRow(deleteElf.getStartRowKey().getBytes());
			scan.setStopRow(deleteElf.getStopRowKey().getBytes()); 
		} 
		if(deleteElf.getAppID()!=null && deleteElf.getStartTime()!=null && deleteElf.getStopTime()!=null){
			System.out.println("AppID: " + deleteElf.getAppID());
			System.out.println("start time: " + deleteElf.getStartTime());
			System.out.println("stop time: " + deleteElf.getStopTime()); 			
			scan.setStartRow(getRowKey(deleteElf.getAppID(),deleteElf.getStartTime()).getBytes());
			scan.setStopRow(getRowKey(deleteElf.getAppID(),deleteElf.getStopTime()).getBytes()); 
		}
		
		scan.setCacheBlocks(false);

		TableMapReduceUtil.initTableMapperJob(deleteElf.getTable(), scan,
				DeleteMapper.class, ImmutableBytesWritable.class, Delete.class,
				job);
		TableMapReduceUtil.initTableReducerJob(deleteElf.getTable(), null, job);

		boolean b = job.waitForCompletion(true);
		if (!b) {
			throw new IOException("error with job!");
		}
	}

}

 

Hbase Loader MapReduce Example

import java.io.IOException;
import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Sample Uploader MapReduce
 * <p>
 * This is EXAMPLE code. You will need to change it to work for your context.
 * <p>
 * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
 * to suit your data. In this example, we are importing a CSV file.
 * <p>
 * 
 * <pre>
 * row,family,qualifier,value
 * </pre>
 * <p>
 * The table and columnfamily we're to insert into must preexist.
 * <p>
 * There is no reducer in this example as it is not necessary and adds
 * significant overhead. If you need to do any massaging of data before
 * inserting into HBase, you can do this in the map as well.
 * <p>
 * Do the following to start the MR job:
 * 
 * <pre>
 * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
 * </pre>
 * <p>
 * This code was written against HBase 0.21 trunk.
 * 
 * Before running this job, please make sure set HADOOP_CLASSPATH. You need to include zookeeper.jar and hbase-0.90.4-cdh3u3.jar
 */
public class BulkLoaderToHbase {

	private static final String NAME = "BulkLoaderToHbase";
	private static byte[] SYSINFO;
	private static byte[] CONTENT;
	private static byte[] APP_ID;
	private static byte[] ENV;
	private static byte[] HOSTNAME;
	private static byte[] BODY;
	private static byte[] LOG_FILE_NAME;
	private static byte[] LOG_TYPE;
	private static byte[] LOG_FILE_PATH;
	
	private static byte[] appId_v;
	private static byte[] env_v;
	private static byte[] hostname_v;
	private static byte[] logPath_v;
	private static byte[] logFileName_v;
	private static byte[] logType_v;
	
	private static long nano = 0;
	
	static class Uploader extends
			Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

		private long checkpoint = 100;
		private long count = 0;

		@Override
		public void map(LongWritable key, Text line, Context context)
				throws IOException {

			Calendar cal = Calendar.getInstance();
			String rowkey = Bytes.toString(appId_v) + ":" + cal.getTimeInMillis() + ":" + (nano++) + ":" + Bytes.toString(hostname_v);
			byte[] rowKeyValue = Bytes.toBytes(rowkey);
			Put put = new Put(rowKeyValue);

			put.add(SYSINFO, APP_ID, appId_v);
			put.add(SYSINFO, ENV, env_v);
			put.add(SYSINFO, HOSTNAME, hostname_v);
			put.add(CONTENT, BODY, line.getBytes());
			put.add(CONTENT, LOG_FILE_PATH, logPath_v);
			put.add(CONTENT, LOG_FILE_NAME, logFileName_v);
			put.add(CONTENT, LOG_TYPE, logType_v);

			// Uncomment below to disable WAL. This will improve performance but
			// means
			// you will experience data loss in the case of a RegionServer
			// crash.
			// put.setWriteToWAL(false);

			try {
				context.write(new ImmutableBytesWritable(rowKeyValue), put);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			// Set status every checkpoint lines
			if (++count % checkpoint == 0) {
				context.setStatus("Emitting Put " + count);
			}
		}
	}

	/**
	 * Job configuration.
	 */
	public static Job configureJob(Configuration conf, String[] args)
			throws IOException {
		
	    SYSINFO = Bytes.toBytes("sysInfo");
	    CONTENT = Bytes.toBytes("content");
	    APP_ID = Bytes.toBytes("appId");
	    ENV = Bytes.toBytes("env");
	    HOSTNAME = Bytes.toBytes("hostName");
	    BODY = Bytes.toBytes("body");
	    LOG_FILE_PATH = Bytes.toBytes("logFilePath");
	    LOG_FILE_NAME = Bytes.toBytes("logFileName");
	    LOG_TYPE = Bytes.toBytes("logType");
		
		Path inputPath = new Path(args[0]);
		String tableName = args[1];
		appId_v = Bytes.toBytes(args[2]);
		env_v = Bytes.toBytes(args[3]);
		hostname_v = Bytes.toBytes(args[4]);
		logPath_v = Bytes.toBytes(args[5]);
		logFileName_v = Bytes.toBytes(args[6]);
		logType_v = Bytes.toBytes(args[7]);
		
		
		Job job = new Job(conf, NAME + "_" + tableName);
		job.setJarByClass(Uploader.class);
		FileInputFormat.setInputPaths(job, inputPath);
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(Uploader.class);
		// No reducers. Just write straight to table. Call initTableReducerJob
		// because it sets up the TableOutputFormat.
		TableMapReduceUtil.initTableReducerJob(tableName, null, job);
		job.setNumReduceTasks(0);
		return job;
	}

	/**
	 * Main entry point.
	 * 
	 * @param args
	 *            The command line parameters.
	 * @throws Exception
	 *             When running the job fails.
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 8) {
			System.err
					.println("Wrong number of arguments: " + otherArgs.length);
			System.err.println("Usage: " + NAME + " <input> <tablename> <appId> <env> <hostname> <logpath> <logFileName> <logType>");
			System.exit(-1);
		}
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

分享到:
评论

相关推荐

    HBase.The.Definitive.Guide.2nd.Edition

    Use new classes to integrate HBase with Hadoop’s MapReduce framework Explore HBase’s architecture, including the storage format, write-ahead log, and background processes Dive into advanced usage, ...

    Sams.Teach.Yourself.Big.Data.Analytics.with.Microsoft.HDInsight

    Doing big data analytics with MapReduce, writing your MapReduce programs in your choice of .NET programming language such as C# Using Hive for big data analytics, demonstrate end to end scenario and ...

    hive官方翻译文档

    Hive可以通过Hive-HBase Integration实现对HBase数据的SQL查询,使得大数据分析与实时查询得以结合。 在《Hive用户指南》的中文版中,读者可以找到关于如何创建表、加载数据、执行查询、管理元数据、优化查询性能等...

    elephant-bird-core-4.6rc1.zip

    六、HBase Integration 对于NoSQL数据库HBase,Elephant Bird提供了对LZO和protobuf的支持,包括对列族的配置和数据读写的优化。这使得HBase可以更好地处理压缩数据,减少了存储空间需求,同时保持了高性能。 总结...

    jars大数据开发接口

    以及各种NoSQL数据库(如Cassandra、HBase)的Java驱动程序,便于Java应用程序与这些数据库交互。 描述中的“厉害了”可能是指这些大数据JAR包的复杂性和功能强大。这些接口和库不仅简化了大数据处理的复杂性,还...

    Hive编程指南

    1. **Hive与HBase集成**: 通过Hive-HBase integration,可以在Hive中直接查询HBase存储的数据。 2. **Hive与Pig和Spark集成**: Hive可以与Pig、Spark等工具无缝对接,共同完成大数据处理任务。 3. **Hive与ETL工具...

    大数据全栈学习【生态组件,技术栈,数据流,数据仓库,数据库,指标体系,血缘关系,元数据管理,数据质量,DataWor.zip

    接下来是“技术栈”,这涉及到一系列技术的组合,包括数据采集(如Flume、Scribe)、数据预处理(ETL工具如Sqoop、Data Integration)、数据存储(如HBase、Cassandra)、数据处理(如Spark SQL、Presto)、数据可视...

    spring data hadoop reference

    此框架旨在简化在 Hadoop 生态系统中的开发工作,提供了一种更加面向 Spring 的方式来处理 MapReduce 任务、HDFS 文件系统操作以及与 HBase 和 Hive 等数据存储系统的集成。 #### 二、Spring 和 Hadoop ##### 2.1 ...

    impala-2.8

    - **Integration**: It seamlessly integrates with existing Hadoop ecosystems, including HDFS, HBase, and other components. #### How Impala Works with Apache Hadoop Impala complements the Hadoop ...

    大数据系列2020-数据迁移工具资料汇总(sqoop、kettle、datax).zip

    Sqoop利用MapReduce作业来处理数据传输,确保了并行性和可扩展性。使用Sqoop,你可以通过SQL查询来选择要迁移的数据,方便灵活。 Kettle,全称Pentaho Data Integration,是一款开源的数据集成工具。它的设计目标是...

    大数据技术交流37.pptx

    此外,IBM还支持如MapReduce、Hive、Pig、HBase等开源技术,以及机器学习和文本处理等功能。 IBM的Hadoop平台BigInsights提供了丰富的组件,如Jaql和Hive用于数据处理,Flume用于数据采集,HDFS作为分布式文件系统...

    kettle 开发视频文档2

    Kettle,也称为Pentaho Data Integration(PDI),是一款强大的ETL(Extract, Transform, Load)工具,专为数据集成而设计。它允许用户从各种数据源抽取、转换和加载数据,支持大数据处理和高可用性场景。在这个...

    chukwa-src-0.8.0.tar

    3. Hadoop Integration:Chukwa利用Hadoop的MapReduce框架进行数据处理,这使得它能够处理PB级别的大数据。 4. Adapters:适配器允许Chukwa与各种不同的数据源集成,包括系统日志、应用日志、性能指标等。 5. ...

    基于Greenplum Hadoop- 分布式平台的大数据解决方案22 - 管理数据(1).zip

    这通常涉及到ETL(提取、转换、加载)工具,如Pentaho Data Integration或Apache Nifi。 2. **数据整合**:在分布式环境中,数据可能来自多个源,因此如何有效地整合这些数据是管理数据的关键。这可能涉及到数据...

    BI Kettle文档汇集

    7. **大数据支持**:Kettle与Hadoop生态系统紧密集成,能够处理大规模数据,如通过Hadoop输入/输出步骤进行MapReduce操作,或者使用Pentaho Big Data Plugin处理Hive、HBase等。 8. **性能优化**:Kettle支持并行...

    Professional-ASPNET-MVC-5

    根据提供的文件信息,可以看出这份文档实际上是在描述如何安装与配置Hadoop、MapReduce以及HBase的过程,而不是关于“Professional-ASP.NET-MVC-5”的相关内容。不过,为了满足任务需求,我们将从标题“Professional...

Global site tag (gtag.js) - Google Analytics