import java.io.File;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import org.apache.hadoop.examples.EJob;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
public class ReadDB {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, StudentRecord, LongWritable, Text> {
// map
public void map(LongWritable key, StudentRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}
//reducer
public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> value,
OutputCollector<LongWritable,Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
while (value.hasNext()){
collector.collect(key,value.next());
}
}
}
public static class StudentRecord implements Writable, DBWritable {
public int id;
public String name;
public String sex;
public int age;
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
this.sex = Text.readString(in);
this.age = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
Text.writeString(out, this.sex);
out.writeInt(this.age);
}
@Override
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
this.sex = result.getString(3);
this.age = result.getInt(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setString(3, this.sex);
stmt.setInt(4, this.age);
}
@Override
public String toString() {
return new String("学号" + this.id + "_姓名:" + this.name
+ "_性别:"+ this.sex + "_年龄:" + this.age);
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(ReadDB.class);
//设置地址
conf.set("fs.default.name", "hdfs://192.168.71.128:9000");
conf.set("mapred.job.tracker", "192.168.71.128:9001");
conf.set("dfs.permissions","false");
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/hadoop/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
conf.setJar(jarFile.toString());
DistributedCache.addFileToClassPath(new Path(
"/usr/hadoop/lib/mysql-connector-java-5.1.18-bin.jar"), conf);
Class.forName("com.mysql.jdbc.Driver");
// 设置map和reduce类
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
// 设置数据库
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.71.128:3306/song", "root", "mysql");
// 设置表字段
String[] fields = { "id", "name", "sex", "age" };
DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);
// 设置输入类型
conf.setInputFormat(DBInputFormat.class);
//conf.setMapOutputKeyClass(Text.class);
//conf.setMapOutputValueClass(LongWritable.class);
// 设置输出类型
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
// 输出路径
FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));
JobClient.runJob(conf);
}
}
//需要把mysql驱动包添加到工程中
分享到:
相关推荐
接下来,我们将讨论如何通过MapReduce与数据库交互,尤其是MySQL数据库。在大数据场景下,有时需要将MapReduce处理的结果存储到关系型数据库中,或者从数据库中读取数据进行处理。Hadoop提供了JDBC(Java Database ...
6. **JDBC(Java Database Connectivity)**:为了将MapReduce处理后的数据写入MySQL,我们需要使用JDBC驱动程序,它是Java连接数据库的标准API。通过建立数据库连接,执行SQL语句,可以将数据插入或更新到MySQL表中...
然而,如果日志数据存储在MySQL数据库中,我们需要先通过MapReduce的InputFormat接口从数据库中读取数据。这可能涉及到使用Hadoop的JDBC InputFormat,允许MapReduce直接查询数据库。在Map阶段,我们可以直接处理...
通过MySQL Connector/J,Hive可以与MySQL数据库交互,将Hadoop集群处理的结果写入或读取MySQL,实现离线数据处理与实时查询的结合。 在"mysql-connector-java-8.0.28"目录下,用户通常会找到如下关键文件: 1. `...
当需要从数据库中输入数据时,MapReduce作业通常会包括一个Mapper类,它通过JDBC或其他连接方式读取MySQL数据库中的数据,并将其转换为适合于MapReduce处理的键值对形式。而当需要将数据输出到MySQL数据库时,...
标题中的“apriori java 数据库读取”意味着我们将讨论如何使用Java编程语言来实现Apriori算法,并从MySQL数据库中读取数据。在Java中实现Apriori,通常会涉及到以下关键步骤: 1. 数据库连接:首先,我们需要使用...
在这个实例中,我们看到MapReduce被用来从Hbase数据库中提取海量数据,对其进行处理,然后将统计结果存储到MySQL数据库中。这个过程涉及到大数据处理的核心技术,下面我们将深入探讨这些知识点。 首先,**Hbase** ...
3. **Hive与MySQL交互**:虽然Hive不直接支持MySQL作为元数据存储,但通过配置,可以在Hive Metastore中使用MySQL数据库存储表元数据,如表名、字段名、分区信息等。这有助于在多用户环境中提高性能和并发性。 4. *...
1. **数据导入**:Sqoop 可以将结构化的数据从关系型数据库管理系统(RDBMS)如 MySQL、Oracle 等导入到 HDFS,然后可以进一步使用 MapReduce 或 Hive 进行分析和处理。 2. **数据导出**:反之,Sqoop 也可以将 HDFS...
- 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...
电影推荐网站是一个基于Hadoop生态系统的大数据项目,它利用了HBase和MySQL数据库,并通过协同过滤算法为用户提供个性化的电影推荐。在这个项目中,我们主要关注以下几个关键知识点: 1. **Hadoop生态**:Hadoop是...
- **添加组件**:为了将 MySQL 数据导入到 HDFS,需要添加 `tMySQLInput` 组件来读取 MySQL 数据库中的数据,以及 `tHDFSOutput` 组件来将数据写入 HDFS。 - **配置组件**: - 在 `tMySQLInput` 组件中,需要指定要...
在处理海量数据时,关系型数据库如MySQL、PostgreSQL,或者非关系型数据库如MongoDB、HBase,都会被广泛使用。这些数据库系统通常具备强大的索引机制和查询语言,如SQL,来支持高效的查询操作。 接着,我们来看看...
本程序的标题是"读写数据库数据的mr程序",这意味着我们将探讨如何利用MapReduce来从MySQL数据库读取数据,并将这些数据存储到HBASE这种分布式数据库中。这个过程通常被称为ETL(Extract, Transform, Load)操作,是...
关系型数据库(如MySQL、Oracle、SQL Server等)与非关系型数据库(如Hbase)在数据存储和处理上有着显著的区别。关系型数据库遵循ACID(原子性、一致性、隔离性和持久性)原则,适合结构化数据的存储,而Hbase作为...
标题中的“hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序”指的是一项数据处理任务,利用Hadoop的MapReduce框架,将关系型数据库(如Oracle和MySQL)中的数据高效地迁移至分布式存储系统HDFS(Hadoop ...
在这个版本中,开发人员可以利用Java语言方便地与MySQL数据库进行交互,实现数据的存取、查询、更新和删除等操作。MySQL Connector/J遵循JDBC API标准,使得它能够在各种Java环境中无缝运行,包括Java SE(Standard ...
这段代码首先连接到MySQL数据库,读取`users`表的所有数据,然后逐条插入到HBase的`user_data`表中。注意,由于HBase不支持日期类型,我们通常将日期转换为长整型(以毫秒为单位)进行存储。 总结,通过上述步骤,...
- **关系数据库**: MySQL、SQL Server 和 Oracle 均属于关系数据库,它们采用 SQL 作为标准查询语言,支持 ACID 特性(原子性、一致性、隔离性和持久性)。 - **非关系数据库**: 指的是不采用表格形式来组织数据的...
同时,这两个类也实现了 `readFields(ResultSet)` 和 `write(PreparedStatement)` 方法,分别用于从数据库结果集读取数据和向数据库准备语句写入数据。这样,Hadoop 可以通过这些类直接与 MySQL 进行交互,无需额外...