`
臻是二哥
  • 浏览: 189698 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
博客专栏
Group-logo
Java技术分享
浏览量:0
社区版块
存档分类
最新评论

Hadoop与关系数据库

 
阅读更多

Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;

public class SDBConnInput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{
		Text result= new Text();
		 public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{
			result.set(value.toString());
			collector.collect(key, result);
		}
	}
	public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{
	    public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{
	    	 String str="";
	    	 while(values.hasNext()){
	    		  str+=values.next().toString();
	    	 }
	    	 output.collect(null, new Text(str));	
	  }
    }
	public static void main(String [] args) throws Exception{
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		JobConf job = new JobConf();
		job.setJarByClass(SDBConnInput.class);
	    job.setOutputKeyClass(LongWritable.class);
	    job.setOutputValueClass(Text.class);
	    job.setInputFormat(DBInputFormat.class);
	    FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out"));
	    DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
	    String fieldNames []={"customerID","customerName","phoneNumber"};
	    DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames);
	    job.setMapperClass(MapperClass.class);
	    job.setReducerClass(ReducerClass.class);
	    JobClient.runJob(job);
	}
}

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.*;

public class SDBConnOutput {
	public static class CustomerRecord implements Writable,DBWritable{
		String customerID;
		String customerName;
		String phoneNumber;
		public void readFields(ResultSet resultSet)  throws SQLException{
			customerID=resultSet.getString(1);
			customerName=resultSet.getString(2);
			phoneNumber=resultSet.getString(3);
			}
		public void write(PreparedStatement statement)  throws SQLException{
			statement.setString(1, customerID);
			statement.setString(2, customerName);
			statement.setString(3,phoneNumber);
		}
			 
		 public void readFields(DataInput in) throws IOException{
			 customerID=in.readUTF();
			 customerName=in.readUTF();
			 phoneNumber=in.readUTF();
		 }
		 public void write(DataOutput out) throws IOException{
			 out.writeUTF(customerID);
			 out.writeUTF(customerName);
			 out.writeUTF(phoneNumber);
		 }
		 public void setCustomerID(String customerID){
			 this.customerID=customerID;
		 }
		 public void setCustomerName(String customerName){
			 this.customerName=customerName;			 
		 }
		 public void setPhoneNumber(String phoneNumber){
			 this.phoneNumber=phoneNumber;
		 }
		 public String toString(){
			 return this.customerID+","+this.customerName+","+this.phoneNumber; 
		 }
	}
	public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{
		CustomerRecord customer=new CustomerRecord();
		 public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter)  throws IOException{
			 String [] strs=value.toString().split(",");
			customer.setCustomerID(strs[0]);
			customer.setCustomerName(strs[1]);
			customer.setPhoneNumber(strs[2]);
			collector.collect( customer,value);
		}
		
	}
	/**
	*将HDFS中的文件输出到数据库
	*/
	public static void main(String [] args) throws Exception{
	
		
		/**
		 * 从关系数据库读取数据到HDFS
		 */
		
		JobConf job = new JobConf(SDBConnInput.class);
		//DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中
	    job.setOutputFormat(DBOutputFormat.class);
	    FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt"));
	    DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
				"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
	    String fieldNames []={"customerID","customerName","phoneNumber"};
	    DBOutputFormat.setOutput(job, "customers", fieldNames);
	    job.setMapperClass(MapperClass.class);
	    job.setNumReduceTasks(0);
	    JobClient.runJob(job);
	}
}

注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。
3
2
分享到:
评论

相关推荐

    Hadoop与 Oracle 数据库集成.pdf

    为了更好地整合传统的关系型数据库与新兴的大数据平台,Hadoop与Oracle数据库之间的集成变得尤为重要。本文将详细介绍Hadoop与Oracle数据库集成的相关知识点,包括Hadoop与Oracle之间的几种主要集成方式及其应用场景...

    hadoop与mysql数据库相连读出数据.pdf

    DBInputFormat是一个用于读取关系数据库中的数据的InputFormat。我们需要在JobConf中设置DBInputFormat,并指定数据库的连接信息和要读取的表名和字段。 3. 在Eclipse中编译运行Hadoop程序。 在本文中,我们需要在...

    Sqoop的安装与配置指南:从Hadoop到关系数据库的数据传输桥梁

    Sqoop的安装与配置指南:从Hadoop到关系数据库的数据传输桥梁

    基于Hadoop的大规模数据交换的研究

    6. 第三方数据交换工具:随着Hadoop的流行,市场上出现了多种第三方数据交换工具,这些工具在连接Hadoop与关系数据库方面提供了额外的支持和解决方案。 7. 数据交换工具的差异与不足:这些第三方数据交换工具各有...

    基于Hadoop云的数据库营销海量数据处理与挖掘的研究.docx

    Hadoop不仅仅是一种文件存储系统,还可以与传统的关系型数据库进行结合,实现数据的导入导出、实时查询等功能。例如,通过Hive(一个建立在Hadoop之上的数据仓库工具)可以方便地执行SQL查询操作,而HBase则提供了一...

    分布式数据库Hive笔记_HDFS_Hadoop_分布式数据库

    分布式数据库Hive是大数据处理领域中的重要工具,它与Hadoop生态系统紧密相连,主要用于实现对大规模数据集的存储和查询。Hive构建在Hadoop的HDFS(分布式文件系统)之上,利用MapReduce进行分布式计算,同时引入了...

    基于Hadoop的分布式数据库系统.pdf

    综上所述,Hadoop及其相关技术为处理海量数据提供了有效的解决方案,而基于Hadoop的分布式数据库系统模型能够满足云计算环境下对于大规模数据处理和分析的需求。这不仅对云计算技术的发展具有重要的推动作用,同时也...

    基于Hadoop 及关系型数据库的海量数据分析研究

    Hadoop 可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理 ...本文对Hadoop 和关系型数据库进行了比较分析,讨论了将二者结 合构建海量数据分析系统的可行性,同时给出了实际的应用场景

    实验四:NoSQL和关系数据库的操作比较

    【实验四:NoSQL和关系数据库的操作比较】 本实验旨在对比分析四种不同的数据库管理系统:MySQL、HBase、Redis和MongoDB。这些数据库在处理大数据时各有特点,理解它们的概念及不同点是实验的关键。 1. **MySQL**...

    Hadoop集群之—MySQL关系数据库_V1.0

    需要注意的是,尽管Hadoop与MySQL的整合可以提供灵活的数据处理方式,但是在选择使用关系数据库与Hadoop结合时,还需要考虑两者在设计理念上的不同。Hadoop通常用于大规模的数据批处理,而MySQL则更适合事务性强、...

    大数据概述包括: 大数据绪论,Hadoop简介,数据库技术历史和发展,分布式计算架构

    E.F.Codd是这一阶段的代表人物,他发表《大型共享数据库数据的关系模型》论文,为关系数据库技术奠定了理论基础。 第三代数据库系统:新型数据库,代表系统有流数据库Auraro、列存储数据仓库C-Store、高性能OLTP...

    Hadoop权威指南第三版(英文版)

    - 比较了Hadoop与关系数据库管理系统(RDBMS)、网格计算和志愿计算等其他数据处理技术的区别。 - **Hadoop的历史** - 简述了Hadoop的发展历程。 - **Hadoop生态系统** - 讲解了Hadoop生态系统的组成及其相互之间...

    2013年中国数据库大会-07-基于Hadoop的携程集中式日志及其周边生态系统介绍

    携程集中式日志系统展示了传统数据库与大数据技术结合的实践,如Hbase和Hadoop的使用,体现了企业应对大数据挑战的策略和技术演进。 总体而言,携程的集中式日志系统不仅在技术层面体现了对日志管理和分析的创新,...

    基于Hadoop的云计算试验平台搭建研究.docx

    - **数据导出工具**:如Sqoop用于实现Hadoop与关系数据库之间的数据交换。 #### 五、搭建步骤 1. **环境准备**: - 选择合适的服务器作为集群节点。 - 安装必要的软件包,包括Linux操作系统、JDK等。 2. **...

    hadoop与mysql数据库的那点事(1)

    将 Hadoop 与 MySQL 集成的关键在于使用 Hadoop 的 DBInputFormat 和 DBOutputFormat 类,它们允许你定义如何读取和写入数据库记录。在给定的代码中,我们看到有两个自定义类 `StudentRecord` 和 `TeacherRecord` ...

    基于Hadoop及关系型数据库的海量数据分析研究

    Hadoop可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理...本文对Hadoop和关系型数据库进行了比较分析,讨论了将二者结合构建海量数据分析系统的可行性,同时给出了实际的应用场景。

    sqoop的安装与配置

    sqoop的安装与配置 第1章:什么是 Sqoop? Sqoop 是一种用于在 Hadoop 与关系型数据库...1.数据导入(Import):从关系数据库到 Hadoop。 2.数据导出(Export):从 Hadoop 到关系数据库。 第2章:安装 Sqoop 的步骤

    山东大学非关系数据库实验报告

    非关系型数据库(NoSQL,Not Only SQL)是近年来在大数据处理、分布式系统等领域广泛应用的一种数据库类型,它与传统的关系型数据库(RDBMS)相比,具有更高的可扩展性、灵活性和性能。本实验报告主要关注两个知名的...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf

    【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf】 这份学习资料详细介绍了Hadoop集群和MySQL关系数据库的相关知识,旨在帮助读者深入理解大数据处理和云计算的实践...

Global site tag (gtag.js) - Google Analytics