`

MapReduce高级编程——自定义InputFormat

 
阅读更多

0、测试集样例

ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1
 

1、测试Point3D InputFormat

import java.io.IOException;
import java.net.URI;

import javax.xml.soap.Text;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * desc:Custom Data Types <code>TestPoint3DInputFormat</code>
 * 
 * @author chenwq
 */
public class TestPoint3DInputFormat {
	 /**
     * @param args
     * @throws IOException 
     * @throws ClassNotFoundException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO Auto-generated method stub
        System.out.println("hello,chenwq!");
        Job job=new Job();
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
        fs.delete(new Path(args[1]));
        job.setJobName("测试MyInputFormat程序。。。。。");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setInputFormatClass(Point3DinputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Point3D.class);
        job.setMapperClass(Point3DMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(false);
    }
}

 

2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * desc:Custom Data Types <code>Point</code>
 * 
 * @author chenwq
 */
public class Point3D implements WritableComparable {
	public float x;
	public float y;
	public float z;

	public Point3D(float x, float y, float z) {
		this.x = x;
		this.y = y;
		this.z = z;
	}

	public Point3D() {
		this(0.0f, 0.0f, 0.0f);
	}

	public void set(float x, float y, float z) {
		this.x = x;
		this.y = y;
		this.z = z;
	}

	public void write(DataOutput out) throws IOException {
		out.writeFloat(x);
		out.writeFloat(y);
		out.writeFloat(z);
	}

	public void readFields(DataInput in) throws IOException {
		x = in.readFloat();
		y = in.readFloat();
		z = in.readFloat();
	}

	public String toString() {
		return Float.toString(x) + ", " + Float.toString(y) + ", "
				+ Float.toString(z);
	}

	public float distanceFromOrigin() {
		return (float) Math.sqrt(x * x + y * y + z * z);
	}

	public int compareTo(Object other) {
		float myDistance = this.distanceFromOrigin();
		float otherDistance = ((Point3D) other).distanceFromOrigin();

		return Float.compare(myDistance, otherDistance);
	}

	public boolean equals(Object o) {
		Point3D other = (Point3D) o;
		if (!(other instanceof Point3D)) {
			return false;
		}

		return this.x == other.x && this.y == other.y && this.z == other.z;
	}

	public int hashCode() {
		return Float.floatToIntBits(x) ^ Float.floatToIntBits(y)
				^ Float.floatToIntBits(z);
	}

}

 3、自定义Point3DInputFormat类型,供MapReduce编程模型使用

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class Point3DinputFormat extends FileInputFormat<Text, Point3D> {
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // TODO Auto-generated method stub
        return false;
    }
    @Override
    public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit,
            TaskAttemptContext context) throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return new objPosRecordReader();
    }
    public static class objPosRecordReader extends RecordReader<Text,Point3D>{

        public LineReader in;
        public Text lineKey;
        public Point3D lineValue;
        public StringTokenizer token=null;
        
        public Text line;
      
        @Override
        public void close() throws IOException {
            // TODO Auto-generated method stub
            
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            //lineKey.set(token.nextToken());
            return lineKey;
        }

        @Override
        public Point3D getCurrentValue() throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return lineValue;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public void initialize(InputSplit input, TaskAttemptContext context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit split=(FileSplit)input;
            Configuration job=context.getConfiguration();
            Path file=split.getPath();
            FileSystem fs=file.getFileSystem(job);
            
            FSDataInputStream filein=fs.open(file);
            in=new LineReader(filein,job);
            
            line=new Text();
            lineKey=new Text();
            lineValue=new Point3D();
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int linesize=in.readLine(line);
            if(linesize==0)
                return false;
            
            String[] pieces = line.toString().split(",");
    		if(pieces.length != 4){
    			throw new IOException("Invalid record received");
    		}
    		
    		// try to parse floating point components of value
    		float fx, fy, fz;
    		try{
    			fx = Float.parseFloat(pieces[1].trim());
    			fy = Float.parseFloat(pieces[2].trim());
    			fz = Float.parseFloat(pieces[3].trim());
    		}catch(NumberFormatException nfe){
    			throw new IOException("Error parsing floating poing value in record");
    		}
            lineKey.set(pieces[0]);
            
            lineValue.set(fx, fy, fz);
            
            return true;
        }
    }
}

 

4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{
	protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{
		context.write(key, value);
	}
}
 
2
0
分享到:
评论

相关推荐

    自定义MapReduce的InputFormat

    5. **配置和使用自定义InputFormat**:在你的MapReduce作业中,通过设置`job.setInputFormatClass()`方法指定你的自定义InputFormat类。同时,如果需要,你还可以在JobConf中添加额外的配置参数来指导InputFormat和...

    Hadoop高级编程——构建与实现大数据解决方案.rar

    本压缩包“Hadoop高级编程——构建与实现大数据解决方案”将深入探讨如何利用Hadoop进行高效的数据操作,构建实际的大数据解决方案。 一、Hadoop概述 Hadoop是由Apache基金会开发的开源项目,主要由Hadoop ...

    Hadoop高级编程- 构建与实现大数据解决方案

    本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据项目。 首先,我们要理解Hadoop的基础架构。Hadoop由两个主要组件构成:Hadoop ...

    大数据实验 实验五:MapReduce 初级编程实践

    大数据实验 实验五:MapReduce 初级编程实践

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    大数据实验5实验报告:MapReduce 初级编程实践

    【MapReduce初级编程实践】是大数据处理中的一项基础任务,主要应用于大规模数据集的并行计算。在这个实验中,我们关注的是如何利用MapReduce来实现文件的合并与去重操作。MapReduce是一种分布式计算模型,由Google...

    mapreduce wc单词计数 自定义分区 自定义排序实现

    在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...

    大数据实验四-MapReduce编程实践

    ### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...

    【MapReduce篇03】MapReduce之InputFormat数据输入1

    在MapReduce编程模型中,InputFormat是至关重要的组件,它负责将存储在HDFS(Hadoop Distributed File System)上的数据转化为可以被MapTask处理的键值对。本文将深入讲解MapReduce的InputFormat,特别是默认的...

    MapReduce模型--自定义数据类型

    自定义数据类型在MapReduce中扮演着重要角色,尤其在处理不同类型数据和复杂数据结构时显得尤为重要。 首先,了解Java和Hadoop之间的基本字段映射关系是理解自定义数据类型的基础。例如,在Java中常用的String类型...

    Hadoop高级编程之构建与实现大数据解决方案

    3. **MapReduce编程**:理解MapReduce的工作原理,编写Map函数和Reduce函数,处理键值对数据,以及如何定义InputFormat和OutputFormat。 4. **YARN资源调度**:YARN作为新一代的资源管理器,负责任务调度和集群资源...

    MapReduce高级编程之本地聚集与Combinner

    在分布式计算领域,MapReduce是一种广泛使用的编程模型,主要用于处理和生成大规模数据集。本篇文章将深入探讨MapReduce中的本地聚集(Local Aggregation)和Combinner的概念,这两个特性是优化MapReduce性能的关键...

    MapReduce案例编程开发,详情参考压缩文件展开图

    MapReduce编程的高级特性,如自定义分区、排序和分组,以及计数器的使用,也是提高数据处理能力的关键。在开发中,开发者往往需要根据具体的业务需求,灵活地应用这些高级特性。比如,自定义分区可以控制Map输出键值...

    MapReduce操作实例-倒排索引.pdf

    MapReduce是Apache Hadoop框架下的并行计算模型,用于处理和生成大数据集。在这个实例中,我们将详细探讨如何使用MapReduce实现倒排索引。 首先,我们来看`Mapper`类。`InvertedIndexMapper`是Map阶段的核心,它...

    大数据技术原理及应用课实验5 :MapReduce初级编程实践

    **大数据技术原理及应用——MapReduce初级编程实践** MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个实验中,我们将学习如何利用MapReduce编程解决实际问题,包括数据去重...

    MapReduce操作实例-数据去重.pdf

    MapReduce是分布式计算的一种编程模型,常用于处理大规模数据集。在这个实例中,我们看到的是一个基于MapReduce的数据去重操作,这个操作在大数据处理中非常常见,尤其是当处理的数据源包含重复记录时。下面将详细...

    Hadoop之MapReduce编程实例完整源码

    一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...

    云计算技术之————MapReduce

    "云计算技术之————MapReduce" 云计算技术中的 MapReduce 是一种分布式编程模型,用于大规模群组中的海量数据处理。MapReduce 由 Google 公司的 Jeffrey Dean 和 Sanjay Ghemawat 开发,旨在解决大规模数据处理...

Global site tag (gtag.js) - Google Analytics