`
sunasheng
  • 浏览: 122831 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mapreduce读取mysql

阅读更多
package com.sun.mysql;
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import java.sql.PreparedStatement;  
import java.sql.ResultSet;  
import java.sql.SQLException;  
import java.util.Iterator;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
import org.apache.hadoop.mapreduce.lib.db.DBWritable;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 /**
  * 从mysql中读数据(结果存放在HDFS中)然后经mapreduce处理
  * @author asheng
  */
public class ReadDataFromMysql {  
    /**
     * 重写DBWritable
     * @author asheng
     * TblsRecord需要从mysql读取数据
     */
    public static class TblsRecord implements Writable, DBWritable 
    {  
            String tbl_name;  
            String tbl_type;  
            public TblsRecord() 
            {  


            }  
            @Override  
            public void write(PreparedStatement statement) throws SQLException 
            {
                    statement.setString(1, this.tbl_name);  
                    statement.setString(2, this.tbl_type);  
            }  
            @Override  
            public void readFields(ResultSet resultSet) throws SQLException 
            {  
                    this.tbl_name = resultSet.getString(1);  
                    this.tbl_type = resultSet.getString(2);  
            }  
            @Override  
            public void write(DataOutput out) throws IOException 
            {  
                    Text.writeString(out, this.tbl_name);  
                    Text.writeString(out, this.tbl_type);  
            }  
            @Override  
            public void readFields(DataInput in) throws IOException 
            {  
                    this.tbl_name = Text.readString(in);  
                    this.tbl_type = Text.readString(in);  
            }  
            public String toString() 
            {  
                return new String(this.tbl_name + " " + this.tbl_type);  
            }  
    }  
    /**
     * Mapper
     * @author asheng
     * 下面的类中的Mapper一定是包org.apache.hadoop.mapreduce.Mapper;下的
     */
public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text>
 //TblsRecord是自定义的类型,也就是上面重写的DBWritable类
{  
        public void map(LongWritable key,TblsRecord values,Context context)throws IOException,
                                                                                                                InterruptedException 
        {  
               //只是将从数据库读取进来数据转换成Text类型然后输出给reduce
                context.write(new Text(values.tbl_name), new Text(values.tbl_type));  
        }  
}  
/**
 * Reducer
 * @author asheng
 * 下面的类中的Reducer一定是包org.apache.hadoop.mapreduce.Reducer;下的
 */
public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {  
        public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,
                                                                                                                InterruptedException 
        {  
               //循环遍历并写入相应的指定文件中
                for(Iterator<Text> itr = values.iterator();itr.hasNext();) 
                {  
                        context.write(key, itr.next());  
                }  
        }  
}  
public static void main(String[] args) throws Exception 
{  
        Configuration conf = new Configuration(); 

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://127.0.0.1:3306/mapreduce_test",                                                                                                                                                "root", "root");   
        Job job = new Job(conf,"test mysql connection");  
        job.setJarByClass(ReadDataFromMysql.class);  
          
        job.setMapperClass(ConnMysqlMapper.class);  
        job.setReducerClass(ConnMysqlReducer.class);  
          
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
          
        job.setInputFormatClass(DBInputFormat.class);  
        FileOutputFormat.setOutputPath(job, new Path("hdfs://127.0.0.1:9000/user/lxw/output/"));  
          
        //对应数据库中的列名  
        String[] fields = { "TBL_NAME", "TBL_TYPE" };   
        //setInput方法六个参数分别的含义:  
        //1.Job;2.Class<? extends DBWritable>按照什么类型读取的  
        //3.表名;4.where条件  
        //5.order by语句;6.列名所组成的数组 
        DBInputFormat.setInput(job, TblsRecord.class,"lxw_tabls", "TBL_NAME like 'lxy%'", "TBL_NAME", fields);    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        //本程序表示从mysql数据库mapreduce_test的表lxw_tabls中查询处列TAB_NAME为lxy开头的数据并放入hdfs中
        //执行完后的查看bin/hadoop fs -cat /user/lxw/output/part-r-00000
/*结果
lxyae
lxyaccg
lxybf
         */
}  
}
/*
mysql> select * from lxw_tabls;
+----------+----------+
| TBL_NAME | TBL_TYPE |
+----------+----------+
| zhao     | a        |
| qian     | b        |
| sun      | c        |
| li       | d        |
| lxya     | e        |
| lxyb     | f        |
| lxyacc   | g        |
+----------+----------+
7 rows in set (0.00 sec)
*/

 

分享到:
评论

相关推荐

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    要实现MapReduce读取数据库,首先需要在Mapper类中加载数据库驱动并建立连接。然后,可以在map()方法中使用SQL查询获取所需数据。在Reduce阶段,可以对数据进行进一步处理和聚合,最后将结果写入到数据库中。 对于...

    mapreduce解析网络日志文件(或从mysql数据库获取记录)并计算相邻日志记录间隔时长

    然而,如果日志数据存储在MySQL数据库中,我们需要先通过MapReduce的InputFormat接口从数据库中读取数据。这可能涉及到使用Hadoop的JDBC InputFormat,允许MapReduce直接查询数据库。在Map阶段,我们可以直接处理...

    MapReduce实例

    在这个实例中,MapReduce从Hbase中读取数据,可能涉及到使用Hbase的API来扫描表,获取千万级别的记录。 接着,我们来到了**MapReduce** 阶段。MapReduce包含两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始...

    基于javaweb + mapreduce的小型电影推荐系统

    在实际项目中,数据预处理是一个重要步骤,可能会涉及到`small.csv`文件的读取和清洗。CSV(Comma Separated Values)是一种通用的数据格式,常用于存储表格数据。在这个项目中,它可能包含了用户ID、电影ID、评分等...

    mysql-connector-java-8.0.28.tar.gz

    通过MySQL Connector/J,Hive可以与MySQL数据库交互,将Hadoop集群处理的结果写入或读取MySQL,实现离线数据处理与实时查询的结合。 在"mysql-connector-java-8.0.28"目录下,用户通常会找到如下关键文件: 1. `...

    Hadoop集群之—MySQL关系数据库_V1.0

    当需要从数据库中输入数据时,MapReduce作业通常会包括一个Mapper类,它通过JDBC或其他连接方式读取MySQL数据库中的数据,并将其转换为适合于MapReduce处理的键值对形式。而当需要将数据输出到MySQL数据库时,...

    云应用系统开发第二次项目(mapreduce)

    该项目使用 CentOS-7 操作系统,已安装 JDK、Hadoop 平台和 MySQL 数据库平台,并完成 HBase 的安装。 MapReduce 是一个编程模型,用于大规模数据处理,它将复杂的数据处理任务分解成许多小任务,并将其分布式地...

    hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序

    标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...

    基于Mysql的表转HBase小Demo

    2. **数据读取**:从Mysql中读取数据,这通常通过JDBC(Java Database Connectivity)实现,可以使用PreparedStatement执行SQL查询,获取需要转换的数据。 3. **数据写入HBase**:在HBase中写入数据需要使用HBase的...

    mysql-connector-5.1.39,配合sqoop1.4.7和hive2.1.1使用

    4. **数据分析**:在Hadoop集群上,Hive可以读取由Sqoop导入的MySQL数据进行大规模的分析工作,利用Hadoop的分布式计算能力处理大量数据,然后将结果通过Sqoop返回到MySQL,或者直接在MySQL中进行进一步的处理和展示...

    java代码将mysql表数据导入HBase表

    数据迁移的核心在于编写Java代码,将MySQL中的数据读取并写入到HBase中。这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration;...

    hive、Hbase、mysql的区别.docx

    Hive的设计初衷是为了简化大数据处理,通过将SQL转换为MapReduce任务,适合批处理和离线分析。 - Hbase是一个分布式、列式存储的NoSQL数据库,它构建在HDFS之上,特别适合实时或近实时的数据查询。Hbase提供了一种...

    mapreduce-db-operat:mapreduce实现数据从hdfs到mysql之间的相互传递

    本项目"mapreduce-db-operat"便是为了解决这一问题,通过MapReduce实现在HDFS和MySQL之间的数据传输。 首先,我们来看标题和描述中提到的关键知识点: 1. **MapReduce**:MapReduce由两个主要阶段组成——Map阶段...

    Hive、MySQL、HBase数据互导

    理解Hive如何将SQL查询转换为MapReduce作业在Hadoop上执行。 2. **关系数据库概念与基本原理**:掌握MySQL的表结构、索引、事务处理、ACID属性等,以及SQL的基本语法,如SELECT、INSERT、UPDATE、DELETE等。 3. **...

    apriori java 数据库读取

    标题中的“apriori java 数据库读取”意味着我们将讨论如何使用Java编程语言来实现Apriori算法,并从MySQL数据库中读取数据。在Java中实现Apriori,通常会涉及到以下关键步骤: 1. 数据库连接:首先,我们需要使用...

    hive-mysqlhive-mysql

    在Hive中,我们可以通过JDBC驱动程序将Hive查询结果写入MySQL,或者从MySQL读取数据到Hive。 2. **mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar**:这是一个RPM软件包集合,包含适用于Red Hat Enterprise Linux 7 (x86...

    mysql-connector-java-5.1.42

    它的功能丰富,支持多种存储引擎,如InnoDB(用于事务处理)和MyISAM(适合读取密集型应用)。MySQL在大数据处理方面也有所涉猎,但与Hive的结合更多体现在数据仓库和大数据分析场景。 Hive是Apache软件基金会的一...

    Storm定时匹配插入mysql,源数据录入hdfs

    这可能是通过Hadoop的MapReduce或者Hadoop的客户端API完成的。 5. **HDFS与Storm的结合**:虽然Storm通常处理实时数据流,但也可以与HDFS集成,从HDFS中读取历史数据进行批处理,或者将处理结果写入HDFS。这种方式...

    Talend学习笔记2——mysql文件导入到HDFS

    - **添加组件**:为了将 MySQL 数据导入到 HDFS,需要添加 `tMySQLInput` 组件来读取 MySQL 数据库中的数据,以及 `tHDFSOutput` 组件来将数据写入 HDFS。 - **配置组件**: - 在 `tMySQLInput` 组件中,需要指定要...

Global site tag (gtag.js) - Google Analytics