`

hadoop API 读数据库到HDFS

 
阅读更多

 

用hadoop的API从数据库读取数据到HDFS

 

package com.avgdate;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class T extends Configured {

	static String	driverClassName	= "oracle.jdbc.driver.OracleDriver";
	static String	url				= "jdbc:oracle:thin:@10.10.35.65:1521/crm_standby"; 
	static String	username		= "crmdev";
	static String	password		= "crmdev";

static class AccessRecord implements DBWritable, Writable {  
  long clubid;
  String clubname;
  Date daytime;
  
  @Override
  public void write(PreparedStatement statement) throws SQLException {
   statement.setLong(1, clubid);
   statement.setString(2, clubname);
   statement.setDate(3, daytime);
   System.out.println("PreparedStatement");
  }
  @Override
  public void readFields(ResultSet resultSet) throws SQLException { //(1) SQL 从数据库查询数据
   this.clubid = resultSet.getLong(1);
   this.clubname=resultSet.getString(2);
   this.daytime=resultSet.getDate(3);
   System.out.println("ResultSet");
  }
  @Override
  public void write(DataOutput out) throws IOException { //(2) Mapper 将查询的数据记录 输出到Combiner
   out.writeLong(clubid);
   Text.writeString(out,clubname);
   out.writeLong(daytime.getTime()); //以时间戳格式输出到hdfs
   System.out.println("DataOutput");
  }
  @Override
  public void readFields(DataInput in) throws IOException { //(3) Reduce 从Combiner 读取数据
   this.clubid = in.readLong();
   this.clubname = Text.readString(in);
   this.daytime =new Date( in.readLong());
   System.out.println("DataInput");
  }
  public String toString() { //(4) 写入HDFS文件
   System.out.println("===");
   return new String(this.clubid+"\t"+this.clubname+"\t"+this.daytime);
  }
 }

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws Exception {
		Long timpt = new Date().getTime();
		if (args.length != 2) {
			args = new String[2];
			args[0] = " "; // 此参数无用
			args[1] = "d:\\output\\db\\" + timpt;
		}

		Configuration conf = new Configuration();
		DBConfiguration.configureDB(conf, driverClassName, url, username, password);

		Job job = new Job(conf, "t");

		// 设置map 和 reduce

		job.setInputFormatClass(DBInputFormat.class);
		
		DBInputFormat.setInput(job, AccessRecord.class, "SELECT  clubid ,nickname,daytime FROM CUST_CLUB WHERE rownum<100", "SELECT count(clubid) FROM CUST_CLUB");
//		DBInputFormat.setInput(job, AccessRecord.class, "CUST_CLUB",
//				"rownum<10", "clubid", new String[] { "clubid"});
		// 设置输出类型
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(AccessRecord.class);

		// 临时目录
		Path path = new Path(args[1] + "\\tmp");
		// 设置输入输出路径
		FileOutputFormat.setOutputPath(job, path);
		
//		DBOutputFormat.setOutput(job, "tableName", "fieldNames");

		job.waitForCompletion(true);

	}

}

 

 

分享到:
评论

相关推荐

    HadoopAPI使用

    org.apache.hadoop.io 包定义了通用的 I/O API,用于读写各种数据对象,包括网络、数据库和文件等。org.apache.hadoop.ipc 包提供了网络服务端和客户端的工具,用于封装网络异步 I/O 操作。org.apache.hadoop.mapred...

    Hadoop中的HDFS和Mapreduce

    - **HBase**:一个高性能的分布式列式数据库,基于HDFS构建,支持实时读写访问。 - **Pig**:一种高级的数据流语言和执行框架,简化了在Hadoop上的数据处理流程。 - **Hive**:建立在Hadoop之上的数据仓库工具,提供...

    hadoop-api中文说明文档

    9. **Hadoop生态系统的扩展**:Hadoop不仅是MapReduce和HDFS,还包括许多相关的项目,如Hive(SQL-like查询)、Pig(数据分析)、Spark(快速大数据处理)和HBase(NoSQL数据库)。这些工具通常通过Hadoop API与...

    JavaWeb操作hadoop2.6 HDFS,从页面上传,下载,列表展示的demo

    1. **HDFS API集成**:为了操作HDFS,我们需要使用Hadoop的Java API。这包括创建HDFS客户端,连接NameNode,执行读写操作等。例如,`FileSystem`类用于打开、关闭和管理文件系统,`FSDataInputStream`和`...

    Hadoop 2.10.0中文版API

    10. **Sqoop**:用于在Hadoop和传统关系型数据库之间导入导出数据的工具,提供了批处理数据迁移的API。 掌握Hadoop 2.10.0中文版API意味着开发者能够熟练地在Hadoop平台上开发、部署和优化大数据处理应用,从而充分...

    基于Hadoop应用开发的例子(新手入门宝典)

    2. 从本地拷贝文件到HDFS:可以使用Hadoop的API将本地文件拷贝到HDFS中。 3. 创建某个HDFS文件:可以使用Hadoop的API创建一个新的HDFS文件。 4. 重命名某个HDFS文件名:可以使用Hadoop的API将一个HDFS文件重命名。 5...

    Hadoop 0.20.2 API文档

    总之,Hadoop 0.20.2 API文档是开发人员理解和使用Hadoop的关键资源,它详细阐述了如何利用Java接口来操作HDFS、执行MapReduce任务以及与其他组件交互。掌握这些知识,将使开发者能够有效地在分布式环境中处理大数据...

    Hadoop_MapReduce_HDFS示例代码

    5. **remote_test**:这个可能涉及到远程Hadoop集群的测试代码,如提交MapReduce作业到远程集群运行,检查作业状态,或者对HDFS上的文件进行远程操作。 学习这些示例代码,不仅可以理解Hadoop生态的基本组件和它们...

    Hadoop-HDFS-实践教程

    在学习Hadoop-HDFS实践教程的过程中,初学者将能够了解到Hadoop与传统关系型数据库的差异,理解如何在Hadoop生态中使用各种数据处理技术,以及如何对大数据进行分析和挖掘。教程还将提供一些高级话题,例如如何使用...

    完整版大数据云计算课程 Hadoop数据分析平台系列课程 Hadoop 05 Hadoop API开发 共32页.pptx

    通过学习,学员将能够独立完成Hadoop的安装、配置与管理,掌握在Hadoop、操作系统以及关系型数据库之间传递数据的技能,制定有效数据集成方案,并熟练向Hadoop提交作业以及监控作业运行状态。 【Hadoop API开发】 ...

    java 从hadoop hdfs读取文件 进行groupby并显示为条形图

    Java API提供了访问HDFS的接口,例如`org.apache.hadoop.fs.FileSystem`类,可以用于读取、写入和管理文件系统中的文件。 2. **Hadoop MapReduce**:MapReduce是Hadoop用于并行处理和分析大数据的编程模型。在GROUP...

    基于Java的Hadoop核心功能实现。包括HDFS及MapReduce等.zip

    还有HBase,一个基于HDFS的NoSQL数据库,支持实时查询。 在这个压缩包中,"hadoop-master"可能是一个包含Hadoop源码、配置文件和示例的项目,这对于学习和理解Hadoop的工作原理非常有帮助。通过阅读和分析源码,...

    sqoop2 java API从oracle导数据到HDFS开发总结

    本文档旨在帮助读者理解如何使用sqoop2的Java API将数据从Oracle数据库迁移至HDFS(Hadoop Distributed File System),同时分享了作者在实践中遇到的一些问题及解决方案。为确保能够顺利地运行示例代码,建议先按照...

    Hadoop+HBase+Java API

    标题 "Hadoop+HBase+Java API" 涉及到三个主要的开源技术:Hadoop、HBase以及Java API,这些都是大数据处理和存储领域的关键组件。以下是对这些技术及其结合使用的详细介绍: **Hadoop** 是一个分布式计算框架,由...

    HDFS文件系统JAVA api访问接口(基于hadoop大数据平台)

    在搭建完hadoop大数据系统(CDH5.16.1)后,如何访问hdfs文件系统上的数据呢?那当然是通过构建maven项目 使用java api接口进行文件了。为此,特别进行了hdfs文件系统java api访问的整理。

    Hadoop Hive HBase Spark Storm概念解释

    **HBase** 是一个分布式的、面向列的NoSQL数据库,它构建在Hadoop之上,特别是HDFS之上。HBase提供了一个高效的、支持随机读写的平台,适用于需要频繁访问大量数据的应用场景。 - **解决的问题**:HBase解决了...

    2013年中国数据库大会-07-基于Hadoop的携程集中式日志及其周边生态系统介绍

    携程集中式日志系统的设计目标是支持可靠的消息多播(Reliable Multicast)、流式数据处理以及将数据落地到HBase/HDFS。系统的目标是支持开发者(Dev)和运维(Ops)两个方面的需求。 3. 用户行为追踪(User ...

    大数据 hdfs hadoop hbase jmeter

    HDFS的API允许开发者以简单的方式读写文件,这在处理大数据时非常关键。 接下来,Hadoop是HDFS的基础框架,它不仅包括HDFS,还包括MapReduce编程模型,用于并行处理数据。MapReduce将大型任务分解为小的Map任务和...

Global site tag (gtag.js) - Google Analytics