`
wx1569466809
  • 浏览: 72077 次
文章分类
社区版块
存档分类
最新评论

Spark图计算API简介

 
阅读更多

一、属性操作符:

graph中提供了对vertex,edge和triplet的map操作,类似于RDD中的map操作:

def mapVertices[VD2](map:(VertexId, VD)=> VD2): Graph[VD2, ED]

def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

使用这些方法不会改变图的结构,所以这些操作符可以利用原有的图的structural indicies。所以不要用graph.vertices.map的方法来实现同样的操作。

mapEdges: transform each edge attribute in the graph using the map function.

实例:注意在mapEdges中使用的函数里,输入参数x是一个Edge对象,返回对象则是Edge的属性对象。在例子中,属性对象的类型并没有改变,(都是String)但属性的值有所变化。也可以变成其它的类型的对象。

val sheyouGraph = graph.mapEdges(x => {if("roommate".equals(x.attr)) "sheyou" else x.attr})

 

mapVertices: transform each vertex attribute in the graph using the map function

跟mapEdges类似,mapVerticies中传入的对象也是Vertex的实例化对象,返回值也是顶点的属性对象:

val oneAttrGraph = graph.mapVertices((id, attr) => {attr._1+ " is:"+attr._2})

mapTriplets: Transforms each edge attribute using the map function, passing it the adjacent(临近的) vertex attributes as well.

也就是在mapTriplets中,与mapEdges不同的地方仅仅在于可以使用的作为map条件的东西多了邻近的顶点的属性,最终改变的东西仍然是edge的属性。如果转换中不需要根据顶点的属性,就直接用mapEdges就行了。

什么是Triplet:

Triplet的全称是EdgeTriplet,继承自Edge,所代表的entity是:An edge along with the vertex attributes of its neighboring vertices. 一个EdgeTriplet中包含srcId, dstId, attr(继承自Edge)和srcAttr和dstAttr五个属性。

graph.mapTriplets(triplet => {.....})

二、Structural Operators:

1. subgraph:

方法的定义:

def subgraph(

    epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

    vpred: (VertexId, VD) => Boolean = ((v, d) => true)

): Graph[VD, ED]

返回的对象是一个图,图中包含着的顶点和边分别要满足vpred和epred两个函数。(要注意,顶点和边是完全不同的概念,如果一个边被砍掉了,这个边关联的两个顶点并不会受影响)

要注意,在图里,如果一个顶点没了,其对应的边也就没了,但边没了之后,点不会受影响。

所以,subgraph一般用于:restrict the graph to the vertices and edges of interest或者eliminate broken links.

2. joinVertices/outerJoinVerticies:

有时候需要从外部的RDD中跟Graph做数据的连接操作。例如:外部的user属性想要跟现有的graph做一个合并,或者想把图的顶点的属性从一个图迁移到另一个图中。这些可以用join来完成。

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

joinVertices: 将顶点跟输入的RDD[(VertexId, U)]做关联,返回一个新的图。新的图的属性的类型跟原图是一样的,但值可以改变;在mapFunc中,可以使用原来的图的顶点属性和输入的RDD的顶点属性U来计算新的顶点属性。输入的RDD中每个vertex最多只能有一个vertex。如果原图在input table中没有对应的entry,则原来的属性不做改变。

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

从函数的定义可以看出,该操作不会改变vertex的属性的类型,但值是可以改变的。比如first name需要加上last name。

事实上,joinVerticies方法的实现中就使用了outerJoinVerticies方法:

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = {

    val uf = (id: VertexId, data: VD, o: Option[U]) => {

        o match {

            case Some(u) => mapFunc(id, data, u) case None => data

        }

    }

    graph.outerJoinVertices(table)(uf)

}

outerJoinVertices与joinVertices很类似,但在map方法中可以修改vertex的属性类型。由于并非所有的图的顶点都一定能跟传入的RDD匹配上,所以定义mapFunc的时候使用了option选项。对于joinVerticies方法,如果某个顶点没有跟传入的RDD匹配上,就直接用原有的值。因为joinVerticies并不改变顶点的数据类型(有没有忘了option跟Some、None之间的爱恨情仇?使用Option的时候一定离不开match,要注意match的语法)。

val outDegGraph = graph.outDegrees

val degGraph = graph.outerJoinVertices(outDegGraph){    

    (id, oldAttr, outDeg) => {    

        outDeg match{

            case Some(outDeg) => outDegcase None => 0

        }

    }

}

3. aggregateMessages(原来的名字叫做mapReduceTriplets):

如果需要将顶点跟其邻居的信息集成起来,可以使用aggregateMessages方法。比如,想要知道有多少人follow了一个用户,或者follow用户的平均年龄。

函数的定义:

def aggregateMessages[A: ClassTag](

    sendMsg: EdgeContext[VD, ED, A] => Unit,

    mergeMessage: (A, A) => A,

    tripletFields: TripletFields = TripletFields.ALL

): VertexRDD[A]

跟mapReduceTriplets的定义很类似:

def mapReduceTriplets[Msg](

    map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],

    reduce: (Msg, Msg) => Msg

):VertexRDD[Msg]

VertexRDD继承了RDD[(VertexId, VD)],本身自带的泛型VD。

API:aggregate values from the neighboring edges and vertices of each vertex.

方法中的sendMsg是在图中的每个边都会被调用的,用于将message发送给相邻顶点。

mergeMsg用于将sendMsg中发送给同一个顶点的message做组合。

tripletFields: 那些fields可以用于EdgeContext中,可用的值包括TripletFields.None, TripletFields.EdgeOnly, TripletFields.Src, TripletFields.Dst, TripletFields.ALL。默认为ALL,也就是所有的信息都要用,如果只需要用部分数据,可以单独选择部分属性发送,可以提升计算效率。

其中,Src和Dst分别会将source和destination field进行传递,而且都会添加edge fields:

如果TripletFields中传入的资源少了,也就是在sendMsg中需要使用到的信息并没有包含在TripletFields中,可能会报空指针异常。

使用实例:计算graph的出度或入度:

val inDeg: RDD[(VertexId, Int)] = graph.aggregateMessages[Int](edgeContext => edgeContext.sendToDst(1), _+_)

由于这里我们并没有用到edgeContext中的任何属性,所以其实也可以在参数中添加TripletFields.None,从而提高一点执行效率:

graph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _+_ , TripletFields.None)

TIPS: 什么是EdgeContext:EdgeContext中会将source和destination属性以及Edge属性都暴露出来,包含sendToSrc和sendToDst来将信息发送给source和destination属性。

4. reverse: return a new graph with all edge directions reversed.

调用方法:

val reverseGraph = graph.reverse

5. mask

mask用于创建一个子图,子图中包含在输入的图中也包含的顶点和边。该方法通常跟subgraph方法一起使用,来根据另一个关联的图来过滤当前的图中展示的数据。

6. groupEdges:

groupEdges用于将多重图中的相同的顶点之间的边做合并(除了属性其实没其他可以合并的)。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

7. 一组collect:

graph中有多个collect算法,包括collectEdges, collectNeighbours, collectNeighbourIds等

collectEdges: return an RDD that contains for each vertex its local edges

8. Pregel API:

图本身就是内在的递归的数据结构,因为一个顶点的属性可能依赖于其neighbor,而neighbor的属性又依赖于他们的neighbour。所以很多重要的图算法都会迭代计算每个顶点的属性,直到达到一个稳定状态。

GraphX中的Pregel操作符是一个批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于图的拓扑结构(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作为edge triplet的一个函数并行计算的,message的计算可以使用source和dest顶点的属性。没有收到message的顶点在super step中被跳过。迭代会在么有剩余的信息之后停止,并返回最终的图。

pregel的定义:

def pregel[A]

    (initialMsg: A,//在第一次迭代中每个顶点获取的起始

    msgmaxIter: Int = Int.MaxValue,//迭代计算的次数

    activeDir: EdgeDirection = EdgeDirection.Out

)(

    vprog: (VertexId, VD, A) => VD,//顶点的计算函数,在每个顶点运行,根据顶点的ID,属性和获取的inbound message来计算顶点的新属性值。顶一次迭代的时候,inbound message为initialMsg,且每个顶点都会执行一遍该函数。以后只有上次迭代中接收到信息的顶点会执行。

    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//应用于顶点的出边(out edges)用于接收顶点发出的信息

    mergeMsg: (A, A) => A//合并信息的算法

)

算法实现的大致过程:

var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根据initMsg在每个顶点执行一次vprog算法,从而每个顶点的属性都会迭代一次。

var messages = g.mapReduceTriplets(sendMsg, mergeMsg)

var messagesCount = messages.count

var i = 0

while(activeMessages > 0 && i < maxIterations){

    g = g.joinVertices(messages)(vprog).cache

    val oldMessages = messages

    messages = g.mapReduceTriplets(

        sendMsg,

        mergeMsg,

        Some((oldMessages, activeDirection))

    ).cache()

    activeMessages = messages.count

    i += 1

}

g

pregel算法的一个实例:将图跟一些一些初始的score做关联,然后将顶点分数根据出度大小向外发散,并自己保留一份:

//将图中顶点添加上该顶点的出度属性

val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){

    case (vid, name, deg) => (name, deg match {

        case Some(deg) => deg+0.0

        case None => 1.25}

    )

}//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

    case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//将图与初始分数做关联

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

    case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))

算法的第一步:将0.0(也就是传入的初始值initMsg)跟各个顶点的值相加(还是原来的值),然后除以顶点的出度。这一步很重要,不能忽略。 并且在设计的时候也要考虑结果会不会被这一步所影响。

9. 计算图的度、入度和出度:

graph.degrees

graph.outDegrees

graph.inDegrees

返回的对象是VertexRDD[Int]

注意的是返回的RDD对象中,度为0的顶点并不包含在内。

10. filter方法:先计算一些用于过滤的值(preprocess),然后在使用predicate进行过滤。

def filter[VD2: ClassTag, ED2: ClassTag](

    preprocess: Graph[VD, ED] => Graph[VD2, ED2],

    epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,

    vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true

):Graph[VD, ED] = {

    graph.mask(

        preprocess(graph).subgraph(epred, vpred)

    )

}

preprocess:a function to compute new vertex and edge data before filtering

要注意最后返回的图跟传入的图的顶点和边的属性类型是一样的。

该方法可以用于在不改变顶点和边的属性值(要注意的是,在preprocess中,使用graph的时候可能会有类似于修改graph操作的api调用,但在调用的过程中,graph本身的值不会发生改变。比如在下边的例子的中,graph做了一个跟其degree关联的操作,但graph本身的值没有任何变化)的情况下对图进行基于某些属性的过滤。这些属性的值可以是计算得来的。例如,删除图中没有出度的顶点:

graph.filter(

    graph => {

        val degrees: VertexRDD[Int] = graph.outDegrees

        graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}

    },

    vpred = (vid: VertexId, deg:Int) => deg > 0

)

11. groupEdges:合并两个顶点中的多条边称为一条边。要获取正确的结果,graph必须调用partitionBy来做partition。这是因为该操作假定需要一起合并的边都分布在同一个partition上。所以在调用groupEdges之前必须调用partitionBy。

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],也就是对边的属性做合并操作。

partitionBy: repartitions the edges in the graph according to partitionStrategy.

def partitionBy(partitionStrtegy: PartitionStrategy): Graph[VD, ED]

12. convertToCanonicalEdges:将双向变转化为单向边。

具体的算法是:将所有边都转化成srcId小于dstId的边,然后合并多余的边。

二、Graph Builders:

http://spark.apache.org/docs/latest/graphx-programming-guide.html#graph_builders

GraphX提供了一组使用vertex和edge的集合来构建一个图的方法。这些Graph Builder默认不会对边做repartition,边一般留在其原来的默认的partition中,例如其原来的HDFS的block。

1. GraphLoader.edgeListFile:

用于从一组edge(每个edge中包括简单的source id和destination id)中来构建一个graph,自动创建其中涉及的顶点,顶点和边的属性都设置为1。

def edgeListFile(

    sc: SparkContext,

    path: String,

    canonicalOrientation: Boolean = false,

    minEdgePartitions: Int = 1,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[Int, Int]

canonicalOrientation参数可以强制让边按照srcId

2. Graph.apply:使用顶点和边的RDD对象来创建一个图。重复的顶点被任意抛弃,edgeRDD中有而verticiesRDD中没有的顶点会被赋予一个默认的属性值。

def apply[VD, ED](

    vertices: RDD[(VertexId, VD)],

    edges: RDD[Edge[ED]],

    defaultVertexAttr: VD = null,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

3. Graph.fromEdges: 从单独的一个边的RDD中构建一个图。自动创建边中使用的顶点,并赋予默认值。

def fromEdges[VD, ED](

    edges: RDD[Edge[ED]],

    defaultValue: VD,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, ED]

4. Graph.fromEdgeTuples: 使用一个edge tuple的RDD创建图。边的值默认为1,顶点自动创建并赋予默认值。该方法也支持对边的deduplication(也就是去重)。如果发现多个相同的边,就将他们合并,属性值计算他们的和。或者将重复的边当做多条边。

def fromEdgeTuples[VD](

    rawEdges: RDD[(VertexId, VertexId)],

    defaultValue: VD,

    uniqueEdges: Option[PartitionStrategy] = None,

    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

): Graph[VD, Int]

 

转载于:https://my.oschina.net/u/3658076/blog/3076981

分享到:
评论

相关推荐

    spark 2.0.1 JavaAPI

    5. **GraphX**:用于处理图形数据,提供了图计算的API。 在Java API中,主要涉及以下关键概念和类: 1. **SparkConf**:配置Spark应用的基本设置,如appName、master URL等。 2. **SparkContext**:Spark应用的...

    spark_API文档

    在实际应用中,开发人员通常会结合使用Spark API、Scala编程技巧以及第三方库,例如Breeze用于数值计算,或者Algebird和Tungsten用于优化计算性能。通过深入理解Spark API,开发者能够构建出高效、可扩展的大数据...

    spark2.1.0 JAVA API

    Spark 2.1.0是Apache Spark的一个重要版本,它为大数据处理提供了高效、易用的计算框架。在Java API方面,Spark提供了丰富的类库,使得开发者能够利用Java语言便捷地构建分布式数据处理应用。本篇文章将深入探讨...

    Spark2.0.2API

    Spark 2.0.2 的 API 提供了对数据处理、机器学习、图形处理和SQL查询的支持,极大地简化了分布式计算的工作流程。 首先,Spark的核心组件是RDD(Resilient Distributed Datasets),这是一个弹性分布式数据集,可以...

    spark-scala-api

    Spark Scala API 是一个用于大数据处理的强大工具,它结合了Apache Spark的高性能计算框架与Scala编程语言的简洁性和表达力。这个zip压缩包很可能是包含了Spark的Scala开发接口及相关示例,便于开发者在Scala环境中...

    Spark 1.0.0 API (java)

    Spark 1.0.0版本是其发展中的一个重要里程碑,为开发者提供了强大的分布式计算能力,特别是对于Java开发者而言,Spark提供了丰富的Java API,使得在Java环境中进行大数据处理变得更加便捷。 ### 1. Spark核心概念 ...

    spark-2.1.0 api文档

    5. **GraphX**:Spark的图计算框架,用于处理图形数据,提供了图的创建、操作和分析功能,适用于社交网络分析、推荐系统等领域。 6. **Spark R**:Spark R是Spark与R语言的集成,允许用户在Spark集群上运行R代码,...

    基于Spark REST Api实现spark的任务创建、查询、终止等+源代码+文档说明

    - 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,...

    spark_api_1.3.1

    在 Spark 1.3.1 中,GraphX 进一步强化了图的创建、转换和分析功能,支持图的遍历和属性图计算。 总的来说,Spark API 1.3.1 为开发者提供了一个全面的大数据处理工具箱,它在批处理、流处理、SQL 查询和机器学习等...

    Spark api chm格式下载.rar

    5. **GraphX**:用于图计算的API,可以处理图形数据并支持图形算法。 在使用Spark API时,开发者会接触到的关键概念有: - **RDD(Resilient Distributed Dataset)**:弹性分布式数据集,是Spark的基础数据抽象,...

    Spark 1.0.2 API (Java)

    这个API提供了在Java中使用Spark的强大工具,使得数据处理、分析和并行计算变得更加高效和便捷。Spark作为一个分布式计算框架,其核心特性在于内存计算,允许数据在内存中快速迭代,从而显著提高了大规模数据处理的...

    电影评分数据汇总(使用spark2.4+scala, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip

    Spark实战高手之路-第5章Spark API编程动手实战(1)

    ### Spark实战高手之路-第5章Spark API编程动手实战(1) #### 一、基础知识概述 **Spark**作为一项先进的大数据处理技术,在云计算领域占据着举足轻重的地位。本书《Spark实战高手之路》旨在帮助读者从零开始,...

    spark2.1.0.chm(spark java API)

    《Spark 2.1.0 Java API 深度解析》 Spark,作为一个分布式计算框架,因其高效、灵活和易用的特性,在大数据处理领域深受青睐。Spark 2.1.0版本对Java API进行了全面优化,使得Java开发者能够更加便捷地利用Spark...

    springboot与spark整合开发, 练习spark api

    接下来,Spark作为一个强大的分布式计算系统,提供了一整套API用于处理大规模数据,包括SQL查询、机器学习、图计算等。Spark的核心特性是内存计算,它能够将数据缓存到内存中,从而提高了数据处理的速度。 整合...

    讯飞python操控大模型lc-sp-sparkAPI-1709535448185.zip

    标题中的“讯飞python操控大模型lc-sp-sparkAPI-1709535448185.zip”表明这是一个与科大讯飞相关的项目,它涉及到使用Python语言来控制大型机器学习模型,并且可能利用了Spark API。这个项目可能是一个数据处理或...

    spring boot + scala + spark http驱动spark计算

    标题中的“spring boot + scala + spark http驱动spark计算”揭示了一个使用现代技术栈构建的数据处理系统。这个系统基于Spring Boot框架来提供HTTP服务,利用Scala作为编程语言,并借助Apache Spark进行大数据计算...

    spark图计算应用解析

    ### Spark图计算应用解析 #### 一、SparkGraphX概述 **1.1 什么是SparkGraphX** SparkGraphX是Apache Spark生态系统中的一个重要组件,它为处理大规模图数据提供了高效的支持。SparkGraphX的设计旨在利用Spark的...

    Spark 1.0.2 API (Scala)

    理解并熟练运用这些API,能够帮助开发者充分利用Spark的强大功能,处理大规模数据,实现快速计算。通过掌握RDD、DataFrame、Spark Streaming等核心概念,以及相应的转换和行动操作,开发者可以灵活地构建数据处理...

    Spark python API 函数调用学习

    Spark Python API,通常被称为PySpark,是Apache Spark与Python编程语言的接口,使得开发者能够利用Python的简洁性和Spark的强大计算能力处理大规模数据。PySpark在大数据处理领域具有广泛的应用,尤其适合于数据...

Global site tag (gtag.js) - Google Analytics