SparkGraphx计算指定节点的N度关系节点源码
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,直接上代码:package horizon.graphx.utilimport java.security.InvalidParameterExceptionimport horizon.graphx
千家信息网最后更新 2025年12月02日SparkGraphx计算指定节点的N度关系节点源码
直接上代码:
package horizon.graphx.utilimport java.security.InvalidParameterExceptionimport horizon.graphx.util.CollectionUtil.CollectionHelperimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelimport scala.collection.mutable.ArrayBufferimport scala.reflect.ClassTag/** * Created by yepei.ye on 2017/1/19. * Description:用于在图中为指定的节点计算这些节点的N度关系节点,输出这些节点与源节点的路径长度和节点id */object GraphNdegUtil { val maxNDegVerticesCount = 10000 val maxDegree = 1000 /** * 计算节点的N度关系 * * @param edges * @param choosedVertex * @param degree * @tparam ED * @return */ def aggNdegreedVertices[ED: ClassTag](edges: RDD[(VertexId, VertexId)], choosedVertex: RDD[VertexId], degree: Int): VertexRDD[Map[Int, Set[VertexId]]] = { val simpleGraph = Graph.fromEdgeTuples(edges, 0, Option(PartitionStrategy.EdgePartition2D), StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) aggNdegreedVertices(simpleGraph, choosedVertex, degree) } def aggNdegreedVerticesWithAttr[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], choosedVertex: RDD[VertexId], degree: Int, sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true): VertexRDD[Map[Int, Set[VD]]] = { val ndegs: VertexRDD[Map[Int, Set[VertexId]]] = aggNdegreedVertices(graph, choosedVertex, degree, sendFilter) val flated: RDD[Ver[VD]] = ndegs.flatMap(e => e._2.flatMap(t => t._2.map(s => Ver(e._1, s, t._1, null.asInstanceOf[VD])))).persist(StorageLevel.MEMORY_AND_DISK_SER) val matched: RDD[Ver[VD]] = flated.map(e => (e.id, e)).join(graph.vertices).map(e => e._2._1.copy(attr = e._2._2)).persist(StorageLevel.MEMORY_AND_DISK_SER) flated.unpersist(blocking = false) ndegs.unpersist(blocking = false) val grouped: RDD[(VertexId, Map[Int, Set[VD]])] = matched.map(e => (e.source, ArrayBuffer(e))).reduceByKey(_ ++= _).map(e => (e._1, e._2.map(t => (t.degree, Set(t.attr))).reduceByKey(_ ++ _).toMap)) matched.unpersist(blocking = false) VertexRDD(grouped) } def aggNdegreedVertices[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], choosedVertex: RDD[VertexId], degree: Int, sendFilter: (VD, VD) => Boolean = (_: VD, _: VD) => true ): VertexRDD[Map[Int, Set[VertexId]]] = { if (degree < 1) { throw new InvalidParameterException("度参数错误:" + degree) } val initVertex = choosedVertex.map(e => (e, true)).persist(StorageLevel.MEMORY_AND_DISK_SER) var g: Graph[DegVertex[VD], Int] = graph.outerJoinVertices(graph.degrees)((_, old, deg) => (deg.getOrElse(0), old)) .subgraph(vpred = (_, a) => a._1 <= maxDegree) //去掉大节点 .outerJoinVertices(initVertex)((id, old, hasReceivedMsg) => { DegVertex(old._2, hasReceivedMsg.getOrElse(false), ArrayBuffer((id, 0))) //初始化要发消息的节点 }).mapEdges(_ => 0).cache() //简化边属性 choosedVertex.unpersist(blocking = false) var i = 0 var prevG: Graph[DegVertex[VD], Int] = null var newVertexRdd: VertexRDD[ArrayBuffer[(VertexId, Int)]] = null while (i < degree + 1) { prevG = g //发第i+1轮消息 newVertexRdd = prevG.aggregateMessages[ArrayBuffer[(VertexId, Int)]](sendMsg(_, sendFilter), (a, b) => reduceVertexIds(a ++ b)).persist(StorageLevel.MEMORY_AND_DISK_SER) g = g.outerJoinVertices(newVertexRdd)((vid, old, msg) => if (msg.isDefined) updateVertexByMsg(vid, old, msg.get) else old.copy(init = false)).cache() prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) newVertexRdd.unpersist(blocking = false) i += 1 } newVertexRdd.unpersist(blocking = false) val maped = g.vertices.join(initVertex).mapValues(e => sortResult(e._1)).persist(StorageLevel.MEMORY_AND_DISK_SER) initVertex.unpersist() g.unpersist(blocking = false) VertexRDD(maped) } private case class Ver[VD: ClassTag](source: VertexId, id: VertexId, degree: Int, attr: VD = null.asInstanceOf[VD]) private def updateVertexByMsg[VD: ClassTag](vertexId: VertexId, oldAttr: DegVertex[VD], msg: ArrayBuffer[(VertexId, Int)]): DegVertex[VD] = { val addOne = msg.map(e => (e._1, e._2 + 1)) val newMsg = reduceVertexIds(oldAttr.degVertices ++ addOne) oldAttr.copy(init = msg.nonEmpty, degVertices = newMsg) } private def sortResult[VD: ClassTag](degs: DegVertex[VD]): Map[Int, Set[VertexId]] = degs.degVertices.map(e => (e._2, Set(e._1))).reduceByKey(_ ++ _).toMap case class DegVertex[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)]) case class VertexDegInfo[VD: ClassTag](var attr: VD, init: Boolean = false, degVertices: ArrayBuffer[(VertexId, Int)]) private def sendMsg[VD: ClassTag](e: EdgeContext[DegVertex[VD], Int, ArrayBuffer[(VertexId, Int)]], sendFilter: (VD, VD) => Boolean): Unit = { try { val src = e.srcAttr val dst = e.dstAttr //只有dst是ready状态才接收消息 if (src.degVertices.size < maxNDegVerticesCount && (src.init || dst.init) && dst.degVertices.size < maxNDegVerticesCount && !isAttrSame(src, dst)) { if (sendFilter(src.attr, dst.attr)) { e.sendToDst(reduceVertexIds(src.degVertices)) } if (sendFilter(dst.attr, dst.attr)) { e.sendToSrc(reduceVertexIds(dst.degVertices)) } } } catch { case ex: Exception => println(s"==========error found: exception:${ex.getMessage}," + s"edgeTriplet:(srcId:${e.srcId},srcAttr:(${e.srcAttr.attr},${e.srcAttr.init},${e.srcAttr.degVertices.size}))," + s"dstId:${e.dstId},dstAttr:(${e.dstAttr.attr},${e.dstAttr.init},${e.dstAttr.degVertices.size}),attr:${e.attr}") ex.printStackTrace() throw ex } } private def reduceVertexIds(ids: ArrayBuffer[(VertexId, Int)]): ArrayBuffer[(VertexId, Int)] = ArrayBuffer() ++= ids.reduceByKey(Math.min) private def isAttrSame[VD: ClassTag](a: DegVertex[VD], b: DegVertex[VD]): Boolean = a.init == b.init && allKeysAreSame(a.degVertices, b.degVertices) private def allKeysAreSame(a: ArrayBuffer[(VertexId, Int)], b: ArrayBuffer[(VertexId, Int)]): Boolean = { val aKeys = a.map(e => e._1).toSet val bKeys = b.map(e => e._1).toSet if (aKeys.size != bKeys.size || aKeys.isEmpty) return false aKeys.diff(bKeys).isEmpty && bKeys.diff(aKeys).isEmpty }}其中sortResult方法里对Traversable[(K,V)]类型的集合使用了reduceByKey方法,这个方法是自行封装的,使用时需要导入,代码如下:
/** * Created by yepei.ye on 2016/12/21. * Description: */object CollectionUtil { /** * 对具有Traversable[(K, V)]类型的集合添加reduceByKey相关方法 * * @param collection * @param kt * @param vt * @tparam K * @tparam V */ implicit class CollectionHelper[K, V](collection: Traversable[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) { def reduceByKey(f: (V, V) => V): Traversable[(K, V)] = collection.groupBy(_._1).map { case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => (a._1, f(a._2, b._2))) } /** * reduceByKey的同时,返回被reduce掉的元素的集合 * * @param f * @return */ def reduceByKeyWithReduced(f: (V, V) => V)(implicit kt: ClassTag[K], vt: ClassTag[V]): (Traversable[(K, V)], Traversable[(K, V)]) = { val reduced: ArrayBuffer[(K, V)] = ArrayBuffer() val newSeq = collection.groupBy(_._1).map { case (_: K, values: Traversable[(K, V)]) => values.reduce((a, b) => { val newValue: V = f(a._2, b._2) val reducedValue: V = if (newValue == a._2) b._2 else a._2 val reducedPair: (K, V) = (a._1, reducedValue) reduced += reducedPair (a._1, newValue) }) } (newSeq, reduced.toTraversable) } }}总结
以上就是本文关于SparkGraphx计算指定节点的N度关系节点源码的全部内容了,希望对大家有所帮助。感兴趣的朋友可以参阅:浅谈七种常见的Hadoop和Spark项目案例 Spark的广播变量和累加器使用方法代码示例 Spark入门简介等,有什么问题请留言,小编会及时回复大家的。
节点
方法
代码
消息
类型
源码
使用方法
元素
兴趣
内容
变量
只有
同时
就是
属性
常见
朋友
案例
状态
示例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
aix重启需要关闭数据库吗
1.9.4服务器地址
武汉东浦软件开发工资待遇
郑州翼云软件开发
西双版纳电信服务器地址
四大数据库的异同点
服务器搜索360就闪退
戴尔笔记本做软件开发哪个好
广州专科软件开发专业
小米服务器安装网卡
开世览文数据库
中国第一大互联网科技公司
纯软件开发的出路
csgo无法使用安全服务器
软件开发工具包的简称
网络安全智能家居
校园网络安全论文主要工作
数据库人民
中华数据库水稻品种
云时代竟抢服务器
加强网络安全工作落实
灰度服务器
逻辑服务器
易点缘网络技术
python比数据库难吗
数据库索引重复报警
网络安全融入思想政治的必要性
郑州网络技术公司有哪些
网络技术春考可以报考哪些专业
网络安全暨保密工作培训会