`
superlxw1234
  • 浏览: 552278 次
  • 性别: Icon_minigender_1
  • 来自: 西安
博客专栏
Bd1c0a0c-379a-31a8-a3b1-e6401e2f1523
Hive入门
浏览量:44539
社区版块
存档分类
最新评论

MapReduce直接连接Mysql获取数据

阅读更多

Mysql中数据:

 

mysql> select * from lxw_tbls;
+---------------------+----------------+
| TBL_NAME            | TBL_TYPE       |
+---------------------+----------------+
| lxw_test_table      | EXTERNAL_TABLE |
| lxw_t               | MANAGED_TABLE  |
| lxw_t1              | MANAGED_TABLE  |
| tt                  | MANAGED_TABLE  |
| tab_partition       | MANAGED_TABLE  |
| lxw_hbase_table_1   | MANAGED_TABLE  |
| lxw_hbase_user_info | MANAGED_TABLE  |
| t                   | EXTERNAL_TABLE |
| lxw_jobid           | MANAGED_TABLE  |
+---------------------+----------------+
9 rows in set (0.01 sec)

mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;
+---------------------+----------------+
| TBL_NAME            | TBL_TYPE       |
+---------------------+----------------+
| lxw_hbase_table_1   | MANAGED_TABLE  |
| lxw_hbase_user_info | MANAGED_TABLE  |
| lxw_jobid           | MANAGED_TABLE  |
| lxw_t               | MANAGED_TABLE  |
| lxw_t1              | MANAGED_TABLE  |
| lxw_test_table      | EXTERNAL_TABLE |
+---------------------+----------------+
6 rows in set (0.00 sec)

 

MapReduce程序代码,ConnMysql.java:

 

package com.lxw.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ConnMysql {
        
        private static Configuration conf = new Configuration();
        
        static {
                conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
                conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
                conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
                conf.set("mapred.job.tracker", "10.133.103.21:50021");
        }
        
        public static class TblsRecord implements Writable, DBWritable {
                String tbl_name;
                String tbl_type;

                public TblsRecord() {

                }

                @Override
                public void write(PreparedStatement statement) throws SQLException {
                        // TODO Auto-generated method stub
                        statement.setString(1, this.tbl_name);
                        statement.setString(2, this.tbl_type);
                }

                @Override
                public void readFields(ResultSet resultSet) throws SQLException {
                        // TODO Auto-generated method stub
                        this.tbl_name = resultSet.getString(1);
                        this.tbl_type = resultSet.getString(2);
                }

                @Override
                public void write(DataOutput out) throws IOException {
                        // TODO Auto-generated method stub
                        Text.writeString(out, this.tbl_name);
                        Text.writeString(out, this.tbl_type);
                }

                @Override
                public void readFields(DataInput in) throws IOException {
                        // TODO Auto-generated method stub
                        this.tbl_name = Text.readString(in);
                        this.tbl_type = Text.readString(in);
                }

                public String toString() {
                        return new String(this.tbl_name + " " + this.tbl_type);
                }

        }

        public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text> {
                public void map(LongWritable key,TblsRecord values,Context context) 
                                throws IOException,InterruptedException {
                        context.write(new Text(values.tbl_name), new Text(values.tbl_type));
                }
        }
        
        public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {
                public void reduce(Text key,Iterable<Text> values,Context context) 
                                throws IOException,InterruptedException {
                        for(Iterator<Text> itr = values.iterator();itr.hasNext();) {
                                context.write(key, itr.next());
                        }
                }
        }
        
        public static void main(String[] args) throws Exception {
                Path output = new Path("/user/lxw/output/");
                
                FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
                if (fs.exists(output)) {
                        fs.delete(output);
                }
                
                //mysql的jdbc驱动
                DistributedCache.addFileToClassPath(new Path(  
                          "hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);  
                
                DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
                          "jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive");  
                
                Job job = new Job(conf,"test mysql connection");
                job.setJarByClass(ConnMysql.class);
                
                job.setMapperClass(ConnMysqlMapper.class);
                job.setReducerClass(ConnMysqlReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                job.setInputFormatClass(DBInputFormat.class);
                FileOutputFormat.setOutputPath(job, output);
                
                //列名
                String[] fields = { "TBL_NAME", "TBL_TYPE" }; 
                //六个参数分别为:
                //1.Job;2.Class<? extends DBWritable>
                //3.表名;4.where条件
                //5.order by语句;6.列名
                DBInputFormat.setInput(job, TblsRecord.class,
                     "lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields);  
                
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        
}

 

运行结果:

 

[lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000
lxw_hbase_table_1       MANAGED_TABLE
lxw_hbase_user_info     MANAGED_TABLE
lxw_jobid       MANAGED_TABLE
lxw_t   MANAGED_TABLE
lxw_t1  MANAGED_TABLE
lxw_test_table  EXTERNAL_TABLE

 

0
1
分享到:
评论
2 楼 freeluotao 2016-04-18  
       public void readFields(DataInput in) throws IOException {
            this.id = in.readInt();
            this.name = Text.readString(in);
        }

        public void write(DataOutput out) throws IOException {
            out.writeInt(this.id);
            Text.writeString(out, this.name);
        }
版主,请教一下,上面两个方法中遇到时间字段怎么处理啊,请有空解答一下,谢谢!
1 楼 88548886 2014-06-30  
请问你这样的写的mr能直接在eclipse里运行吗,我把你的demo运行以后提示
14/06/30 16:42:34 ERROR security.UserGroupInformation: PriviledgedActionException as:Administrator (auth:SIMPLE) cause:java.io.IOException: Call to dn001/192.168.1.171:50090 failed on local exception: java.io.EOFException
Exception in thread "main" java.io.IOException: Call to dn001/192.168.1.171:50090 failed on local exception: java.io.EOFException

也就是说在liunx运行可以取到linux用户,而在windows下用eclipse运行取得的用是administrator,涉及到权限,所以无法运行,问题是我的权限已经关了.
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

相关推荐

    mysql数据抽取,自动生成hive建表语句

    1. **连接MySQL**:使用Java或Python等编程语言,通过JDBC或其他库连接到MySQL数据库,获取表信息。 2. **解析表结构**:查询MySQL的元数据信息,如字段名、字段类型、字段长度、是否为主键等。 3. **映射数据类型...

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    MapReduce是一种分布式计算模型,由Google开发,广泛应用于大数据处理。在MapReduce中,计数器(Counter)是一个非常重要的工具,它允许开发者在MapReduce作业执行过程中收集和跟踪各种统计信息,帮助理解和优化程序...

    hive3.1.2+mysql驱动.zip

    使用MySQL驱动,Hive可以连接到MySQL服务器来获取和存储元数据。这提供了更强大的元数据管理功能,比如高可用性、数据备份和恢复,同时也便于多用户环境下的权限管理和访问控制。在Hive 3.1.2中,正确配置MySQL驱动...

    mysql经典50题_大数据_mysql经典50题_mysql经典五十题_hive_

    MySQL是世界上最受欢迎的关系型数据库管理系统之一,而Hive则是大数据处理领域的重要工具,主要用于结构化数据的查询、分析和管理。这两个技术在后端开发和大数据处理中扮演着至关重要的角色。下面,我们将深入探讨...

    mysql2hbase.7z

    资源中的“mysql2hbase.jar”很可能就是一个使用Java编写的工具,它可能包含了连接MySQL和HBase的代码,实现了从MySQL到HBase的数据迁移过程。这个jar包可以直接在Linux环境中运行,这意味着它可能封装了所有必要的...

    大数据入门HIVE和MySQL安装包

    在大数据领域,Hive和MySQL是两种非常重要的数据存储和管理工具。Hive作为一个数据仓库工具,能够将结构化的数据文件映射为一张数据库表,并提供SQL(HQL)查询功能,适合处理大规模的数据集。而MySQL则是一种关系型...

    scribe+hadoop+log4j+hive+mysql

    对于 MySQL,还需要安装相应的 JDBC 驱动程序以便其他应用程序能够连接到 MySQL 数据库。 #### 七、综合运用 - **数据收集与处理**:通过使用 Scribe 收集日志数据,并将其存储在 HDFS 中。然后使用 MapReduce 或 ...

    CDH和Mysql安装脚本

    2. **依赖安装**:安装必要的库和依赖,比如libmysqlclient-dev,以便于编译和连接MySQL。 3. **MySQL安装**:下载并安装MySQL服务器,包括设置root用户的密码、配置文件等。 4. **初始化数据库**:创建必要的...

    MySQL,大数据,Python(文章分享).zip

    总的来说,MySQL提供了可靠的数据库服务,大数据技术解决了海量数据的处理问题,而Python则作为连接这两者的强大工具,实现了数据的获取、处理和分析。这三者之间的紧密协作,极大地推动了现代信息技术的发展。

    java DMS数据采集系统

    它提供了丰富的类库和API,支持多线程、网络通信和数据库连接,使得数据的获取、处理和存储变得更加便捷。 二、数据采集 1. 网络爬虫:Java可以构建高效的数据爬取模块,通过HTTP/HTTPS协议抓取网页信息,如Jsoup库...

    23-Sqoop数据导入导出1

    1. **JDBC检查**:在开始导入前,Sqoop通过JDBC连接数据库,获取表结构和列信息,如数据类型,将其映射为Java类型。 2. **代码生成器**: Sqoop根据表信息生成Java类,用于存储从数据库抽取的记录。 3. **...

    大数据实践-sqoop数据导入导出.doc

    2. Sqoop会读取HDFS中的数据,创建对应的SQL语句,并通过MapReduce作业执行,将数据写入MySQL。 ### 四、创建job 创建Sqoop作业是为了持久化导入或导出的配置,这样可以在将来重复使用而无需每次都手动输入参数。...

    基于Spark的大数据分析平台的设计与实现

    4. **数据可视化**:利用数据可视化工具,例如Tableau,连接MySQL数据库,创建图表展示用户行为模式。 #### 五、总结 本文介绍了基于Spark的大数据分析平台的设计与实现过程。通过使用Scala编程语言、Spark SQL以及...

    kittle连接hive需要的jar包

    因此,Kettle需要这个驱动来连接到存储Hive元数据的MySQL数据库,以便获取表定义和其他相关信息。 5. **lib文件夹**: 压缩包中的"lib"文件夹通常包含所有这些必要的JAR包。在Kettle中,这些JAR包需要被添加到...

    华为数据湖治理中心-数据湖治理中心

    在数据接入阶段,DGC 提供了丰富的数据源支持,包括但不限于 Hadoop 分布式文件系统(HDFS)、华为云MRS(MapReduce Service)的Hive、数据仓库服务(DWS)以及关系型数据库如MySQL等。这使得企业能够便捷地将来自...

    Linux下MySQL-Hive.rar

    配置Hive连接到MySQL的Metastore服务,以便Hive可以查询和管理在MySQL中的元数据。 6. **Hive SQL与数据处理**: - SQL语法:学习HQL(Hive Query Language),它是类似SQL的语言,用于查询HDFS上的数据。 - 分区...

    基于Hadoop的气象数据分析 毕业论文.docx

    该系统基于Hadoop Mapreduce,使用swing编写前端,Hadoop集群作为后台,MySQL作为数据库,通过网页爬虫技术从天气网(www.tianqi.com)获取天气数据信息,并进行分析和存储,然后将文本描述的天气信息量化成天气等级...

    海量数据匹配 数据库

    描述中提到“可以直接运行,海量数据读取、匹配和查询”,这可能指的是一个已经封装好的解决方案或工具,它能够快速处理数据读取、匹配和查询任务,无需用户进行繁琐的底层实现。完成资源的有效控制意味着系统具有...

    六、数据处理.docx

    这可能涉及到数据库连接、数据融合或者数据匹配算法,以解决不同数据源之间的不一致性问题。 5. 数据存储:数据处理过程中,需要合适的存储解决方案来管理大量数据。常见的数据存储技术有关系型数据库(如MySQL)、...

    大数据课程-Hadoop集群程序设计与开发-10.Sqoop数据迁移_lk_edit.pptx

    Sqoop的工作原理是利用MapReduce的并行计算能力,通过JDBC连接器与各种RDBMS进行交互,确保数据迁移的效率和容错性。 #### Sqoop导入原理 在导入数据时,Sqoop首先通过JDBC检查目标数据库表,获取列信息和SQL数据...

Global site tag (gtag.js) - Google Analytics