SPARK GRAPHX 学习
Pregel的计算模型
Pregel计算模型三要素:
- 作用于每个顶点的处理逻辑 vertexProgram
- 消息发送,用于相邻节点间的通讯 sendMessage
- 消息合并逻辑 messageCombining
Pregel API
图是固有的递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又取决于邻居的属性。因此,许多重要的图形算法迭代地重新计算每个顶点的属性直到达到一个定点条件。已经提出了一系列图平行抽象来表达这些迭代算法。GraphX公开了Pregel API的变体。
在高层次上,GraphX中的Pregel操作符是一种限制于图形拓扑的批量同步并行消息抽象 。Pregel算子在一系列超级步骤中执行,其中顶点从前一个超级步骤接收入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与Pregel不同,消息作为边三元组的函数并行计算,消息计算可以访问源和目标顶点属性。不接收消息的顶点在超级步骤内跳过。Pregel运算符终止迭代,并在没有剩余消息时返回最终图形。
请注意,与更多标准Pregel实现不同,GraphX中的顶点只能将消息发送到相邻的顶点,并且使用用户定义的消息传递函数并行完成消息构造。这些约束允许在GraphX中进行额外的优化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class GraphOps[VD, ED] { def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() var i = 0 while (activeMessages > 0 && i < maxIterations) { g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages messages = g.mapReduceTriplets( sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() activeMessages = messages.count() i += 1 } g } }
|
请注意,Pregel有两个参数列表(即,graph.pregel(list1)(list2)
)。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认沿着边缘)。第二个参数列表包含接收消息(顶点程序vprog
),计算消息(sendMsg
)和组合消息的用户定义函数 mergeMsg
。
例子-使用Pregel计算最短路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import org.apache.spark.graphx.{Graph, VertexId} import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) val sourceId: VertexId = 42
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist), triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) println(sssp.vertices.collect.mkString("\n"))
|
PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中其他网页而言的重要程度。他由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现将连接价值作为排名因素。
”在互联网上,如果一个网页被很多其它网页所链接,说明它受到普遍的承认和依赖,那么它的排名就很高。“ (摘自数学之美第10章)
Spark verison: 2.1
Scala version: 2.11
1
| org.apache.spark.graphx.lib.PageRank.scala
|