`

mapreduce Bet

 
阅读更多
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class BET {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, Text>{
   
    private final static IntWritable one = new IntWritable(1);
    private final static Text creditBalance = new Text("creditBalance");
    private final static Text betString = new Text("betString");
    private Text word = new Text();
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
//    value =new Text("hello zhrg bey zhrg") ;
     StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
      //  context.write(word, one);
      }
    FileSplit fs = (FileSplit) context.getInputSplit(); 
        Path grandpa = fs.getPath().getParent().getParent(); 
        String date = grandpa.getName(); 
        if(date == null) 
            throw new RuntimeException("The input path is not right. No date field can be extracted!"); 
   
        word.set("creditBalance1creditBalance2:"+key);
    context.write(creditBalance,word);
    word.set("betString1betString2:"+key);
    context.write(betString,word);
    }
  }
  //对每一个数字进行汇总求和 
  public static class SumCombiner extends 
          Reducer<Text,Text,Text,Text> { 
      private LongWritable result = new LongWritable(); 
      private Text k = new Text("midsum"); 
      private Text text = new Text();
      public void reduce(Text key, Iterable<Text> values,
              Context context
              ) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();
              for(Text tx:values){
              sb.append(tx.toString());
              }
              text.set(sb.toString());
              context.write(key, text);
              } 
  }
 
  public static class IntSumReducer
       extends Reducer<Text,Text,Text,Text> {
    private IntWritable result = new IntWritable();

    private Text text = new Text();
    public void reduce(Text key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
     /* for (IntWritable val : values) {
        sum += val.get();
      }*/
     // result.set(sum);

      StringBuffer sb = new StringBuffer();
      for(Text tx:values){
      sb.append(tx.toString());
      }
      text.set(sb.toString());
      context.write(key, text);
    }
  }

  public static void main(String[] args) throws Exception {
//  Path path = new Path("/hadoop/file01");
    Configuration conf = new Configuration();
   
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    HDFS_File fu = new HDFS_File();
    String src = "/install/sumnum.txt";
    // HadoopFileUtil hu = new HadoopFileUtil();
//    String src = "/hadoop/files02";
    /*String dst = "hdfs://s21:9000/user/sumnum";
    if(!fu.CheckFileExist(conf, dst))
    fu.PutFile(conf, src, dst);*/
   // fu.putFileFormLocal(conf,"/install/sumnum.txt", otherArgs[0] );
  /*  String str = otherArgs[1]+"/sum_its4";
    if(fu.CheckFileExist(conf, str))
    fu.DelFile(conf, str, true);
    str = otherArgs[1]+"/_SUCCESS";
    if(fu.CheckFileExist(conf, str));
    fu.DelFile(conf, str, true);
    str = otherArgs[1];
    if(fu.CheckFileExist(conf, str));
    fu.DelFile(conf, str, true);*/
    Job job = new Job(conf, "word count");
    job.setJarByClass(BET.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(SumCombiner.class); 
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
//    job.setOutputFormatClass(coAuOutputFormat.class); 
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    System.out.println("ok");
  }
}
分享到:
评论

相关推荐

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    mapreduce mapreduce mapreduce

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大量数据。这个模型主要由两个主要阶段组成:Map(映射)和Reduce(规约)。MapReduce的核心思想是将复杂的大规模数据处理任务分解成一系列可并行执行...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    MapReduce基础.pdf

    ### MapReduce基础知识详解 #### 一、MapReduce概述 **MapReduce** 是一种编程模型,最初由Google提出并在Hadoop中实现,用于处理大规模数据集的分布式计算问题。该模型的核心思想是将复杂的大型计算任务分解成较...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    mapreduce项目 数据清洗

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解成两个主要阶段:Map(映射)和Reduce(化简)。在这个"MapReduce项目 数据清洗"中,我们将探讨...

    学生mapreduce成绩分析

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的计算任务分解成两个主要阶段:Map(映射)和Reduce(化简),使得在大规模分布式环境下处理大数据变得可能...

    MapReduce 设计模式

    MapReduce是一种编程模型,用于大规模数据集的并行运算。它最初由Google提出,其后发展为Apache Hadoop项目中的一个核心组件。在这一框架下,开发者可以创建Map函数和Reduce函数来处理数据。MapReduce设计模式是对...

    hadoop mapreduce编程实战

    Hadoop MapReduce 编程实战 Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 ...

    基于MapReduce的Apriori算法代码

    基于MapReduce的Apriori算法代码 基于MapReduce的Apriori算法代码是一个使用Hadoop MapReduce框架实现的关联规则挖掘算法,称为Apriori算法。Apriori算法是一种经典的关联规则挖掘算法,用于发现事务数据库中频繁...

    MapReduce发明人关于MapReduce的介绍

    ### MapReduce:大规模数据处理的简化利器 #### 引言:MapReduce的诞生与使命 在MapReduce问世之前,Google的工程师们,包括其发明者Jeffrey Dean和Sanjay Ghemawat,面临着一个共同的挑战:如何高效地处理海量...

    Hadoop原理与技术MapReduce实验

    (2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...

    大数据 hadoop mapreduce 词频统计

    【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    大数据实验四-MapReduce编程实践

    ### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...

    kmeans(mapreduce)

    这个过程可以视为“更新”步骤,但因为MapReduce模型不支持原地更新,所以需要再次运行MapReduce作业,将新的质心作为输入,开始下一轮迭代。 4. **迭代过程**:重复上述过程,直到质心不再明显变化或者达到预设的...

    MapReduce的实现细节

    ### MapReduce的实现细节 #### 一、MapReduce框架概述 MapReduce是一种广泛应用于大数据处理领域的分布式编程模型,最初由Google提出并在其内部系统中得到广泛应用。随着开源社区的发展,尤其是Apache Hadoop项目...

    Mapreduce实验报告.doc

    MapReduce是一种分布式计算模型,由Google提出,用于处理和生成大规模数据集。它将复杂的分布式系统操作抽象成简单的编程模型,使开发人员能够专注于编写Map和Reduce函数,从而实现大规模数据处理。 MapReduce的...

Global site tag (gtag.js) - Google Analytics