`
zhangbaoming815
  • 浏览: 150150 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

使用MapReduce往Hbase插入数据

阅读更多
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.hbase.log.RecordParser;

public class HbaseInsertData {
	
	public static class HbaseMapper 
		extends Mapper<LongWritable, Text, Text, Text>{
		
		RecordParser parser = new RecordParser();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			parser.parse(value);
			String phone = parser.getPhone();
			int bloodPressure = parser.getBloodPressure();
			if(bloodPressure > 150) {
				context.write(new Text(phone), new Text(bloodPressure + ""));
			}
		}
	}
	
	public static class HbaseReducer
		extends TableReducer<Text, Text, ImmutableBytesWritable> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			String value = values.iterator().next().toString();
			Put putRow = new Put(key.getBytes());
			putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
			
			context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
		}
	}
	
	public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  //千万别忘记配置

		Job job = new Job(conf, "count");
		
		job.setJarByClass(HbaseInsertData.class);
		job.setMapperClass(HbaseMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		Path in = new Path("hdfs://localhost:9000/input");
		FileInputFormat.addInputPath(job, in);
		
		TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

解析的classRecordParser

import org.apache.hadoop.io.Text;

public class RecordParser {
	
	private String id;
	private String phone;
	private int bloodPressure;
	
	public void parse(String record) {
		String[] logs = record.split(",");
		id = logs[1];
		phone = logs[3];
		bloodPressure = Integer.parseInt(logs[4]);
	}
	
	public void parse(Text record) {
		this.parse(record.toString());
	}

	public String getId() {
		return id;
	}

	public String getPhone() {
		return phone;
	}

	public int getBloodPressure() {
		return bloodPressure;
	}
}
 
分享到:
评论

相关推荐

    可视化毕业设计:数据处理(MapReduce)+数据展示(hbase+java+echarts).zip

    主要涉及三个核心部分:数据处理使用MapReduce,数据存储使用HBase,以及数据展示借助Java和ECharts。下面将详细阐述这些关键技术及其在项目中的应用。 1. **MapReduce**: MapReduce是一种分布式计算模型,由...

    MR程序Bulkload数据到hbase

    标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...

    hbase和hadoop数据块损坏处理

    * hbase org.apache.hadoop.hbase.mapreduce.Import &lt;outputdir&gt; &lt;tablename&gt;:将 HDFS 文件插入到 HBase 表中 * snapshot 'test', 'snap_test':创建名为 snap_test 的快照 * hbase org.apache.hadoop.hbase....

    HBase基本操作.pdf

    可以使用`put`命令插入数据时指定时间戳,如`put '表名', '行值', '列族:列名', '数据', 时间戳`。通过设置列族属性VERSIONS的值,可以控制每个列族的最大版本数。 #### 11. 数据批量导入 HBase提供了数据的批量...

    Hive、MySQL、HBase数据互导

    5. **HBase Java API编程**:掌握如何使用Java编程接口与HBase交互,包括创建表、插入数据、查询数据等操作。 6. **Eclipse开发工具使用**:熟悉Eclipse环境下的Java项目配置和调试,以便编写和测试HBase的Java程序...

    Hbase 安装与基本使用

    2. **插入数据**:插入数据通过`put '表名', '行键', '列族:列标识', '值'`完成,如`put 'users', 'user1', 'info:name', '张三'`。 3. **查询数据**:使用`get '表名', '行键'`获取行数据,`get 'users', 'user1'`...

    hbase用于查询客户端工具

    HBase Shell提供了创建表、删除表、插入数据、查询数据等基本操作,同时也支持复杂的条件查询和扫描。通过Shell,你可以直接执行HBase的API调用,进行调试和测试。 2. **HBase REST Gateway**:REST...

    【HBase企业应用开发】工作中自己总结的Hbase文档,非常全面!

    HBaseshell的基本用法包括创建表和列族、插入数据、按设计的表结构插入值、根据键值查询数据、扫描所有数据、删除指定数据、修改表结构、统计行数、执行disable和enable操作、表的删除以及hbaseshell脚本的使用。...

    Hadoop数据迁移--从Hadoop向HBase

    通过以上详细的分析和解释,我们可以清晰地了解到从Hadoop向HBase进行数据迁移的具体步骤和技术要点,包括MapReduce的使用、数据插入机制、HBase表的创建以及Mapper类的实现细节。这对于理解和实施Hadoop与HBase之间...

    hbase权威指南源码

    8. **MapReduce与HBase**:书中可能包括使用MapReduce与HBase进行批量数据处理的示例。 9. **HBase与Hadoop集成**:源码会演示如何将HBase与Hadoop生态系统(如HDFS、YARN)集成,进行数据导入导出。 10. **客户端...

    HBase官方指南——数据模型篇

    HBase是一种开源的非关系型分布式数据库(NoSQL),它运行在Hadoop文件系统之上,为大数据存储和处理提供了一...在实际应用中,设计合适的行键、合理使用列簇和版本管理以及恰当的命名空间规划是高效使用HBase的关键。

    hadoop1.1.2操作例子 包括hbase hive mapreduce相应的jar包

    在压缩包中的`hbase`文件可能包含了HBase的客户端库,用户可以利用这些库来创建表、插入数据、查询数据等。 **Hive**: Hive是Facebook开发的一个数据仓库工具,它建立在Hadoop之上,允许数据分析师使用SQL-like...

    HBase官方文档

    它支持创建表、插入数据、查询数据、删除数据等操作。熟悉HBaseShell是HBase日常使用的基础。 ### 数据模型 HBase的数据模型主要包括概念视图、物理视图、表、行、列族、Cells等。HBase采用的是列族式存储模型,每...

    MapReduce实例

    在这个实例中,MapReduce从Hbase中读取数据,可能涉及到使用Hbase的API来扫描表,获取千万级别的记录。 接着,我们来到了**MapReduce** 阶段。MapReduce包含两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始...

    大数据HBASE考题材料

    HBase中的批量加载通常使用MapReduce实现,这种方式可以有效地处理大量的数据导入操作。 #### 多项选择题解析 1. **HBase的特性** - HBase是开源的:虽然题目中提到“不是开源的”这个选项不正确,但HBase确实...

    HBase官方文档中文版-HBase手册中文版

    1. CRUD操作:创建表、删除表、插入数据、查询数据、更新数据、删除数据。 2. 扫描器(Scanner):用于批量读取数据,支持设定条件过滤。 3. 表的分区与负载均衡:通过调整Region大小和分布,实现集群负载均衡。 五...

    java代码将mysql表数据导入HBase表

    这段代码首先连接到MySQL数据库,读取`users`表的所有数据,然后逐条插入到HBase的`user_data`表中。注意,由于HBase不支持日期类型,我们通常将日期转换为长整型(以毫秒为单位)进行存储。 总结,通过上述步骤,...

    test_hbase - feed 插入,查询

    3. 插入数据:使用`Table.put()`方法,传入一个Put对象,包含行键、列族、列和值。 4. 提交更改:调用`Table.flushCommits()`确保更改被写入。 对于查询,HBase提供多种方式,包括单行查询、扫描和过滤。基础查询...

    HBase技术介绍.docx

    HBase提供了专门的TableInputFormat和TableOutputFormat,使得开发者能够将HBase表作为MapReduce作业的数据源和目标。 #### 六、HBase系统架构 - **Client**: 使用HBase RPC机制与HMaster和HRegionServer通信。 - ...

Global site tag (gtag.js) - Google Analytics