golang etcd raft协议是怎样的
发表于:2025-11-15 作者:千家信息网编辑
千家信息网最后更新 2025年11月15日,今天就跟大家聊聊有关golang etcd raft协议是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。raft分布式一致性算法分布式存
千家信息网最后更新 2025年11月15日golang etcd raft协议是怎样的
今天就跟大家聊聊有关golang etcd raft协议是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题--如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。源码gitee地址:https://gitee.com/ioly/learning.gooop
目标
根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 11)
虽然Leader State还有细节没处理完,但应该能启动并提供基本服务了
添加外围功能,为首次"点火"做准备:
config/tRaftConfig:从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
lsm/tRaftLSMImplement: 提供对顶层接口IRaftLSM的实现,将"配置/kv存储/节点通讯"三大块粘合起来
server/IRaftKVServer:server启动器接口
server/tRaftKVServer: server启动器的实现,监听raft rpc和kv rpc
config/tRaftConfig.go
从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
package configimport ( "encoding/json" "os")type tRaftConfig struct { ID string Nodes []*tRaftNodeConfig}type tRaftNodeConfig struct { ID string Endpoint string}func (me *tRaftConfig) GetID() string { return me.ID}func (me *tRaftConfig) GetNodes() []IRaftNodeConfig { a := make([]IRaftNodeConfig, len(me.Nodes)) for i,it := range me.Nodes { a[i] = it } return a}func (me *tRaftNodeConfig) GetID() string { return me.ID}func (me *tRaftNodeConfig) GetEndpoint() string { return me.Endpoint}func LoadJSONFile(file string) IRaftConfig { data, err := os.ReadFile(file) if err != nil { panic(err) } c := new(tRaftConfig) err = json.Unmarshal(data, c) if err != nil { panic(err) } return c}lsm/tRaftLSMImplement.go
提供对顶层接口IRaftLSM的实现,将"配置/kv存储/节点通讯"三大块粘合起来,并添加诊断日志。
package lsmimport ( "learning/gooop/etcd/raft/common" "learning/gooop/etcd/raft/config" "learning/gooop/etcd/raft/logger" "learning/gooop/etcd/raft/rpc" "learning/gooop/etcd/raft/rpc/client" "learning/gooop/etcd/raft/store" "sync")type tRaftLSMImplement struct { tEventDrivenModel mInitOnce sync.Once mConfig config.IRaftConfig mStore store.ILogStore mClientService client.IRaftClientService mState IRaftState}// trigger: init()// args: emptyconst meInit = "lsm.Init"// trigger: HandleStateChanged()// args: IRaftStateconst meStateChanged = "lsm.StateChnaged"func (me *tRaftLSMImplement) init() { me.mInitOnce.Do(func() { me.initEventHandlers() me.raise(meInit) })}func (me *tRaftLSMImplement) initEventHandlers() { // write only me.hookEventsForConfig() me.hookEventsForStore() me.hookEventsForPeerService() me.hookEventsForState()}func (me *tRaftLSMImplement) hookEventsForConfig() { me.hook(meInit, func(e string, args ...interface{}) { logger.Logf("tRaftLSMImplement.init, ConfigFile = %v", common.ConfigFile) me.mConfig = config.LoadJSONFile(common.ConfigFile) })}func (me *tRaftLSMImplement) hookEventsForStore() { me.hook(meInit, func(e string, args ...interface{}) { logger.Logf("tRaftLSMImplement.init, DataFile = %v", common.DataFile) err, db := store.NewBoltStore(common.DataFile) if err != nil { panic(err) } me.mStore = db })}func (me *tRaftLSMImplement) hookEventsForPeerService() { me.hook(meInit, func(e string, args ...interface{}) { me.mClientService = client.NewRaftClientService(me.mConfig) })}func (me *tRaftLSMImplement) hookEventsForState() { me.hook(meInit, func(e string, args ...interface{}) { me.mState = newFollowerState(me, me.mStore.LastCommittedTerm()) me.mState.Start() }) me.hook(meStateChanged, func(e string, args ...interface{}) { state := args[0].(IRaftState) logger.Logf("tRaftLSMImplement.StateChanged, %v", state.Role()) me.mState = state state.Start() })}func (me *tRaftLSMImplement) Config() config.IRaftConfig { return me.mConfig}func (me *tRaftLSMImplement) Store() store.ILogStore { return me.mStore}func (me *tRaftLSMImplement) HandleStateChanged(state IRaftState) { me.raise(meStateChanged, state)}func (me *tRaftLSMImplement) RaftClientService() client.IRaftClientService { return me.mClientService}func (me *tRaftLSMImplement) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { state := me.mState e := state.Heartbeat(cmd, ret) logger.Logf("tRaftLSMImplement.Heartbeat, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { state := me.mState e := state.AppendLog(cmd, ret) logger.Logf("tRaftLSMImplement.AppendLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { state := me.mState e := state.CommitLog(cmd, ret) logger.Logf("tRaftLSMImplement.CommitLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { state := me.mState e := state.RequestVote(cmd, ret) logger.Logf("tRaftLSMImplement.RequestVote, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error { state := me.mState e := state.ExecuteKVCmd(cmd, ret) logger.Logf("tRaftLSMImplement.ExecuteKVCmd, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e) return e}func (me *tRaftLSMImplement) State() IRaftState { return me.mState}func NewRaftLSM() IRaftLSM { it := new(tRaftLSMImplement) it.init() return it}server/IRaftKVServer.go
server启动器接口
package servertype IRaftKVServer interface { BeginServeTCP(port int) error}server/tRaftKVServer.go
server启动器的实现,监听raft rpc和kv rpc
package serverimport ( "fmt" "learning/gooop/etcd/raft/lsm" rrpc "learning/gooop/etcd/raft/rpc" "learning/gooop/saga/mqs/logger" "net" "net/rpc" "time")type tRaftKVServer intfunc (me *tRaftKVServer) BeginServeTCP(port int) error { logger.Logf("tRaftKVServer.BeginServeTCP, starting, port=%v", port) // resolve address addy, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port)) if err != nil { return err } // create raft lsm singleton raftLSM := lsm.NewRaftLSM() // register raft rpc server rserver := &RaftRPCServer { mRaftLSM : raftLSM, } err = rpc.Register(rserver) if err != nil { return err } // register kv rpc server kserver := &KVStoreRPCServer{ mRaftLSM: raftLSM, } err = rpc.Register(kserver) if err != nil { return err } inbound, err := net.ListenTCP("tcp", addy) if err != nil { return err } go rpc.Accept(inbound) logger.Logf("tRaftKVServer.BeginServeTCP, service ready at port=%v", port) return nil}// RaftRPCServer exposes a raft rpc servicetype RaftRPCServer struct { mRaftLSM lsm.IRaftLSM}// Heartbeat leader to followerfunc (me *RaftRPCServer) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error { e := me.mRaftLSM.Heartbeat(cmd, ret) logger.Logf("RaftRPCServer.Heartbeat, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// AppendLog leader to followerfunc (me *RaftRPCServer) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error { e := me.mRaftLSM.AppendLog(cmd, ret) logger.Logf("RaftRPCServer.AppendLog, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// CommitLog leader to followerfunc (me *RaftRPCServer) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error { e := me.mRaftLSM.CommitLog(cmd, ret) logger.Logf("RaftRPCServer.CommitLog, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// RequestVote candidate to othersfunc (me *RaftRPCServer) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error { e := me.mRaftLSM.RequestVote(cmd, ret) logger.Logf("RaftRPCServer.RequestVote, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}// Ping to keep alivefunc (me *RaftRPCServer) Ping(cmd *rrpc.PingCmd, ret *rrpc.PingRet) error { ret.SenderID = me.mRaftLSM.Config().GetID() ret.Timestamp = time.Now().UnixNano() logger.Logf("RaftRPCServer.Ping, cmd=%v, ret=%v", cmd, ret) return nil}// KVStoreRPCServer expose a kv storage servicetype KVStoreRPCServer struct { mRaftLSM lsm.IRaftLSM}// ExecuteKVCmd leader to followerfunc (me *KVStoreRPCServer) ExecuteKVCmd(cmd *rrpc.KVCmd, ret *rrpc.KVRet) error { e := me.mRaftLSM.ExecuteKVCmd(cmd, ret) logger.Logf("KVStoreRPCServer.ExecuteKVCmd, cmd=%v, ret=%v, e=%v", cmd, ret, e) return e}(未完待续)
看完上述内容,你们对golang etcd raft协议是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
存储
分布式
启动器
接口
节点
问题
配置
一致
内容
系统
一致性
三大
个子
副本
多个
文件
日志
目标
算法
通讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云服务器和实体服务器安全性
中国矿业大学移动软件开发
寿光市科威网络技术有限公司
怪物猎人世界各服务器怎么解锁
java中如何轮训数据库
宝山区咨询软件开发平台资质
辽宁网咖网络技术有限公司
谷粒商城买服务器
游戏软件开发证书
网络安全项目预算
linx系统登录数据库
庆余年手游各个服务器
部门 管理 服务器
怎么查蓝阔服务器ip
我的世界1.8.9服务器生存
银币鉴定软件开发
sql数据库实现数据复制
ecs服务器需要注册域名吗
可信无线自组网络技术
网络安全考证指引
荒野的召唤服务器怎么搜索
戴尔服务器总代理
软件开发商务本
金华服务器机柜价格怎么样
来凤县天气预报软件开发
越南软件开发工资多少
软件开发工作量的考核
服务器网络用桥接还是nat
ibm 服务器 巡检
中国网络安全信息化战略