0%

SPARK GRAPHX 学习

SPARK GRAPHX 学习

Pregel的计算模型

Pregel计算模型三要素:

  1. 作用于每个顶点的处理逻辑 vertexProgram
  2. 消息发送,用于相邻节点间的通讯 sendMessage
  3. 消息合并逻辑 messageCombining

Pregel API

​ 图是固有的递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又取决于邻居的属性。因此,许多重要的图形算法迭代地重新计算每个顶点的属性直到达到一个定点条件。已经提出了一系列图平行抽象来表达这些迭代算法。GraphX公开了Pregel API的变体。

​ 在高层次上,GraphX中的Pregel操作符是一种限制于图形拓扑的批量同步并行消息抽象 。Pregel算子在一系列超级步骤中执行,其中顶点从前一个超级步骤接收入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与Pregel不同,消息作为边三元组的函数并行计算,消息计算可以访问源和目标顶点属性。不接收消息的顶点在超级步骤内跳过。Pregel运算符终止迭代,并在没有剩余消息时返回最终图形。

请注意,与更多标准Pregel实现不同,GraphX中的顶点只能将消息发送到相邻的顶点,并且使用用户定义的消息传递函数并行完成消息构造。这些约束允许在GraphX中进行额外的优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// 为每个顶点初始化消息
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// 第一次调用消息传递
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// 循环直到没有消息或者达到最大迭代次数
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接收消息更新顶点信息
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// 发送消息, 跳过没有接收到消息的边
// 我们必须缓存消息,以便下次迭代时能够使用到
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}

请注意,Pregel有两个参数列表(即,graph.pregel(list1)(list2))。第一个参数列表包含配置参数,包括初始消息,最大迭代次数以及发送消息的边缘方向(默认沿着边缘)。第二个参数列表包含接收消息(顶点程序vprog),计算消息(sendMsg)和组合消息的用户定义函数 mergeMsg

例子-使用Pregel计算最短路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

PageRank 算法

什么是PageRank

PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中其他网页而言的重要程度。他由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现将连接价值作为排名因素。

PageRank的核心思想

”在互联网上,如果一个网页被很多其它网页所链接,说明它受到普遍的承认和依赖,那么它的排名就很高。“ (摘自数学之美第10章)

Spark PageRank代码走读

Spark verison: 2.1

Scala version: 2.11

1
org.apache.spark.graphx.lib.PageRank.scala