`
guoyunsky
  • 浏览: 854424 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:206253
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

Hadoop MapReduce 学习笔记(七) MapReduce在多字段/列基础上实现类似SQL的max和min

 
阅读更多

   本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233733

 

      

       请先阅读:           

           1.Hadoop MapReduce 学习笔记(一) 序言和准备

           2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

           3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

           4.Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

                 5.Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min

                 6.Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min  正确写法

 

    下一篇: Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序

 

        Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min  正确写法 只是一列,如序言说的,一张表中有多个列呢?比如想找出序言中USER表最大和最小ID的用户数据,类似SQL:

 

 SELECT * FROM USER WHERE ID=MAX(ID) OR ID= MIN(ID);

      还是贴上代码吧,这里引入的概念是自己实现Hadoop的输入输出.Hadoop自己的是IntWritalbe,Text等,有如Java的int,String.但我们想实现自己的类呢.请看代码吧:

 

     1.相对Hadoop来说,自己的输入输出类:

package com.guoyun.hadoop.mapreduce.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * 多列数据,这里格式是:frameworkName(String)  number(int)
 * 等同于数据表
 * CREATE TABLE TABLE_NAME(
 *  FRAMEWORK_NAME VARCHAR(32),
 *  NUMBER INT
 * )
 */
public  class MultiColumnWritable implements  WritableComparable{
  protected String frameworkName="";
  protected long number=-1;
  
  public String getFrameworkName() {
    return frameworkName;
  }

  public void setFrameworkName(String frameworkName) {
    this.frameworkName = frameworkName;
  }

  public long getNumber() {
    return number;
  }

  public void setNumber(long number) {
    this.number = number;
  }

  public MultiColumnWritable() {
    super();
  }

  @Override
  public int compareTo(Object obj) {
    int result=-1;
    if(obj instanceof MultiColumnWritable){
      MultiColumnWritable mcw=(MultiColumnWritable)obj;
      if(mcw.getNumber()<this.getNumber()){
        result =1;
      }else if(mcw.getNumber()==this.getNumber()){
        result=0;
      }
    }
    return result;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    frameworkName=in.readUTF();
    number=in.readLong();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeUTF(frameworkName);
    out.writeLong(number);
  }

  @Override
  public String toString() {
    return frameworkName+"\t"+number;
  }
  
}

 

  2.获得最大和最小值

package com.guoyun.hadoop.mapreduce.study;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 或得最大和最小值,类似SQL:SELECT * FROM TABLE WHERE NUMBER=MAX(NUMBER) OR NUMBER=MIN(NUMBER)
 * 这里有多列数据,但只取其中一列的最大和最小
 * 如果想对其中几列取最大最小值,请自己实现 @MultiColumnWritable
 */
public class GetMaxAndMinValueMultiMapReduceTest extends MyMapReduceMultiColumnTest {
  
  public static final Logger log=LoggerFactory.getLogger(GetMaxAndMinValueMultiMapReduceTest.class);

  public GetMaxAndMinValueMultiMapReduceTest(long dataLength) throws Exception {
    super(dataLength);
    // TODO Auto-generated constructor stub
  }

  public GetMaxAndMinValueMultiMapReduceTest(String outputPath) throws Exception {
    super(outputPath);
    // TODO Auto-generated constructor stub
  }

  public GetMaxAndMinValueMultiMapReduceTest(long dataLength, String inputPath,
      String outputPath) throws Exception {
    super(dataLength, inputPath, outputPath);
    // TODO Auto-generated constructor stub
  }
  
  
  public static class MyCombiner
    extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{
    private final Text maxValueKey=new Text("maxValue");
    private final Text minValueKey=new Text("minValue");
    
    @Override
    public void reduce(Text key, Iterable<MultiColumnWritable> values,Context context)
        throws IOException, InterruptedException {
      log.debug("begin to combine");
      long maxValue=Long.MIN_VALUE;
      String maxFrameworkName="";
      long minValue=Long.MAX_VALUE;
      String minFrameworkName="";
      long valueTmp=0;
      String nameTmp="";
      MultiColumnWritable writeValue=new MultiColumnWritable();
      
      for(MultiColumnWritable value:values){
        valueTmp=value.getNumber();
        nameTmp=value.getFrameworkName();
        
        // 其实可以用他们的compare方法
        if(valueTmp>maxValue){
          maxValue=valueTmp;
          maxFrameworkName=nameTmp;
        }else if(valueTmp<minValue){
          minValue=valueTmp;
          minFrameworkName=nameTmp;
        }
      }
      
      writeValue.setFrameworkName(maxFrameworkName);
      writeValue.setNumber(maxValue);
      context.write(maxValueKey, writeValue);
      writeValue.setFrameworkName(minFrameworkName);
      writeValue.setNumber(minValue);
      context.write(minValueKey, writeValue);
    } 
    
  }
  
  
  /**
   * Reduce,to get the max value
   */
  public static class MyReducer 
    extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{
    private final Text maxValueKey=new Text("maxValue");
    private final Text minValueKey=new Text("minValue");
      
    @Override
    public void run(Context context) throws IOException, InterruptedException {
      long maxValue=Long.MIN_VALUE;
      long minValue=Long.MAX_VALUE;
      long tmpValue=0;
      String tmpFrameworkName="";
      String tmpKey="";
      String maxFrameworkName="";
      String minFrameworkName="";
      MultiColumnWritable writeValue=new MultiColumnWritable(); 
      MultiColumnWritable tmpWrite=null;
      
      try {
        setup(context);
   
        while(context.nextKey()){
          tmpKey=context.getCurrentKey().toString();
          tmpWrite=(MultiColumnWritable)context.getCurrentValue();
          tmpValue=tmpWrite.getNumber();
          tmpFrameworkName=tmpWrite.getFrameworkName();
          
          if(tmpKey.equals("maxValue")){
            if(tmpValue>maxValue){
              maxValue=tmpValue;
              maxFrameworkName=tmpFrameworkName;
            }
          }else if(tmpKey.equals("minValue")){
            if(tmpValue<minValue){
              minValue=tmpValue;
              minFrameworkName=tmpFrameworkName;
            }
          }
        }
        
        writeValue.setFrameworkName(maxFrameworkName);
        writeValue.setNumber(maxValue);
        context.write(maxValueKey, writeValue);
        writeValue.setFrameworkName(minFrameworkName);
        writeValue.setNumber(minValue);
        context.write(minValueKey, writeValue);
      } catch (Exception e) {
        log.debug(e.getMessage());
      }finally{
        cleanup(context);
      }
       
    }

    @Override
    protected void cleanup(Context context) throws IOException,
        InterruptedException {
      // TODO Auto-generated method stub
      super.cleanup(context);
    }

    @Override
    protected void setup(Context context) throws IOException,
        InterruptedException {
      // TODO Auto-generated method stub
      super.setup(context);
    }
    
  }
  
  /**
   * @param args
   */
  public static void main(String[] args) {
    MyMapReduceTest mapReduceTest=null;
    Configuration conf=null;
    Job job=null;
    FileSystem fs=null;
    Path inputPath=null;
    Path outputPath=null;
    long begin=0;
    String input="testDatas/mapreduce/MRInput_Multi_getMaxAndMin";
    String output="testDatas/mapreduce/MROutput_Multi_getMaxAndMin";
    
    
    try {
      mapReduceTest=new GetMaxAndMinValueMultiMapReduceTest(2000000,input,output);
      
      inputPath=new Path(mapReduceTest.getInputPath());
      outputPath=new Path(mapReduceTest.getOutputPath());
      
      conf=new Configuration();
      job=new Job(conf,"getMaxAndMinValueMulti");
      
      fs=FileSystem.getLocal(conf);
      if(fs.exists(outputPath)){
        if(!fs.delete(outputPath,true)){
          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");
          return;
        }
      }
      
      
      job.setJarByClass(GetMaxAndMinValueMultiMapReduceTest.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(MultiColumnWritable.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(MultiColumnWritable.class);
      job.setMapperClass(MultiSupMapper.class);
      job.setCombinerClass(MyCombiner.class);
      job.setReducerClass(MyReducer.class);
      
      job.setNumReduceTasks(2);
      
      FileInputFormat.addInputPath(job, inputPath);
      FileOutputFormat.setOutputPath(job, outputPath);
      
      begin=System.currentTimeMillis();
      job.waitForCompletion(true);
      
      System.out.println("===================================================");
      if(mapReduceTest.isGenerateDatas()){
        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());
        System.out.println("The minValue is:"+mapReduceTest.getMinValue());
      }
      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));
      // Spend time:13361
      
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    

  }
  

}

 

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

0
1
分享到:
评论

相关推荐

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount ...通过理解和实践 Hadoop MapReduce 的 WordCount 示例,开发者可以快速掌握 MapReduce 的基本工作原理,为进一步学习和应用大数据处理技术打下坚实基础。

    Hadoop MapReduce实现tfidf源码

    在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...

    大数据 hadoop mapreduce 词频统计

    在这个过程中,Hadoop MapReduce通过并行化处理和容错机制,能够高效地处理大规模数据,即使在硬件故障的情况下也能确保数据完整性。同时,MapReduce的编程模型相对简单,使得开发者能够专注于业务逻辑,而不是底层...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    Hadoop MapReduce实战手册(完整版)

    总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、编程模型以及在大数据处理中的实际应用,是学习和理解大数据处理技术的理想读物。通过深入阅读,读者可以提升在大数据环境下的编程和...

    Hadoop mapreduce 实现KMeans

    在压缩包文件“CH 10.1 - KMeans”中,可能包含了关于如何在 Hadoop MapReduce 上实现 KMeans 的具体代码示例、步骤指南或者理论讲解。这些资源可以帮助你更深入地理解如何将这两个技术结合,以便在大规模数据集上...

    python hadoop mapreduce 相似用户|mapreduce.rar

    在大数据处理领域,Python、Hadoop MapReduce是两个非常重要的工具。本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,...

    Hadoop mapreduce 实现MatrixMultiply矩阵相乘

    本主题将深入探讨如何使用Hadoop MapReduce来实现MatrixMultiply,即矩阵相乘,这是一个基础且重要的数学运算,尤其在数据分析、机器学习以及高性能计算中有着广泛应用。 首先,理解矩阵相乘的基本原理至关重要。在...

    hadoop mapreduce编程实战

    Hadoop MapReduce 编程实战 ...通过了解 MapReduce 编程基础、MapReduce 项目实践、MapReduce 编程模型、Deduplication、MAC 地址统计和计数器的使用,我们可以更好地掌握 Hadoop MapReduce 的编程技术。

    Hadoop之MapReduce编程实例完整源码

    一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...

    Java操作Hadoop Mapreduce基本实践源码

    在Java中实现MapReduce程序,我们需要创建两个类:`Mapper`和`Reducer`,分别对应Map和Reduce阶段的逻辑。此外,还需要配置Job对象,指定输入输出路径、Mapper和Reducer类以及其他的Hadoop配置参数。 例如,一个...

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    ### Hadoop MapReduce V2 知识点概览 #### 一、Hadoop MapReduce V2 生态系统介绍 ...通过本书的学习,读者不仅可以了解Hadoop MapReduce V2的基本原理,还可以学习到如何在实际项目中有效利用这一强大的工具。

    mapreduce在hadoop实现词统计和列式统计

    在这个场景中,我们将讨论如何使用Hadoop的MapReduce来实现词统计和列式统计。 **一、MapReduce原理** MapReduce的工作流程主要包括三个主要阶段:Map、Shuffle(排序)和Reduce。在Map阶段,输入数据被分割成多个...

    hadoop mapreduce helloworld 能调试

    在大数据处理领域,Hadoop MapReduce 是一个至关重要的框架,它允许开发者编写分布式应用程序来处理海量数据。"Hadoop MapReduce HelloWorld 能调试" 的主题意味着我们将深入理解如何设置、运行以及调试 MapReduce ...

    基于Apriori算法的频繁项集Hadoop mapreduce

    在大数据处理领域,Apriori算法与Hadoop MapReduce的结合是实现大规模数据挖掘的关键技术之一。Apriori算法是一种经典的关联规则学习算法,用于发现数据集中频繁出现的项集,进而挖掘出有趣的关联规则。而Hadoop ...

    Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的计算框架,尤其在处理大规模数据集时。决策树(Decision Tree)是一种流行的机器学习算法,常用于分类和回归问题。本项目结合了两者,实现了一个名为 MR_...

    Hadoop MapReduce.pdf

    MapReduce的核心思想是通过将大数据集分割成小块,并在多个计算机节点上进行并行处理,从而实现高效的数据处理。 #### 二、MapReduce工作流程详解 1. **输入切分**:Hadoop将输入文件分割成固定大小的块,每个块...

    Hadoop MapReduce.md

    本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...

    Hadoop_MapReduce教程.doc

    【Hadoop MapReduce教程】 Hadoop MapReduce是一种分布式计算框架,设计用于处理和存储大量数据。这个框架使得开发者能够编写应用程序来处理PB级别的数据,即使是在由数千台廉价硬件组成的集群上。MapReduce的核心...

Global site tag (gtag.js) - Google Analytics