package com.dt.spark.topn; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TopN { /** * * 从所有订单日志中获取Top 5订单及付款金额 * @author yuming * @ail: ymzhang@foxmail.com * @weibo: http://www.weibo.com/yumzhang */ public static class ForTopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { int[] tops; int length; @Override protected void setup(Context context) throws IOException, InterruptedException { length = context.getConfiguration().getInt("topN", 5); tops = new int[length + 1]; } /** * 在Map阶段各个Map分别计算自己的Top N,减少网络传输的压力 * 减少数据量,提高Reduce处理的效率,防止少海量数据的OOM问题 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] data = value.toString().split(","); if (data != null && 4 == data.length) { int cost = Integer.valueOf(data[2]); tops[0] = cost; Arrays.sort(tops); //正向排序 } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = 0; i < tops.length; i++) { context.write(new IntWritable(tops[i]), new IntWritable(tops[i])); } } } public static class ForTopNReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { int[] tops; int length; @Override protected void setup(Context context) throws IOException, InterruptedException { length = context.getConfiguration().getInt("topN", 5); //default get Top 5 tops = new int[length + 1]; } public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { tops[0] = key.get(); Arrays.sort(tops); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = length; i > 0; i--) { //对已经排序好的数组输出 context.write(new IntWritable(length - i + 1), new IntWritable(tops[i])); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInt("topN", 5); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: TopN Rank <in> [<in>...] <out>"); System.exit(2); } // set job Job job = new Job(conf, "Sorted TopN Application"); job.setJarByClass(TopN.class); // set Map、Combine and Reduce class job.setMapperClass(ForTopNMapper.class); // job.setCombinerClass(ForSortReducer .class); job.setReducerClass(ForTopNReducer.class); // set input output data format job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // set path for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } /** * dataTopN1.txt Id,custmId,pay,productId 1,9819,100,121 2,8918,2000,111 3,2813,1234,22 4,9100,10,1101 5,3210,490,111 6,1298,28,1211 7,1010,281,90 8,1818,9000,20 ---------------- dataTopN.txt 10,2222,10,1000 11,9321,1000,293 12,3881,999,328 13,8328,1000,66 */
一图顶千言:
充分利用Map的集群效应,在Map阶段将处理各自处理自己的Top N,然后将数据输出给Reducer,提高集群利用率,防止OOM发生,可能还有更优算法,如果哪位有也可以回复我。
相关推荐
1. **前N个最大/最小值的获取**:对于较小的N值,可以采用优先队列或堆数据结构来高效实现,时间复杂度为O(n log N),其中n为总数据量。 2. **双路归并排序**:适用于较大的N值,通过两路归并的方法逐步合并排序...
4. MapReduce设计模式在构建MapReduce应用程序中的应用。 5. Secondary Sort问题的定义、成因、解决方案以及Hadoop和Spark框架下的实现方法。 6. 如何编写MapReduce程序中的核心函数:map()和reduce(),及其在具体...
本话题主要探讨如何利用MapReduce来实现物品协同过滤算法(Item-based Collaborative Filtering, 简称ItemCF),这是一种推荐系统中常用的算法。我们将深入理解ItemCF的原理,以及如何将其与MapReduce相结合。 **...
- **哈希表**:利用哈希表(hash map)来存储查询及其出现次数,以快速获取每个查询的频率。 - **Trie树**:Trie树是一种树形结构,特别适合于字符串的检索,可以用来统计不同查询的频率。 例如,在一个每条记录大小...
海量数据处理面试题主要考察的是数据处理能力、算法理解、分布式计算原理以及高效存储策略。以下是对这些面试题的详细解答: 1. 提取出某日访问百度次数最多的 IP: 这题通常需要使用流式处理,如 MapReduce 或者 ...
- **实现**: Map阶段将数据划分并发送到不同的节点进行处理,Reduce阶段汇总结果。 **7. 数据压缩技术** - **适用场景**: 减少存储和传输成本。 - **实现**: 使用如gzip、bzip2等压缩工具对数据进行压缩。 **8. ...
首先,通过Map阶段将日志中的IP与访问次数对应起来,然后在Reduce阶段对相同IP的访问次数进行求和,最后找出访问次数最多的IP。 2. **检索串频率统计** 这个问题涉及到搜索引擎的查询分析。可以使用Trie树或者...
与之相伴的是监控数据的快速增长,如何从海量监控数据中挖掘出有效信息已经成为现代舰船远程监控系统的瓶颈问题。为了解决这个问题,本文提出了一种基于现有的开源云平台的远程实时监控系统架构,并针对监控系统架构...
这些面试题目聚焦于大数据量和海量数据的处理,涵盖了各种挑战,包括数据过滤、去重、排序、频率统计和热门元素提取。以下是对这些题目的详细解析和相关知识点: 1. **URL共现问题**:这是一个典型的集合交集问题,...
随着互联网数据量的急速增长,如何在海量的RDF数据中实现快速准确的关键词搜索显得尤为重要。 文章提出了KDSOS(Keyword Distributed Search with Ontology Subgraph)算法,这是面向大规模RDF数据的分布式搜索算法...
在当前大数据时代,推荐系统已经成为互联网服务中不可或缺的一部分,而协同过滤(Collaborative Filtering, CF)是推荐系统中常用的一种算法。本篇将探讨如何在分布式计算框架Hadoop上,利用项目K临近(K-Nearest ...
在这个项目中,我们将深入探讨如何利用Hadoop的分布式计算能力来解决大规模数据处理问题,以及如何在Java环境下实现好友推荐算法。 首先,Hadoop是Apache软件基金会开发的一个开源框架,它允许对大型数据集进行...
MapReduce则是一种编程模型,用于大规模数据集的并行计算,它将复杂计算任务拆分成“映射”(map)和“化简”(reduce)两部分,便于在集群中并行执行。 二、电影推荐系统原理 电影推荐系统是通过分析用户的历史...
- **案例分析**:通过实际案例,如WordCount、TopN分析等,详细介绍MapReduce的具体实现方法。 2. **Hadoop API使用** - **Java API**:介绍如何使用Java API来开发Hadoop应用程序,包括如何创建Job、设定输入...
MapReduce将大规模数据处理分解为两个主要步骤:Map(映射)和Reduce(规约),使得并行处理数据成为可能,从而在处理海量数据时表现出高效能。 【数据收集与分布式文件系统】 在这个研究中,提出了一种基于...
它分为两个阶段:Map(映射)和Reduce(规约)。 - **YARN(Yet Another Resource Negotiator)**:资源管理器,负责分配和管理集群中的资源。 - **Hive**:数据仓库工具,提供SQL查询功能,使得处理Hadoop数据变得...