`

第一个MapReduce任务

    博客分类:
  • java
阅读更多

    前两天在公司内网上搭了个2个节点hadoop集群,暂时没有多大实际意义,仅用作自己的测试。遇到的问题在阿里巴巴这位仁兄的《Hadoop集群配置和使用技巧 》都有提到的。也遇到了reduce任务卡住的问题,只需要在每个节点的/etc/hosts将集群中的机器都配置上即可解决。
   今天将一个日志统计任务用Hadoop MapReduce框架重新实现了一次,数据量并不大,每天分析一个2G多的日志文件罢了。先前是用Ruby配合cat、grep命令搞定,运行一次在 50多秒左右,如果纯粹采用Ruby的话CPU占用率非常高而且慢的无法忍受,利用IO.popen调用linux的cat、grep命令先期处理就好多 了。看看这个MapReduce任务:

public class GameCount extends Configured implements
        org.apache.hadoop.util.Tool {
    public static class MapClass extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {

        private Pattern pattern;

        public void configure(JobConf job) {
            String gameName = job.get("mapred.mapper.game");
            pattern = Pattern.compile("play\\sgame\\s" + gameName
                    + ".*uid=(\\d+),score=(-?\\d+),money=(-?\\d+)");
        }

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            String text = value.toString();
            Matcher matcher = pattern.matcher(text);
            int total = 0; // 总次数
            while (matcher.find()) {
                int record = Integer.parseInt(matcher.group(2));
                output.collect(new Text(matcher.group(1)), new IntWritable(
                        record));
                total += 1;
            }
            output.collect(new Text("total"), new IntWritable(total));
        }
    }

    public static class ReduceClass extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }

    }

    static int printUsage() {
        System.out
                .println("gamecount [-m <maps>] [-r <reduces>] <input> <output> <gamename>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), GameCount.class);
        conf.setJobName("gamecount");

       conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(MapClass.class);
        conf.setCombinerClass(ReduceClass.class);
        conf.setReducerClass(ReduceClass.class);

        List<String> other_args = new ArrayList<String>();
        for (int i = 0; i < args.length; ++i) {
            try {
                if ("-m".equals(args[i])) {
                    conf.setNumMapTasks(Integer.parseInt(args[++i]));
                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of "
                        + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }
        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 3) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }
        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        conf.set("mapred.mapper.game", args[2]);
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        long start = System.nanoTime();
        int res = ToolRunner.run(new Configuration(), new GameCount(), args);
        System.out.println("running time:" + (System.nanoTime() - start)
                / 1000000 + " ms");
        System.exit(res);
    }

}
 

    代码没啥好解释的,就是分析类似"play game DouDiZhu result:uid=1871653,score=-720,money=0"这样的字符串,分析每天玩家玩游戏的次数、分数等。打包成GameCount.jar,执行:

 

hadoop jar GameCount.jar test.GameCount /usr/logs/test.log /usr/output DouDiZhu
 


   统计的运行时间在100多秒,适当增加map和reduce任务个数没有多大改善,不过CPU占用率还是挺高的。

分享到:
评论

相关推荐

    MapReduce的小应用

    在实现过程中,需要注意的是,第二个MapReduce任务的输入目录是第一个任务的输出目录。使用JobControl类可以管理这两个job的顺序执行。在Reduce阶段,为了避免自动对相同key的字段相加,collect语句应放置在循环内部...

    MapReduce的两个简单例子

    首先,我们来看第一个例子:WordCount。这是一个非常经典的MapReduce示例,它的目标是统计文本中每个单词出现的次数。在这个过程中,Map阶段的主要任务是对输入的每行文本进行拆分,生成键值对(单词,1),而Reduce...

    mapreduce mapreduce mapreduce

    Map阶段是数据处理的第一步,它接收输入数据,将其分割成多个键值对(key-value pairs),然后对每个键值对应用用户定义的函数(称为Mapper)。Mapper通常用于过滤、转换或对原始数据进行初步分析。Map阶段的结果是...

    基于MapReduce的基于用户的协同过滤算法代码及其使用

    `UserCF1`类是第一个MapReduce任务的具体实现。它继承自`Configured`类并实现了`Tool`接口,这意味着它可以直接作为一个Hadoop工具来运行。 - **Mapper1**:Mapper阶段的主要功能是从输入的数据中提取有用的信息。...

    mapreduce项目 数据清洗

    MapReduce通常与Hadoop生态系统一起使用,Hadoop提供了一个分布式文件系统(HDFS)来存储大数据,以及YARN资源管理器来协调计算任务。在这个项目中,数据可能存储在HDFS上,由YARN调度执行MapReduce作业。 5. **...

    第4讲_分布式计算框架mapreduce.pdf

    Map阶段是MapReduce程序的第一阶段,输入是Split切片,输出是中间计算结果。Map阶段由若干Map任务组成,任务数量由Split数量决定。Map任务将中间结果写入专用内存缓冲区Buffer,进行Partition和Sort。 Reduce阶段是...

    Hadoop学习全程记录-在Eclipse中运行第一个MapReduce程序.docx

    在本文中,我们将深入探讨如何在Eclipse环境中编写并运行你的第一个MapReduce程序,这是一个针对Hadoop初学者的教程。Hadoop是一个开源框架,用于处理和存储大量数据,而MapReduce是Hadoop的核心计算模型,它将...

    java大数据作业_5Mapreduce、数据挖掘

    在给定的例子中,输入数据为一系列整数对,我们需要按第一个数字排序,然后在相同的第一数字下,按第二个数字排序。在Map阶段,键可以是原始数字对,值是数字对本身。在Reduce阶段,使用自定义Comparator对相同键的...

    MapReduce进阶

    Map阶段是MapReduce处理流程的第一步,主要由用户自定义的map函数实现。在这个阶段,输入数据会被拆分成多个小块,每个小块称为一个split。然后,对于每个split,map函数会被调用来处理这个数据块,将输入的键值对...

    MapReduce求平均值示例程序

    首先,Map阶段是MapReduce工作流程的第一步。在这个阶段,原始数据被分割成多个小块(split),每个split由一个map任务处理。在示例程序中,map函数会接收这些数据块,并进行预处理。为了计算平均值,我们需要统计每...

    Hadoop MapReduce

    第一个版本提供了源代码、使用方法和逐步解释,而第二个版本在此基础上进一步展示了如何运行和展示亮点。 最后,文档是由Apache软件基金会发布的,它全面描述了所有面向用户方面的Hadoop MapReduce框架,并作为一个...

    MapReduce基础

    MapReduce的第一步是映射。在这个阶段,输入数据被分成若干个小块,每个小块由一个Map函数处理。Map函数接受输入数据的一个键值对,并输出一系列新的键值对。这些键值对随后会被发送到Reduce阶段进行进一步处理。 *...

    拓思爱诺大数据-第二次作业MapReduce编程

    在Map阶段,程序会读取输入的文本文件,并将每个单词与一个计数1进行配对。然后,在Reduce阶段,相同的单词会被归并,其对应的计数被求和,从而得到每个单词的总出现次数。 接下来是flowcount流量统计程序,这可能...

    mapreduce程序

    Map阶段是MapReduce工作流程的第一步,它接收输入数据集,并将其分割成多个小的数据块,每个数据块由一个Map任务处理。Map函数通常用于对原始数据进行预处理,如解析、过滤和转换。在这个阶段,数据本地化策略确保...

    第四章Mapreduce.pdf

    4. 编程模型限制:MapReduce模型仅包含一个Map阶段和一个Reduce阶段,复杂业务逻辑可能需要多个MapReduce作业串行执行。 【MapReduce核心思想】 MapReduce的基本思想是将大规模数据处理分解为两个主要步骤:Map和...

    Mapreduce原理

    - **Map阶段**:数据处理的第一步是将原始数据切分为多个块,这些块被称为“split”。Map函数接收这些split作为输入,并对其进行处理,将数据转换为键值对形式。Map函数的主要目的是对输入数据进行初步处理和过滤,...

    MapReduce实现矩阵相乘算法

    如果矩阵A为m×n维,矩阵B为n×p维,它们可以相乘得到一个m×p维的矩阵C,其中每个元素C[i][j]是通过计算A的第i行与B的第j列对应元素的乘积之和得到的。 在MapReduce框架下,这个过程可以分为三个主要步骤:Map、...

    MapReduce天气源数据和计算类

    1. **Mapper**:Mapper阶段是MapReduce工作流程的第一步,负责接收输入数据并生成中间键值对。在这个天气数据分析的上下文中,Mapper可能会读取每一天的天气记录,从中提取出日期(key)和对应日期的最高、最低温度...

    细细品味Hadoop_Hadoop集群(第9期)_MapReduce初级案例

    这个“细细品味Hadoop_Hadoop集群(第9期)_MapReduce初级案例”主题聚焦于Hadoop集群的核心组件——MapReduce,这是一种分布式计算模型,对于处理海量数据具有重要作用。 MapReduce的工作原理可以分为两个主要阶段...

Global site tag (gtag.js) - Google Analytics