如何使用golang etcd raft协议
发表于:2025-11-16 作者:千家信息网编辑
千家信息网最后更新 2025年11月16日,本篇内容介绍了"如何使用golang etcd raft协议"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有
千家信息网最后更新 2025年11月16日如何使用golang etcd raft协议
本篇内容介绍了"如何使用golang etcd raft协议"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题--如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。源码gitee地址:https://gitee.com/ioly/learning.gooop
目标
根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 10)
添加put/get/del kv键值对的rpc接口
继续完善Leader状态的raft协议响应
设计
rpc/IKVStoreRPC: kv操作的rpc接口
store/IKVStore: kv操作的持久化接口
stoer/ILogStore: 从IKVStore继承,以支持kv持久化
lsm/IRaftState: 继承rpc.IKVStoreRPC接口,以支持kv操作
lsm/tLeaderState: 初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。
rpc/IKVStoreRPC.go
kv操作的rpc接口
package rpctype IKVStoreRPC interface { ExecuteKVCmd(cmd *KVCmd, ret *KVRet) error}type KVCmd struct { OPCode KVOPCode Key []byte Content []byte}type KVOPCode intconst ( KVGet KVOPCode = iota KVPut KVOPCode = iota KVDel KVOPCode = iota)type KVRet struct { Code KVRetCode Key []byte Content []byte}type KVRetCode intconst ( KVOk KVRetCode = iota KVKeyNotFound KVRetCode = iota KVInternalError KVRetCode = iota)store/IKVStore.go
kv操作的持久化接口
package storetype IKVStore interface { Get(key []byte) (error, []byte) Put(key []byte, content []byte) error Del(key []byte) error}stoer/ILogStore.go
从IKVStore继承,以支持kv持久化
package storeimport ( "learning/gooop/etcd/raft/model")type ILogStore interface { IKVStore LastAppendedTerm() int64 LastAppendedIndex() int64 LastCommittedTerm() int64 LastCommittedIndex() int64 Append(entry *model.LogEntry) error Commit(index int64) error GetLog(index int64) (error, *model.LogEntry)}lsm/IRaftState.go
继承rpc.IKVStoreRPC接口,以支持kv操作
package lsmimport ( "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc")type IRaftState interface { rpc.IRaftRPC rpc.IKVStoreRPC Role() roles.RaftRole Start()}lsm/tLeaderState.go
初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。
package lsmimport ( "errors" "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/model" "learning/gooop/etcd/raft/roles" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/store" "learning/gooop/etcd/raft/timeout" "sync" "time")// tLeaderState presents a leader nodetype tLeaderState struct { tEventDrivenModel context iRaftStateContext mInitOnce sync.Once mStartOnce sync.Once // update: leInit / leLeaderHeartbeat mTerm int64 // update: leInit / leDisposing mDisposedFlag bool // update: leVoteToCandidate mVotedTerm int64 mVotedCandidateID string mVotedTimestamp int64}// trigger: init()// args: emptyconst leInit = "leader.init"// trigger: Start()// args: emptyconst leStart = "leader.Start"// trigger: whenNewLeaderAnnouncedThenSwitchToFollower// args: emptyconst leDiposing = "leader.Disposing"// trigger : Heartbeat() / AppendLog()// args: term int64const leNewLeaderAnnounced = "leader.NewLeaderAnnounced"// trigger: RequestVote()// args: *rpc.RequestVoteCmdconst leBeforeRequestVote = "leader.BeforeRequestVote"// trigger:// args: *rpc.RequestVoteCmdconst leVoteToCandidate = "leader.VoteToCandidate"// trigger: handleHeartbeat()// args: term int64const leHeartbeatRejected = "leader.HeartbeatRejected"func newLeaderState(ctx iRaftStateContext, term int64) IRaftState { it := new(tLeaderState) it.init(ctx, term) return it}func (me *tLeaderState) init(ctx iRaftStateContext, term int64) { me.mInitOnce.Do(func() { me.context = ctx me.mTerm = term me.initEventHandlers() me.raise(leInit) })}func (me *tLeaderState) initEventHandlers() { // write only logic me.hookEventsForDisposedFlag() me.hookEventsForVotedTerm() // read only logic me.hook(leStart, me.whenStartThenBeginHeartbeatToOthers) me.hook(leNewLeaderAnnounced, me.whenNewLeaderAnnouncedThenSwitchToFollower) me.hook(leHeartbeatRejected, me.whenHeartbeatRejectedThenSwitchToFollower)}func (me *tLeaderState) hookEventsForDisposedFlag() { me.hook(leInit, func(e string, args ...interface{}) { me.mDisposedFlag = false }) me.hook(leDiposing, func(e string, args ...interface{}) { me.mDisposedFlag = true })}func (me *tLeaderState) hookEventsForVotedTerm() { me.hook(leBeforeRequestVote, func(e string, args ...interface{}) { // check last vote timeout if me.mVotedTerm == 0 { return } if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout { me.mVotedTerm = 0 me.mVotedTimestamp = 0 me.mVotedCandidateID = "" } }) me.hook(leVoteToCandidate, func(e string, args ...interface{}) { // after vote to candidate cmd := args[0].(*rpc.RequestVoteCmd) me.mVotedTerm = cmd.Term me.mVotedCandidateID = cmd.CandidateID me.mVotedTimestamp = time.Now().UnixNano() })}func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { // check term if cmd.Term <= me.mTerm { ret.Code = rpc.HBTermMismatch return nil } // new leader me.raise(leNewLeaderAnnounced, cmd.Term) // return ok ret.Code = rpc.HBOk return nil}func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { // check term if cmd.Term <= me.mTerm { ret.Code = rpc.ALTermMismatch return nil } // new leader me.raise(leNewLeaderAnnounced, cmd.Term) // return ok ret.Code = rpc.ALInternalError return nil}func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { // just ignore ret.Code = rpc.CLInternalError return nil}func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { me.raise(leBeforeRequestVote, cmd) // check voted term if cmd.Term < me.mVotedTerm { ret.Code = rpc.RVTermMismatch return nil } if cmd.Term == me.mVotedTerm { if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID { // already vote another ret.Code = rpc.RVVotedAnother return nil } else { // already voted ret.Code = rpc.RVOk return nil } } if cmd.Term > me.mVotedTerm { // new term, check log if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() { // good log me.raise(leVoteToCandidate, cmd) ret.Code = rpc.RVOk } else { // bad log ret.Code = rpc.RVLogMismatch } return nil } // should not reach here ret.Code = rpc.RVTermMismatch return nil}func (me *tLeaderState) Role() roles.RaftRole { return roles.Leader}func (me *tLeaderState) Start() { me.mStartOnce.Do(func() { me.raise(leStart) })}func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) { go func() { for !me.mDisposedFlag { _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error { return me.handleHeartbeat(client) }) time.Sleep(timeout.HeartbeatInterval) } }()}func (me *tLeaderState) boardcast(action func(config.IRaftNodeConfig, rpc.IRaftRPC) error) error { for _,it := range me.context.Config().Nodes() { if it.ID() == me.context.Config().ID() { continue } e := me.context.RaftClientService().Using(it.ID(), func(client rpc.IRaftRPC) error { return action(it, client) }) if e != nil { return e } } return nil}func (me *tLeaderState) handleHeartbeat(client rpc.IRaftRPC) error { cmd := new(rpc.HeartbeatCmd) cmd.Term = me.mTerm cmd.LeaderID = me.context.Config().ID() ret := new(rpc.HeartbeatRet) e := client.Heartbeat(cmd, ret) if e != nil { return e } switch ret.Code { case rpc.HBTermMismatch: me.raise(leHeartbeatRejected, ret.Term) break } return nil}func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) { me.raise(leDiposing) term := args[0].(int64) me.context.HandleStateChanged(newFollowerState(me.context, term))}func (me *tLeaderState) whenHeartbeatRejectedThenSwitchToFollower(_ string, args ...interface{}) { me.raise(leDiposing) term := args[0].(int64) me.context.HandleStateChanged(newFollowerState(me.context, term))}func (me *tLeaderState) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error { switch cmd.OPCode { case rpc.KVGet: return me.handleKVGet(cmd, ret) case rpc.KVPut: return me.handleKVPut(cmd, ret) case rpc.KVDel: return me.handleKVDel(cmd, ret) } return nil}func (me *tLeaderState) handleKVGet(cmd *rpc.KVCmd, ret *rpc.KVRet) error { e, v := me.context.Store().Get(cmd.Key) if e != nil { ret.Code = rpc.KVInternalError return e } ret.Code = rpc.KVOk ret.Content = v return nil}func (me *tLeaderState) handleKVPut(cmd *rpc.KVCmd, ret *rpc.KVRet) error { kvcmd := new(store.PutCmd) kvcmd.Key = cmd.Key kvcmd.Value = cmd.Content // create/append/commit log e := me.broadcastKVCmd(kvcmd, ret) if e != nil { return e } // apply cmd return me.context.Store().Put(cmd.Key, cmd.Content)}func (me *tLeaderState) handleKVDel(cmd *rpc.KVCmd, ret *rpc.KVRet) error { kvcmd := new(store.DelCmd) kvcmd.Key = cmd.Key // create/append/commit log e := me.broadcastKVCmd(kvcmd, ret) if e != nil { return e } // apply cmd return me.context.Store().Put(cmd.Key, cmd.Content)}func (me *tLeaderState) broadcastKVCmd(cmd store.IKVCmd, ret *rpc.KVRet) error { // create log st := me.context.Store() log := new(model.LogEntry) log.Term = me.mTerm log.Index = st.LastCommittedIndex() + 1 log.PrevTerm = st.LastCommittedTerm() log.PrevIndex = st.LastCommittedIndex() log.Command = cmd.Marshal() // append log e := st.Append(log) if e != nil { ret.Code = rpc.KVInternalError return e } // ask other nodes to append log alcmd := new(rpc.AppendLogCmd) alcmd.Term = me.mTerm alcmd.LeaderID = me.context.Config().ID() alcmd.Entry = log sumOk := []int{ 0 } _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error { alret := new(rpc.AppendLogRet) e := client.AppendLog(alcmd, alret) if e != nil { return e } switch alret.Code { case rpc.ALOk: sumOk[0]++ break case rpc.ALTermMismatch: // todo: fixme break case rpc.ALIndexMismatch: // todo: fixme break } return nil }) // wait for most nodes if sumOk[0] >= len(me.context.Config().Nodes()) / 2 { // commit log clcmd := new(rpc.CommitLogCmd) clcmd.LeaderID = me.context.Config().ID() clcmd.Term = me.mTerm clcmd.Index = log.Index _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error { ret := new(rpc.CommitLogRet) e := client.CommitLog(clcmd, ret) if e != nil { return e } switch ret.Code { case rpc.CLInternalError: // todo: fixme break case rpc.CLLogNotFound: // todo: fixme break case rpc.CLOk: return nil } return nil }) // ok return nil } else { return gErrorCannotReachAgreement }}var gErrorCannotReachAgreement = errors.New("cannot reach agreement")"如何使用golang etcd raft协议"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
接口
分布式
问题
支持
一致
状态
系统
存储
一致性
个子
事件
内容
副本
多个
字段
更多
目标
知识
算法
逻辑
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
重点人力资源数据库
山西华为服务器虚拟化建设
creo如何设置服务器管理
什么是数据库查询网页
网络安全法模板
佛山app软件开发订制
软件开发外包效果
汽车租赁系统数据库
提供下载文件的服务器
天远之星连接不到服务器
成都软件开发 网站
网络安全关键词有哪些
dl388服务器售后
购买网络安全公开课视频
软件开发是什么工作
重庆市电信网络安全宣传周
我的世界 搭建pe服务器
java数据库结构
海南耀天网络技术公司
服务器功率多少瓦
华三5900服务器
数据库技术的大量应用
软件开发过程中的质量问题
河南妙趣网络技术有限公司官方
网络技术中关键路线
服务器做渲染设置
网络安全感后感1000
数据库王珊第二章答案
网络工程网络安全方向好吗
我的世界自由度很高的服务器