`

Hadoop Map-Reduce编程

 
阅读更多
/*
MAP REDUCE 的计算框架

INPUT -> MAP-> COMBINER -> REDUCER -> OUTPUT

计算的每个步骤皆以KEY,VALUE键值对作为输入,输出参数。
参数的类型为HADOOP封装的类型,加快数据的网络传输。
在计算之前,先对数据进行分片,通常情况下,一个分片对应一个64M的数据块,每个分片对应一个TASK.
通过分片实现计算数据本地化,若一行记录被分成两个不同的数据块,则HADOOP会将另外一个数据块的
剩余记录读取到本地,形成一个分片。

INPUT: 数据的输入路径
MAP:   输入KEY参数为每行所在文件的偏移量,输入VALUE参数为每个内容。此步骤主要是做数据的预处理,挑选出需要处理的数据。
REDUCER: 输入参数为MAP的输出参数,对数据进行加工处理。
COMBINER: 减少MAP节点到REDUCER节点的传输数据量,而在MAP之后进行的分片内的数据计算处理。
OUTPUT:  数据的输出路径
//AVG的实现
--打包
javac -classpath ../hadoop-core-1.1.2.jar *.java

jar cvf ./WetherAvg.jar ./*.class
*/

bin/hadoop jar ./AvgTemperature.jar AvgTemperature ./in/sample.txt ./out10
打包后需注意把myclass的class文件删除掉。

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//Mapper 类是个泛型类,四个形参
//input key,input value,output key,output value
//
//
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final int MISSING = 9999;
@Override
//Hadoop提供了一系列基础的类型,便于网络序列化传输longwriteable=long
//text=string
//Called once for each key/value pair in the input split. 
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { 
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
//map() method also provides an instance of Context to write the output to。
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//reducer函数也有四个形参用于指定输入和输出类型reduce的函数输入类型必须与map函数的输出类型匹配
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
//实现这个Iterable接口允许对象成为 "foreach" 语句的目标。
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for (IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
//reducer() method also provides an instance of Context to write the output to。
context.write(key, new IntWritable(maxValue));
}
}


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//A Job object forms the specification of the job and gives you control over how the job
//is run.
//When we run this job on a Hadoop cluster, we will package the code into a JAR
//file (which Hadoop will distribute around the cluster).
//
//
//
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));//define the input data path
FileOutputFormat.setOutputPath(job, new Path(args[1]));//define the output data path The directory shouldn’t exist before running the job
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);//Submit the job to the cluster and wait for it to finish. 
}
}
----------------------------------------------------------------------------------------
--实现平均天气稳定的代码
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text>
{
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { 
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
}
}
}

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int sum = 0,count = 0;
for (Text intvalue : values) {
count++;
sum += Integer.parseInt(intvalue.toString());
}
context.write(key, new Text(sum+","+count));
}
}

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int sum = 0,count = 0;
for (Text value : values) {
String[] sp = value.toString().split(",");
sum += Integer.parseInt(sp[0]);
count += Integer.parseInt(sp[1]);
}
context.write(key, new Text((sum/count)+"")); 
}
}


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgTemperature.class);
job.setJobName("Avg temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
----------------------------------------------------------------------------------------------------------------------------------/**  
 * Hadoop网络课程作业程序
 * 编写者:James
 */  

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Exercise_1 extends Configured implements Tool {  
  
  /**  
   * 计数器
   * 用于计数各种异常数据
   */  
  enum Counter 
  {
    LINESKIP,  //出错的行
  }
  
  /**  
   * MAP任务
   */  
  public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> 
  {
    public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException 
    {
      String line = value.toString();        //读取源数据
      
      try
      {
        //数据处理
        String [] lineSplit = line.split(" ");
        String month = lineSplit[0];
        String time = lineSplit[1];
        String mac = lineSplit[6];

        /**  需要注意的部分     **/ 
        
        String name = context.getConfiguration().get("name");
        Text out = new Text(name + ' ' + month + ' ' + time + ' ' + mac);
        
        /**  需要注意的部分     **/ 
        
        
        context.write( NullWritable.get(), out);  //输出
      }
      catch ( java.lang.ArrayIndexOutOfBoundsException e )
      {
        context.getCounter(Counter.LINESKIP).increment(1);  //出错令计数器+1
        return;
      }
    }
  }


  @Override
  public int run(String[] args) throws Exception 
  {
    Configuration conf = getConf();
    
    /**  需要注意的部分     **/ 
 
    conf.set("name", args[2]);

    /**  需要注意的部分     **/ 

    Job job = new Job(conf, "Exercise_1");              //任务名
    job.setJarByClass(Exercise_1.class);              //指定Class
    
    FileInputFormat.addInputPath( job, new Path(args[0]) );      //输入路径
    FileOutputFormat.setOutputPath( job, new Path(args[1]) );    //输出路径
    
    job.setMapperClass( Map.class );                //调用上面Map类作为Map任务代码
    job.setOutputFormatClass( TextOutputFormat.class );
    job.setOutputKeyClass( NullWritable.class );          //指定输出的KEY的格式
    job.setOutputValueClass( Text.class );              //指定输出的VALUE的格式
    
    job.waitForCompletion(true);
    
    //输出任务完成情况
    System.out.println( "任务名称:" + job.getJobName() );
    System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
    System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
    System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
    System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );

    return job.isSuccessful() ? 0 : 1;
  }
  
  /**  
   * 设置系统说明
   * 设置MapReduce任务
   */  
  public static void main(String[] args) throws Exception 
  {
    
    //判断参数个数是否正确
    //如果无参数运行则显示以作程序说明
    if ( args.length != 3 )
    {
      System.err.println("");
      System.err.println("Usage: Test_1 < input path > < output path > < name >");
      System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output hadoop");
      System.err.println("Counter:");
      System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
      System.exit(-1);
    }
    
    //记录开始时间
    DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
    Date start = new Date();
    
    //运行任务
    int res = ToolRunner.run(new Configuration(), new Exercise_1(), args);

    //输出任务耗时
    Date end = new Date();
    float time =  (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
    System.out.println( "任务开始:" + formatter.format(start) );
    System.out.println( "任务结束:" + formatter.format(end) );
    System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); 

        System.exit(res);
  }
}
------------------------------------------------------------------------------------------------------------------------------------------------
/**  
 * Hadoop网络课程模板程序
 * 编写者:James
 */  

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
/**  
 * 有Reducer版本
 */  
public class Test_2 extends Configured implements Tool {  
  
  /**  
   * 计数器
   * 用于计数各种异常数据
   */  
  enum Counter 
  {
    LINESKIP,  //出错的行
  }
  
  /**  
   * MAP任务
   */  
  public static class Map extends Mapper<LongWritable, Text, Text, Text> 
  {
    public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException 
    {
      String line = value.toString();        //读取源数据
      
      try
      {
        //数据处理
        String [] lineSplit = line.split(" ");
        String anum = lineSplit[0];
        String bnum = lineSplit[1];
        
        context.write( new Text(bnum), new Text(anum) );  //输出
      }
      catch ( java.lang.ArrayIndexOutOfBoundsException e )
      {
        context.getCounter(Counter.LINESKIP).increment(1);  //出错令计数器+1
        return;
      }
    }
  }

  /**  
   * REDUCE任务
   */ 
  public static class Reduce extends Reducer<Text, Text, Text, Text> 
  {
    public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
    {
      String valueString;
      String out = "";
      String name = context.getConfiguration().get("name");
      
      for ( Text value : values )
      {
        valueString = value.toString();
        out += valueString + "|";
      }
      out+=name;
      
      context.write( key, new Text(out) );
    }
  }

  @Override
  public int run(String[] args) throws Exception 
  {
    Configuration conf = getConf();
    conf.set("name", args[2]);
    Job job = new Job(conf, "Test_2");                //任务名
    job.setJarByClass(Test_2.class);                //指定Class
    
    FileInputFormat.addInputPath( job, new Path(args[0]) );      //输入路径
    FileOutputFormat.setOutputPath( job, new Path(args[1]) );    //输出路径
    
    job.setMapperClass( Map.class );                //调用上面Map类作为Map任务代码
    job.setReducerClass ( Reduce.class );              //调用上面Reduce类作为Reduce任务代码
    job.setOutputFormatClass( TextOutputFormat.class );
    job.setOutputKeyClass( Text.class );              //指定输出的KEY的格式
    job.setOutputValueClass( Text.class );              //指定输出的VALUE的格式
    
    job.waitForCompletion(true);
    
    //输出任务完成情况
    System.out.println( "任务名称:" + job.getJobName() );
    System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
    System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
    System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
    System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );

    return job.isSuccessful() ? 0 : 1;
  }
  
  /**  
   * 设置系统说明
   * 设置MapReduce任务
   */  
  public static void main(String[] args) throws Exception 
  {
    
    //判断参数个数是否正确
    //如果无参数运行则显示以作程序说明
    if ( args.length != 2 )
    {
      System.err.println("");
      System.err.println("Usage: Test_2 < input path > < output path > ");
      System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output");
      System.err.println("Counter:");
      System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
      System.exit(-1);
    }
    
    //记录开始时间
    DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
    Date start = new Date();
    
    //运行任务
    int res = ToolRunner.run(new Configuration(), new Test_2(), args);

    //输出任务耗时
    Date end = new Date();
    float time =  (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
    System.out.println( "任务开始:" + formatter.format(start) );
    System.out.println( "任务结束:" + formatter.format(end) );
    System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); 

        System.exit(res);
  }
}
分享到:
评论

相关推荐

    Hadoop Map-Reduce教程

    Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-Reduce 通过将大数据集拆分成小数据集来实现并行处理,这些小数据集...

    Map-Reduce原理体系架构和工作机制,eclipse与Hadoop集群连接

    Map-Reduce是一种编程模型,用于处理大规模数据集(通常是TB级或更大),该模型可以在大量计算机(称为集群)上进行并行处理。Map-Reduce的设计初衷是为了简化大数据处理任务,通过将这些任务分解成两个阶段——Map...

    Hive - A Warehousing Solution Over a Map-Reduce.pdf

    然而,Map-Reduce编程模型本身较为底层,开发者需要编写大量的自定义程序来实现特定的功能,这不仅增加了开发的难度,还降低了代码的可维护性和复用性。 针对这些问题,Facebook的数据基础设施团队提出了一种名为...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    使用Hadoop-Eclipse-Plugin时,建议遵循良好的编程习惯,如合理划分Mapper和Reducer的功能,优化数据处理流程,以及充分利用Hadoop的并行计算能力。同时,及时更新插件至最新版本,以获取最新的功能和修复。 通过...

    hadoop-common-2.6.0-bin-master.zip

    MapReduce则是一种编程模型,用于大规模数据集的并行计算,它将复杂计算任务分解为“映射”(map)和“化简”(reduce)两部分,便于分布式执行。 **Hadoop在Windows上的安装与配置** 在Windows上使用Hadoop通常...

    Hadoop-eclipse-plugin-2.7.6下载与说明

    3. **创建MapReduce项目**:在Eclipse中,你可以通过"File" -&gt; "New" -&gt; "Project" -&gt; "Map/Reduce Project"创建一个新的Hadoop项目。选择合适的Hadoop版本(这里是2.7.6),然后为项目命名。 4. **编写MapReduce...

    Hadoop示例程序WordCount运行及详解

    Hadoop平台上进行WordCount是非常重要的,掌握了WordCount可以更好地理解Hadoop的map-reduce编程模型。本文将详细讲解Hadoop平台上WordCount的运行和实现。 基于Hadoop的map-reduce模型,WordCount程序可以将输入的...

    hadoop-map-reduce-demo

    总结,`hadoop-map-reduce-demo`项目是一个理想的起点,通过它,你可以深入学习和掌握Hadoop MapReduce的核心概念和编程技巧。同时,Java作为主要的编程语言,对于理解分布式计算的实现细节至关重要。不断实践和探索...

    hadoop-eclipse-plugin-2.7.4

    接着,可以创建新的MapReduce项目,编写map和reduce函数,最后通过插件将程序打包并提交到Hadoop集群执行。 总之,Hadoop Eclipse Plugin 2.7.4是Hadoop开发者不可或缺的工具,它通过提供直观的图形界面和强大的...

    hadoop-eclipse2.7.1、hadoop-eclipse2.7.2、hadoop-eclipse2.7.3

    2. **项目创建与编辑**:安装插件后,Eclipse会新增Hadoop相关的项目类型,如"Hadoop Map/Reduce Project"。开发人员可以创建这样的项目,编写MapReduce程序。插件提供了对Hadoop API的自动补全和语法高亮,使得编码...

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    它将大型任务分解为许多小的Map任务和Reduce任务,这些任务在集群中的节点上并行执行。 3. **网络通信**:Hadoop使用`org.apache.hadoop.net`包中的类来处理网络通信,如`SocketServer`和`NetUtils`,它们负责节点...

    hadoop-3.3.0.tar.gz

    MapReduce则是一种编程模型,用于处理和生成大规模数据集,它将工作分解为“映射”(map)和“化简”(reduce)两个阶段,使得并行处理成为可能。 在Hadoop 3.3.0中,有一些值得注意的更新和改进: 1. **YARN增强*...

    hadoop-eclipse-plugin-2.7.1.jar

    4. 完成安装后,重启Eclipse,即可在"File" -&gt; "New"菜单中看到“Hadoop Map/Reduce Project”选项。 同时,`no.txt`文件可能是用于记录某些说明或注意事项的文本文件,但具体内容需要打开文件查看。在Hadoop开发中...

    Hadoop技术-MapReduce编程模型.pptx

    它的设计灵感来源于函数式编程语言中的映射(Map)和归约(Reduce)概念,使得开发者可以方便地编写分布式应用程序,处理存储在Hadoop分布式文件系统(HDFS)上的海量数据。 ### 1. MapReduce编程过程 MapReduce的...

    hadoop-2.8.3-eclipse-plugin插件包

    MapReduce分为Map阶段和Reduce阶段,Map将大任务拆分成小任务并行处理,Reduce则对Map的结果进行聚合。同时,理解Hadoop的配置文件和YARN资源调度器也是必要的。 总之,Hadoop-2.8.3-Eclipse-Plugin是Hadoop开发的...

    hadoop-2.6.0-cdh5.7.0.tar.gz

    它将大型任务拆分成可并行处理的小任务(map阶段),然后将结果合并(reduce阶段)。 3. **YARN (Yet Another Resource Negotiator)**:在Hadoop 2.x中,YARN成为资源管理器,负责调度集群资源,分离了数据处理和...

Global site tag (gtag.js) - Google Analytics