基于Spark的GraphX.pptx
1. Property Graph:用户定义的有向图,图中的每个顶点和每条边都附加一个用户定义的对象,允许在两个顶点之间并行存在多条边。每个顶点都具有一个64位的唯一标识(VertexID),GraphX并不强制VertexID有序。每条边则由起始和终止VertexID标识。
Graph具有两个参数化的类型:Vertex(VD)和Edge(ED),分别对应附加在顶点和边上的对象。当VD和ED为基本的数据类型时,Graph将把它们保存在数组中。
Graph和RDD一样(spark的基本数据类型,Resilient Distributed Dataset),创建之后不可再改变,分布式存储在集群上,并且具有容错能力。对图中结构和值的改变,都将需要产生一个新的Graph对象,新的Graph将与之前的Graph共享大部分数据结构。Graph通过顶点分割方法,分割在不同的机器上。任何数据分片所在机器的失败都将引发该数据分片在其它机器上重新创建。
逻辑上Graph包含VertexRDD和EdgeRDD,即:
class Graph[VD,ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED,VD]
}
其中,VertexRDD[VD]和EdgeRDD[ED,VD]分别是RDD[VertexID,VD]和RDD[Edge[ED]]经过优化(extends)后的版本,提供了图计算相关功能,并做了内部优化。
2. Graph类的成员变量
class Graph[VD, ED] {
//Graph的基本信息:边数,顶点数,入度,出度,度
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
//Graph的顶点RDD,边RDD,以及三元组RDD
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED, VD]
val triplets: RDD[EdgeTriplet[VD, ED]]
}
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
上面的每个操作都将改变Graph中vertex和edge特性,并产生一个新的Graph
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse操作返回一个愿图中边的方向反转的新图。由于该操作没有改变顶点和边的特性,所以不需要数据的移动
subgraph操作返回连接的点和边满足vpred和epred构成的子图
mask操作返回两个图相交的子图,groupEdges操作合并重复的边
5. 连接操作
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]}
joinVertices操作,连接顶点和输入的RDD,然后对连接得到的顶点,应用用户定义的map函数,若在RDD中没有匹配连接的顶点,则保持顶点原有的值不变
outerjoinVertices类似于joinVertices,只是用户定义的map函数应用于所有的顶点,且可以改变顶点的类型
其中f(a)(b)的写法类似于f(a,b),只是参数b的类型取决于a
6. 邻域聚合
GraphX中,经过深度优化的核心聚合操作是mapReduceTriplets
class Graph[VD, ED] {
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A)
: VertexRDD[A]}
mapReduceTriplets接收一个用户定义的map函数,应用于Graph的每一个三元组,产生消息(message)給三元组中的任意顶点。为了便于预先聚合优化,暂时只支持給其中一个顶点发送消息。随后,用户定义的reduce函数结合发送给每一个顶点的消息。最终返回VertexRDD[A],没有收到消息的顶点不包含在该结果之中
mapRedeceTriplets还包含一个可选的参数:activeSetOpt,指定执行map操作的顶点集合
7. 在spark中,RDD默认是不会一直保存在内存中的,为了避免重复计算,需要显式的指定:Graph.cache(),显式指定保存在内存中的RDD只有在系统内存不足时,才会强制采用LRU(least recently uesd)方式调出内存。然而,对于迭代计算则应该uncaching迭代产生的中间数据,因此,在进行图的迭代计算时,推荐采用Pregel API,它会自动的unpersist不需要的中间结果。
8. GraphX Pregel API
图天然就是一个递归的数据结构,图中顶点的特性取决于它们邻域顶点的特性,反过来又影响其邻域顶点的特性。因此,很多重要的图算法都需要迭代计算每个顶点的特性,直到收敛。GraphX提供类似于Pregel的操作,其是Google Pregel和GraphLab框架抽象的结合。
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A, //初始消息,最大迭代次数,消息传递方向
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
var i = 0
while (activeMessages > 0 && i < maxIterations) {
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
9. 创建Graph
GraphX提供了根据顶点和边RDD或者从磁盘上创建图的方法。默认情况下,图构建器不会重新分割图的边,即边将留在它们起始分片所在机器。然而,Graph.groupEdges要求将图重新分片,因为该操作假设相同的边处在相同的分片中。所以需先调用Graph.partitionBy操作。
GraphLoader.edgeListFile操作,从磁盘加载图,解析sourceVD destinationVD,跳过#开始的注释行,顶点值默认为1
10.VertexRDD和EdgeRDD
GraphX提供Graph的VertexRDD和EdgeRDD,由于GraphX对顶点和边的数据结构进行了优化,因此还提供一些额外的功能。Vertex[A]继承自RDD[VertexID,A],并且约束VertexID只能出现一次,采用哈希表的方式存储顶点属性A。EdgeRDD继承自RDD[Edge[ED]]依据策略PartitionStrategy,将边保存在分块中。在每个分块中,边的结构和属性保存在不同的结构中。
相关推荐
《基于Spark-Graphx的大规模用户图计算和应用》是一份深入探讨如何使用Apache Spark的GraphX组件进行大规模用户图计算的完整高清资料。Spark作为一个快速、通用且可扩展的数据处理框架,其GraphX模块为处理图形数据...
5. **图并行计算**:基于Spark的分布式计算能力,GraphX可以并行处理大规模图数据,充分利用集群资源,提高计算效率。其数据分区策略确保了计算的高效性和可扩展性。 6. **图挖掘应用**:在社交网络分析中,GraphX...
在GraphX之前,存在两种主要的图计算框架:Pregel和GraphLab。Pregel框架基于BSP(Bulk Synchronous Parallel)模型,适用于大规模图数据的处理,而GraphLab则提供了一个更为高级的编程抽象。 二、GraphX简介 ...
本书旨在详细介绍Apache Spark中的图计算框架GraphX。随着大数据分析需求的增长,图计算作为一种处理复杂数据关系的有效手段,逐渐成为研究热点。在这一背景下,《Spark GraphX in Action》这本书的出现显得尤为及时...
在图计算的背景下,GraphX是一个并行计算框架,它继承了Pregel、Giraph、GraphLab等系统的特性,这些系统都提供了用于简化基于图的编程的API。GraphX的出现是为了解决多个图计算系统并存时带来的学习、部署与管理的...
《基于Spark图计算的社会网络分析系统的设计和实现——顶点分析》这篇论文主要探讨了如何利用Spark的图计算能力来设计和实现一个用于社会网络分析的系统,特别是针对顶点分析这一关键环节。社会网络分析(SNA)在...
### 大数据处理与Spark计算框架 #### 一、大数据生态系统及Spark的作用 **大数据处理流程**主要包括以下几个步骤: 1. **数据采集**:通过各种传感器、设备、社交媒体等方式收集原始数据。 2. **数据存储**:将...
"基于Spark平台支持空间数据管理的图计算框架的研究"这一主题深入探讨了如何借助Apache Spark的强大能力,构建一个适用于处理空间数据的图计算框架。 Apache Spark是一个开源的大数据处理框架,以其高效、易用和多...
1. **Spark核心概念**:Spark的核心组件包括Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算)。项目可能重点使用了Spark Core和Spark Streaming,用于数据处理和实时流计算。 2. **...
dga-graphX软件包包含使用GraphX框架在Spark上构建的几种预构建的可执行图算法。 前提条件 [Spark]( )0.9.0或更高版本 [graphX]( ) [Gradle]( ) 建造 如有必要,请编辑build.gradle文件以设置您的spark和...
GraphX是Spark生态中的一个重要组件,它是基于Spark构建的,用于大规模图计算。GraphX提供了丰富的图数据操作符,能在多个分布式集群上进行图计算运算,并且拥有丰富的API接口。GraphX在大规模图数据处理方面具有...
Spark的核心组件包括Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX,这些组件相互协作,可以支持批处理、交互式查询、实时流处理和图形计算等多种应用场景。 在这个电影点评系统中,Spark ...
总结来说,基于Spark GraphX的神经网络构建利用了Spark的并行计算能力和GraphX的图数据结构,能够处理大规模的神经网络模型。这种方式不仅提高了计算效率,还能方便地扩展到分布式环境。然而,需要注意的是,虽然...
基于Spark-GraphX的大规模加权图最短路径查询方法(LSGSP-SG)的核心在于分布式计算。具体步骤包括: 1. **初始化阶段**:加载经过分割和标记的大规模图数据到Spark集群中。每个计算节点负责处理分配给它的子图数据...
Spark GraphX提供了一个高效的图处理API,允许开发者在Spark的分布式内存计算框架上进行图计算。它不仅支持静态图的处理,还能够处理动态图,适应不断变化的数据网络。在GraphX中,图被表示为一个弹性分布式数据集...
相较于基于磁盘的计算框架,Spark在内存计算方面具有明显优势。 4. 提出的优化算法GXDSGC(GraphX-based Distributed Structural Graph Clustering) 为了克服Hadoop MapReduce框架在执行图聚类时遇到的性能瓶颈,...
- **Spark GraphX**:处理图数据,适用于社交网络分析等场景,但在这个项目中可能未涉及。 4. **数据处理流程**:项目可能包含数据的抓取、清洗、转换、聚合等步骤,这些通常在Spark的DataFrame或RDD(弹性分布式...