千家信息网

kubernetes中etcd增删改查的具体实现

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容主要讲解"kubernetes中etcd增删改查的具体实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"kubernetes中etcd增删改查的具
千家信息网最后更新 2025年12月03日kubernetes中etcd增删改查的具体实现

本篇内容主要讲解"kubernetes中etcd增删改查的具体实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"kubernetes中etcd增删改查的具体实现"吧!

kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现

1. 数据的存储与版本

1.1 数据存储的转换

在k8s中有部分数据的存储是需要经过处理之后才能存储的,比如secret这种加密的数据,既然要存储就至少包含两个操作,加密存储,解密读取,transformer就是为了完成该操作而实现的,其在进行etcd数据存储的时候回对数据进行加密,而在读取的时候,则会进行解密

1.2 资源版本revision

在etcd中进行修改(增删改)操作的时候,都会递增revision,而在k8s中也通过该值来作为k8s资源的ResourceVersion,该机制也是实现watch的关键机制,在操作etcd解码从etcd获取的数据的时候,会通过versioner组件来为资源动态的修改该值

1.3 数据模型的映射

将数据从etcd中读取后,数据本身就是一个字节数组,如何将对应的数据转换成我们真正的运行时对象呢?还记得我们之前的scheme与codec么,在这里我们知道对应的数据编码格式,也知道资源对象的类型,则通过codec、字节数组、目标类型,我们就可以完成对应数据的反射

2. 查询接口一致性

etcd中的数据写入是基于leader单点写入和集群quorum机制实现的,并不是一个强一致性的数据写入,则如果如果我们访问的节点不存在quorum的半数节点内,则可能造成短暂的数据不一致,针对一些强一致的场景,我们可以通过其revision机制来进行数据的读取, 保证我们读取到更新之后的数据

// 省略非核心代码func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {        // 获取key        getResp, err := s.client.KV.Get(ctx, key, s.getOps...)    // 检测当前版本,是否达到最小版本的        if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {                return err        }        // 执行数据转换        data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))        if err != nil {                return storage.NewInternalError(err.Error())        }        // 解码数据        return decode(s.codec, s.versioner, data, out, kv.ModRevision)}

3. 创建接口实现

创建一个接口数据则会首先进行资源对象的检查,避免重复创建对象,此时会先通过资源对象的version字段来进行初步检查,然后在利用etcd的事务机制来保证资源创建的原子性操作

// 省略非核心代码func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {        if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {                return errors.New("resourceVersion should not be set on objects to be created")        }        if err := s.versioner.PrepareObjectForStorage(obj); err != nil {                return fmt.Errorf("PrepareObjectForStorage failed: %v", err)        }        // 将数据编码        data, err := runtime.Encode(s.codec, obj)        if err != nil {                return err        }                // 转换数据        newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))        if err != nil {                return storage.NewInternalError(err.Error())        }        startTime := time.Now()    // 事务操作        txnResp, err := s.client.KV.Txn(ctx).If(                notFound(key), // 如果之前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在        ).Then(                clientv3.OpPut(key, string(newData), opts...), // put修改数据        ).Commit()        metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)        if err != nil {                return err        }        if !txnResp.Succeeded {                return storage.NewKeyExistsError(key, 0)        }        if out != nil {        // 获取对应的Revision                putResp := txnResp.Responses[0].GetResponsePut()                return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)        }        return nil}func notFound(key string) clientv3.Cmp {        return clientv3.Compare(clientv3.ModRevision(key), "=", 0)}

4. 删除接口的实现

删除接口主要是通过CAS和事务机制来共同实现,确保在etcd不发生异常的情况,即使并发对同个资源来进行删除操作也能保证至少有一个节点成功

// 省略非核心代码func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {        startTime := time.Now()        // 获取当前的key的数据        getResp, err := s.client.KV.Get(ctx, key)        for {                // 获取当前的状态                origState, err := s.getState(getResp, key, v, false)                if err != nil {                        return err                }                txnResp, err := s.client.KV.Txn(ctx).If(                        clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 如果修改版本等于当前状态,就尝试删除                ).Then(                        clientv3.OpDelete(key), // 删除                ).Else(                        clientv3.OpGet(key),    // 获取                ).Commit()                if !txnResp.Succeeded {                        // 获取最新的数据重试事务操作                        getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())                        klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)                        continue                }                // 将最后一个版本的数据解码到out里面,然后返回                return decode(s.codec, s.versioner, origState.data, out, origState.rev)        }}

5. 更新接口的实现

