利用Hadoop API 从数据库中读出数据 简单处理 并写入数据库中
package dbio; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; public class DBIO extends Configured { static String driverClassName = "oracle.jdbc.driver.OracleDriver"; static String url = "jdbc:oracle:thin:@10.10.31.81:1521/oradev"; static String username = "scott"; static String password = "test001"; /** * 用户自定义对象 保存 * * @author Administrator * */ public static class AccessRecord implements WritableComparable<AccessRecord>, DBWritable { int prodid; // 商品编码 int price; // 商品价格 int count; // 商品销售数量 @Override public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, prodid); statement.setInt(2, price); statement.setInt(3, count); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.prodid = resultSet.getInt(1); this.price = resultSet.getInt(2); this.count = resultSet.getInt(3); } /** * Set the prodId and price and count values. */ public void set(int prodid, int price, int count) { this.prodid = prodid; this.price = price; this.count = count; } public int getProdid() { return prodid; } public int getPrice() { return price; } public int getCount() { return count; } @Override // 反序列化,从流中的二进制转换成AccessRecord public void readFields(DataInput in) throws IOException { prodid = in.readInt(); price = in.readInt(); count = in.readInt(); } @Override // 序列化,将AccessRecord 转化成使用流传送的二进制 public void write(DataOutput out) throws IOException { out.writeInt(prodid); out.writeInt(price); out.writeInt(count); } @Override // key的比较 public int compareTo(AccessRecord o) { if (o.count == count) { if (o.count == 0) { return o.prodid - prodid; } return o.price - price; } return o.count - count; } // 新定义类应该重写的两个方法 @Override public int hashCode() { return count + prodid * 3; } @Override public boolean equals(Object right) { if (right == null) return false; if (this == right) return true; if (right instanceof AccessRecord) { AccessRecord r = (AccessRecord) right; return r.prodid == prodid && r.price == price && r.count == count; } else { return false; } } } static class MapDataBaseInHDFS extends Mapper<Object, AccessRecord, LongWritable , AccessRecord> { LongWritable writeKey = new LongWritable(); public void map(Object key, AccessRecord value, Context context) throws IOException, InterruptedException { context.write(writeKey,value ); // 输出到Reduce } } public static class ReduceHDFSInDataBase extends Reducer<LongWritable , AccessRecord,AccessRecord , LongWritable> { LongWritable writKey = new LongWritable(); public void reduce(LongWritable key, Iterable<AccessRecord> values, Context context) throws IOException, InterruptedException { for (AccessRecord writValue : values) { context.write(writValue, writKey); } } } /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, driverClassName, url, username,password); Job job = new Job(conf, "dbio"); job.setJarByClass(DBIO.class); // 设置mapper 和 reduce job.setMapperClass(MapDataBaseInHDFS.class); job.setReducerClass(ReduceHDFSInDataBase.class); // 设置输出类型 // map 输出Key的类型 job.setMapOutputKeyClass(LongWritable.class); // map输出Value的类型 job.setMapOutputValueClass(AccessRecord.class); // rduce输出Key的类型 job.setOutputKeyClass(AccessRecord.class); // rduce输出Value的类型 job.setOutputValueClass(LongWritable.class); // 设置输入输出路径 job.setInputFormatClass(DBInputFormat.class); DBInputFormat.setInput(job, AccessRecord.class, "SELECT PRODID ,PRICE ,COUNT FROM TMP_HADOOP WHERE rownum<100", "SELECT count(PRODID) FROM TMP_HADOOP"); // DBInputFormat.setInput(job, AccessRecord.class, "CUST_CLUB", // "rownum<10", "PRODID", new String[] { "PRODID","PRICE","COUNT"}); job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, "TMP_HADOOP", new String[] { "PRODID","PRICE", "COUNT" }); job.waitForCompletion(true); } }
相关推荐
org.apache.hadoop.io 包定义了通用的 I/O API,用于读写各种数据对象,包括网络、数据库和文件等。org.apache.hadoop.ipc 包提供了网络服务端和客户端的工具,用于封装网络异步 I/O 操作。org.apache.hadoop.mapred...
10. **Sqoop**:用于在Hadoop和传统关系型数据库之间导入导出数据的工具,提供了批处理数据迁移的API。 掌握Hadoop 2.10.0中文版API意味着开发者能够熟练地在Hadoop平台上开发、部署和优化大数据处理应用,从而充分...
Hadoop API中文说明文档是针对Apache Hadoop框架的开发者指南,它详细解释了如何使用Hadoop的编程接口来处理大规模数据。Hadoop是开源的分布式计算框架,它允许在廉价硬件集群上存储和处理海量数据。这个文档对于...
你可以使用HBase的Java API或Hadoop的`TableOutputFormat`来写入数据。 3. **JDBC连接器**: 对于支持JDBC的任何关系型数据库,如MySQL、PostgreSQL等,可以通过Hadoop的`DBOutputFormat`将结果写入数据库。你需要...
Hadoop 0.20.2 API文档是开发者在使用开源分布式架构Hadoop时的重要参考资料。这个版本的API文档详细地介绍了如何利用Java语言来与Hadoop生态系统进行交互,为开发人员提供了丰富的功能和工具,以实现大规模数据处理...
基于springboot + Hadoop + Hive 的健身馆可视化分析平台源码+数据库 整合组件: HDFS MapReduce Hive Hadoop ###性别认为锻炼的重要性占比 饼图 ...
通过学习,学员将能够独立完成Hadoop的安装、配置与管理,掌握在Hadoop、操作系统以及关系型数据库之间传递数据的技能,制定有效数据集成方案,并熟练向Hadoop提交作业以及监控作业运行状态。 【Hadoop API开发】 ...
标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...
对于写入数据库,通常在Reducer类的reduce()方法或cleanup()方法中进行,将处理后的数据转换为适合数据库存储的格式,然后通过JDBC API执行插入、更新或删除等操作。需要注意的是,由于MapReduce作业可能涉及大量的...
《HBase 0.98.1-hadoop2 API》是关于HBase数据库的一个特定版本的API参考文档,主要用于帮助开发者理解和使用这个基于Hadoop的数据存储系统。HBase是Apache软件基金会开发的一个开源NoSQL数据库,它构建在Hadoop...
携程集中式日志系统展示了传统数据库与大数据技术结合的实践,如Hbase和Hadoop的使用,体现了企业应对大数据挑战的策略和技术演进。 总体而言,携程的集中式日志系统不仅在技术层面体现了对日志管理和分析的创新,...
HBase是一种基于Hadoop的非关系型数据库,提供高可靠性、高性能、面向列的数据存储服务;而ZooKeeper则是协调分布式应用程序的服务,提供了一套简单的API来实现诸如命名服务、配置维护和组服务等功能。 #### 二、...
4. **Hadoop API**:学习使用Hadoop API进行数据读写和处理,例如FileSystem API用于文件操作,InputFormat和OutputFormat定义输入输出格式,Mapper和Reducer实现数据处理逻辑。 5. **MapReduce编程**:理解...
安装并配置好winutils后,Windows用户可以通过Hadoop的HDFS API访问HDFS,进行数据读写操作。 在大数据处理中,Hadoop的核心组件包括: 1. HDFS(Hadoop Distributed File System):分布式文件系统,能够将大规模...
HBase是一个基于Hadoop的分布式NoSQL数据库,专为大规模随机读写操作设计。HBase API提供了一套用于操作表格、行、列族和时间戳的接口,使开发人员能够轻松地存储和检索大量结构化和半结构化数据。HBase与Hadoop的...
Hadoop 2.2还支持Hadoop生态中的其他组件,如Hive(数据仓库工具)、Pig(数据流处理语言)、HBase(分布式数据库)等,提供了一站式的数据分析解决方案。 10. **开发实践**: 开发者在使用Hadoop 2.2 API时,...
Hadoop 2.2.0 不只是MapReduce和HDFS,还包括一系列生态系统项目,如HBase(分布式NoSQL数据库)、Hive(数据仓库工具)、Pig(数据流处理语言)、Oozie(工作流调度系统)和Zookeeper(分布式协调服务)。...