用hadoop的API从数据库读取数据到HDFS
package com.avgdate; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class T extends Configured { static String driverClassName = "oracle.jdbc.driver.OracleDriver"; static String url = "jdbc:oracle:thin:@10.10.35.65:1521/crm_standby"; static String username = "crmdev"; static String password = "crmdev"; static class AccessRecord implements DBWritable, Writable { long clubid; String clubname; Date daytime; @Override public void write(PreparedStatement statement) throws SQLException { statement.setLong(1, clubid); statement.setString(2, clubname); statement.setDate(3, daytime); System.out.println("PreparedStatement"); } @Override public void readFields(ResultSet resultSet) throws SQLException { //(1) SQL 从数据库查询数据 this.clubid = resultSet.getLong(1); this.clubname=resultSet.getString(2); this.daytime=resultSet.getDate(3); System.out.println("ResultSet"); } @Override public void write(DataOutput out) throws IOException { //(2) Mapper 将查询的数据记录 输出到Combiner out.writeLong(clubid); Text.writeString(out,clubname); out.writeLong(daytime.getTime()); //以时间戳格式输出到hdfs System.out.println("DataOutput"); } @Override public void readFields(DataInput in) throws IOException { //(3) Reduce 从Combiner 读取数据 this.clubid = in.readLong(); this.clubname = Text.readString(in); this.daytime =new Date( in.readLong()); System.out.println("DataInput"); } public String toString() { //(4) 写入HDFS文件 System.out.println("==="); return new String(this.clubid+"\t"+this.clubname+"\t"+this.daytime); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws Exception { Long timpt = new Date().getTime(); if (args.length != 2) { args = new String[2]; args[0] = " "; // 此参数无用 args[1] = "d:\\output\\db\\" + timpt; } Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, driverClassName, url, username, password); Job job = new Job(conf, "t"); // 设置map 和 reduce job.setInputFormatClass(DBInputFormat.class); DBInputFormat.setInput(job, AccessRecord.class, "SELECT clubid ,nickname,daytime FROM CUST_CLUB WHERE rownum<100", "SELECT count(clubid) FROM CUST_CLUB"); // DBInputFormat.setInput(job, AccessRecord.class, "CUST_CLUB", // "rownum<10", "clubid", new String[] { "clubid"}); // 设置输出类型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(AccessRecord.class); // 临时目录 Path path = new Path(args[1] + "\\tmp"); // 设置输入输出路径 FileOutputFormat.setOutputPath(job, path); // DBOutputFormat.setOutput(job, "tableName", "fieldNames"); job.waitForCompletion(true); } }
相关推荐
org.apache.hadoop.io 包定义了通用的 I/O API,用于读写各种数据对象,包括网络、数据库和文件等。org.apache.hadoop.ipc 包提供了网络服务端和客户端的工具,用于封装网络异步 I/O 操作。org.apache.hadoop.mapred...
- **HBase**:一个高性能的分布式列式数据库,基于HDFS构建,支持实时读写访问。 - **Pig**:一种高级的数据流语言和执行框架,简化了在Hadoop上的数据处理流程。 - **Hive**:建立在Hadoop之上的数据仓库工具,提供...
9. **Hadoop生态系统的扩展**:Hadoop不仅是MapReduce和HDFS,还包括许多相关的项目,如Hive(SQL-like查询)、Pig(数据分析)、Spark(快速大数据处理)和HBase(NoSQL数据库)。这些工具通常通过Hadoop API与...
1. **HDFS API集成**:为了操作HDFS,我们需要使用Hadoop的Java API。这包括创建HDFS客户端,连接NameNode,执行读写操作等。例如,`FileSystem`类用于打开、关闭和管理文件系统,`FSDataInputStream`和`...
10. **Sqoop**:用于在Hadoop和传统关系型数据库之间导入导出数据的工具,提供了批处理数据迁移的API。 掌握Hadoop 2.10.0中文版API意味着开发者能够熟练地在Hadoop平台上开发、部署和优化大数据处理应用,从而充分...
2. 从本地拷贝文件到HDFS:可以使用Hadoop的API将本地文件拷贝到HDFS中。 3. 创建某个HDFS文件:可以使用Hadoop的API创建一个新的HDFS文件。 4. 重命名某个HDFS文件名:可以使用Hadoop的API将一个HDFS文件重命名。 5...
总之,Hadoop 0.20.2 API文档是开发人员理解和使用Hadoop的关键资源,它详细阐述了如何利用Java接口来操作HDFS、执行MapReduce任务以及与其他组件交互。掌握这些知识,将使开发者能够有效地在分布式环境中处理大数据...
5. **remote_test**:这个可能涉及到远程Hadoop集群的测试代码,如提交MapReduce作业到远程集群运行,检查作业状态,或者对HDFS上的文件进行远程操作。 学习这些示例代码,不仅可以理解Hadoop生态的基本组件和它们...
在学习Hadoop-HDFS实践教程的过程中,初学者将能够了解到Hadoop与传统关系型数据库的差异,理解如何在Hadoop生态中使用各种数据处理技术,以及如何对大数据进行分析和挖掘。教程还将提供一些高级话题,例如如何使用...
通过学习,学员将能够独立完成Hadoop的安装、配置与管理,掌握在Hadoop、操作系统以及关系型数据库之间传递数据的技能,制定有效数据集成方案,并熟练向Hadoop提交作业以及监控作业运行状态。 【Hadoop API开发】 ...
Java API提供了访问HDFS的接口,例如`org.apache.hadoop.fs.FileSystem`类,可以用于读取、写入和管理文件系统中的文件。 2. **Hadoop MapReduce**:MapReduce是Hadoop用于并行处理和分析大数据的编程模型。在GROUP...
还有HBase,一个基于HDFS的NoSQL数据库,支持实时查询。 在这个压缩包中,"hadoop-master"可能是一个包含Hadoop源码、配置文件和示例的项目,这对于学习和理解Hadoop的工作原理非常有帮助。通过阅读和分析源码,...
本文档旨在帮助读者理解如何使用sqoop2的Java API将数据从Oracle数据库迁移至HDFS(Hadoop Distributed File System),同时分享了作者在实践中遇到的一些问题及解决方案。为确保能够顺利地运行示例代码,建议先按照...
标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...
在搭建完hadoop大数据系统(CDH5.16.1)后,如何访问hdfs文件系统上的数据呢?那当然是通过构建maven项目 使用java api接口进行文件了。为此,特别进行了hdfs文件系统java api访问的整理。
**HBase** 是一个分布式的、面向列的NoSQL数据库,它构建在Hadoop之上,特别是HDFS之上。HBase提供了一个高效的、支持随机读写的平台,适用于需要频繁访问大量数据的应用场景。 - **解决的问题**:HBase解决了...
携程集中式日志系统的设计目标是支持可靠的消息多播(Reliable Multicast)、流式数据处理以及将数据落地到HBase/HDFS。系统的目标是支持开发者(Dev)和运维(Ops)两个方面的需求。 3. 用户行为追踪(User ...
HDFS的API允许开发者以简单的方式读写文件,这在处理大数据时非常关键。 接下来,Hadoop是HDFS的基础框架,它不仅包括HDFS,还包括MapReduce编程模型,用于并行处理数据。MapReduce将大型任务分解为小的Map任务和...