<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.96.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.96.2-hadoop2</version> </dependency> </dependencies>
package com.abloz.hbase; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableInputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @SuppressWarnings("deprecation") public class CopyToMysql extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(CopyToMysql.class); public static final String driverClassName = "com.mysql.jdbc.Driver"; public static final String URL = "jdbc:mysql://Hadoop48/toplists"; public static final String USERNAME = "root";//mysql username public static final String PASSWORD = "";//mysql password private static final String tableName="myaward"; private Connection connection; public static class AwardInfoRecord implements Writable, DBWritable { String userid; String nick; String loginid; public AwardInfoRecord() { } public void readFields(DataInput in) throws IOException { this.userid = Text.readString(in); this.nick = Text.readString(in); this.loginid = Text.readString(in); } public void write(DataOutput out) throws IOException { Text.writeString(out,this.userid); Text.writeString(out, this.nick); Text.writeString(out, this.loginid); } public void readFields(ResultSet result) throws SQLException { this.userid = result.getString(1); this.nick = result.getString(2); this.loginid = result.getString(3); } public void write(PreparedStatement stmt) throws SQLException { stmt.setString(1, this.userid); stmt.setString(2, this.nick); stmt.setString(3, this.loginid); } public String toString() { return new String(this.userid + " " + this.nick +" " +this.loginid); } } public static Configuration conf; public static class MyMapper extends MapReduceBase implements Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable> { @Override public void map(ImmutableBytesWritable key, Result rs, OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> output, Reporter report) throws IOException { String rowkey = new String(key.get()); String userid = new String(rs.getValue("info".getBytes(), "UserId".getBytes())); String nick = new String(rs.getValue("info".getBytes(), "nickName".getBytes()),HConstants.UTF8_ENCODING); String loginid = new String(rs.getValue("info".getBytes(), "loginId".getBytes())); output.collect(new ImmutableBytesWritable(userid.getBytes()),new ImmutableBytesWritable((nick+","+loginid).getBytes())); //LOG.info("map: userid:"+userid+",nick:"+nick); } @Override public void configure(JobConf job) { super.configure(job); } } public static class MyReducer extends MapReduceBase implements Reducer<ImmutableBytesWritable, ImmutableBytesWritable,AwardInfoRecord, Text>{ @Override public void configure(JobConf job) { super.configure(job); } @Override public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> it, OutputCollector<AwardInfoRecord, Text> output, Reporter report) throws IOException { AwardInfoRecord record = new AwardInfoRecord(); record.userid=new String(key.get()); String info = new String(it.next().get()); record.nick = new String(info.split(",")[0]); record.loginid = new String(info.split(",")[1]); //LOG.debug("reduce: userid:"+record.userid+",nick:"+record.nick); output.collect(record, new Text()); } } public static void main(String[] args) throws Exception { conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new CopyToMysql(), args); System.exit(ret); } @Override public int run(String[] args) throws Exception { createConnection(driverClassName, URL); JobControl control = new JobControl("mysql"); JobConf job = new JobConf(conf,CopyToMysql.class); job.setJarByClass(CopyToMysql.class); String fromTable = "award"; job.set("mapred.input.dir", fromTable); job.set("hbase.mapred.tablecolumns", "info:UserId info:nickName info:loginId"); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(ImmutableBytesWritable.class); job.setInputFormat(TableInputFormat.class); DBConfiguration.configureDB(job, driverClassName, URL, USERNAME, PASSWORD); String[] fields = {"userid","nick","loginid"}; DBOutputFormat.setOutput(job, tableName, fields); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); Job controlJob = new Job(job); control.addJob(controlJob); //JobClient.runJob(job); //control.run(); Thread theController = new Thread(control); theController.start(); //final while(!control.allFinished()){ Thread.sleep(3000); System.out.print("."); } control.stop(); System.out.println(); LOG.info("job end!"); return 0; } //connect private void createConnection(String driverClassName, String url) throws Exception { Class.forName(driverClassName); connection = DriverManager.getConnection(url,USERNAME,PASSWORD); connection.setAutoCommit(false); } //create table fast private void createTable(String tableName) throws SQLException { String createTable = "CREATE TABLE " +tableName+ " (userid VARCHAR(9) NOT NULL," + " nick VARCHAR(20) NOT NULL, " + " loginid VARCHAR(20) NOT NULL, " + " PRIMARY KEY (userid, caldate))"; Statement st = connection.createStatement(); try { st.executeUpdate(createTable); connection.commit(); } catch (Exception e) { LOG.warn("table '"+tableName+"' is already exist! so we do anything"); } finally { st.close(); } } //init // private void initialize() throws Exception { // if(!this.initialized) { // createConnection(driverClassName, URL); //// dropTables(tableName); // createTable(tableName); // System.out.println("------------------create ----------------------"); // this.initialized = true; // } // } }
更简单的代码:
package com.my.hbase; /** * Created by foreverhui on 2015/1/16. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class FromHBaseToMysqlExample { public static class HBaseMapper extends TableMapper<ImmutableBytesWritable, Text>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for(Cell kv:value.rawCells()){ //Text out=new Text(Bytes.toString(kv.getFamilyArray())+"|"+Bytes.toString(kv.getQualifierArray())+"|"+Bytes.toString(kv.getValueArray())); String primaryKey=Bytes.toString(kv.getRowArray()); String dataRow=Bytes.toString(kv.getValueArray()); //todo //解析 dataRow insert into mysql //context.write(new ImmutableBytesWritable(kv.getRowArray()), out); } } } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf=HBaseConfiguration.create(); conf.set("from.table", "testtable"); //conf.set("family", "family1"); Job job=new Job(conf,"hbase to hbase"); job.setJarByClass(FromHBaseToMysqlExample.class); TableMapReduceUtil.initTableMapperJob(conf.get("from.table"), new Scan(), HBaseMapper.class,ImmutableBytesWritable.class, Text.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
相关推荐
数据迁移的核心在于编写Java代码,将MySQL中的数据读取并写入到HBase中。这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration;...
- 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...
在这个Demo中,你将找到如何设计Dao层来实现Mysql到HBase的数据迁移。 1. **数据模型转换**:Mysql使用表格形式存储数据,每行数据有固定的列,而HBase则以行键(Row Key)、列族(Column Family)、列限定符...
标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...
### HBase海量数据全量导入方法详解 在大数据领域,HBase作为一款分布式、...通过深入理解HBase的数据结构和Hadoop生态的集成方式,我们可以更好地设计和实施大数据导入方案,充分发挥HBase在大数据处理领域的优势。
关系型数据库(如MySQL、Oracle、SQL Server等)与非关系型数据库(如Hbase)在数据存储和处理上有着显著的区别。关系型数据库遵循ACID(原子性、一致性、隔离性和持久性)原则,适合结构化数据的存储,而Hbase作为...
2. **HBaseBulkLoad**:HBase提供的工具,允许大量数据通过HFile格式批量加载,提高效率。 3. **MapReduce**:Hadoop的并行处理框架,可用于分布式数据转换和加载。 4. **HBase Coprocessors**:可以在HBase服务器...
在电影推荐项目中,这些步骤都会涉及,例如从各种来源收集用户数据,清洗去除异常值,存储到Hadoop系统,通过MapReduce进行分析,最后将结果展示给用户。 8. **数据挖掘**:在推荐系统中,数据挖掘是寻找用户兴趣...
- HBase主要通过Java API或类似HBase shell的工具进行操作,查询功能相对简单,但在大数据处理中可以配合MapReduce或Spark进行高效计算。 6. 应用场景: - RDBMS适用于需要事务处理、复杂查询和数据一致性的业务...
这个示例,"MR_HBase-Hadoop中的MapReduce使用示例,输入(DBInputFormat),输出(DBOutputFormat)",主要展示了如何利用MapReduce与HBase进行交互,进行数据的读取和写入。下面将详细介绍相关的知识点。 1. **...
本文主要介绍了一种针对海量天文数据的分布式MySQL锥形检索的研究方法,使用数据库中间件技术,通过分库分表的方式将数据存储到分布式数据库集群中,并通过动态索引工具建立伪球面索引,以此来满足锥形检索的需求。...
【标题】:“中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx...通过上述实践,上海久耶供应链成功地运用HBase和相关技术构建了实时数仓,满足了互联网行业的实时数据需求,提高了业务决策效率。
本程序的标题是"读写数据库数据的mr程序",这意味着我们将探讨如何利用MapReduce来从MySQL数据库读取数据,并将这些数据存储到HBASE这种分布式数据库中。这个过程通常被称为ETL(Extract, Transform, Load)操作,是...
2. **MySQL到HBase**: - 配置HBase:确保HBase运行正常,创建相应的表结构。 - 导入数据:使用Sqoop命令,指定HBase的表名和列族,将MySQL数据导入HBase。 3. **MySQL到Hive**: - 配置Hive:创建与MySQL表结构...
而当需要将数据输出到MySQL数据库时,MapReduce作业则可能包含一个Reducer类,它将处理后的结果转换为适合存储到MySQL的数据格式,并写入数据库中。 文档还提到了一些实用的参考链接和编辑信息,以及对作者的支持...
- 数据导入HBase:通过Java API编写MapReduce程序,Map阶段清洗数据,Reducer阶段将数据写入HBase。注意,如果数据导入不完整,可能是因为rowkey设计问题,可以考虑使用HBase的export工具进行导入。 - 数据导入...
MapReduce可以用来批量导入数据到HBase,或者对HBase中的数据进行批处理分析。在代码中,可能会有相关的示例展示如何使用MapReduce与HBase API交互,进行数据的读取、写入和更新。 最后,提到的关系型数据库通常指...
2. Sqoop会读取HDFS中的数据,创建对应的SQL语句,并通过MapReduce作业执行,将数据写入MySQL。 ### 四、创建job 创建Sqoop作业是为了持久化导入或导出的配置,这样可以在将来重复使用而无需每次都手动输入参数。...
这种方式下,Hive首先执行MapReduce作业生成结果,然后通过HBase的API将结果写入相应的表中。这有助于提高数据的读取速度,尤其是在需要频繁查询同一组数据的情况下。 综上所述,Hive作为一种基于Hadoop的数据仓库...
"大数据76道面试题及答案" 本文档包含了76道大数据...Sqoop在导入数据到MySQL中,如何不重复导入数据,可以通过增加参数来实现。如果存在数据问题,Sqoop可以通过 FAILED java.util.NoSuchElementException来处理。