`
Chrro
  • 浏览: 9924 次
  • 性别: Icon_minigender_1
  • 来自: 沈阳
社区版块
存档分类
最新评论

mapreduce--读取mysql数据库数据

阅读更多
import java.io.File;
import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;



import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;
import java.util.Iterator;



import org.apache.hadoop.examples.EJob;
import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;



public class ReadDB {



    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, StudentRecord, LongWritable, Text> {



        // map

        public void map(LongWritable key, StudentRecord value,

        OutputCollector<LongWritable, Text> collector, Reporter reporter)

                throws IOException {

            collector.collect(new LongWritable(value.id),

                    new Text(value.toString()));

        }



    }
    //reducer
       public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<LongWritable,Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
while (value.hasNext()){
collector.collect(key,value.next());
}
}
      
       }



    public static class StudentRecord implements Writable, DBWritable {

        public int id;

        public String name;

        public String sex;

        public int age;



        @Override

        public void readFields(DataInput in) throws IOException {

            this.id = in.readInt();

            this.name = Text.readString(in);

            this.sex = Text.readString(in);

            this.age = in.readInt();

        }



        @Override

        public void write(DataOutput out) throws IOException {

            out.writeInt(this.id);

            Text.writeString(out, this.name);

            Text.writeString(out, this.sex);

            out.writeInt(this.age);

        }



        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.id = result.getInt(1);

            this.name = result.getString(2);

            this.sex = result.getString(3);

            this.age = result.getInt(4);

        }



        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setInt(1, this.id);

            stmt.setString(2, this.name);

            stmt.setString(3, this.sex);

            stmt.setInt(4, this.age);

        }



        @Override

        public String toString() {

            return new String("学号" + this.id + "_姓名:" + this.name

                    + "_性别:"+ this.sex + "_年龄:" + this.age);

        }

    }



    @SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {



        JobConf conf = new JobConf(ReadDB.class);
        //设置地址
       conf.set("fs.default.name", "hdfs://192.168.71.128:9000");
        conf.set("mapred.job.tracker", "192.168.71.128:9001");
        conf.set("dfs.permissions","false");
            

        File jarFile = EJob.createTempJar("bin");
   
    EJob.addClasspath("/usr/hadoop/conf");
    
    ClassLoader classLoader = EJob.getClassLoader();
    
    Thread.currentThread().setContextClassLoader(classLoader);

conf.setJar(jarFile.toString());


DistributedCache.addFileToClassPath(new Path(

         "/usr/hadoop/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
Class.forName("com.mysql.jdbc.Driver");

       
        // 设置map和reduce类
       
        conf.setMapperClass(Map.class);

        conf.setReducerClass(Reduce.class);
       
        // 设置数据库
       
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.71.128:3306/song", "root", "mysql");


       
        // 设置表字段

        String[] fields = { "id", "name", "sex", "age" };

        DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);

        // 设置输入类型

        conf.setInputFormat(DBInputFormat.class);

        //conf.setMapOutputKeyClass(Text.class);
        //conf.setMapOutputValueClass(LongWritable.class);
        // 设置输出类型

        conf.setOutputKeyClass(LongWritable.class);

        conf.setOutputValueClass(Text.class);


        // 输出路径

        FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));

        JobClient.runJob(conf);

    }

}
//需要把mysql驱动包添加到工程中
分享到:
评论

相关推荐

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    接下来,我们将讨论如何通过MapReduce与数据库交互,尤其是MySQL数据库。在大数据场景下,有时需要将MapReduce处理的结果存储到关系型数据库中,或者从数据库中读取数据进行处理。Hadoop提供了JDBC(Java Database ...

    mapreduce-db-operat:mapreduce实现数据从hdfs到mysql之间的相互传递

    6. **JDBC(Java Database Connectivity)**:为了将MapReduce处理后的数据写入MySQL,我们需要使用JDBC驱动程序,它是Java连接数据库的标准API。通过建立数据库连接,执行SQL语句,可以将数据插入或更新到MySQL表中...

    mapreduce解析网络日志文件(或从mysql数据库获取记录)并计算相邻日志记录间隔时长

    然而,如果日志数据存储在MySQL数据库中,我们需要先通过MapReduce的InputFormat接口从数据库中读取数据。这可能涉及到使用Hadoop的JDBC InputFormat,允许MapReduce直接查询数据库。在Map阶段,我们可以直接处理...

    mysql-connector-java-8.0.28.tar.gz

    通过MySQL Connector/J,Hive可以与MySQL数据库交互,将Hadoop集群处理的结果写入或读取MySQL,实现离线数据处理与实时查询的结合。 在"mysql-connector-java-8.0.28"目录下,用户通常会找到如下关键文件: 1. `...

    Hadoop集群之—MySQL关系数据库_V1.0

    当需要从数据库中输入数据时,MapReduce作业通常会包括一个Mapper类,它通过JDBC或其他连接方式读取MySQL数据库中的数据,并将其转换为适合于MapReduce处理的键值对形式。而当需要将数据输出到MySQL数据库时,...

    apriori java 数据库读取

    标题中的“apriori java 数据库读取”意味着我们将讨论如何使用Java编程语言来实现Apriori算法,并从MySQL数据库中读取数据。在Java中实现Apriori,通常会涉及到以下关键步骤: 1. 数据库连接:首先,我们需要使用...

    MapReduce实例

    在这个实例中,我们看到MapReduce被用来从Hbase数据库中提取海量数据,对其进行处理,然后将统计结果存储到MySQL数据库中。这个过程涉及到大数据处理的核心技术,下面我们将深入探讨这些知识点。 首先,**Hbase** ...

    mysql-connector-5.1.39,配合sqoop1.4.7和hive2.1.1使用

    3. **Hive与MySQL交互**:虽然Hive不直接支持MySQL作为元数据存储,但通过配置,可以在Hive Metastore中使用MySQL数据库存储表元数据,如表名、字段名、分区信息等。这有助于在多用户环境中提高性能和并发性。 4. *...

    Hadoop深入浅出之Sqoop介绍.pptx

    1. **数据导入**:Sqoop 可以将结构化的数据从关系型数据库管理系统(RDBMS)如 MySQL、Oracle 等导入到 HDFS,然后可以进一步使用 MapReduce 或 Hive 进行分析和处理。 2. **数据导出**:反之,Sqoop 也可以将 HDFS...

    Hive、MySQL、HBase数据互导

    - 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...

    电影推荐网站(基于hadoop生态的大数据项目,使用hbase和MySQL数据库,利用协同过滤算法给出用户电影推荐).zip

    电影推荐网站是一个基于Hadoop生态系统的大数据项目,它利用了HBase和MySQL数据库,并通过协同过滤算法为用户提供个性化的电影推荐。在这个项目中,我们主要关注以下几个关键知识点: 1. **Hadoop生态**:Hadoop是...

    Talend学习笔记2——mysql文件导入到HDFS

    - **添加组件**:为了将 MySQL 数据导入到 HDFS,需要添加 `tMySQLInput` 组件来读取 MySQL 数据库中的数据,以及 `tHDFSOutput` 组件来将数据写入 HDFS。 - **配置组件**: - 在 `tMySQLInput` 组件中,需要指定要...

    海量数据匹配 数据库

    在处理海量数据时,关系型数据库如MySQL、PostgreSQL,或者非关系型数据库如MongoDB、HBase,都会被广泛使用。这些数据库系统通常具备强大的索引机制和查询语言,如SQL,来支持高效的查询操作。 接着,我们来看看...

    读写数据库数据的mr程序

    本程序的标题是"读写数据库数据的mr程序",这意味着我们将探讨如何利用MapReduce来从MySQL数据库读取数据,并将这些数据存储到HBASE这种分布式数据库中。这个过程通常被称为ETL(Extract, Transform, Load)操作,是...

    关系型数据库的数据导入Hbase

    关系型数据库(如MySQL、Oracle、SQL Server等)与非关系型数据库(如Hbase)在数据存储和处理上有着显著的区别。关系型数据库遵循ACID(原子性、一致性、隔离性和持久性)原则,适合结构化数据的存储,而Hbase作为...

    hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序

    标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...

    mysql-connector-java-5.1.42

    在这个版本中,开发人员可以利用Java语言方便地与MySQL数据库进行交互,实现数据的存取、查询、更新和删除等操作。MySQL Connector/J遵循JDBC API标准,使得它能够在各种Java环境中无缝运行,包括Java SE(Standard ...

    java代码将mysql表数据导入HBase表

    这段代码首先连接到MySQL数据库,读取`users`表的所有数据,然后逐条插入到HBase的`user_data`表中。注意,由于HBase不支持日期类型,我们通常将日期转换为长整型(以毫秒为单位)进行存储。 总结,通过上述步骤,...

    nosql分布式数据库期末考试题.docx

    - **关系数据库**: MySQL、SQL Server 和 Oracle 均属于关系数据库,它们采用 SQL 作为标准查询语言,支持 ACID 特性(原子性、一致性、隔离性和持久性)。 - **非关系数据库**: 指的是不采用表格形式来组织数据的...

    hadoop与mysql数据库的那点事(1)

    同时,这两个类也实现了 `readFields(ResultSet)` 和 `write(PreparedStatement)` 方法,分别用于从数据库结果集读取数据和向数据库准备语句写入数据。这样,Hadoop 可以通过这些类直接与 MySQL 进行交互,无需额外...

Global site tag (gtag.js) - Google Analytics