- 浏览: 189681 次
- 性别:
- 来自: 杭州
博客专栏
-
Java技术分享
浏览量:0
文章分类
最新评论
-
masuweng:
学习了,学习了
mybatis是如何防止SQL注入的 -
somefuture:
终于知道了#$的区别
mybatis是如何防止SQL注入的 -
masuweng:
...
tomct处理请求的流程 -
zhp8341:
masuweng 写道寻求cas的更多例子, http://w ...
JUC之CAS -
臻是二哥:
java.util.concurrent包中到处都使用了CAS ...
JUC之CAS
Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。
import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.lib.db.*; import java.sql.*; import java.io.*; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.Path; public class SDBConnInput { public static class CustomerRecord implements Writable,DBWritable{ String customerID; String customerName; String phoneNumber; public void readFields(ResultSet resultSet) throws SQLException{ customerID=resultSet.getString(1); customerName=resultSet.getString(2); phoneNumber=resultSet.getString(3); } public void write(PreparedStatement statement) throws SQLException{ statement.setString(1, customerID); statement.setString(2, customerName); statement.setString(3,phoneNumber); } public void readFields(DataInput in) throws IOException{ customerID=in.readUTF(); customerName=in.readUTF(); phoneNumber=in.readUTF(); } public void write(DataOutput out) throws IOException{ out.writeUTF(customerID); out.writeUTF(customerName); out.writeUTF(phoneNumber); } public void setCustomerID(String customerID){ this.customerID=customerID; } public void setCustomerName(String customerName){ this.customerName=customerName; } public void setPhoneNumber(String phoneNumber){ this.phoneNumber=phoneNumber; } public String toString(){ return this.customerID+","+this.customerName+","+this.phoneNumber; } } public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{ Text result= new Text(); public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{ result.set(value.toString()); collector.collect(key, result); } } public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{ public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{ String str=""; while(values.hasNext()){ str+=values.next().toString(); } output.collect(null, new Text(str)); } } public static void main(String [] args) throws Exception{ /** * 从关系数据库读取数据到HDFS */ JobConf job = new JobConf(); job.setJarByClass(SDBConnInput.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out")); DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.0.25:3306/hadoop","root","1117"); String fieldNames []={"customerID","customerName","phoneNumber"}; DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames); job.setMapperClass(MapperClass.class); job.setReducerClass(ReducerClass.class); JobClient.runJob(job); } }
import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.lib.db.*; import java.sql.*; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.filecache.*; public class SDBConnOutput { public static class CustomerRecord implements Writable,DBWritable{ String customerID; String customerName; String phoneNumber; public void readFields(ResultSet resultSet) throws SQLException{ customerID=resultSet.getString(1); customerName=resultSet.getString(2); phoneNumber=resultSet.getString(3); } public void write(PreparedStatement statement) throws SQLException{ statement.setString(1, customerID); statement.setString(2, customerName); statement.setString(3,phoneNumber); } public void readFields(DataInput in) throws IOException{ customerID=in.readUTF(); customerName=in.readUTF(); phoneNumber=in.readUTF(); } public void write(DataOutput out) throws IOException{ out.writeUTF(customerID); out.writeUTF(customerName); out.writeUTF(phoneNumber); } public void setCustomerID(String customerID){ this.customerID=customerID; } public void setCustomerName(String customerName){ this.customerName=customerName; } public void setPhoneNumber(String phoneNumber){ this.phoneNumber=phoneNumber; } public String toString(){ return this.customerID+","+this.customerName+","+this.phoneNumber; } } public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{ CustomerRecord customer=new CustomerRecord(); public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter) throws IOException{ String [] strs=value.toString().split(","); customer.setCustomerID(strs[0]); customer.setCustomerName(strs[1]); customer.setPhoneNumber(strs[2]); collector.collect( customer,value); } } /** *将HDFS中的文件输出到数据库 */ public static void main(String [] args) throws Exception{ /** * 从关系数据库读取数据到HDFS */ JobConf job = new JobConf(SDBConnInput.class); //DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中 job.setOutputFormat(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt")); DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.0.25:3306/hadoop","root","1117"); String fieldNames []={"customerID","customerName","phoneNumber"}; DBOutputFormat.setOutput(job, "customers", fieldNames); job.setMapperClass(MapperClass.class); job.setNumReduceTasks(0); JobClient.runJob(job); } }
注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。
- mysql-connector-java-5.1.22-bin.jar (813.4 KB)
- 下载次数: 3
发表评论
-
Hive安装
2014-11-21 20:43 1184Hive安装 hive是基于Hadoop的一个数据仓库工具 ... -
Pig安装
2014-11-13 16:20 1151Pig有两种使用模式:本地模式和MapReduce模式。 ... -
通过全局文件复制实现多数据源的Map端连接
2014-11-10 17:45 1356在DataJoin实现多数据源reduce端连接的过程中,连接 ... -
用DataJoin实现多数据源的Reduce端链接
2014-11-08 16:51 1372DataJoin是Hadoop处理多数据源问题的一个jar包, ... -
hadoop之用户定制
2014-11-04 09:30 1780Hadoop提供了9中内置数据类型,分别为: BooleanW ... -
使用复合键优化倒排索引
2014-11-03 11:18 1672巧用复合键优化倒排索引程序 之前写了一个倒排索引的程序,但 ... -
倒排索引
2014-10-31 11:49 1543倒排索引是文档检索系统中最常见的数据结构,被广泛的应用于搜索 ... -
Reducer多少个最佳
2014-10-29 20:29 911从MapReduce框架的执行流程,我们知道,输入文件会被分 ... -
从WordCount看MapReduce框架执行流程
2014-10-29 16:51 3593代码如下: import java.io.IOExcept ... -
第一个hadoop程序-WordCount
2014-10-28 20:46 1993首先说明一下环境:我在前面的博客中搭建的hadoop平台,具 ... -
HDFS可靠性措施
2014-10-27 08:21 2940HDFS可靠性措施 一、 ... -
Win7上的Eclipse3.3远程连接ubuntu14.04中的hadoop0.20.2
2014-10-24 19:15 1070Win7上的Eclipse3.3远程连 ... -
ubuntu14.04的hadoop环境搭建(全分布模式)
2014-10-20 10:53 1639hadoop0.20.2软件下载http://pan.ba ... -
ubuntu实现无密码登陆
2014-10-19 10:43 3517环境说明: 打在ubuntu系统的两台计算机mas ... -
Ubuntu14.04安装jdk1.7.0_71
2014-10-17 19:41 2811Ubuntu14.04安装jdk1.7.0_71 将位于~ ... -
ubuntu14.04设置静态IP
2014-09-19 21:48 6524最近在研究集群,于是弄了几台破电脑装ubuntu,结果ubun ...
相关推荐
为了更好地整合传统的关系型数据库与新兴的大数据平台,Hadoop与Oracle数据库之间的集成变得尤为重要。本文将详细介绍Hadoop与Oracle数据库集成的相关知识点,包括Hadoop与Oracle之间的几种主要集成方式及其应用场景...
DBInputFormat是一个用于读取关系数据库中的数据的InputFormat。我们需要在JobConf中设置DBInputFormat,并指定数据库的连接信息和要读取的表名和字段。 3. 在Eclipse中编译运行Hadoop程序。 在本文中,我们需要在...
Sqoop的安装与配置指南:从Hadoop到关系数据库的数据传输桥梁
6. 第三方数据交换工具:随着Hadoop的流行,市场上出现了多种第三方数据交换工具,这些工具在连接Hadoop与关系数据库方面提供了额外的支持和解决方案。 7. 数据交换工具的差异与不足:这些第三方数据交换工具各有...
Hadoop不仅仅是一种文件存储系统,还可以与传统的关系型数据库进行结合,实现数据的导入导出、实时查询等功能。例如,通过Hive(一个建立在Hadoop之上的数据仓库工具)可以方便地执行SQL查询操作,而HBase则提供了一...
分布式数据库Hive是大数据处理领域中的重要工具,它与Hadoop生态系统紧密相连,主要用于实现对大规模数据集的存储和查询。Hive构建在Hadoop的HDFS(分布式文件系统)之上,利用MapReduce进行分布式计算,同时引入了...
综上所述,Hadoop及其相关技术为处理海量数据提供了有效的解决方案,而基于Hadoop的分布式数据库系统模型能够满足云计算环境下对于大规模数据处理和分析的需求。这不仅对云计算技术的发展具有重要的推动作用,同时也...
Hadoop 可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理 ...本文对Hadoop 和关系型数据库进行了比较分析,讨论了将二者结 合构建海量数据分析系统的可行性,同时给出了实际的应用场景
【实验四:NoSQL和关系数据库的操作比较】 本实验旨在对比分析四种不同的数据库管理系统:MySQL、HBase、Redis和MongoDB。这些数据库在处理大数据时各有特点,理解它们的概念及不同点是实验的关键。 1. **MySQL**...
需要注意的是,尽管Hadoop与MySQL的整合可以提供灵活的数据处理方式,但是在选择使用关系数据库与Hadoop结合时,还需要考虑两者在设计理念上的不同。Hadoop通常用于大规模的数据批处理,而MySQL则更适合事务性强、...
E.F.Codd是这一阶段的代表人物,他发表《大型共享数据库数据的关系模型》论文,为关系数据库技术奠定了理论基础。 第三代数据库系统:新型数据库,代表系统有流数据库Auraro、列存储数据仓库C-Store、高性能OLTP...
- 比较了Hadoop与关系数据库管理系统(RDBMS)、网格计算和志愿计算等其他数据处理技术的区别。 - **Hadoop的历史** - 简述了Hadoop的发展历程。 - **Hadoop生态系统** - 讲解了Hadoop生态系统的组成及其相互之间...
携程集中式日志系统展示了传统数据库与大数据技术结合的实践,如Hbase和Hadoop的使用,体现了企业应对大数据挑战的策略和技术演进。 总体而言,携程的集中式日志系统不仅在技术层面体现了对日志管理和分析的创新,...
- **数据导出工具**:如Sqoop用于实现Hadoop与关系数据库之间的数据交换。 #### 五、搭建步骤 1. **环境准备**: - 选择合适的服务器作为集群节点。 - 安装必要的软件包,包括Linux操作系统、JDK等。 2. **...
将 Hadoop 与 MySQL 集成的关键在于使用 Hadoop 的 DBInputFormat 和 DBOutputFormat 类,它们允许你定义如何读取和写入数据库记录。在给定的代码中,我们看到有两个自定义类 `StudentRecord` 和 `TeacherRecord` ...
Hadoop可以在大量廉价的硬件设备组成的集群上运行应用程序,全面地将计算推向数据,在处理...本文对Hadoop和关系型数据库进行了比较分析,讨论了将二者结合构建海量数据分析系统的可行性,同时给出了实际的应用场景。
sqoop的安装与配置 第1章:什么是 Sqoop? Sqoop 是一种用于在 Hadoop 与关系型数据库...1.数据导入(Import):从关系数据库到 Hadoop。 2.数据导出(Export):从 Hadoop 到关系数据库。 第2章:安装 Sqoop 的步骤
非关系型数据库(NoSQL,Not Only SQL)是近年来在大数据处理、分布式系统等领域广泛应用的一种数据库类型,它与传统的关系型数据库(RDBMS)相比,具有更高的可扩展性、灵活性和性能。本实验报告主要关注两个知名的...
【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第10期_MySQL关系数据库 共47页.pdf】 这份学习资料详细介绍了Hadoop集群和MySQL关系数据库的相关知识,旨在帮助读者深入理解大数据处理和云计算的实践...