千家信息网

Spark Graphx如何实现图中极大团挖掘

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关Spark Graphx如何实现图中极大团挖掘,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。spark graphx并未提
千家信息网最后更新 2025年12月02日Spark Graphx如何实现图中极大团挖掘

今天就跟大家聊聊有关Spark Graphx如何实现图中极大团挖掘,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

spark graphx并未提供极大团挖掘算法

当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

####思路:####

spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集

利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)

对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限

期待真正的并行化的极大团算法

####配置文件:####

graph_data_path=hdfs://localhost/graph_data out_path=hdfs://localhost/clique ck_path=hdfs://localhost/checkpoint numIter=50      剪枝次数 count=3         极大团顶点数大小 algorithm=2     极大团算法,1:个人实现  2:jgrapht percent=90      剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高 spark.master=local spark.app.name=graph spark.serializer=org.apache.spark.serializer.KryoSerializer spark.yarn.executor.memoryOverhead=20480 spark.yarn.driver.memoryOverhead=20480 spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.driver.maxResultSize=10g spark.default.parallelism=60

####样本数据:####

{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}

####样本图:####

####输出:####

0,1,2 0,2,3 3,4,5 4,5,6

####代码实现:####

import java.util import java.util.Properties
import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} import org.jgrapht.alg.BronKerboschCliqueFinder import org.jgrapht.graph.{DefaultEdge, SimpleGraph}  import scala.collection.JavaConverters._ import scala.collection.mutable  object ApplicationTitan {     def main(args: Array[String]) {         val prop = new Properties()         prop.load(getClass.getResourceAsStream("/config.properties"))              val graph_data_path = prop.getProperty("graph_data_path")         val out_path = prop.getProperty("out_path")         val ck_path = prop.getProperty("ck_path")         val count = Integer.parseInt(prop.getProperty("count"))         val numIter = Integer.parseInt(prop.getProperty("numIter"))         val algorithm = Integer.parseInt(prop.getProperty("algorithm"))         val percent = Integer.parseInt(prop.getProperty("percent"))         val conf = new SparkConf()         try {           Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path) //            Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path)         } catch {           case ex: Exception =>             ex.printStackTrace(System.out)         }              prop.stringPropertyNames().asScala.foreach(s => {           if (s.startsWith("spark")) {             conf.set(s, prop.getProperty(s))           }         })         conf.registerKryoClasses(Array(getClass))         val sc = new SparkContext(conf)         sc.setLogLevel("ERROR")         sc.setCheckpointDir(ck_path)         val sqlc = new SQLContext(sc)         try {           val e_df = sqlc.read //                        .json(graph_data_path)         .parquet(graph_data_path)            var e_rdd = e_df             .mapPartitions(it => {               it.map({                 case Row(dst: String, src: String) =>                   val src_long = src.toLong                   val dst_long = dst.toLong                   if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)               })             }).distinct()           e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)                var bc: Broadcast[Set[Long]] = null           var iter = 0           var bc_size = 0          //剪枝           while (iter <= numIter) {             val temp = e_rdd               .flatMap(x => List((x._1, 1), (x._2, 1)))               .reduceByKey((x, y) => x + y)               .filter(x => x._2 >= count - 1)               .mapPartitions(it => it.map(x => x._1))             val bc_value = temp.collect().toSet             bc = sc.broadcast(bc_value)             e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2))             e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)             iter += 1             if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {               println("total iter : "+ iter)               iter = Int.MaxValue             }             bc_size = bc_value.size           }                // 构造图           val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))           val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)                //连通图           val cc = graph.connectedComponents().vertices           cc.persist(StorageLevel.MEMORY_AND_DISK_SER)                cc.join(e_rdd)             .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))             .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)             .mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))             .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4)             .filter(x => x._2.size >= count - 1)             .flatMap(x => {               if (algorithm == 1)                 find(x, count)               else                 find2(x, count)             })             .mapPartitions(it => {               it.map({                 case set =>                   var temp = ""                   set.asScala.foreach(x => temp += x + ",")                   temp.substring(0, temp.length - 1)                 case _ =>               })             })     //                .coalesce(1)     .saveAsTextFile(out_path) }      catch {   case ex: Exception =>     ex.printStackTrace(System.out)     }     sc.stop() } //自己实现的极大团算法  def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = {     println(x._1 + "|s|" + x._2.size)     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     val neighbors = new util.HashMap[String, util.Set[String]]     val finder = new CliqueFinder(neighbors, count)     x._2.foreach(r => {       val v1 = r._1.toString       val v2 = r._2.toString       if (neighbors.containsKey(v1)) {         neighbors.get(v1).add(v2)       } else {         val temp = new util.HashSet[String]()         temp.add(v2)         neighbors.put(v1, temp)       }       if (neighbors.containsKey(v2)) {         neighbors.get(v2).add(v1)       } else {         val temp = new util.HashSet[String]()         temp.add(v1)         neighbors.put(v2, temp)       }     })     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     finder.findMaxCliques().asScala } //jgrapht 中的极大团算法  def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = {     println(x._1 + "|s|" + x._2.size)     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])     x._2.foreach(r => {       val v1 = r._1.toString       val v2 = r._2.toString       to_clique.addVertex(v1)       to_clique.addVertex(v2)       to_clique.addEdge(v1, v2)     })     val finder = new BronKerboschCliqueFinder(to_clique)     val list = finder.getAllMaximalCliques.asScala     var result = Set[util.Set[String]]()     list.foreach(x => {       if (x.size() >= count)         result = result + x     })     println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())     result } }

//自己实现的极大团算法

import java.util.*;  /**  * [@author](https://my.oschina.net/arthor) mopspecial@gmail.com  * [@date](https://my.oschina.net/u/2504391) 2017/7/31  */ public class CliqueFinder {     private Map> neighbors;     private Set nodes;     private Set> maxCliques = new HashSet<>();     private Integer minSize;      public CliqueFinder(Map> neighbors, Integer minSize) {         this.neighbors = neighbors;         this.nodes = neighbors.keySet();         this.minSize = minSize;     }      private void bk3(Set clique, List candidates, List excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) {             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique);             }             return;         }          for (String s : degeneracy_order(candidates)) {             List new_candidates = new ArrayList<>(candidates);             new_candidates.retainAll(neighbors.get(s));              List new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s));             Set nextClique = new HashSet<>(clique);             nextClique.add(s);             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s);             excluded.add(s);         }     }      private void bk2(Set clique, List candidates, List excluded) {         if (candidates.isEmpty() && excluded.isEmpty()) {             if (!clique.isEmpty() && clique.size() >= minSize) {                 maxCliques.add(clique);             }             return;         }         String pivot = pick_random(candidates);         if (pivot == null) {             pivot = pick_random(excluded);         }         List tempc = new ArrayList<>(candidates);         tempc.removeAll(neighbors.get(pivot));          for (String s : tempc) {             List new_candidates = new ArrayList<>(candidates);             new_candidates.retainAll(neighbors.get(s));              List new_excluded = new ArrayList<>(excluded);             new_excluded.retainAll(neighbors.get(s));             Set nextClique = new HashSet<>(clique);             nextClique.add(s);             bk2(nextClique, new_candidates, new_excluded);             candidates.remove(s);             excluded.add(s);         }     }      private List degeneracy_order(List innerNodes) {         List result = new ArrayList<>();         Map deg = new HashMap<>();         for (String node : innerNodes) {             deg.put(node, neighbors.get(node).size());         }         while (!deg.isEmpty()) {             Integer min = Collections.min(deg.values());             String minKey = null;             for (String key : deg.keySet()) {                 if (deg.get(key).equals(min)) {                     minKey = key;                     break;                 }             }             result.add(minKey);             deg.remove(minKey);             for (String k : neighbors.get(minKey)) {                 if (deg.containsKey(k)) {                     deg.put(k, deg.get(k) - 1);                 }             }          }         return result;     }       private String pick_random(List random) {         if (random != null && !random.isEmpty()) {             return random.get(0);         } else {             return null;         }     }      public Set> findMaxCliques() {         this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>());         return maxCliques;     }      public static void main(String[] args) {         Map> neighbors = new HashMap<>();         neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3")));         neighbors.put("1", new HashSet<>(Arrays.asList("0", "2")));         neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6")));         neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5")));         neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6")));         neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6")));         neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5")));         neighbors.put("7", new HashSet<>(Arrays.asList("6")));         CliqueFinder finder = new CliqueFinder(neighbors, 3);         finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>());         System.out.println(finder.maxCliques);     } }

看完上述内容,你们对Spark Graphx如何实现图中极大团挖掘有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

算法 图中 内容 数据 样本 点数 个人 代码 关联性 大小 子集 思想 思路 效率 文件 更多 有限 概念 次数 百分 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 有哪些软件开发环境 电信网络技术培训 我来数科总是提示服务器连接失败 公司网络安全日常检查表 网络安全防诈骗主题班会总结 网络安全坚实 上游网络技术有限公司招聘 我的世界服务器管理员在哪 猩猩助手怎么连接游戏服务器失败 大学一般有哪些服务器 遵义网络安全知识培训 数据库添加的数据在后台怎么找 英文介绍网络安全 我的世界服务器如何开单机 高济互联网科技有限公司招聘 中国出台的网络安全法 车牌门禁数据库 如何确认网络安全 高质量建设数据库 医学数据库标准制定 会议 数据库可以创建查询吗 获取数据库中值为变量的字段 基础网络安全攻防技术 盘古在线网络技术有限公司 梦幻西游私服服务器端是哪个文件 软件开发转行做网络工程师 nosql数据库主流 软件开发中的常见的关键技术 网络安全教学教案 常州企业软件开发业务流程
0