`

第一个MapReduce任务

    博客分类:
  • java
阅读更多

    前两天在公司内网上搭了个2个节点hadoop集群,暂时没有多大实际意义,仅用作自己的测试。遇到的问题在阿里巴巴这位仁兄的《Hadoop集群配置和使用技巧 》都有提到的。也遇到了reduce任务卡住的问题,只需要在每个节点的/etc/hosts将集群中的机器都配置上即可解决。
   今天将一个日志统计任务用Hadoop MapReduce框架重新实现了一次,数据量并不大,每天分析一个2G多的日志文件罢了。先前是用Ruby配合cat、grep命令搞定,运行一次在 50多秒左右,如果纯粹采用Ruby的话CPU占用率非常高而且慢的无法忍受,利用IO.popen调用linux的cat、grep命令先期处理就好多 了。看看这个MapReduce任务:

public class GameCount extends Configured implements
        org.apache.hadoop.util.Tool {
    public static class MapClass extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {

        private Pattern pattern;

        public void configure(JobConf job) {
            String gameName = job.get("mapred.mapper.game");
            pattern = Pattern.compile("play\\sgame\\s" + gameName
                    + ".*uid=(\\d+),score=(-?\\d+),money=(-?\\d+)");
        }

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            String text = value.toString();
            Matcher matcher = pattern.matcher(text);
            int total = 0; // 总次数
            while (matcher.find()) {
                int record = Integer.parseInt(matcher.group(2));
                output.collect(new Text(matcher.group(1)), new IntWritable(
                        record));
                total += 1;
            }
            output.collect(new Text("total"), new IntWritable(total));
        }
    }

    public static class ReduceClass extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }

    }

    static int printUsage() {
        System.out
                .println("gamecount [-m <maps>] [-r <reduces>] <input> <output> <gamename>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), GameCount.class);
        conf.setJobName("gamecount");

       conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(MapClass.class);
        conf.setCombinerClass(ReduceClass.class);
        conf.setReducerClass(ReduceClass.class);

        List<String> other_args = new ArrayList<String>();
        for (int i = 0; i < args.length; ++i) {
            try {
                if ("-m".equals(args[i])) {
                    conf.setNumMapTasks(Integer.parseInt(args[++i]));
                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of "
                        + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }
        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 3) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }
        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        conf.set("mapred.mapper.game", args[2]);
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        long start = System.nanoTime();
        int res = ToolRunner.run(new Configuration(), new GameCount(), args);
        System.out.println("running time:" + (System.nanoTime() - start)
                / 1000000 + " ms");
        System.exit(res);
    }

}
 

    代码没啥好解释的,就是分析类似"play game DouDiZhu result:uid=1871653,score=-720,money=0"这样的字符串,分析每天玩家玩游戏的次数、分数等。打包成GameCount.jar,执行:

 

hadoop jar GameCount.jar test.GameCount /usr/logs/test.log /usr/output DouDiZhu
 


   统计的运行时间在100多秒,适当增加map和reduce任务个数没有多大改善,不过CPU占用率还是挺高的。

分享到:
评论

相关推荐

    第4讲_分布式计算框架mapreduce.pdf

    Map阶段是MapReduce程序的第一阶段,输入是Split切片,输出是中间计算结果。Map阶段由若干Map任务组成,任务数量由Split数量决定。Map任务将中间结果写入专用内存缓冲区Buffer,进行Partition和Sort。 Reduce阶段是...

    Hadoop学习全程记录-在Eclipse中运行第一个MapReduce程序.docx

    在本文中,我们将深入探讨如何在Eclipse环境中编写并运行你的第一个MapReduce程序,这是一个针对Hadoop初学者的教程。Hadoop是一个开源框架,用于处理和存储大量数据,而MapReduce是Hadoop的核心计算模型,它将...

    第四章Mapreduce.pdf

    4. 编程模型限制:MapReduce模型仅包含一个Map阶段和一个Reduce阶段,复杂业务逻辑可能需要多个MapReduce作业串行执行。 【MapReduce核心思想】 MapReduce的基本思想是将大规模数据处理分解为两个主要步骤:Map和...

    配置MapReduce环境.pdf

    创建一个Maven项目是开发MapReduce程序的第一步。在Eclipse中,选择`File &gt; New &gt; Maven Project`,然后选择`Quick Start`模板。填写`Group Id`和`Artifact Id`,这两个标识符将用于唯一识别你的项目。接着,你需要...

    云应用系统开发第二次项目(mapreduce)

    1. 使用 Maven 创建项目:使用 Maven 创建一个新的项目,并配置 pom.xml 文件以便使用 Hadoop 和 MapReduce。 2. 配置 pom.xml:在 pom.xml 文件中添加 Hadoop 和 MapReduce 的依赖项,以便使用相关的类库。 3. 导入...

    MapReduce操作实例-数据去重.pdf

    这是MapReduce工作流程中的第一步,它接收键值对(`LongWritable key, Text value`)作为输入,这里`key`通常是文件块的偏移量,`value`是该位置的行文本。Mapper的主要任务是处理输入数据并生成中间键值对。在这个...

    Chapter7-厦门大学-林子雨-大数据技术原理与应用-第7讲-MapReduce(中国大学MOOC2018年春季学期)1

    MapReduce是大数据技术中的一个核心组件,它是一种分布式并行编程模型,能够处理大量数据。下面是关于MapReduce的知识点: 1. 概述 MapReduce是一种分布式并行编程模型,能够处理大量数据。它由Google公司提出,...

    mapreduce重点笔记.pdf

    在上述示例中,我们定义了一个PairWritable类,它不仅实现了序列化和反序列化,还实现了Comparable接口,使得第一列按字典顺序排序,第一列相同时,第二列按升序排序。 6. MapReduce框架结构: MapReduce的执行...

    【MapReduce篇08】MapReduce优化1

    确保硬件的正常运行是优化的第一步。例如,CPU应有足够的计算能力处理Map和Reduce任务,内存要足够大以存储中间结果,磁盘健康状况好能快速读写数据,而网络带宽要充足以支持数据传输。 I/O操作是MapReduce程序的另...

    第8节、MapReduce1

    本节将详细探讨MapReduce的概念、运行流程以及通过一个简单的WordCount程序案例来理解其实现。 一、概念阐释 MapReduce的核心思想是将大规模数据处理任务分解成两个主要阶段:Map阶段和Reduce阶段。Map阶段将原始...

    mapreduce_canopy

    Canopy算法首先对数据点进行两遍扫描,第一遍使用T1,第二遍使用T2。在每遍扫描中,数据点被分配到最近的Canopy,如果一个数据点已经属于某个Canopy,但与另一个Canopy的距离小于当前阈值,那么它会被重新分配。通过...

    mogodb mapreduce方法

    然后,它遍历`values`数组,累加`count`的值,同时保持`age`的值为第一个文档的`age`。最后,返回聚合后的`reduced`对象。 `finalize`函数是可选的,它在`reduce`函数完成后对最终结果进行进一步的处理。在这个例子...

    MapReduce计算模型详讲(结合源码深入解读)

    4. Map阶段:Map阶段是MapReduce模型的第一阶段,负责将输入数据分割成小块,并对每个小块进行处理。Map阶段的输出结果是一个键值对序列。 5. Reduce阶段:Reduce阶段是MapReduce模型的第二阶段,负责将Map阶段的...

    十分钟掌握MapReduce精髓

    - 映射(Map)是MapReduce的第一步,它接收输入数据集,并将其分割成多个键值对(key-value pairs)。这些键值对是处理的基本单元,每个键值对代表一个独立的计算任务。例如,在文本处理中,键可能是单词,值可能是...

    第三次试验-MapReduce1

    **第三次试验 - MapReduce1 实验详解** MapReduce 是一种分布式计算模型,广泛应用于大数据处理,尤其是在Hadoop框架中。本实验旨在通过实际操作,帮助学生掌握MapReduce的基本编程技巧,以及利用它来解决常见的...

    Mapreduce#文档.docx

    并行计算的第一个重要问题是如何划分计算任务或者计算数据以 便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖 关系的数据无法进行并行计算!  构建抽象模型:Map 和 Reduce ...

    第9节、MapReduce练习1

    在这个实例中,我们将看到如何利用MapReduce计算每个用户(通过电话号码标识)的上行流量(upFlow)和下行流量(downFlow)的总和。 首先,我们来看原始数据。假设数据是以文本格式存储,每一行代表一个用户的流量...

    第5章大数据技术教程-MapReduce运行原理及Yarn介绍.docx

    MapReduce1 的运行调度流程如下:首先客户端启动提交一个作业,向 JobTracker 请求一个 Job ID,将运行作业所需要的资源文件复制到 HDFS 上,包括 MapReduce 程序打包的 JAR 文件、配置文件和 Job 配置信息。...

    理论部分-Hadoop MapReduce1

    1. Map 阶段:这是并行计算的第一步,它接收输入数据,并将其拆分成键值对的形式。在提供的 WordCount 示例中,`TokenizerMapper` 类实现了 Map 功能。它遍历文本行,使用 `StringTokenizer` 分割单词,并为每个单词...

    MapReduce中文版.pdf

    1. **数据分区**:输入数据被分割成多个块,每个块分配给一个Map任务处理。 2. **映射阶段**:Map任务在本地处理输入键值对,生成中间键值对。 3. **排序阶段**:所有相同中间键的值被归并和排序,准备进行Reduce...

Global site tag (gtag.js) - Google Analytics