`

hadoop API 读/写数据库

 
阅读更多

利用Hadoop API 从数据库中读出数据 简单处理 并写入数据库中

package dbio;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBIO extends Configured {
	static String driverClassName = "oracle.jdbc.driver.OracleDriver";
	static String url = "jdbc:oracle:thin:@10.10.31.81:1521/oradev"; 
	static String username = "scott";
	static String password = "test001";

	/**
	 * 用户自定义对象 保存
	 * 
	 * @author Administrator
	 * 
	 */
	public static class AccessRecord implements
			WritableComparable<AccessRecord>, DBWritable {
		int prodid; // 商品编码
		int price; // 商品价格
		int count; // 商品销售数量

		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, prodid);
			statement.setInt(2, price);
			statement.setInt(3, count);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.prodid = resultSet.getInt(1);
			this.price = resultSet.getInt(2);
			this.count = resultSet.getInt(3);
		}

		/**
		 * Set the prodId and price and count values.
		 */
		public void set(int prodid, int price, int count) {
			this.prodid = prodid;
			this.price = price;
			this.count = count;
		}

		public int getProdid() {
			return prodid;
		}

		public int getPrice() {
			return price;
		}

		public int getCount() {
			return count;
		}

		@Override
		// 反序列化,从流中的二进制转换成AccessRecord 
		public void readFields(DataInput in) throws IOException {
			prodid = in.readInt();
			price = in.readInt();
			count = in.readInt();
		}

		@Override
		// 序列化,将AccessRecord 转化成使用流传送的二进制
		public void write(DataOutput out) throws IOException {
			out.writeInt(prodid);
			out.writeInt(price);
			out.writeInt(count);
		}

		@Override
		// key的比较
		public int compareTo(AccessRecord o) {
			if (o.count == count) {
				if (o.count == 0) {
					return o.prodid - prodid;
				}
				return o.price - price;
			}
			return o.count - count;
		}

		// 新定义类应该重写的两个方法
		@Override
		public int hashCode() {
			return count + prodid * 3;
		}

		@Override
		public boolean equals(Object right) {
			if (right == null)
				return false;
			if (this == right)
				return true;
			if (right instanceof AccessRecord) {
				AccessRecord r = (AccessRecord) right;
				return r.prodid == prodid && r.price == price
						&& r.count == count;
			} else {
				return false;
			}
		}
	}

	static class MapDataBaseInHDFS extends
			Mapper<Object, AccessRecord, LongWritable , AccessRecord> {
		
		LongWritable writeKey = new LongWritable();
		
		public void map(Object key, AccessRecord value, Context context)
				throws IOException, InterruptedException {
			context.write(writeKey,value ); // 输出到Reduce
		}
	}

	public static class ReduceHDFSInDataBase extends
			Reducer<LongWritable , AccessRecord,AccessRecord , LongWritable> {
		
		LongWritable writKey = new LongWritable();

		public void reduce(LongWritable key, Iterable<AccessRecord> values,
				Context context) throws IOException, InterruptedException {
			for (AccessRecord writValue : values) {
				context.write(writValue, writKey);
			}
		}
	}

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		DBConfiguration.configureDB(conf, driverClassName, url, username,password);
		
		Job job = new Job(conf, "dbio");
		job.setJarByClass(DBIO.class);
		
		// 设置mapper 和 reduce
		job.setMapperClass(MapDataBaseInHDFS.class);
		job.setReducerClass(ReduceHDFSInDataBase.class);
		
		// 设置输出类型 
		// map 输出Key的类型
		job.setMapOutputKeyClass(LongWritable.class);
		// map输出Value的类型
		job.setMapOutputValueClass(AccessRecord.class);
		
		// rduce输出Key的类型
		job.setOutputKeyClass(AccessRecord.class);
		// rduce输出Value的类型
		job.setOutputValueClass(LongWritable.class);
		
		// 设置输入输出路径
		job.setInputFormatClass(DBInputFormat.class);
		DBInputFormat.setInput(job, AccessRecord.class,
				"SELECT PRODID ,PRICE ,COUNT FROM TMP_HADOOP WHERE rownum<100",
				"SELECT count(PRODID) FROM TMP_HADOOP");
		// DBInputFormat.setInput(job, AccessRecord.class, "CUST_CLUB",
		// "rownum<10", "PRODID", new String[] { "PRODID","PRICE","COUNT"});

		job.setOutputFormatClass(DBOutputFormat.class);
		DBOutputFormat.setOutput(job, "TMP_HADOOP", new String[] { "PRODID","PRICE", "COUNT" });

		job.waitForCompletion(true);
	}
}

 

分享到:
评论

相关推荐

    HadoopAPI使用

    org.apache.hadoop.io 包定义了通用的 I/O API,用于读写各种数据对象,包括网络、数据库和文件等。org.apache.hadoop.ipc 包提供了网络服务端和客户端的工具,用于封装网络异步 I/O 操作。org.apache.hadoop.mapred...

    Hadoop 2.10.0中文版API

    10. **Sqoop**:用于在Hadoop和传统关系型数据库之间导入导出数据的工具,提供了批处理数据迁移的API。 掌握Hadoop 2.10.0中文版API意味着开发者能够熟练地在Hadoop平台上开发、部署和优化大数据处理应用,从而充分...

    hadoop-api中文说明文档

    Hadoop API中文说明文档是针对Apache Hadoop框架的开发者指南,它详细解释了如何使用Hadoop的编程接口来处理大规模数据。Hadoop是开源的分布式计算框架,它允许在廉价硬件集群上存储和处理海量数据。这个文档对于...

    hadoop 二次排序 插入数据库

    你可以使用HBase的Java API或Hadoop的`TableOutputFormat`来写入数据。 3. **JDBC连接器**: 对于支持JDBC的任何关系型数据库,如MySQL、PostgreSQL等,可以通过Hadoop的`DBOutputFormat`将结果写入数据库。你需要...

    Hadoop 0.20.2 API文档

    Hadoop 0.20.2 API文档是开发者在使用开源分布式架构Hadoop时的重要参考资料。这个版本的API文档详细地介绍了如何利用Java语言来与Hadoop生态系统进行交互,为开发人员提供了丰富的功能和工具,以实现大规模数据处理...

    基于springboot + Hadoop + Hive 的健身馆可视化分析平台源码+数据库

    基于springboot + Hadoop + Hive 的健身馆可视化分析平台源码+数据库 整合组件: HDFS MapReduce Hive Hadoop ###性别认为锻炼的重要性占比 饼图 ...

    完整版大数据云计算课程 Hadoop数据分析平台系列课程 Hadoop 05 Hadoop API开发 共32页.pptx

    通过学习,学员将能够独立完成Hadoop的安装、配置与管理,掌握在Hadoop、操作系统以及关系型数据库之间传递数据的技能,制定有效数据集成方案,并熟练向Hadoop提交作业以及监控作业运行状态。 【Hadoop API开发】 ...

    Hadoop+HBase+Java API

    标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    对于写入数据库,通常在Reducer类的reduce()方法或cleanup()方法中进行,将处理后的数据转换为适合数据库存储的格式,然后通过JDBC API执行插入、更新或删除等操作。需要注意的是,由于MapReduce作业可能涉及大量的...

    HBase 0.98.1-hadoop2 API

    《HBase 0.98.1-hadoop2 API》是关于HBase数据库的一个特定版本的API参考文档,主要用于帮助开发者理解和使用这个基于Hadoop的数据存储系统。HBase是Apache软件基金会开发的一个开源NoSQL数据库,它构建在Hadoop...

    2013年中国数据库大会-07-基于Hadoop的携程集中式日志及其周边生态系统介绍

    携程集中式日志系统展示了传统数据库与大数据技术结合的实践,如Hbase和Hadoop的使用,体现了企业应对大数据挑战的策略和技术演进。 总体而言,携程的集中式日志系统不仅在技术层面体现了对日志管理和分析的创新,...

    CentOS下Hadoop+Hbase+ZooKeeper分布式存储部署详解

    HBase是一种基于Hadoop的非关系型数据库,提供高可靠性、高性能、面向列的数据存储服务;而ZooKeeper则是协调分布式应用程序的服务,提供了一套简单的API来实现诸如命名服务、配置维护和组服务等功能。 #### 二、...

    hadoop 文档:Hadoop开发者下载

    4. **Hadoop API**:学习使用Hadoop API进行数据读写和处理,例如FileSystem API用于文件操作,InputFormat和OutputFormat定义输入输出格式,Mapper和Reducer实现数据处理逻辑。 5. **MapReduce编程**:理解...

    hadoop.zip hadoop2.7.1安装包

    安装并配置好winutils后,Windows用户可以通过Hadoop的HDFS API访问HDFS,进行数据读写操作。 在大数据处理中,Hadoop的核心组件包括: 1. HDFS(Hadoop Distributed File System):分布式文件系统,能够将大规模...

    Hadoop,Hbase,mahout三者兼容版本的API文档

    HBase是一个基于Hadoop的分布式NoSQL数据库,专为大规模随机读写操作设计。HBase API提供了一套用于操作表格、行、列族和时间戳的接口,使开发人员能够轻松地存储和检索大量结构化和半结构化数据。HBase与Hadoop的...

    hadoop2.2 api

    Hadoop 2.2还支持Hadoop生态中的其他组件,如Hive(数据仓库工具)、Pig(数据流处理语言)、HBase(分布式数据库)等,提供了一站式的数据分析解决方案。 10. **开发实践**: 开发者在使用Hadoop 2.2 API时,...

    hadoop2.2.0API

    Hadoop 2.2.0 不只是MapReduce和HDFS,还包括一系列生态系统项目,如HBase(分布式NoSQL数据库)、Hive(数据仓库工具)、Pig(数据流处理语言)、Oozie(工作流调度系统)和Zookeeper(分布式协调服务)。...

Global site tag (gtag.js) - Google Analytics