mongodb MapReduce使用初步
摘自:http://www.kafka0102.com/2010/09/329.html
最近在做搜索的查询日志的统计分析,对每一条查询统计日志,我将其解析出来后以特定字段格式存在mongodb中,定时调度做些统计分析。其中有个需求是,统计某个时间段(每天、每周、每月)各个query的查询次数,展示上就是热门查询query了。考虑到处理的数据量不会很大,解决方法也可以简单来之。我现在使用的方法就是mongodb的MapReduce功能,其实这个需求也可以认为是个group操作,而mongodb的group功能就是基于MapReduce的,但group对结果集的大小是有限制的。本文就针对一个示例介绍一下mongodb MapReduce功能。
语法介绍
MapReduce是mongodb中的一个Command,它的语法格式如下:
db.runCommand(
{ mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sort the query. useful for optimization>]
[, limit : <number of objects to return from collection>]
[, out : <output-collection name>]
[, keeptemp: <true|false>]
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, verbose : true]
}
);
对于该Command,必有的3个参数我就不解释了。对于可选参数,这里简要说明如下:
(1) query是很常用的,它用来在map阶段过滤查询条件的以限定MapReduce操作的记录范围。
(2) 和query相关的还有sort和limit,我起初以为它俩是用在reduce阶段,实际上和query一起用在map阶段。
(3) mongodb默认是创建一个临时的collection存储MapReduce结果,当客户端连接关闭或者显示使用collection.drop(),这个临时的collection会被删除掉。这也就说,默认的keeptemp是false,如果keeptemp为true,那么结果collection就是永久的。当然,生成的collection名称并不友好,所以可以指定out表明永久存储的collection的名称(这时不需要再指定keeptemp)。当指定out时,并不是将执行结果直接存储到out,而是同样到临时collection,之后如果out存在则drop掉,最后rename临时collection为out。
(4) finalize:当MapReduce完成时应用到所有结果上,通常不怎么使用。
(5) verbose:提供执行时间的统计信息。
执行结果的格式如下:
{ result : <collection_name>,
counts : {
input : <number of objects scanned>,
emit : <number of times emit was called>,
output : <number of items in output collection>
} ,
timeMillis : <job_time>,
ok : <1_if_ok>,
[, err : <errmsg_if_error>]
}
更常用的MapReduce命令的helper是:
db.collection.mapReduce(mapfunction,reducefunction[,options]);
map函数的定义如下,map函数内使用this来操作当前行表示的对象,并且需要使用emit(key,value)方法来向reduce提供参数:
function map(void) -> void
reduce函数定义如下,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组:
function reduce(key, value_array) -> value
MapReduce得到的collection的格式是{“_id”:key,”value”:}。
应用示例
这里给出一个假想的无意义的示例,主要是为了说明mongodb MapReduce的使用。每条记录的schema是{“query”:,”cnt”:,”year”:,”month”=>}。这个schema的cnt是不需要的,因为每条查询query的cnt都是1,但这里想要稍微复杂一些条件。下面是可在mongodb shell中执行的MapReduce脚本。
map = function() {
emit(this.query, this.cnt);
};
reduce = function(key , vals) {
var sum = 0;
for(var i in vals) {
sum += vals[i];
}
return sum;
};
res = db.log_info.mapReduce(map,reduce,{"query":{"year":2010}});
执行结果如下:
{
"result" : "tmp.mr.mapreduce_1284794393_2",
"timeMillis" : 72,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 113
},
"ok" : 1,
}
对于”result”,它是生成的临时collection名称,这个名称的命名规则是:”tmp.mr.mapreduce_”+time(0)+”_”+(jobNumber++)
执行db[res.result].find()得到:
{ "_id" : "a", "value" : 521 }
{ "_id" : "aa", "value" : 128 }
{ "_id" : "aaa", "value" : 40 }
{ "_id" : "aaaa", "value" : 4 }
{ "_id" : "aaab", "value" : 9 }
{ "_id" : "aaac", "value" : 13 }
{ "_id" : "aab", "value" : 45 }
{ "_id" : "aaba", "value" : 5 }
{ "_id" : "aabb", "value" : 14 }
{ "_id" : "aabc", "value" : 20 }
{ "_id" : "aac", "value" : 39 }
{ "_id" : "aaca", "value" : 6 }
{ "_id" : "aacb", "value" : 2 }
{ "_id" : "aacc", "value" : 5 }
{ "_id" : "ab", "value" : 65 }
{ "_id" : "aba", "value" : 37 }
{ "_id" : "abaa", "value" : 12 }
{ "_id" : "abab", "value" : 13 }
{ "_id" : "abac", "value" : 10 }
{ "_id" : "abb", "value" : 42 }
Java客户端API使用
和JS脚本一样,mongodb Java客户端提供了两个MapReduce接口,分别是:
public MapReduceOutput mapReduce( String map , String reduce , String outputCollection , DBObject query );
public MapReduceOutput mapReduce( DBObject command );
MapReduceOutput实现如下:
public class MapReduceOutput {
MapReduceOutput( DBCollection from , BasicDBObject raw ){
_collname = raw.getString( "result" );
_coll = from._db.getCollection( _collname );
_counts = (BasicDBObject)raw.get( "counts" );
}
public DBCursor results(){
return _coll.find();
}
public void drop(){
_coll.drop();
}
public DBCollection getOutputCollection(){
return _coll;
}
final String _collname;
final DBCollection _coll;
final BasicDBObject _counts;
}
所以,可以调用MapReduceOutput.results()得到DBCursor做后续处理,比如在我的应用场景里,根据value值做降序排序并取limit 1000以得到最热门的一些query。
由于JavaScript引擎设计上的限制,当前的mongodb MapReduce还只是单线程执行,mongodb也在计划解决这个问题。如果需要多线程处理,可以考虑shard或者在客户端代码控制处理。
分享到:
相关推荐
了解和掌握MongoDB的安装、配置以及基本操作,如数据插入、查询、更新和删除,是使用MongoDB的基础。同时,熟悉其复制集和分片集群的设置,有助于构建高可用和可扩展的数据库架构。此外,学习MongoDB的聚合框架和...
"大数据管理实验之三Hadoop基础命令与编程初步.docx"则深入到Hadoop的使用层面,包括HDFS的基本命令,如上传、下载、查看文件等,以及编写MapReduce程序的初步概念,如Mapper和Reducer的工作原理,以及使用Java API...
在MapReduce的Shuffle过程中,每个Map任务有一个缓存,分区默认使用哈希函数,但Shuffle过程不会改变最终结果。若要实现Storm中Tuple的随机分发,应采用ShuffleGrouping。SQL Azure的Client Layer负责将用户请求转化...
10. 可扩展的并行处理:MapReduce是处理大规模数据集的一种方式,书中将使用Hadoop作为实例来介绍MapReduce基础和可扩展的处理。 11. 使用Hive和Pig分析大数据:Hive和Pig是构建在Hadoop之上的高级抽象,用于简化大...
OpenNotes自己整理,总结的...11 锈2018 Centos 7 MySQL Redis MongoDB 码头工人大数据Hadoop 2.6.5 MapReduce 蜂巢1.2.1 HBase 0.98 卡夫卡2.10语言特性初步编程鸭子模型猴子补丁赛顿Pythonic泛型模版生命周期所有权
例如,使用Spark SQL进行结构化数据处理,或者使用Pandas库对数据进行初步分析。 3. 数据存储:大数据项目通常使用分布式数据库系统,如HBase、Cassandra或MongoDB,用于存储非结构化或半结构化的数据;而关系型...
- **日志处理:** 大量的日志数据可以通过Hadoop进行初步清洗和聚合,然后再存储到Hbase中供后续分析使用。 通过以上对Hadoop+Hbase搭建云存储的关键知识点的梳理,我们可以清晰地了解到这套技术组合在实际应用中...
3. **大数据处理**:Hadoop和Spark是当时大数据处理的重要工具,对应的源码可能涉及到MapReduce算法实现或Spark的DataFrame和RDD操作。 4. **云计算**:2013年,AWS(亚马逊网络服务)和OpenStack正逐步流行,源码...
交互式数据查询技术则适合以用户为中心的分析需求,NoSQL数据存储是其基础,比如Hive、HBase、MongoDB等。这种技术的特点是灵活、直观,并且便于控制,适合在数据分析中需要高度互动的场景。 在网络安全分析中,...
此外,NoSQL数据库如MongoDB、Cassandra等,因其非关系型、高并发的特性,也被广泛应用在物联网大数据场景中。 接着,数据处理和分析是大数据价值体现的关键步骤。MapReduce是Hadoop框架下的并行计算模型,适用于...
3. **ODS(操作数据存储)层**:此层提供了一个临时区域,用于存储经过初步处理的数据,以快速响应业务运营查询。 4. **DWM(数据仓库模型)层**:这里的数据经过了更复杂的转换和规范化,以适应特定的业务需求和...
### NoSQL基础知识详解 #### 一、引言与概述 随着互联网技术的飞速发展,海量数据处理...通过对NoSQL的基本概念、关键技术以及常见类型的深入理解,可以帮助我们在实际工作中更好地选择和使用合适的NoSQL解决方案。
- **数据库选择**:根据数据类型及应用场景选择合适的关系型或非关系型数据库,如MySQL、MongoDB等。 - **数据质量管理**:建立数据清洗规则,定期执行数据清理任务,保证数据质量。 ##### 2.3 数据处理与分析 - *...
MongoDB是一种NoSQL数据库,适合处理非结构化和半结构化数据,与传统的关系型数据库如MySQL相比,它在大数据场景下更显优势。Anaconda则是一个广泛使用的Python数据分析环境,集成了众多数据科学所需的库和工具,...
同时,数据的存储和处理可能需要使用到数据库管理系统,如MySQL、MongoDB等,或者大数据处理框架如Hadoop和Spark。 在编程方面,Python和R语言是常用的数据分析工具,它们有丰富的库支持数据处理和建模,如Pandas、...
标题"百度大数据云计算研发笔试卷-综合文档"暗示了这份资料是百度公司在进行大数据和云计算研发岗位招聘时所使用的笔试题目集。这类笔试通常涵盖多个技术领域,旨在测试候选人在大数据处理、云计算技术以及相关研发...
3. **数据存储**:选择HDFS、NoSQL数据库(如MongoDB、Cassandra)或关系型数据库(如MySQL)作为数据存储介质。 4. **数据服务**:开发RESTful API或使用消息队列(如RabbitMQ)等方式提供数据访问服务。 #### ...