`
evans_he
  • 浏览: 12384 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

MapReduce读取数据

阅读更多

        MapReduce处理数据的基本原则之一就是将输入数据分割成片(split),按片读取数据,每个分片交由一个Mapper去做处理。注意,输入分片只是一种逻辑划分,有别于hdfs的数据分块(block),数据分块是数据的物理划分。InputFormat类抽象出了数据分片和读取这两个操作,具体实现交由子类去完成,除了hadoop默认提供的一些子类之外,我们可以自己根据实际需要进行扩展。

        下图列出了涉及MapReduce读取数据的几个核心类以及常见的几种扩展。

        如图所示,InputFormat类抽象了两个方法,创建分片的getSplit( )和创建数据读取工具的createRecordReader( ),可以扩展InputFormat重写这两个方法来实现不同数据源读取数据,或者采用不同的方式读取数据。

        InputSplit表示数据的逻辑分片,最常见的是用于表示文本文件分片的FileSplit类,该类扩展了InputSpit,包含了文件的路径、分片起始位置在源文件中的字节偏移量、分片的字节长度以及分片所属文件块存在数据节点信息。用于分片信息在客户端提交作业时会被序列化到文件然后提交,并且在作业执行中会被反序列化,所以FileSplit还实现了Writable接口,实现了write(DataOutput out)和readFields(DataInput in)两个方法。

       正真为MapReduce提供数据输入的是RecordReader类,给它分配一个分片,它就从数据源中读取分片指定的数据段、并将数据组织成指定的数据结构。

        hadoop默认提供的一些数据读取类基本可以满足多数需求,特殊情况下我们也可以自己扩展。以下用一个简单了例子介绍扩展方式。

        扩展的目的是从多个不同类型的数据库(mysql、oracle、db2等)、或者多张表读取数据。代码如下:

public class MultiTableInputSplit extends DBInputSplit implements Writable{
	
	//查询sql
	private String inputSql;
	//数据库链接url
	private String dbConUrl;
	//数据库用户名
	private String userName;
	private String passWord;
	//数据库类型
	private String dbType;
	
	//作业执行过程中通过反射创建实例,所以必须有无参构造函数
	public MultiTableInputSplit(){	
	}
	
	public MultiTableInputSplit(long start, long end, String intputSql, 
			String dbConUrl, String userName, String passWord, String dbType) {
		super(start, end);
		this.inputSql = intputSql;
		this.dbConUrl = dbConUrl;
		this.userName = userName;
		this.passWord = passWord;
		this.dbType = dbType;
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		super.write(output);
		output.writeUTF(inputSql);
		output.writeUTF(dbConUrl);
		output.writeUTF(userName);
		output.writeUTF(passWord);
		output.writeUTF(dbType);
	}
	
	@Override
	public void readFields(DataInput input) throws IOException {
		super.readFields(input);
		this.inputSql = input.readUTF();
		this.dbConUrl = input.readUTF();
		this.userName = input.readUTF();
		this.passWord = input.readUTF();
		this.dbType = input.readUTF();
	}
	//get、set等方法省略
}

 

  MultiInputSplit 类间接扩展了InputSplit类,添加了数据库连接信息和查询数据所使用的sql语句。

public class MultiTableInputFormat extends InputFormat<LongWritable, MapDBWritable>{

	/**
	 * 单表分片
	 * sqlInfo字符串的格式为:单个sqlInfo里面各个属性之间用"##"分隔,如:
	 *    dbType##driver##url##user##password##sql##counts
	 * 多个sqlInfo之间用"#_#"分隔,如:
	 *    sqlInfo1#_#sqlInfo2
	 */
	@Override
	public List<InputSplit> getSplits(JobContext job) throws IOException {
		List<InputSplit> splits = new ArrayList<InputSplit>();
		String inputQuery = job.getConfiguration().get("sqlInfo");
		String[] inputQueryArr = inputQuery.split("#_#");
		for(String sql : inputQueryArr){
			getSplit(sql, job, splits);
		}
		return splits;
	}

	@Override
	public RecordReader<LongWritable, MapDBWritable> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		try {
			return new MultiTableRecordReader((MultiTableInputSplit)split, 
					context.getConfiguration());
		} catch (SQLException ex) {
			throw new IOException(ex.getMessage());
		}
	}

	/**
	 * 可以根据表的数量、表的大小控制分片的数量
	 */
	private int getSplitCount(String[] sqlInfo) {
		return 1;  //简单实现
	}
	
	/**
	 * 计算分片的大小
	 */
	private int getSplitSize(String[] sqlInfo){
		return 100000;   //简单实现
	}

	public void getSplit(String inputQuery, JobContext job, List<InputSplit> splits){
		String[] sql = inputQuery.split("##");
		int recordCount = Integer.parseInt(sql[6]);
		long splitCount = getSplitCount(sql);
		long splitSize = getSplitSize(sql);
		for (int i = 0; i < splitCount; i++) {
			InputSplit split;
			if (i + 1 == splitCount) {
				split = new MultiTableInputSplit(i * splitSize, recordCount + 1, 
						sql[5], sql[2], sql[3], sql[4], sql[0]);   
			} else {
				split = new MultiTableInputSplit(i * splitSize, i * splitSize + splitSize, 
						sql[5], sql[2], sql[3], sql[4], sql[0]);   
			}
			splits.add(split);
		}
	}
}

  

class MultiTableRecordReader extends RecordReader<LongWritable, MapDBWritable> {

	protected ResultSet results = null;

	protected DBInputFormat.DBInputSplit split;

	protected long pos = 0;

	protected LongWritable key = null;

	protected MapDBWritable value = null;
	
	protected String dbType;
	
	protected Connection connection;
	
	protected Statement statement;

	public MultiTableRecordReader(DBInputFormat.DBInputSplit split, Configuration conf)
			throws SQLException {
		this.split = split;
		initConnection();   //初始化数据库链接
	}
	
	@Override
	public boolean nextKeyValue() throws IOException {
		try {
			if (key == null) {
				key = new LongWritable();
			}
			if (value == null) {
				value = new MapDBWritable();
			}
			if (null == this.results) {
				this.results = executeQuery(getSelectQuery());
			}
			if (!results.next()) {
				return false;
			}
			key.set(pos + split.getStart());
			value.readFields(results);
			pos++;
		} catch (SQLException e) {
			throw new IOException("SQLException in nextKeyValue", e);
		}
		return true;
	}
	
	@Override
	public LongWritable getCurrentKey() {
		return key;
	}

	@Override
	public MapDBWritable getCurrentValue() {
		return value;
	}

	@Override
	public float getProgress() throws IOException {
		return pos / (float) split.getLength();
	}
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
	}
	
	/**
	 * 根据不同的数据库类型实现不同的分页查询
	 */
	protected String getSelectQuery() {
		StringBuilder query = new StringBuilder();
		try {
			DBInputFormat.DBInputSplit split = getSplit();
			MultiTableInputSplit extSplit = (MultiTableInputSplit) split;
			if (extSplit.getLength() > 0 && extSplit.getStart() >= 0) {
				query.append(extSplit.getInputSql());
				String dbType = ((MultiTableInputSplit)split).getDbType();
				if(dbType.equalsIgnoreCase("TERADATA")){
					query.append(" QUALIFY ROW_NUM>=").append(split.getStart())
					.append(" AND ROW_NUM<").append(split.getEnd());
				} if(dbType.equalsIgnoreCase("ORACLE")){
					query.append(" WHERE ROW_NUM>=").append(extSplit.getStart())
					.append(" AND ROW_NUM<").append(extSplit.getEnd());
				} else{
					query.append(" LIMIT ").append(extSplit.getStart())
					.append(" ,").append(extSplit.getEnd());
				}
			}
		} catch (IOException ex) {
			ex.printStackTrace();
		}
		return query.toString();
	}

	public void initConnection() throws SQLException{
		MultiTableInputSplit sp = (MultiTableInputSplit)split;
		String conUrl = sp.getDbConUrl();
		String userName = sp.getUserName();
		String passWord = sp.getPassWord();
		connection = DriverManager.getConnection(conUrl, userName, passWord);
		statement= connection.createStatement();
	}

	@Override
	public void close() throws IOException {
		try {
			if (null != results) {
				results.close();
			}
			if (null != statement) {
				statement.close();
			}
			if (null != connection) {
				connection.close();
			}
		} catch (SQLException e) {
			throw new IOException(e.getMessage());
		}
	}

	protected ResultSet executeQuery(String query) throws SQLException {
		return statement.executeQuery(query);
	}

	public Connection getConnection() {
		return connection;
	}

	public DBInputFormat.DBInputSplit getSplit() {
		return split;
	}
	
	protected void setStatement(PreparedStatement stmt) {
		this.statement = stmt;
	}
}

  

public class MapDBWritable implements DBWritable{

	/**
	 * 表中的列名称--->列值
	 */
	private Map<String, Object> values = null;

	/**
	 * 表中的列名称
	 */
	private String[] colNames;

	/**
	 * 列名称--->字段数据类型
	 */
	private Map<String, String> colType;

	public MapDBWritable(){
	}

	public void readFields(ResultSet resultSet) throws SQLException {
		ResultSetMetaData meta = resultSet.getMetaData();
		int count = meta.getColumnCount();
		
		colNames = new String[count];
		values = new HashMap<String, Object>(count);
		colType = new HashMap<String, String>(count);
		
		for (int i = 0; i < count; i++) {
			String colName = meta.getColumnName(i + 1);
			colNames[i] = colName;
			values.put(colName, resultSet.getString(colName));
			colType.put(colName, meta.getColumnTypeName(i + 1));
		}
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		for (Map.Entry<String, Object> entry : this.values.entrySet()) {
			builder.append(entry.getKey() + "->" + entry.getValue());
			builder.append(";");
		}
		return builder.substring(0, builder.length() - 1);
	}

	public void write(PreparedStatement preparedstatement) throws SQLException {

	}

	public Map<String, Object> getValues() {
		return values;
	}

	public void setValues(Map<String, Object> values) {
		this.values = values;
	}

	public String[] getColNames() {
		return colNames;
	}

	public void setColNames(String[] colNames) {
		this.colNames = colNames;
	}

	public Map<String, String> getColType() {
		return colType;
	}

	public void setColType(Map<String, String> colType) {
		this.colType = colType;
	}
}

 MapDBWritable从ResultSet中读取列的类型信息保存在以列名为key类型为value的map中,读取列值保存在以列名为key列值为value的map中,Mapper的输入就为MapDBWritable。

通过扩展以上几个类就可以从多个数据库、多个表读取数据了。

  • 大小: 226.7 KB
分享到:
评论

相关推荐

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    在大数据场景下,有时需要将MapReduce处理的结果存储到关系型数据库中,或者从数据库中读取数据进行处理。Hadoop提供了JDBC(Java Database Connectivity)接口,使得MapReduce作业能够与数据库进行连接和操作。 要...

    MapReduce读取单词个数.rar

    总结一下,这个“MapReduce读取单词个数”的案例是一个典型的Hadoop MapReduce应用,它展示了如何利用分布式计算处理大量文本数据,统计其中的单词出现频率。整个流程涉及Maven项目构建、生成jar包、上传到Hadoop...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...

    mapreduce案例测试数据emp.csv

    mapreduce案例测试数据

    MapReduce数据分析实战

    MapReduce是一种编程模型,用于处理和生成大数据集的并行运算。它由Google提出,并被Apache Hadoop框架广泛采用。MapReduce模型将复杂的数据处理过程分解为两个阶段:Map(映射)和Reduce(归约)。简单来说,Map...

    mapreduce气象数据(用于测试)

    MapReduce是Apache Hadoop的核心组件之一,主要用于处理和分析大规模数据。在这个名为“mapreduce气象数据(用于测试)”的压缩包中,我们有一个可能是用于教学目的的数据集,旨在帮助初学者理解如何在Hadoop环境下...

    基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示

    Pandas可以对数据进行读取、处理和分析,实现数据的快速处理和分析。 电影排名和推荐系统的开发可以分为以下几步: 1. 数据导入:使用Pandas从电影数据库中导入电影数据。 2. 数据预处理:使用Pandas对电影数据...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    MapReduce--->实现简单的数据清洗需要的数据文件

    MapReduce--->实现简单的数据清洗需要的数据文件

    MapReduce数据统计简单实例

    在MapReduce程序的最后,Reducer产生的输出会被写入到HDFS(Hadoop Distributed File System),然后可以读取这些数据并生成图表或报表,以便直观地展示统计结果。 总结来说,这个实例涵盖了MapReduce的基本用法,...

    0324大数据代码与数据_JAVA大数据_文本分析_运用MapReduce做数据分析_

    在大数据处理领域,Java语言因其稳定性和可扩展性,成为了实现MapReduce算法的首选工具。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个项目"0324大数据代码与数据_JAVA...

    MapReduce天气源数据和计算类

    本案例中,我们关注的是如何使用MapReduce处理1901年和1902年的天气源数据,找出这两年内每天的最大温度和最低温度。 首先,`MrTemperature.java`是实现MapReduce任务的主要Java源代码文件。在这个文件中,我们将...

    MapReduce学习笔记,亲自测试写出来的,1000分都不贵

    - **Map 程序调用用户 map() 方法**:每当 Map 程序读取一行数据时,就会调用用户的 `map()` 方法,并将这一行数据的起始偏移量作为 key,数据内容作为 value 传入。 - **Reduce 程序调用用户 reduce() 方法**:...

    MapReduce分析Youtube数据内含源码以及说明书可以自己运行复现.zip

    1. 数据读取:从HDFS(Hadoop分布式文件系统)中读取原始YouTube数据。 2. 数据切分:将大文件分割成多个键值对,键可能是视频ID或时间戳,值可能包含用户行为的具体信息。 3. 数据映射:对每个键值对执行自定义逻辑...

    MapReduce计算模式详解

    3. **MapTask**:MapTask根据InputFormat读取数据,执行Map函数,生成中间键值对。 4. **Shuffle阶段**:MapTask产生的中间结果会按照键值进行分区和排序,然后传输给ReduceTask。 5. **ReduceTask**:ReduceTask...

    mapreduce框架学习之天气统计

    1. 读取每一行数据,解析出日期和其他气象指标。 2. 将日期与对应的温度、湿度打包成键值对。 3. 输出键值对,准备进入Shuffle和Sort阶段。 Shuffle和Sort阶段是MapReduce流程中的一个重要环节,它确保相同键的值会...

    mapreduce在hadoop实现词统计和列式统计

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。Hadoop是Apache开源项目,它实现了MapReduce模型,为大数据处理提供了可靠的分布式计算框架。在这个场景中,我们将讨论如何...

    基于MapReduce的招聘数据清洗项目(免费提供源码)

    首先,Map阶段读取原始招聘数据,对每条记录进行初步处理,如去除空白行和格式化日期。接下来,Reduce阶段合并和去重数据,确保每个职位信息唯一且完整。该过程大幅提升了数据处理效率,尤其适用于分布式计算环境。 ...

    mapreduce案例文本文件.zip

    MapReduce是一种分布式编程模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的并行计算分解为两个主要阶段:Map(映射)和Reduce(化简)。在Hadoop框架中,MapReduce被广泛应用于大...

Global site tag (gtag.js) - Google Analytics