`
bit1129
  • 浏览: 1070738 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark五十】Spark读写MongoDB

 
阅读更多

 

 

1. Spark写入MongoDB的实例代码

如下代码实现将RDD写入到MongoDB的spark数据库的oc集合(Collection)中

 

 

package spark.examples.db

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import com.mongodb.hadoop.MongoOutputFormat
import org.apache.spark.SparkContext._
import org.bson.BasicBSONObject

object SparkMongoDBIntegration {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkRDDCount").setMaster("local")
    val sc = new SparkContext(conf);
    val data = sc.parallelize(List(("Tom", 31), ("Jack", 22), ("Mary", 25)))
    val config = new Configuration()
    //spark.oc指的是,spark是MongoDB的数据库的名字,而ic表示数据库中的一个Collection
    config.set("mongo.input.uri", "mongodb://192.168.26.137:27017/spark.ic")
    config.set("mongo.output.uri", "mongodb://192.168.26.137:27017/spark.oc")
    //使用MongoOutputFormat将数据写入到MongoDB中
    val rdd = data.map((elem) => {
      val obj = new BasicBSONObject()
      obj.put("name", elem._1)
      obj.put("age", elem._2)
      //转换后的结果,KV对
      //第一个是BSON的ObjectId,插入时可以指定为null,MongoDB Driver在插入到MongoDB时,自动的生成
      //obj是BSONObject,是MongoDB Driver接收的插入对象
      (null, obj)
    })
    //RDD data is a KV pair,so it can use saveAsNewAPIHadoopFile
    rdd.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
  }
}

 

例子程序使用了MongoDB提供的Hadoop-MongoDB-Connector来实现将RDD写入到MongoDB中,注意的是,运行这个程序无需启动Hadoop(指的是HDFS),也就是把MongoDB作为一个外部数据源,通调用rdd.saveAsTextFile("file:///localfilesystemfile")的道理一样,通过这个Connector,MongoDB实现了作为HDFS格式兼容的数据源,因此Spark可以直接操作之。

 

2. 代码的依赖

上面的代码要运行,依赖两个jar包

  • MongodB-Hadoop-Connector
  • MongoDB Driver

分别可以从如下地址下载:https://github.com/plaa/mongo-spark和https://github.com/mongodb/mongo-java-driver/releases

 

3.关于mongo.output.uri和mongo.input.uri

它们的含义可以参见http://www.itpub.net/thread-1808762-1-1.html

启动mongo.output.uri的意思是指specify an output collection URI to write the output to. If using a sharded cluster, you can specify a space-delimited list of URIs referring to separate mongos instances and the output records will be written to them in round-robin fashion to improve throughput and avoid overloading a single mongos.

也就是说,mongo.output.uri是一个Spark输出数据的集合的uri,例子中"mongodb://192.168.26.137:27017/spark.ic",spark.ic的含义是spark是MongoDB的数据库的名字,而ic表示数据库中的一个Collection

 

4. 数据库和集合需要预先建立吗?

在上面的例子中,spark数据库和ic/oc集合都不需要预先建立,MongoDB Driver会首先判断,要写入数据的数据库和集合是否存在,如果不存在则会首先建立

 

5. 如何设置数据库的用户名密码?

在实际工作中,MongoDB的数据库都是有用户名密码的,也就是在指定mongo.output.uri和mongo.input.uri时,都要带着用户名和密码信息,这如何指定?

可以使用如下方式来指定:

 

 

mongodb://joe:123456@192.168.26.137:27017/spark.ic?readPreference=secondary
使用joe和123456作为用户名密码认证,同时指定读优先策略为secondary(从slave节点上读取)

 

 

参考:

http://codeforhire.com/2014/02/18/using-spark-with-mongodb/

http://my.oschina.net/1987times/blog/265483

https://github.com/plaa/mongo-spark

 

 

分享到:
评论

相关推荐

    Spark-Mongodb是一个库允许用户利用SparkSQL读写数据至MongoDB集合

    Spark-MongoDB库将这两者完美结合,实现了在Spark上对MongoDB数据集的高效读写。 首先,让我们详细了解一下如何在Spark中使用Spark-MongoDB库进行数据读取。要连接到MongoDB,你需要配置MongoDB的URI(包括数据库名...

    spark 连接 mongodb 使用例子

    本示例将讨论如何使用Java语言在Spark中连接MongoDB进行CRUD(创建、读取、更新和删除)操作,同时涉及到Spring框架和Maven构建工具。 首先,我们需要确保已安装并配置了Spark、Java、MongoDB以及Maven。在Java项目...

    spark连接mongodb

    标题中的"Spark连接MongoDB"指的是使用Apache Spark与MongoDB之间的数据交互,这通常涉及到大数据处理和NoSQL数据库的结合。Spark作为一个强大的分布式计算框架,能够高效地处理大规模数据,而MongoDB则是一个灵活、...

    spark 3.4.2 mongodb 整合依赖包

    现在,你可以使用`DataFrameReader`和`DataFrameWriter`来读写MongoDB数据了。例如,读取数据: ```scala val df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load() ``` 写入数据: ```scala ...

    MongoDB+Spark.pdf

    该连接器支持双向的数据读写操作,使得Spark可以直接访问MongoDB中的数据。在Apache Spark的执行环境中,连接器能够将MongoDB中的数据作为RDD(弹性分布式数据集)进行操作,反之,也可以将Spark的RDD保存回MongoDB...

    Spark MongoDB解决方案.pptx

    因此,引入MongoDB官方提供的`MongoDBSparkConnector`组件,能够实现Spark与MongoDB之间的高效双向读写,解决了这个问题。 **MongoDBSparkConnector** `MongoDBSparkConnector`使得Spark可以直接读取和写入MongoDB...

    MongoDB Spark - Mongo首席技術架構師唐建法

    4. 高性能:MongoDB在读写性能上有出色表现,尤其是对于大数据集,可以实现毫秒级的响应时间。 5. 全局部署:MongoDB支持全球部署,能够轻松地在多个数据中心之间进行数据复制和同步。 Spark是一个开源的集群计算...

    大数据课程的期末项目,基于spark、hadoop hdfs、mongodb,使用scala,进行电影推荐.zip

    该项目是一个大数据课程的期末项目,主要利用了Spark、Hadoop HDFS和MongoDB等技术,通过Scala编程语言来实现电影推荐系统。这个系统是基于大数据处理的,因此涉及到的知识点非常广泛,涵盖了分布式计算、数据存储、...

    基于MongoDB+Spark+ElasticSearch的电影推荐系统.zip

    《基于MongoDB+Spark+ElasticSearch的电影推荐系统》是一个集成大数据处理、推荐算法与实时检索的综合项目,适用于毕业设计、课程设计以及个人技能提升。在这个项目中,MongoDB作为非关系型数据库用于存储海量的电影...

    基于机器学习+Spark2.0+MongoDB实现的协同过滤推荐系统.zip

    - 高性能读写:推荐系统需要频繁地读取和更新用户行为数据,MongoDB的高性能读写能力可以满足这种需求。 - 灵活的数据模型:MongoDB的文档型数据模型允许快速地添加新属性,适应推荐系统的需求变化。 5. **项目...

    Spark整合Mongodb的方法

    Spark整合MongoDB是一种高效的数据处理方式,它结合了Spark的并行处理能力与MongoDB的非结构化数据存储优势。以下将详细介绍Spark和MongoDB的整合过程、关键知识点以及一个简单的示例代码。 **1. Spark简介** Spark...

    46488_Spark大数据技术与应用_习题数据和答案.rar

    - Spark与这些系统的集成方式,如Spark读写HDFS - Spark与NoSQL数据库的配合,如Cassandra、MongoDB - Spark与YARN、Mesos的资源管理机制 7. **第8章**:Spark项目实战与性能调优 - 如何设计Spark项目,包括...

    基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现.zip

    通过MongoDB的API,我们可以方便地进行数据的读写操作,从而实现数据的实时更新和快速检索。 推荐系统的构建通常包括以下步骤: 1. **数据收集**:MongoDB用于存储用户购买历史、浏览记录、评价等多元数据。 2. **...

    基于Spark+Flask+Mongodb的在线电影推荐系统设计与实现.zip

    《基于Spark+Flask+MongoDB的在线电影推荐系统设计与实现》 在这个项目中,开发者巧妙地结合了Apache Spark、Python Flask框架以及MongoDB数据库,构建了一个实用的在线电影推荐系统。这样的组合使得系统在大数据...

    计算机课程毕设:基于机器学习+Spark2.0+MongoDB实现的协同过滤推荐系统.zip

    在这个项目中,Spark 2.0 被用作大数据处理框架,MongoDB 则作为非关系型数据库存储用户行为数据。 1. **Spark 2.0**: Spark 是一种快速、通用且可扩展的大数据处理框架,2.0 版本在前一版本的基础上进行了优化,...

Global site tag (gtag.js) - Google Analytics