对于每个从事数据技术的人才而言,mapreduce都不陌生,简单而言就是一种大数据计算、分类、分析的一个编程模式,目前很多大数据存储平台都支持mapreduce,比如hadoop、hbase等等,也有很多采用了类似于mapreduce的算法的其他数据计算平台,关于mapreduce的原理不再赘言,我们直接来了解mongodb中如何使用mapreduce。
Mongodb中提供了2种对大数据计算的方式:aggregate和mapreduce,严格意义上说mapreduce也是一种“aggregation”算法,只是这两种方式使用起来差距甚远,设计理念也不同,所以我们分开来介绍,aggregate请参考【aggreate】,本文重点介绍mapreduce。
mapreduce有2个阶段:map和reduce;mapper处理每个document,然后emits一个或者多个objects,object为key-value对;reducer将map操作的结果进行联合操作(combine)。此外mapreduce还可以有一个finalize阶段,这是可选的,它可以调整reducer计算的结果。在进行mapreduce之前,mongodb支持使用query来筛选文档,也支持sort排序和limit。
mapreduce使用javascript语法编写,其内部也是基于javascript V8引擎解析并执行,javascript语言的灵活性也让mapreduce可以处理更加复杂的业务场景;当然这相对于aggreation pipleine而言,意味着需要书写大量的脚本,而且调试也将更加困难。(调试可以基于javascript调试,成功后再嵌入到mongodb中)我们来展示一个例子:
我们想统计每个品类下商品的总量,SQL语义为:
SELECT COUNT(1) FROM products group by category_id
原始数据格式:
{_id:1,"categoryId" : 1 , "productId":100010,"amount" : 200} {_id:1,"categoryId" : 1, "productId": 100011,"amount" : 300}
mapreducey语句范例:
db.products.mapReduce( function(){ emit(this.categoryId,1); }, function(key,values){ return Array.sum(values); }, { out:"result001", } )
JAVA代码示例:
String mapper = "function() {" + "emit(this.categoryId,1);" + "}"; String reducer = "function(key,values){" + "return Array.sum(values);" + "}"; collection.mapReduce(mapper, reducer) .databaseName("test") //结果保存在”test“数据库中,默认为当前数据库 .collectionName("result") //指定结果保存的collection,默认为inline模式,不保存,直接通过cursor输出 .action(MapReduceAction.REPLACE) //结果保存方式,如果”result“表存在,则替换。 .nonAtomic(false) //是否为”非原子性“ .sharded(false) //结果collection是否为sharded .maxTime(180,TimeUnit.SECONDS) //mapreduce执行的最大时间 .iterator() //触发执行,这句话千万别忘了,否则不会执行 .close(); //我们不需要cursor,则直接关闭 //此后即可通过result collection获取数据统计的结果
统计结果示例:
{ "_id" : 1000, "value" : 100 } { "_id" : 1001, "value" : 100 } { "_id" : 1002, "value" : 100 } { "_id" : 1003, "value" : 100 } { "_id" : 1004, "value" : 100 }
通过此例,我们简单的了解了mapreduce的编程方式,其实也是很简单的,如果你从事过javascript开发,你会发现书写mapper和reducer脚本也并不困难,况且javascript内置的大量函数库也可以提供便利。
一个mapreduce就像aggregation中的一次group,如果你的统计结果需要多次group,那么可能需要多个mapreduce;这和aggregate pipleline相比就没有那么简便了。但是mapreduce可以写的非常复杂,处理一些比较棘手的业务性逻辑,这在aggregate中反而不行,因为aggregate支持的“计算表达式”比较有限,和javascript的方法库相比相差太远。mapreduce可以读取sharding collection中的数据,而且可以将output结果输出到sharding collection中,但是aggregate不支持这个特性。
一、编程语法
在mongodb中,mapreduce除了包含mapper和reducer之外,还包含其他的一些选项,不过整体遵循mapreduce的规则:
1、map:javascript方法,此方法中可以使用emit(key,value),一次map调用中允许返回调用多次emit(也可以不调用),它不需要返回值;其中key用来分组,value将来会被传递给reducer用于“聚合计算”。每条document都会调用一次map方法。
mapper中输入的是当前document,可以通过this.<filedName>来获取字段的值。mapper应该是封闭的,它不能访问外部资源,比如collection、database,不能修改外部的值,但允许访问“scope”中的变量。emit的值不能大于16M,即document最大的尺寸,否则mongodb将会抛出错误。
function() { this.items.forEach(function(item) {emit(item.sku,1);});//多次emit }
2、reduce:javascript方法,此方法接收key和values两个参数,经过mapper处理和“归并之后”,一个key将会对应一组values(分组,key:values),此values将会在reduce中进行“聚合计算”,比如:sum、平均数、数据分拣等等。
reducer和mapper一样是封闭的,它内部不允许访问database、collection等外部资源,不能修改外部值,但可以访问“scope”中的变量;如果一个key只有一个value,那么mongodb就不会调用reduce方法。可能一个key对应的values条数很多,将会调用多次reduce,即前一次reduce的结果可能被包含在values中再次传递给reduce方法,这也要求,reduce返回的结果需要value的结构保持一致。同样,reduce返回的数据尺寸不能大于8M(document最大尺寸的一半,因为reduce的结果可能会作为input再次reduce)。
//mapper function() { emit(this.categoryId,{'count' : 1}); } //reducer function(key,values) { var current = {'count' : 0}; values.forEach(function(item) { current.count += item.count;}); return current; }
此外reduce内的算法需要是幂等的,且与输入values的顺序无关关的,因为即使相同的input文档,也无法保证map-reduce的每个过程都是逐字节相同的,但应该确保计算的结果是一致的。
3、out:document结构,包含一些配置选项;用于指定reduce的结果最终如何保存。可以将结果以inline的方式直接输出(cursor),或者写入一个collection中。
out : { <action> : <collectionName> [,db:<dbName>] [,sharded:<boolean>] [,nonAtomic:<boolean>] }
out方式默认为inline,即不保存数据,而是返回一个cursor,客户端直接读取数据即可。
<action>表示如果保存结果的<collection>已经存在时,将如何处理:1)replace:替换,替换原collection中的内容;先将数据保存在临时collection,此后rename,再将旧collection删除 2)merge:将结果与原有内容合并,如果原有文档中持有相同的key(即_id字段),则直接覆盖原值 3)reduce:将结果与原有内容合并,如果原有稳重中有相同的key,则将新值、旧值合并后再次应用reduce方法,并将得到的值覆盖原值(对于“用户留存”、“数据增量统计”非常有用)。
db:结果数据保存在哪个database中,默认为当前db;开发者可能为了进一步使用数据,将统计结果统一放在单独的database中。
sharded:输出结果的collection将使用sharding模式,使用_id作为shard key;不过首先需要开发者对<collection>所在的database开启sharding,否则将无法执行。
nonAtomic:“非原子性”,仅对“merge”和“replace”有效,控制output collection,默认为false,即“原子性”;即mapreduce在输出阶段将会对output collection所在的数据库加锁,直到输出结束,可能会性能会有影响;如果为true,则不会对db加锁,其他客户端可以读取到output collection的中间状态数据。我们通常将ouput collection单独放在一个db中,和application数据分离开,而且nonAtomic为false,我们也不希望用户读到“中间状态数据”。
可以通过指定“out:{inline : 1}”将输出结果保存在内存中,并返回一个cursor,客户端可以直接读取即可。
4、query:筛选文档,只需要将符合条件的documents传递给mapper。
5、sort:对刷选之后的文档排序,然后才传递给mapper。如果根据map的key进行排序,则可以减少reduce的操作次数。排序必须能够使用index。
6、limit:限定输入到map的文档条数。
7、finalize:终结操作,在输出之前调整reduce的结果。它和map、reduce一样,也是一个javascript方法,接收key和value,其中value为reduce输出结果,finalize方法中可以修改value的值作为最终的输出结果:
function(key,value) { var final = {count : 0,key:""}; final.key = key; return final; }
8、scope:document结构,保存一些global级别的变量值,它们可以在map、reduce、finalize中被访问。
//java代码 Document scope = new Document("off",0.5); String finalize = "function(key,value) {" + "return value.count * off;" + "}";
9、jsMode:可选值为true或者false;表示是否将map执行的中间结果数据有javascript对象转换成BSON对象,默认为false。false表示,在mapper中emit最终输出的是javascript对象,因为是javascript引擎处理的,不过mapper 可能产生大量的数据,这些数据将会被保存在临时的存储中(collection),所以需要将javascript对象转换成BSON;在reduce阶段,这些BSON结果在被转换成javascript对象,传递给reduce方法,转换意味着性能消耗和慢速,它解决的问题就是“临时存储”以适应较大数据集的数据分析。如果为true,将不会进行类型转换,数据被暂存在内存中,reduce阶段直接使用mapper的结果即可,但是key的个数不能超过50W个。在production环境中,此值建议为false。
在map、reduce、finalize中,我们自始至终都不要尝试去改变key的值,这会导致错误。
二、基本原理
1、过程与原理
mongodb 2.4之后,可以多线程运行javascript操作,即允许多个mapreduce同时并行执行;不过在2.4之前,则是单线程运行,即每个mongod上同时只能有一个mapreduce运行,所以并发能力很弱,建议开发者升级到最新的版本。
mapreuce可以从sharding collection中读取数据,也可以将结果写入到sharding collection中;但是aggregate则不能将结果写入到sharding collection中。
1)mongos接收到mapreduce的操作请求后,根据query条件,将map-reduce任务发给持有数据的shards(sharding collection将会被分裂成多个chunks并分布在多个shards中,shard即为mongod节点)。
2)每个shards都依次执行mapper和reducer,并将结果写入到本地的临时collection中,结果数据是根据_id(即reducer的key)正序排列。
3)当所有的shards都reduce完成之后,将各自结果数据中_id的最大值和最小值(即min、max key)返回给mongos。
4)mongos负责shuffle和partition,将所有shards反馈的min、max key进行汇总,并将整个key区间分成多个partitions,每个partition包含[min,max]区间,此后mongos将partiton信息封装在finalReduce指令中并发给每个shard,最终每个shard都会收到一个特定的partition的任务;partition不会重叠。
5)此后每个shard将与其他所有的shards建立链接,根据partition信息,从min到max,遍历每个key。对于任何一个key,当前shard都将从其他shards获取此key的所有数据,然后后执行reduce和finalize方法,每个key可能会执行多次reduce,这取决于values的条数,但是finalize只会执行一次,最终将此key的finalize的结果通过本地方式写入sharding collection中。
6)当所有的shards都处理完毕后,mongos将处理结果返回给客户端(inline)。
在mapper时,mapper的结果首先放在内存中,当内存的数据量达到阀值会将执行一次reduce并写入临时文件。
------------ | mongos | ------------ | mapreduce | ------------ ------------ | shard | | shard | ------------ ------------ | | map+redue map+reduce | | ------------ ------------ | _chunk | | _chunk | ------------ ------------ | | [min1~max1] [min2~max2] | ------------ | mongos | ------------ | partitions | [min1'~max1'] [min1'~max2'] | | finalReduce finalReduce | | ------------ ------------ | _chunk | <--read--> | _chunk | ------------ ------------ | | finalize finalize -->write once for each key | | write write -->locally write | ------------ | sharding | ------------
在mongodb2.0版本中,finalReduce步骤将有mongos执行,即所有的shards执行完mapreduce之后,由mongos负责读取所有shards的结果并在本地进行merge和sort,然后执行finalReduce和finalize过程,这中方式并不会带来太大问题,消耗内存也小,但是会增加mongos的负担,影响应用数据的并发能力。经过2.4版本改进后(上述),性能得到很大的提高。
在mapreduce执行过程中,将会阻止balancer“迁移”chunks(sharding balancer机制);对于输出结果是sharding collection,在finalReduce期间,其chunks也不会被迁移,直到运行结束。主要是为了避免并发操作带来的问题。
2、并发性
我们都知道,mongodb中所有的读写操作都会加锁(意向锁),mapreduce也不例外。mapreduce涉及到mapper、reducer,中间过程还会将数据写入临时的collection中,最终将finalize数据写入output collection。read阶段将会使用读锁(读取chunks中的数据),每处理100条documents后重新获取锁(yields)。写入临时collectin使用写锁,这个不会涉及到锁的竞争,因为临时collection只对自己可见。在创建output collection时会对DB加写锁,如果output collection已经存在,且action为“replace”时,则会获取一个global级别的写锁,此时将会阻塞mongod上的所有操作(影响很大),主要是为了让数据结果为atomic;如果action为“merge”或者“reduce”,且“nonAtomic”为true是,只会在每次写入数据时才会获取写锁,这对性能几乎没有影响。
如果你每次mapreduce的结果都会写入到一个新的collection,则建议将“nonAtomic”设置为true,可以避免这个全局锁的问题。参见源码【mongo/db/commons/mr.cpp】
由上述原理可知,在mapper阶段为了压缩内存数据,在写入临时文件时也会执行一次reducer,在reduce阶段如果某个key对象的values较多,也会多次执行reducer;这也要求reducer返回结果必须和mapper emit的结果类型保持一致,同时保证reducer中的算法是幂等的,即多次运行结果应该一致,不依赖于reducer执行的次数和传递mapper结果的顺序。
三、其他
1、mapreduce脚本是基于javascript,为了便于运行和维护,我们通常将这些脚本保存在db中,比如mysql或者mongodb中,在运行mapreduce是从db中获取即可,然后传递给mapreduce方法。
2、一个mapreduce,只能执行产生一个“group”结果,如果你的数据分析中,需要多次group,那么可能需要将开发多个mapreduce,并将它们串联起来。(需要开发一个管理任务有向图的工具)。
相关推荐
MongoDB中的MapReduce是一种分布式计算模型,用于处理和分析海量数据。MapReduce包含两个主要阶段:Map阶段和Reduce阶段,这两个阶段共同实现了数据处理的并行化,从而提高处理效率。 Map阶段的主要任务是将原始...
与关系型数据库不同的是,MongoDB支持文档级别的更新操作(in-place update),以及自动的分片(Auto-sharding)和MapReduce计算模式。 8. MongoDB数据模型: MongoDB使用的是集合(Collection)作为存储数据的容器...
第2章 MongoDB基本原理与安装 2.1 数据库结构 2.2 文档 2.3 集合 2.3.1 集合的无模式 2.3.2 集合的命名 2.4 MongoDB数据类型 2.4.1 基本数据类型 2.4.2 数字类型 2.4.3 日期类型 2.4.4 ...
通过阅读和实践这些源码,读者能够加深对MongoDB工作原理的理解,掌握在实际开发中如何高效地使用MongoDB来处理和管理数据。无论是开发新项目还是优化现有系统,熟悉MongoDB的这些关键知识点都将大大提升工作效率。
6. 新增功能:介绍自第1版以来MongoDB新增的重要特性,如Change Streams、Transactions等。 7. MongoDB的运维:包含备份、恢复、日志分析和故障排查等运维实践。 8. 社区和生态:介绍MongoDB社区、工具生态系统和...
银河麒麟服务器操作系统-Mongodb适配手册 银河麒麟服务器操作系统-Mongodb...通过本手册,读者可以了解银河麒麟服务器操作系统和Mongodb软件的基础知识,并且能更好地理解Mongodb在银河麒麟服务器操作系统中的应用。
- 内存管理:了解工作集原理,确保数据能完全驻留在内存中以提高读写速度。 3. **数据安全** - 认证与授权:启用身份验证,使用角色基础的权限管理,确保数据访问的安全性。 - SSL/TLS加密:启用SSL连接,保护...
- MapReduce:了解如何使用MongoDB的MapReduce功能进行大数据处理和分析。 3. **性能优化** - 索引:学习创建和管理索引,理解不同类型的索引(如单键、复合、地理空间和文本索引)以及它们对查询性能的影响。 -...
6. MapReduce与聚合框架:解释MapReduce在MongoDB中的应用,用于大规模数据处理和分析。对比MapReduce和聚合框架,讨论各自的适用场景。 7. GridFS:介绍MongoDB的文件存储系统GridFS,用于存储和检索大型文件,如...
本学习资料包含了"Mongodb权威指南"和"Mongodb实战班克"两部分,旨在帮助读者全面深入地理解MongoDB的原理与实践应用。 "Mongodb权威指南"是一本详尽的参考书籍,涵盖了MongoDB的基础概念到高级特性。在书中,你...
2. 复制集部署、测试、工作原理:复制集是MongoDB中保证数据一致性的主要方式。 3. 分片集群的理论、搭建过程、测试结果、优势剖析、分片键选择和集群管理:分片是MongoDB中实现数据水平扩展的关键技术。 五、...
学习这两本书,你将能够掌握MongoDB的核心原理,理解其工作方式,并具备设计、部署和管理MongoDB实例的能力。通过实践书中的示例,你将能够熟练地运用MongoDB进行数据存储、查询优化和应用开发。此外,这些书籍也将...
此外,书中可能会介绍MongoDB的高级特性,如TTL(Time-to-Live)索引用于自动删除过期数据,以及地理空间索引用于地理位置相关的查询。 在实际开发中,MongoDB的查询语言(MQL)是另一个关键知识点,它基于JSON,...
此外,书中还涉及了MongoDB的高级特性,如聚合框架、MapReduce功能,以及如何进行性能调优。这些特性让MongoDB不仅仅是一个简单的文档存储,还能进行复杂的数据分析和处理。书中还讲解了MongoDB的命令行工具和驱动...
MongoDB和Hadoop是当前大数据处理领域中的两个关键工具,它们各自在数据存储与处理方面发挥着重要作用。这里,我们分别深入探讨这两个技术的核心概念、功能以及如何在实际应用中利用它们。 首先,MongoDB是一种...
第2章 MongoDB基本原理与安装 2.1 数据库结构 2.2 文档 2.3 集合 2.3.1 集合的无模式 2.3.2 集合的命名 2.4 MongoDB数据类型 2.4.1 基本数据类型 2.4.2 数字类型 2.4.3 日期类型 2.4.4 ...
### Java面试MongoDB知识点梳理 #### 一、MongoDB简介 **MongoDB**是由C++语言编写的,属于一种基于...了解MongoDB的基本原理和特性不仅有助于开发者更好地利用这一工具,也有助于他们在面试中展现自己的专业知识。
分布式数据库原理与应用 分布式数据库原理 MongoDB是面向文档的NoSQL数据库,具有高可靠性、支持分片、面向文档等特性。分布式数据库原理是指将数据分布式存储在多个服务器上,以提高数据的可靠性和可扩展性。 ...
以上内容总结了《MongoDB学习手册》的主要知识点,涵盖了MongoDB的基础概念、特性、应用场景、数据类型、索引、主从同步、分片和集群、基本操作、Shell控制台使用方法以及安全与认证等方面,为读者提供了全面的学习...
【大数据技术原理与应用】是厦门大学的一门专业选修课程,由林子雨教授编纂大纲,旨在为学生提供大数据技术的基础知识和实践经验。这门课程以构建知识体系、阐述基本原理、引导初级实践和了解相关应用为教学目标,为...