千家信息网

nacos中RaftPeerSet的原理和作用是什么

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要讲解了"nacos中RaftPeerSet的原理和作用是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos中RaftPeerSe
千家信息网最后更新 2025年12月03日nacos中RaftPeerSet的原理和作用是什么

这篇文章主要讲解了"nacos中RaftPeerSet的原理和作用是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"nacos中RaftPeerSet的原理和作用是什么"吧!

本文主要研究一下nacos的RaftPeerSet

RaftPeerSet

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java

@Component@DependsOn("serverListManager")public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {    @Autowired    private ServerListManager serverListManager;    private ApplicationContext applicationContext;    private AtomicLong localTerm = new AtomicLong(0L);    private RaftPeer leader = null;    private Map peers = new HashMap<>();    private Set sites = new HashSet<>();    private boolean ready = false;    public RaftPeerSet() {    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }    @PostConstruct    public void init() {        serverListManager.listen(this);    }    public RaftPeer getLeader() {        if (STANDALONE_MODE) {            return local();        }        return leader;    }    public Set allSites() {        return sites;    }    public boolean isReady() {        return ready;    }    public void remove(List servers) {        for (String server : servers) {            peers.remove(server);        }    }    public RaftPeer update(RaftPeer peer) {        peers.put(peer.ip, peer);        return peer;    }    public boolean isLeader(String ip) {        if (STANDALONE_MODE) {            return true;        }        if (leader == null) {            Loggers.RAFT.warn("[IS LEADER] no leader is available now!");            return false;        }        return StringUtils.equals(leader.ip, ip);    }    public Set allServersIncludeMyself() {        return peers.keySet();    }    public Set allServersWithoutMySelf() {        Set servers = new HashSet(peers.keySet());        // exclude myself        servers.remove(local().ip);        return servers;    }    public Collection allPeers() {        return peers.values();    }    public int size() {        return peers.size();    }    public RaftPeer decideLeader(RaftPeer candidate) {        peers.put(candidate.ip, candidate);        SortedBag ips = new TreeBag();        int maxApproveCount = 0;        String maxApprovePeer = null;        for (RaftPeer peer : peers.values()) {            if (StringUtils.isEmpty(peer.voteFor)) {                continue;            }            ips.add(peer.voteFor);            if (ips.getCount(peer.voteFor) > maxApproveCount) {                maxApproveCount = ips.getCount(peer.voteFor);                maxApprovePeer = peer.voteFor;            }        }        if (maxApproveCount >= majorityCount()) {            RaftPeer peer = peers.get(maxApprovePeer);            peer.state = RaftPeer.State.LEADER;            if (!Objects.equals(leader, peer)) {                leader = peer;                applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));                Loggers.RAFT.info("{} has become the LEADER", leader.ip);            }        }        return leader;    }    public RaftPeer makeLeader(RaftPeer candidate) {        if (!Objects.equals(leader, candidate)) {            leader = candidate;            applicationContext.publishEvent(new MakeLeaderEvent(this, leader));            Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",                leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));        }        for (final RaftPeer peer : peers.values()) {            Map params = new HashMap<>(1);            if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {                try {                    String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);                    HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}",                                    response.getResponseBody(), peer.ip);                                peer.state = RaftPeer.State.FOLLOWER;                                return 1;                            }                            update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));                            return 0;                        }                    });                } catch (Exception e) {                    peer.state = RaftPeer.State.FOLLOWER;                    Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);                }            }        }        return update(candidate);    }    public RaftPeer local() {        RaftPeer peer = peers.get(NetUtils.localServer());        if (peer == null && SystemUtils.STANDALONE_MODE) {            RaftPeer localPeer = new RaftPeer();            localPeer.ip = NetUtils.localServer();            localPeer.term.set(localTerm.get());            peers.put(localPeer.ip, localPeer);            return localPeer;        }        if (peer == null) {            throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: "                + Arrays.toString(peers.keySet().toArray()));        }        return peer;    }    public RaftPeer get(String server) {        return peers.get(server);    }    public int majorityCount() {        return peers.size() / 2 + 1;    }    public void reset() {        leader = null;        for (RaftPeer peer : peers.values()) {            peer.voteFor = null;        }    }    public void setTerm(long term) {        localTerm.set(term);    }    public long getTerm() {        return localTerm.get();    }    public boolean contains(RaftPeer remote) {        return peers.containsKey(remote.ip);    }    //......}
  • RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法

  • decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件

  • makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER

RaftCore.MasterElection

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

    public class MasterElection implements Runnable {        @Override        public void run() {            try {                if (!peers.isReady()) {                    return;                }                RaftPeer local = peers.local();                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;                if (local.leaderDueMs > 0) {                    return;                }                // reset timeout                local.resetLeaderDue();                local.resetHeartbeatDue();                sendVote();            } catch (Exception e) {                Loggers.RAFT.warn("[RAFT] error while master election {}", e);            }        }        public void sendVote() {            RaftPeer local = peers.get(NetUtils.localServer());            Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",                JSON.toJSONString(getLeader()), local.term);            peers.reset();            local.term.incrementAndGet();            local.voteFor = local.ip;            local.state = RaftPeer.State.CANDIDATE;            Map params = new HashMap<>(1);            params.put("vote", JSON.toJSONString(local));            for (final String server : peers.allServersWithoutMySelf()) {                final String url = buildURL(server, API_VOTE);                try {                    HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);                                return 1;                            }                            RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);                            Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));                            peers.decideLeader(peer);                            return 0;                        }                    });                } catch (Exception e) {                    Loggers.RAFT.warn("error while sending vote to server: {}", server);                }            }        }    }
  • RaftCore.MasterElection的sendVote方法在请求成功时会执行peers.decideLeader(peer)方法来选举leader

RaftCore.receivedBeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

@Componentpublic class RaftCore {        //......    public RaftPeer receivedBeat(JSONObject beat) throws Exception {        final RaftPeer local = peers.local();        final RaftPeer remote = new RaftPeer();        remote.ip = beat.getJSONObject("peer").getString("ip");        remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));        remote.term.set(beat.getJSONObject("peer").getLongValue("term"));        remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");        remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");        remote.voteFor = beat.getJSONObject("peer").getString("voteFor");        if (remote.state != RaftPeer.State.LEADER) {            Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",                remote.state, JSON.toJSONString(remote));            throw new IllegalArgumentException("invalid state from master, state: " + remote.state);        }        if (local.term.get() > remote.term.get()) {            Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"                , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);            throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()                + ", beat-to-term: " + local.term.get());        }        if (local.state != RaftPeer.State.FOLLOWER) {            Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));            // mk follower            local.state = RaftPeer.State.FOLLOWER;            local.voteFor = remote.ip;        }        final JSONArray beatDatums = beat.getJSONArray("datums");        local.resetLeaderDue();        local.resetHeartbeatDue();        peers.makeLeader(remote);        Map receivedKeysMap = new HashMap<>(datums.size());        for (Map.Entry entry : datums.entrySet()) {            receivedKeysMap.put(entry.getKey(), 0);        }        // now check datums        List batch = new ArrayList<>();        if (!switchDomain.isSendBeatOnly()) {            int processedCount = 0;            if (Loggers.RAFT.isDebugEnabled()) {                Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",                    beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);            }            for (Object object : beatDatums) {                processedCount = processedCount + 1;                JSONObject entry = (JSONObject) object;                String key = entry.getString("key");                final String datumKey;                if (KeyBuilder.matchServiceMetaKey(key)) {                    datumKey = KeyBuilder.detailServiceMetaKey(key);                } else if (KeyBuilder.matchInstanceListKey(key)) {                    datumKey = KeyBuilder.detailInstanceListkey(key);                } else {                    // ignore corrupted key:                    continue;                }                long timestamp = entry.getLong("timestamp");                receivedKeysMap.put(datumKey, 1);                try {                    if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {                        continue;                    }                    if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {                        batch.add(datumKey);                    }                    if (batch.size() < 50 && processedCount < beatDatums.size()) {                        continue;                    }                    String keys = StringUtils.join(batch, ",");                    if (batch.size() <= 0) {                        continue;                    }                    Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"                        , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());                    // update datum entry                    String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");                    HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler() {                        @Override                        public Integer onCompleted(Response response) throws Exception {                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                                return 1;                            }                            List datumList = JSON.parseObject(response.getResponseBody(), new TypeReference>() {                            });                            for (JSONObject datumJson : datumList) {                                OPERATE_LOCK.lock();                                Datum newDatum = null;                                try {                                    Datum oldDatum = getDatum(datumJson.getString("key"));                                    if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {                                        Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",                                            datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);                                        continue;                                    }                                    if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {                                        Datum serviceDatum = new Datum<>();                                        serviceDatum.key = datumJson.getString("key");                                        serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));                                        serviceDatum.value =                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);                                        newDatum = serviceDatum;                                    }                                    if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {                                        Datum instancesDatum = new Datum<>();                                        instancesDatum.key = datumJson.getString("key");                                        instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));                                        instancesDatum.value =                                            JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);                                        newDatum = instancesDatum;                                    }                                    if (newDatum == null || newDatum.value == null) {                                        Loggers.RAFT.error("receive null datum: {}", datumJson);                                        continue;                                    }                                    raftStore.write(newDatum);                                    datums.put(newDatum.key, newDatum);                                    notifier.addTask(newDatum.key, ApplyAction.CHANGE);                                    local.resetLeaderDue();                                    if (local.term.get() + 100 > remote.term.get()) {                                        getLeader().term.set(remote.term.get());                                        local.term.set(getLeader().term.get());                                    } else {                                        local.term.addAndGet(100);                                    }                                    raftStore.updateTerm(local.term.get());                                    Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",                                        newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);                                } catch (Throwable e) {                                    Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);                                } finally {                                    OPERATE_LOCK.unlock();                                }                            }                            TimeUnit.MILLISECONDS.sleep(200);                            return 0;                        }                    });                    batch.clear();                } catch (Exception e) {                    Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);                }            }            List deadKeys = new ArrayList<>();            for (Map.Entry entry : receivedKeysMap.entrySet()) {                if (entry.getValue() == 0) {                    deadKeys.add(entry.getKey());                }            }            for (String deadKey : deadKeys) {                try {                    deleteDatum(deadKey);                } catch (Exception e) {                    Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);                }            }        }        return local;    }    //......}
  • receivedBeat方法会调用peers.makeLeader(remote)来更新leader

小结

  • RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法

  • decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件

  • makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER

感谢各位的阅读,以上就是"nacos中RaftPeerSet的原理和作用是什么"的内容了,经过本文的学习后,相信大家对nacos中RaftPeerSet的原理和作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

方法 更新 作用 原理 一致 事件 学习 信息 内容 标记 状态 节点 研究 统计 成功 小结 就是 思路 情况 文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器防护勒索病毒 山东c语言软件开发价格表 枣庄管理软件开发哪家便宜 北京理工大学网络技术学费 广东省山东软件开发 公司电脑共享服务器地址怎么找到 网络安全法趣味动漫 app服务器如何接入移动网络的 linux服务器凭据如何删除 百度云怎么安装连接数据库 小米 服务器请求失败 深圳市悦茗堂互联网科技有限公司 电脑设备管理数据库 方舟生存进化老是连接服务器超时 客户端与游戏服务器连接失败2k 位置服务器怎么装 河北网络安全知识答题秦皇岛 长沙网络安全城市 浙江盘位机架式服务器云空间 鸿翔网络技术有限公司 常见的电子图书下载数据库 计算机网络技术电视剧 星河集团软件开发 交友互动软件开发 网络安全防护口罩使用说明 路由器域名服务器 软件开发中程序员与文档 杭州青呈网络技术有限公司 数据库完全恢复的步骤 许可证存在无效的服务器
0