`
sunasheng
  • 浏览: 123887 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Mapreduce构建Hbase索引

阅读更多

该博客已经完全转移到http://sunhs.me

 

中并增加更多新的技术内容(hadoop为

 

主),欢迎访问!

package test;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.GenericOptionsParser;

public class IndexBuilder {
	// 索引表唯一的一列为INDEX:ROW,其中INDEX为列族
	public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
	public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");

	public static class Map
			extends
			Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
		private byte[] family;
		// 存储了“列名”到“表名-列名”的映射
		// 前者用于获取某列的值,并作为索引表的键值,后者用于作为表的表名
		private HashMap<byte[], ImmutableBytesWritable> indexes;

		@Override
		protected void map(ImmutableBytesWritable rowKey, Result result,
				Context context) throws IOException, InterruptedException {
			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes
					.entrySet()) {
				byte[] qualifier = index.getKey();// 获得列名
				ImmutableBytesWritable tableName = index.getValue();// 索引表的表名
				byte[] value = result.getValue(family, qualifier);// 根据“列族:列名”获得元素的值
				if (value != null) {
					// 以列值作为行键,在列“INDEX:ROW”中插入行键
					Put put = new Put(value);
					put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
					// 在tableName表上执行put操作
					// 使用MultiOutputFormat时,第二个参数必须是Put或者Delete类型
					context.write(tableName, put);
				}
			}
		}

		/**
		 * setup为Mapper中的方法,该方法只在任务初始化时执行一次
		 */
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			Configuration configuration = context.getConfiguration();
			String tableName = configuration.get("index.tablename");
			String[] fields = configuration.getStrings("index.fields");
			// fields内为需要做索引的列名
			String familyName = configuration.get("index.familyname");
			family = Bytes.toBytes(familyName);

			// 初始化indexes方法
			indexes = new HashMap<byte[], ImmutableBytesWritable>();
			for (String field : fields) {
				// 如果给name做索引,则索引表的名称为“heroes-name”
				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
						Bytes.toBytes(tableName + "-" + field)));
			}
		}
	}

	public static Job configureJob(Configuration conf, String[] args)
			throws IOException {
		String tableName = args[0];
		String columnFamily = args[1];
		System.out.println("****" + tableName);
		// 通过Configuration.set()方法传递参数
		conf.set(TableInputFormat.SCAN,
				TableMapReduceUtil.convertScanToString(new Scan()));
		conf.set(TableInputFormat.INPUT_TABLE, tableName);
		conf.set("index.tablename", tableName);
		conf.set("index.familyname", columnFamily);
		String[] fields = new String[args.length - 2];
		for (int i = 0; i < fields.length; i++) {
			fields[i] = args[i + 2];
		}
		conf.setStrings("index.fields", fields);
		conf.set("index.familyname", "attributes");

		// 配置任务的运行参数
		Job job = new Job(conf, tableName);
		job.setJarByClass(IndexBuilder.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TableInputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);
		return job;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length < 3) {
			System.err.println("Only " + otherArgs.length
					+ " arguments supplied, required: 3");
			System.err
					.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
			System.exit(-1);
		}
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
// 运行:
// 如果要对heroes中的name和email列构建索引,则运行参数设为heroes info name email

 

分享到:
评论

相关推荐

    MapReduce on Hbase

    1. 准备阶段:配置HBase的环境,包括设置HBase的表结构、索引和数据模型。 2. Map阶段:定义Map函数,该函数将从HBase表中读取数据,并对数据进行预处理,形成键值对(key-value pairs)。 3. Shuffle阶段:...

    google mapreduce bigtable hbase 论文中文版

    2. **广泛的适用性**:从Web索引到Google Earth等不同需求的应用都能够使用Bigtable进行数据存储。 3. **高性能与可扩展性**:Bigtable不仅支持高吞吐量的批处理任务,还能够即时响应用户请求,提供快速的数据访问...

    HBase MapReduce完整实例.rar

    HBase的核心特性包括强一致性的读写操作、水平扩展的架构以及基于行键的索引,这些特性使得它在大数据领域中独树一帜。 MapReduce是处理大数据的一种编程模型,它将复杂的计算任务分解为两个阶段:Map阶段和Reduce...

    HBase二级索引

    3. 实时索引构建:另外一种创建二级索引的方式是利用MapReduce或者Spark等大数据处理框架,通过批处理的方式创建索引。这种方法在数据量不是特别大时,可以快速生成二级索引,但是在数据量巨大并且实时性要求很高的...

    基于Hbase的大数据查询优化

    类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了研究,借助分布式计算框架Mapreduce在Hbase上构建了二级索引,就可以对表进行有针对性的定位和高效率的查找,同时也减轻zookeeper服务对资源调度的压力...

    基于hadoop和hbase的分布式索引集群研究.pdf

    通过对Hadoop和HBase的理解和运用,可以构建一个高效率的分布式索引集群,用以支持大数据环境下的搜索引擎需求。这种集群利用分布式计算和存储的优势,通过高效的分布式倒排索引算法处理索引任务,并将索引表存储在...

    hbase二级索引

    为了实现事务,通常需要借助于如HBase的Coprocessor机制或者外部系统如Hadoop的HDFS和MapReduce。 在HBase的二级索引实现中,一种常见的方式是使用 Coprocessor。Coprocessor 是 HBase 内置的一种扩展机制,允许...

    Hadoop集群上基于HBase的大数据索引构建

    使用场景和目的:本案例适用于企业环境中大规模日志记录搜索应用,或者任何其他需要高效文本匹配的情景,旨在帮助技术人员了解构建大规模数据库索引所涉及的关键技术和最佳实践。 其它相关信息:报告里不仅有详细的...

    google三大论文 gfs bigtable mapreduce hadoop hdfs hbase的原型

    - HBase 是一个基于Bigtable 构建的分布式列式存储系统,用于存储大规模结构化数据。 - 它提供了类似于Bigtable 的数据模型,但增加了更多的特性和功能,以适应更广泛的应用场景。 综上所述,Google三大论文...

    大数据技术 Hadoop开发者第二期 MapReduce HDFS Hive Mahout HBase 共64页.pdf

    - 应用 MapReduce 框架进行数据处理和索引构建。 - **关键点**: - 改写 Nutch 的 segment 文件存储接口以支持自定义爬虫需求。 - 识别并解决 Nutch 中 MapReduce 应用的特殊点。 #### 三、支持自定义爬虫的 ...

    Hbase二级索引与JOIN

    - **解决方法**:通常通过构建额外的数据结构来模拟二级索引,例如使用MapReduce作业预处理数据,创建辅助表等。 - **JOIN操作**:由于HBase的设计初衷是为了支持简单的键值查询,因此其并不直接支持JOIN操作。 - ...

    InformationInteraction:信息交互MapReduce库-HBase + YARN(实施环境)

    这种模型非常适合处理海量数据,如日志分析、搜索引擎索引构建等。 HBase,全称为Apache HBase,是一款基于谷歌Bigtable设计的开源分布式数据库,运行在Hadoop之上。它是一个NoSQL数据库,提供实时读写访问,支持大...

    大数据HBASE考题材料

    HBase构建在Hadoop的HDFS之上,这意味着HBase利用HDFS来存储其底层数据,从而获得HDFS提供的高可靠性和高容错性。 4. **消息通信机制** HBase使用Apache Zookeeper来提供消息通信机制,这包括协调服务、命名服务...

    HBase@不睡觉书副本.rar

    此外,还讨论了HBase的索引、过滤器、MapReduce与HBase的集成、HBase的数据备份和恢复等高级主题,以满足不同层次读者的需求。 书中还特别强调了HBase在实际项目中的应用案例,例如在互联网日志分析、物联网数据...

    hbase-1.0.1.1-bin.tar.gz.zip

    - 分布式存储:HBase构建在Hadoop之上,利用HDFS作为底层存储,能够自动分散数据到集群的各个节点。 - 列族存储:不同于关系型数据库,HBase以列族为单位存储数据,每个列族包含一系列相关的列。 - 行键(Row Key...

    大数据技术 Hadoop开发者第二期 Nutch MapReduce HDFS Hive Mahout HBase 共64页.r

    1. **Nutch**:Nutch是一款开源的网络爬虫项目,它主要用于抓取互联网上的网页并进行索引,是构建搜索引擎的基础。Nutch与Hadoop结合,利用其分布式存储和计算能力,可以高效地处理大量网页数据,实现大规模的网页...

    HBase大数据.zip

    - **搜索引擎索引**:构建搜索索引,实现快速查询。 《HBase实战+权威指南》这本书很可能详细涵盖了这些知识点,并通过实例和最佳实践来帮助读者深入理解和应用HBase。对于希望在大数据领域中利用HBase解决存储和...

    hbase详解,apache_hbase_reference_guide

    Apache HBase是一个开源、非关系型、分布式存储系统,它是Apache Software Foundation下的Hadoop项目的一...通过这份指南,用户可以构建、管理和优化运行HBase的环境,并充分利用其在大数据存储和分析方面的强大功能。

Global site tag (gtag.js) - Google Analytics