千家信息网

Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,作者|白松目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。
千家信息网最后更新 2025年12月03日Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松

目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:

@Override  public void compute(Iterable messages) {    if (getSuperstep() == 0) {      setValue(new DoubleWritable(Double.MAX_VALUE));    }    double minDist = isSource() ? 0d : Double.MAX_VALUE;    for (DoubleWritable message : messages) {      minDist = Math.min(minDist, message.get());    }    if (minDist < getValue().get()) {      setValue(new DoubleWritable(minDist));      for (Edge edge : getEdges()) {        double distance = minDist + edge.getValue().get();        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));      }    }    //把顶点置为InActive状态    voteToHalt();  }

附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。

hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。

修改过程如下:

  1. org.apache.giraph.partition. PartitionStats 类

添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:

/** computed vertices in this partition */private long computedVertexCount=0;/*** Increment the computed vertex count by one.*/public void incrComputedVertexCount() {    ++ computedVertexCount;}/** * @return the computedVertexCount */public long getComputedVertexCount() {    return computedVertexCount;}

修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。

@Overridepublic void readFields(DataInput input) throws IOException {    partitionId = input.readInt();    vertexCount = input.readLong();    finishedVertexCount = input.readLong();    edgeCount = input.readLong();    messagesSentCount = input.readLong();    //添加下条语句    computedVertexCount=input.readLong();}@Overridepublic void write(DataOutput output) throws IOException {    output.writeInt(partitionId);    output.writeLong(vertexCount);    output.writeLong(finishedVertexCount);    output.writeLong(edgeCount);    output.writeLong(messagesSentCount);    //添加下条语句    output.writeLong(computedVertexCount);}
  1. org.apache.giraph.graph. GlobalStats 类

    添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。

 /** computed vertices in this partition   *  Add by BaiSong   */  private long computedVertexCount=0;     /**     * @return the computedVertexCount     */    public long getComputedVertexCount() {        return computedVertexCount;    }

修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。

/**  * Add the stats of a partition to the global stats.  *  * @param partitionStats Partition stats to be added.  */  public void addPartitionStats(PartitionStats partitionStats) {    this.vertexCount += partitionStats.getVertexCount();    this.finishedVertexCount += partitionStats.getFinishedVertexCount();    this.edgeCount += partitionStats.getEdgeCount();    //Add by BaiSong,添加下条语句    this.computedVertexCount+=partitionStats.getComputedVertexCount(); }

当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:

public String toString() {        return "(vtx=" + vertexCount + ", computedVertexCount="                + computedVertexCount + ",finVtx=" + finishedVertexCount                + ",edges=" + edgeCount + ",msgCount=" + messageCount                + ",haltComputation=" + haltComputation + ")";    }
  1. org.apache.giraph.graph. ComputeCallable

添加统计功能。在computePartition()方法中,添加下面一句。

if (!vertex.isHalted()) {        context.progress();        TimerContext computeOneTimerContext = computeOneTimer.time();        try {            vertex.compute(messages);        //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1            partitionStats.incrComputedVertexCount();        } finally {           computeOneTimerContext.stop();        }……
  1. 添加Counters统计,和我的博客Giraph源码分析(七)-- 添加消息统计功能 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:
package org.apache.giraph.counters;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.mapreduce.Mapper.Context;import com.google.common.collect.Maps;/** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */public class GiraphComputedVertex extends HadoopCountersBase {    /** Counter group name for the giraph Messages */    public static final String GROUP_NAME = "Giraph Computed Vertex";    /** Singleton instance for everyone to use */    private static GiraphComputedVertex INSTANCE;    /** superstep time in msec */    private final Map superstepVertexCount;    private GiraphComputedVertex(Context context) {        super(context, GROUP_NAME);        superstepVertexCount = Maps.newHashMap();    }    /**     * Instantiate with Hadoop Context.     *      * @param context     *            Hadoop Context to use.     */    public static void init(Context context) {        INSTANCE = new GiraphComputedVertex(context);    }    /**     * Get singleton instance.     *      * @return singleton GiraphTimers instance.     */    public static GiraphComputedVertex getInstance() {        return INSTANCE;    }    /**     * Get counter for superstep messages     *      * @param superstep     * @return     */    public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {        GiraphHadoopCounter counter = superstepVertexCount.get(superstep);        if (counter == null) {            String counterPrefix = "Superstep: " + superstep+" ";            counter = getCounter(counterPrefix);            superstepVertexCount.put(superstep, counter);        }        return counter;    }    @Override    public Iterator iterator() {        return superstepVertexCount.values().iterator();    }}
  1. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:

上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0

解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。

【阅读更多文章请访问数澜社区】

顶点 方法 统计 迭代 数目 消息 状态 功能 变量 算法 语句 过程 加下 源码 分析 总数 条件 目的 结果 测试 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 手机做ftp服务器 最新全球网络安全产业峰会 鸟培训 网络安全 网络安全高手 宜阳租房软件开发 我的世界怎么成为服务器管理 数据库数据第一行 服务器调协安全接入失败 数据库课程资源百度云 2021校园网络安全画 sql数据库新建 数据库热图是什么意思 灵璧县网络技术及信息安全工程师 平板升级无法连接服务器 深圳软件开发销售业务 计算机网络技术开设课程 网络安全风险特点和危害 昆山世硕有软件开发的职位吗 福建数据库空投箱销售 济南传世网络技术有限公司 vc中如何读取数据库 津南区互联网软件开发价格走势 软件开发专业能考一级建造师吗 数通互联网科技有限公司 sql数据库安装无法继续 中学网络安全教育主题班会资料 百旺系统升级后提示数据库 服务器证书无效或不存在怎么办 软件开发企业所得税加计扣除 使用阿里云服务器搭建个人网站
0