更新接口实现上与删除接口并无本质上的差别,但是如果多个节点同时进行更新,CAS并发操作必然会有一个节点成功,当发现已经有节点操作成功,则当前节点其实并不需要再做过多的操作,直接返回即可

// 省略非核心代码func (s *store) GuaranteedUpdate(        ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,        preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {        // 获取当前key的最新数据        getCurrentState := func() (*objState, error) {                startTime := time.Now()                getResp, err := s.client.KV.Get(ctx, key, s.getOps...)                metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)                if err != nil {                        return nil, err                }                return s.getState(getResp, key, v, ignoreNotFound)        }        // 获取当前数据        var origState *objState        var mustCheckData bool        if len(suggestion) == 1 && suggestion[0] != nil {                // 如果提供了建议的数据,则会使用,                origState, err = s.getStateFromObject(suggestion[0])                if err != nil {                        return err                }                //但是需要检测数据                mustCheckData = true        } else {                // 尝试重新获取数据                origState, err = getCurrentState()                if err != nil {                        return err                }        }        transformContext := authenticatedDataString(key)        for {                // 检查对象是否已经更新, 主要是通过检测uuid/revision来实现                if err := preconditions.Check(key, origState.obj); err != nil {                        // If our data is already up to date, return the error                        if !mustCheckData {                                return err                        }                        // 如果检查数据一致性错误,则需要重新获取                        origState, err = getCurrentState()                        if err != nil {                                return err                        }                        mustCheckData = false                        // Retry                        continue                }                // 删除当前的版本数据revision                ret, ttl, err := s.updateState(origState, tryUpdate)                if err != nil {                        // If our data is already up to date, return the error                        if !mustCheckData {                                return err                        }                        // It's possible we were working with stale data                        // Actually fetch                        origState, err = getCurrentState()                        if err != nil {                                return err                        }                        mustCheckData = false                        // Retry                        continue                }                // 编码数据                data, err := runtime.Encode(s.codec, ret)                if err != nil {                        return err                }                if !origState.stale && bytes.Equal(data, origState.data) {                        // 如果我们发现我们当前的数据与获取到的数据一致,则会直接跳过                        if mustCheckData {                                origState, err = getCurrentState()                                if err != nil {                                        return err                                }                                mustCheckData = false                                if !bytes.Equal(data, origState.data) {                                        // original data changed, restart loop                                        continue                                }                        }                        if !origState.stale {                // 直接返回数据                                return decode(s.codec, s.versioner, origState.data, out, origState.rev)                        }                }                // 砖汉数据                newData, err := s.transformer.TransformToStorage(data, transformContext)                if err != nil {                        return storage.NewInternalError(err.Error())                }                opts, err := s.ttlOpts(ctx, int64(ttl))                if err != nil {                        return err                }                trace.Step("Transaction prepared")                startTime := time.Now()                // 事务更新数据                txnResp, err := s.client.KV.Txn(ctx).If(                        clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),                ).Then(                        clientv3.OpPut(key, string(newData), opts...),                ).Else(                        clientv3.OpGet(key),                ).Commit()                metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)                if err != nil {                        return err                }                trace.Step("Transaction committed")                if !txnResp.Succeeded {                        // 重新获取数据                        getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())                        klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)                        origState, err = s.getState(getResp, key, v, ignoreNotFound)                        if err != nil {                                return err                        }                        trace.Step("Retry value restored")                        mustCheckData = false                        continue                }                // 获取put响应                putResp := txnResp.Responses[0].GetResponsePut()                return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)        }}

到此,相信大家对"kubernetes中etcd增删改查的具体实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

数据 一致 接口 版本 资源 存储 更新 事务 对象 机制 节点 一致性 代码 时候 非核 检查 成功 编码 保证 加密 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 星空之上网络安全 述职报告软件开发规划 centos设置数据库远程 游戏服务器维护中英文怎么说 网络技术学院毕业能干什么 数据库超市管理系统概述 铜陵市网络安全应急支撑单位 数据库的创建与管理实验总结 软件开发加什么 刚上线3天全服务器爆满 华为视频服务器端口 工业互联网物流科技 灵沃软件开发是培训机构吗 mysql初始数据库是干什么的 mac 网页代理服务器 西安新茂融软件开发有限公司 了解我国网络安全的现状 学位论文数据库有哪些 开源分布式数据库哪个好 戴尔服务器启动不连接网络 竹山良好软件开发服务保障 数据库主要功能不包括 哪里有服务好的软件开发 软件开发一般用多大内存合适 软件开发培训专科学校 松滋市软件开发项目管理 广东众商互联网科技有限公司 什么叫网络安全保护义务 数据库中检查视图是什么 关于自制软件开发的书
0