1. Spark写入MongoDB的实例代码
如下代码实现将RDD写入到MongoDB的spark数据库的oc集合(Collection)中
package spark.examples.db import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkContext, SparkConf} import com.mongodb.hadoop.MongoOutputFormat import org.apache.spark.SparkContext._ import org.bson.BasicBSONObject object SparkMongoDBIntegration { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local") val sc = new SparkContext(conf); val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25))) val config = new Configuration() //spark.oc指的是,spark是MongoDB的数据库的名字,而ic表示数据库中的一个Collection config.set("mongo.input.uri", "mongodb://192.168.26.137:27017/spark.ic") config.set("mongo.output.uri", "mongodb://192.168.26.137:27017/spark.oc") //使用MongoOutputFormat将数据写入到MongoDB中 val rdd = data.map((elem) => { val obj = new BasicBSONObject() obj.put("name", elem._1) obj.put("age", elem._2) //转换后的结果,KV对 //第一个是BSON的ObjectId,插入时可以指定为null,MongoDB Driver在插入到MongoDB时,自动的生成 //obj是BSONObject,是MongoDB Driver接收的插入对象 (null, obj) }) //RDD data is a KV pair,so it can use saveAsNewAPIHadoopFile rdd.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config) } }
例子程序使用了MongoDB提供的Hadoop-MongoDB-Connector来实现将RDD写入到MongoDB中,注意的是,运行这个程序无需启动Hadoop(指的是HDFS),也就是把MongoDB作为一个外部数据源,通调用rdd.saveAsTextFile("file:///localfilesystemfile")的道理一样,通过这个Connector,MongoDB实现了作为HDFS格式兼容的数据源,因此Spark可以直接操作之。
2. 代码的依赖
上面的代码要运行,依赖两个jar包
- MongodB-Hadoop-Connector
- MongoDB Driver
分别可以从如下地址下载:https://github.com/plaa/mongo-spark和https://github.com/mongodb/mongo-java-driver/releases
3.关于mongo.output.uri和mongo.input.uri
它们的含义可以参见http://www.itpub.net/thread-1808762-1-1.html
启动mongo.output.uri的意思是指specify an output collection URI to write the output to. If using a sharded cluster, you can specify a space-delimited list of URIs referring to separate mongos instances and the output records will be written to them in round-robin fashion to improve throughput and avoid overloading a single mongos.
也就是说,mongo.output.uri是一个Spark输出数据的集合的uri,例子中"mongodb://192.168.26.137:27017/spark.ic",spark.ic的含义是spark是MongoDB的数据库的名字,而ic表示数据库中的一个Collection
4. 数据库和集合需要预先建立吗?
在上面的例子中,spark数据库和ic/oc集合都不需要预先建立,MongoDB Driver会首先判断,要写入数据的数据库和集合是否存在,如果不存在则会首先建立
5. 如何设置数据库的用户名密码?
在实际工作中,MongoDB的数据库都是有用户名密码的,也就是在指定mongo.output.uri和mongo.input.uri时,都要带着用户名和密码信息,这如何指定?
可以使用如下方式来指定:
mongodb://joe:123456@192.168.26.137:27017/spark.ic?readPreference=secondary使用joe和123456作为用户名密码认证,同时指定读优先策略为secondary(从slave节点上读取)
参考:
http://codeforhire.com/2014/02/18/using-spark-with-mongodb/
http://my.oschina.net/1987times/blog/265483
https://github.com/plaa/mongo-spark
相关推荐
Spark-MongoDB库将这两者完美结合,实现了在Spark上对MongoDB数据集的高效读写。 首先,让我们详细了解一下如何在Spark中使用Spark-MongoDB库进行数据读取。要连接到MongoDB,你需要配置MongoDB的URI(包括数据库名...
本示例将讨论如何使用Java语言在Spark中连接MongoDB进行CRUD(创建、读取、更新和删除)操作,同时涉及到Spring框架和Maven构建工具。 首先,我们需要确保已安装并配置了Spark、Java、MongoDB以及Maven。在Java项目...
标题中的"Spark连接MongoDB"指的是使用Apache Spark与MongoDB之间的数据交互,这通常涉及到大数据处理和NoSQL数据库的结合。Spark作为一个强大的分布式计算框架,能够高效地处理大规模数据,而MongoDB则是一个灵活、...
现在,你可以使用`DataFrameReader`和`DataFrameWriter`来读写MongoDB数据了。例如,读取数据: ```scala val df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load() ``` 写入数据: ```scala ...
该连接器支持双向的数据读写操作,使得Spark可以直接访问MongoDB中的数据。在Apache Spark的执行环境中,连接器能够将MongoDB中的数据作为RDD(弹性分布式数据集)进行操作,反之,也可以将Spark的RDD保存回MongoDB...
因此,引入MongoDB官方提供的`MongoDBSparkConnector`组件,能够实现Spark与MongoDB之间的高效双向读写,解决了这个问题。 **MongoDBSparkConnector** `MongoDBSparkConnector`使得Spark可以直接读取和写入MongoDB...
4. 高性能:MongoDB在读写性能上有出色表现,尤其是对于大数据集,可以实现毫秒级的响应时间。 5. 全局部署:MongoDB支持全球部署,能够轻松地在多个数据中心之间进行数据复制和同步。 Spark是一个开源的集群计算...
该项目是一个大数据课程的期末项目,主要利用了Spark、Hadoop HDFS和MongoDB等技术,通过Scala编程语言来实现电影推荐系统。这个系统是基于大数据处理的,因此涉及到的知识点非常广泛,涵盖了分布式计算、数据存储、...
《基于MongoDB+Spark+ElasticSearch的电影推荐系统》是一个集成大数据处理、推荐算法与实时检索的综合项目,适用于毕业设计、课程设计以及个人技能提升。在这个项目中,MongoDB作为非关系型数据库用于存储海量的电影...
- 高性能读写:推荐系统需要频繁地读取和更新用户行为数据,MongoDB的高性能读写能力可以满足这种需求。 - 灵活的数据模型:MongoDB的文档型数据模型允许快速地添加新属性,适应推荐系统的需求变化。 5. **项目...
Spark整合MongoDB是一种高效的数据处理方式,它结合了Spark的并行处理能力与MongoDB的非结构化数据存储优势。以下将详细介绍Spark和MongoDB的整合过程、关键知识点以及一个简单的示例代码。 **1. Spark简介** Spark...
- Spark与这些系统的集成方式,如Spark读写HDFS - Spark与NoSQL数据库的配合,如Cassandra、MongoDB - Spark与YARN、Mesos的资源管理机制 7. **第8章**:Spark项目实战与性能调优 - 如何设计Spark项目,包括...
通过MongoDB的API,我们可以方便地进行数据的读写操作,从而实现数据的实时更新和快速检索。 推荐系统的构建通常包括以下步骤: 1. **数据收集**:MongoDB用于存储用户购买历史、浏览记录、评价等多元数据。 2. **...
《基于Spark+Flask+MongoDB的在线电影推荐系统设计与实现》 在这个项目中,开发者巧妙地结合了Apache Spark、Python Flask框架以及MongoDB数据库,构建了一个实用的在线电影推荐系统。这样的组合使得系统在大数据...
在这个项目中,Spark 2.0 被用作大数据处理框架,MongoDB 则作为非关系型数据库存储用户行为数据。 1. **Spark 2.0**: Spark 是一种快速、通用且可扩展的大数据处理框架,2.0 版本在前一版本的基础上进行了优化,...