import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.hbase.log.RecordParser;
public class HbaseInsertData {
public static class HbaseMapper
extends Mapper<LongWritable, Text, Text, Text>{
RecordParser parser = new RecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
String phone = parser.getPhone();
int bloodPressure = parser.getBloodPressure();
if(bloodPressure > 150) {
context.write(new Text(phone), new Text(bloodPressure + ""));
}
}
}
public static class HbaseReducer
extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
String value = values.iterator().next().toString();
Put putRow = new Put(key.getBytes());
putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", "localhost"); //千万别忘记配置
Job job = new Job(conf, "count");
job.setJarByClass(HbaseInsertData.class);
job.setMapperClass(HbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/input");
FileInputFormat.addInputPath(job, in);
TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
解析的classRecordParser
import org.apache.hadoop.io.Text;
public class RecordParser {
private String id;
private String phone;
private int bloodPressure;
public void parse(String record) {
String[] logs = record.split(",");
id = logs[1];
phone = logs[3];
bloodPressure = Integer.parseInt(logs[4]);
}
public void parse(Text record) {
this.parse(record.toString());
}
public String getId() {
return id;
}
public String getPhone() {
return phone;
}
public int getBloodPressure() {
return bloodPressure;
}
}
分享到:
相关推荐
主要涉及三个核心部分:数据处理使用MapReduce,数据存储使用HBase,以及数据展示借助Java和ECharts。下面将详细阐述这些关键技术及其在项目中的应用。 1. **MapReduce**: MapReduce是一种分布式计算模型,由...
标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...
* hbase org.apache.hadoop.hbase.mapreduce.Import <outputdir> <tablename>:将 HDFS 文件插入到 HBase 表中 * snapshot 'test', 'snap_test':创建名为 snap_test 的快照 * hbase org.apache.hadoop.hbase....
可以使用`put`命令插入数据时指定时间戳,如`put '表名', '行值', '列族:列名', '数据', 时间戳`。通过设置列族属性VERSIONS的值,可以控制每个列族的最大版本数。 #### 11. 数据批量导入 HBase提供了数据的批量...
5. **HBase Java API编程**:掌握如何使用Java编程接口与HBase交互,包括创建表、插入数据、查询数据等操作。 6. **Eclipse开发工具使用**:熟悉Eclipse环境下的Java项目配置和调试,以便编写和测试HBase的Java程序...
2. **插入数据**:插入数据通过`put '表名', '行键', '列族:列标识', '值'`完成,如`put 'users', 'user1', 'info:name', '张三'`。 3. **查询数据**:使用`get '表名', '行键'`获取行数据,`get 'users', 'user1'`...
HBase Shell提供了创建表、删除表、插入数据、查询数据等基本操作,同时也支持复杂的条件查询和扫描。通过Shell,你可以直接执行HBase的API调用,进行调试和测试。 2. **HBase REST Gateway**:REST...
HBaseshell的基本用法包括创建表和列族、插入数据、按设计的表结构插入值、根据键值查询数据、扫描所有数据、删除指定数据、修改表结构、统计行数、执行disable和enable操作、表的删除以及hbaseshell脚本的使用。...
通过以上详细的分析和解释,我们可以清晰地了解到从Hadoop向HBase进行数据迁移的具体步骤和技术要点,包括MapReduce的使用、数据插入机制、HBase表的创建以及Mapper类的实现细节。这对于理解和实施Hadoop与HBase之间...
8. **MapReduce与HBase**:书中可能包括使用MapReduce与HBase进行批量数据处理的示例。 9. **HBase与Hadoop集成**:源码会演示如何将HBase与Hadoop生态系统(如HDFS、YARN)集成,进行数据导入导出。 10. **客户端...
HBase是一种开源的非关系型分布式数据库(NoSQL),它运行在Hadoop文件系统之上,为大数据存储和处理提供了一...在实际应用中,设计合适的行键、合理使用列簇和版本管理以及恰当的命名空间规划是高效使用HBase的关键。
在压缩包中的`hbase`文件可能包含了HBase的客户端库,用户可以利用这些库来创建表、插入数据、查询数据等。 **Hive**: Hive是Facebook开发的一个数据仓库工具,它建立在Hadoop之上,允许数据分析师使用SQL-like...
它支持创建表、插入数据、查询数据、删除数据等操作。熟悉HBaseShell是HBase日常使用的基础。 ### 数据模型 HBase的数据模型主要包括概念视图、物理视图、表、行、列族、Cells等。HBase采用的是列族式存储模型,每...
在这个实例中,MapReduce从Hbase中读取数据,可能涉及到使用Hbase的API来扫描表,获取千万级别的记录。 接着,我们来到了**MapReduce** 阶段。MapReduce包含两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始...
HBase中的批量加载通常使用MapReduce实现,这种方式可以有效地处理大量的数据导入操作。 #### 多项选择题解析 1. **HBase的特性** - HBase是开源的:虽然题目中提到“不是开源的”这个选项不正确,但HBase确实...
1. CRUD操作:创建表、删除表、插入数据、查询数据、更新数据、删除数据。 2. 扫描器(Scanner):用于批量读取数据,支持设定条件过滤。 3. 表的分区与负载均衡:通过调整Region大小和分布,实现集群负载均衡。 五...
这段代码首先连接到MySQL数据库,读取`users`表的所有数据,然后逐条插入到HBase的`user_data`表中。注意,由于HBase不支持日期类型,我们通常将日期转换为长整型(以毫秒为单位)进行存储。 总结,通过上述步骤,...
3. 插入数据:使用`Table.put()`方法,传入一个Put对象,包含行键、列族、列和值。 4. 提交更改:调用`Table.flushCommits()`确保更改被写入。 对于查询,HBase提供多种方式,包括单行查询、扫描和过滤。基础查询...
HBase提供了专门的TableInputFormat和TableOutputFormat,使得开发者能够将HBase表作为MapReduce作业的数据源和目标。 #### 六、HBase系统架构 - **Client**: 使用HBase RPC机制与HMaster和HRegionServer通信。 - ...