`
liyonghui160com
  • 浏览: 777114 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hbase通过 Mapreduce 写入数据到Mysql

阅读更多

 

 

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.96.2-hadoop2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.96.2-hadoop2</version>
        </dependency>
    </dependencies>

 

 

 

package com.abloz.hbase;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
 
import java.util.Iterator;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
@SuppressWarnings("deprecation")
public class CopyToMysql extends Configured implements Tool {
	private static final Log LOG = LogFactory.getLog(CopyToMysql.class);
	public static final String driverClassName = "com.mysql.jdbc.Driver";
	public static final String URL = "jdbc:mysql://Hadoop48/toplists";
	public static final String USERNAME = "root";//mysql username
	public static final String PASSWORD = "";//mysql password
	private static final String tableName="myaward";
 
	private Connection connection;
 
 
	   public static class AwardInfoRecord implements Writable,  DBWritable {
	     String userid;
 
	     String nick;
	     String loginid;
	     public AwardInfoRecord() {
 
	     }
	     public void readFields(DataInput in) throws IOException {
		        this.userid = Text.readString(in);
		        this.nick = Text.readString(in);
		        this.loginid = Text.readString(in);
		     }
		     public void write(DataOutput out) throws IOException {
		        Text.writeString(out,this.userid);
		        Text.writeString(out, this.nick);
		        Text.writeString(out, this.loginid);
		     }
		     public void readFields(ResultSet result) throws SQLException {
		        this.userid = result.getString(1);
		        this.nick = result.getString(2);
		        this.loginid = result.getString(3);
		     }
		     public void write(PreparedStatement stmt) throws SQLException {
		        stmt.setString(1, this.userid);
 
		        stmt.setString(2, this.nick);
		        stmt.setString(3, this.loginid);
		     }
	     public String toString() {
	        return new String(this.userid + " " +  this.nick +" " +this.loginid);
	     }
 
 
	   }
	public static Configuration conf;
 
	public static class MyMapper extends MapReduceBase implements Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable> {
		@Override
		public void map(ImmutableBytesWritable key,	Result rs,
				OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> output, Reporter report) throws IOException
				{
 
				String rowkey = new String(key.get());
 
				String userid = new String(rs.getValue("info".getBytes(), "UserId".getBytes()));
				String nick = new String(rs.getValue("info".getBytes(), "nickName".getBytes()),HConstants.UTF8_ENCODING);
				String loginid = new String(rs.getValue("info".getBytes(), "loginId".getBytes()));
 
				output.collect(new ImmutableBytesWritable(userid.getBytes()),new ImmutableBytesWritable((nick+","+loginid).getBytes()));
 
				//LOG.info("map: userid:"+userid+",nick:"+nick);
			}
 
		@Override
		public void configure(JobConf job) {
			super.configure(job);
		}
 
	}
	public static class MyReducer extends MapReduceBase implements Reducer<ImmutableBytesWritable, ImmutableBytesWritable,AwardInfoRecord, Text>{
 
		@Override
		public void configure(JobConf job) {
 
			super.configure(job);
		}
 
		@Override
		public void reduce(ImmutableBytesWritable key,
				Iterator<ImmutableBytesWritable> it,
				OutputCollector<AwardInfoRecord, Text> output, Reporter report)
				throws IOException {
			AwardInfoRecord record = new AwardInfoRecord();
			record.userid=new String(key.get());
			String info = new String(it.next().get());
			record.nick = new String(info.split(",")[0]);
			record.loginid = new String(info.split(",")[1]);
			//LOG.debug("reduce: userid:"+record.userid+",nick:"+record.nick);
			output.collect(record, new Text());
		}
 
	}
 
 
	public static void main(String[] args) throws Exception {
		conf = HBaseConfiguration.create();
 
		int ret = ToolRunner.run(conf, new CopyToMysql(), args);
		System.exit(ret);
 
	}
 
	@Override
	public int run(String[] args) throws Exception {
 
		createConnection(driverClassName, URL);
 
		JobControl control = new JobControl("mysql");
		JobConf job = new JobConf(conf,CopyToMysql.class);
 
		job.setJarByClass(CopyToMysql.class);
		String fromTable = "award";
 
		job.set("mapred.input.dir", fromTable);
 
		job.set("hbase.mapred.tablecolumns", "info:UserId info:nickName info:loginId");
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(ImmutableBytesWritable.class);
		job.setInputFormat(TableInputFormat.class);
		DBConfiguration.configureDB(job, driverClassName, URL, USERNAME, PASSWORD);
		String[] fields = {"userid","nick","loginid"};
		DBOutputFormat.setOutput(job, tableName, fields);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(1);
 
		Job controlJob = new Job(job);
 
		control.addJob(controlJob);
 
		//JobClient.runJob(job);
 
		//control.run();
		Thread theController = new Thread(control);
		theController.start();
		//final
		while(!control.allFinished()){
			Thread.sleep(3000);
			System.out.print(".");
		}
		control.stop();
 
		System.out.println();
		LOG.info("job end!");
		return 0;
	}
 
	//connect
	private void createConnection(String driverClassName, String url) throws Exception {
		    Class.forName(driverClassName);
		    connection = DriverManager.getConnection(url,USERNAME,PASSWORD);
		    connection.setAutoCommit(false);
		  }
 
	//create table fast
	private void createTable(String tableName) throws SQLException {
 
	    String createTable = 
	      "CREATE TABLE " +tableName+
	      " (userid  VARCHAR(9) NOT NULL," +
 
	            " nick VARCHAR(20) NOT NULL, " +
	            " loginid VARCHAR(20) NOT NULL, " +
	            " PRIMARY KEY (userid, caldate))";
	    Statement st = connection.createStatement();
	    try {
	      st.executeUpdate(createTable);
	      connection.commit();
	    } catch (Exception e) {
	    	LOG.warn("table '"+tableName+"' is already exist! so we do anything");
		}
	    finally {
	      st.close();
	    }
	  }
	//init 
//	private void initialize() throws Exception {
//		    if(!this.initialized) {
//		      createConnection(driverClassName, URL);
////		      dropTables(tableName);
//		      createTable(tableName);
//		      System.out.println("------------------create ----------------------");
//		      this.initialized = true;  
//		    }
//	}
}

 

更简单的代码:

 

package com.my.hbase;

/**
 * Created by foreverhui on 2015/1/16.
 */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class FromHBaseToMysqlExample {
    public static class HBaseMapper extends TableMapper<ImmutableBytesWritable, Text>{

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                           Context context) throws IOException, InterruptedException {
            for(Cell kv:value.rawCells()){
                //Text out=new Text(Bytes.toString(kv.getFamilyArray())+"|"+Bytes.toString(kv.getQualifierArray())+"|"+Bytes.toString(kv.getValueArray()));
                String primaryKey=Bytes.toString(kv.getRowArray());
                String dataRow=Bytes.toString(kv.getValueArray());
                //todo
                //解析 dataRow insert into mysql
                //context.write(new ImmutableBytesWritable(kv.getRowArray()), out);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args)throws Exception {
        Configuration conf=HBaseConfiguration.create();
        conf.set("from.table", "testtable");
        //conf.set("family", "family1");
        Job job=new Job(conf,"hbase to hbase");
        job.setJarByClass(FromHBaseToMysqlExample.class);
        TableMapReduceUtil.initTableMapperJob(conf.get("from.table"), new Scan(), HBaseMapper.class,ImmutableBytesWritable.class, Text.class, job);
        System.exit(job.waitForCompletion(true)?0:1);


    }

}

 

分享到:
评论

相关推荐

    java代码将mysql表数据导入HBase表

    数据迁移的核心在于编写Java代码,将MySQL中的数据读取并写入到HBase中。这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration;...

    Hive、MySQL、HBase数据互导

    - 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...

    基于Mysql的表转HBase小Demo

    在这个Demo中,你将找到如何设计Dao层来实现Mysql到HBase的数据迁移。 1. **数据模型转换**:Mysql使用表格形式存储数据,每行数据有固定的列,而HBase则以行键(Row Key)、列族(Column Family)、列限定符...

    hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序

    标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...

    hbase海量数据的全量导入方法

    ### HBase海量数据全量导入方法详解 在大数据领域,HBase作为一款分布式、...通过深入理解HBase的数据结构和Hadoop生态的集成方式,我们可以更好地设计和实施大数据导入方案,充分发挥HBase在大数据处理领域的优势。

    关系型数据库的数据导入Hbase

    关系型数据库(如MySQL、Oracle、SQL Server等)与非关系型数据库(如Hbase)在数据存储和处理上有着显著的区别。关系型数据库遵循ACID(原子性、一致性、隔离性和持久性)原则,适合结构化数据的存储,而Hbase作为...

    java2hbase.rar

    2. **HBaseBulkLoad**:HBase提供的工具,允许大量数据通过HFile格式批量加载,提高效率。 3. **MapReduce**:Hadoop的并行处理框架,可用于分布式数据转换和加载。 4. **HBase Coprocessors**:可以在HBase服务器...

    电影推荐网站(基于hadoop生态的大数据项目,使用hbase和MySQL数据库,利用协同过滤算法给出用户电影推荐).zip

    在电影推荐项目中,这些步骤都会涉及,例如从各种来源收集用户数据,清洗去除异常值,存储到Hadoop系统,通过MapReduce进行分析,最后将结果展示给用户。 8. **数据挖掘**:在推荐系统中,数据挖掘是寻找用户兴趣...

    Difference between HBase and RDBMS

    - HBase主要通过Java API或类似HBase shell的工具进行操作,查询功能相对简单,但在大数据处理中可以配合MapReduce或Spark进行高效计算。 6. 应用场景: - RDBMS适用于需要事务处理、复杂查询和数据一致性的业务...

    hadoop中Map-Reduce使用示例,输入(DBInputFormat),输出(DBOu-MR_HBase.zip

    这个示例,"MR_HBase-Hadoop中的MapReduce使用示例,输入(DBInputFormat),输出(DBOutputFormat)",主要展示了如何利用MapReduce与HBase进行交互,进行数据的读取和写入。下面将详细介绍相关的知识点。 1. **...

    面向海量天文数据的分布式MySQL锥形检索研究.pdf

    本文主要介绍了一种针对海量天文数据的分布式MySQL锥形检索的研究方法,使用数据库中间件技术,通过分库分表的方式将数据存储到分布式数据库集群中,并通过动态索引工具建立伪球面索引,以此来满足锥形检索的需求。...

    中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx

    【标题】:“中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx...通过上述实践,上海久耶供应链成功地运用HBase和相关技术构建了实时数仓,满足了互联网行业的实时数据需求,提高了业务决策效率。

    读写数据库数据的mr程序

    本程序的标题是"读写数据库数据的mr程序",这意味着我们将探讨如何利用MapReduce来从MySQL数据库读取数据,并将这些数据存储到HBASE这种分布式数据库中。这个过程通常被称为ETL(Extract, Transform, Load)操作,是...

    23-Sqoop数据导入导出1

    2. **MySQL到HBase**: - 配置HBase:确保HBase运行正常,创建相应的表结构。 - 导入数据:使用Sqoop命令,指定HBase的表名和列族,将MySQL数据导入HBase。 3. **MySQL到Hive**: - 配置Hive:创建与MySQL表结构...

    Hadoop集群之—MySQL关系数据库_V1.0

    而当需要将数据输出到MySQL数据库时,MapReduce作业则可能包含一个Reducer类,它将处理后的结果转换为适合存储到MySQL的数据格式,并写入数据库中。 文档还提到了一些实用的参考链接和编辑信息,以及对作者的支持...

    日志分析1

    - 数据导入HBase:通过Java API编写MapReduce程序,Map阶段清洗数据,Reducer阶段将数据写入HBase。注意,如果数据导入不完整,可能是因为rowkey设计问题,可以考虑使用HBase的export工具进行导入。 - 数据导入...

    mr程序代码

    MapReduce可以用来批量导入数据到HBase,或者对HBase中的数据进行批处理分析。在代码中,可能会有相关的示例展示如何使用MapReduce与HBase API交互,进行数据的读取、写入和更新。 最后,提到的关系型数据库通常指...

    大数据实践-sqoop数据导入导出.doc

    2. Sqoop会读取HDFS中的数据,创建对应的SQL语句,并通过MapReduce作业执行,将数据写入MySQL。 ### 四、创建job 创建Sqoop作业是为了持久化导入或导出的配置,这样可以在将来重复使用而无需每次都手动输入参数。...

    Hadoop数据仓库工具hive介绍.pdf

    这种方式下,Hive首先执行MapReduce作业生成结果,然后通过HBase的API将结果写入相应的表中。这有助于提高数据的读取速度,尤其是在需要频繁查询同一组数据的情况下。 综上所述,Hive作为一种基于Hadoop的数据仓库...

    大数据 76 道面试题及答案.docx

    "大数据76道面试题及答案" 本文档包含了76道大数据...Sqoop在导入数据到MySQL中,如何不重复导入数据,可以通过增加参数来实现。如果存在数据问题,Sqoop可以通过 FAILED java.util.NoSuchElementException来处理。

Global site tag (gtag.js) - Google Analytics