千家信息网

如何理解kubernetes数据卷管理的源码

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇文章给大家分享的是有关如何理解kubernetes数据卷管理的源码,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概述volume是k
千家信息网最后更新 2025年12月02日如何理解kubernetes数据卷管理的源码

本篇文章给大家分享的是有关如何理解kubernetes数据卷管理的源码,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

概述

volume是k8s中很重要的一个环节,主要用来存储k8s中pod生产的一些系统或者业务数据。k8s在kubelet中提供了volume管理的逻辑

源码分析

首先是kubelet启动方法

func main() {       s := options.NewKubeletServer()       s.AddFlags(pflag.CommandLine)       flag.InitFlags()       logs.InitLogs()       defer logs.FlushLogs()       verflag.PrintAndExitIfRequested()       if err := app.Run(s, nil); err != nil {              fmt.Fprintf(os.Stderr, "error: %v\n", err)              os.Exit(1)       }}

很容易发现run方法中包含了kubelet所有重要信息

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {                        //配置验证            ...       if kubeDeps == nil {              ...              kubeDeps, err = UnsecuredKubeletDeps(s)              ...       }       //初始化cAdvisor以及containerManager等管理器       ...       if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {              return err       }       ...}

里面有两个与volume管理相关的重要方法

  • UnsecuredKubeletDeps:会初始化docker client、网络管理插件、数据管理插件等系统核心组件,因为不方便对外部开放,所以命名为unsecure。其中我们需要关注的是它对volume plugin的初始化操作

     func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {            ...                return &kubelet.KubeletDeps{                        Auth:               nil,                         CAdvisorInterface:  nil,                         Cloud:              nil,                         ContainerManager:   nil,                        DockerClient:       dockerClient,                        KubeClient:         nil,                        ExternalKubeClient: nil,                        Mounter:            mounter,                        NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),                        OOMAdjuster:        oom.NewOOMAdjuster(),                        OSInterface:        kubecontainer.RealOS{},                        Writer:             writer,                        VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir),                        TLSOptions:         tlsOptions,                }, nil        }


    在初始化volume plugin的时候会传递VolumePluginDir作为自定义plugin的路径,默认路径为**/usr/libexec/kubernetes/kubelet-plugins/volume/exec/**

           func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {                allPlugins := []volume.VolumePlugin{}                allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...)                allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)                allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...)                allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)                return allPlugins        }


    可以观察到众多插件中,有一个名为flexvolume,只有这个插件带有参数pluginDir,说明只有这个插件支持自定义实现。具体kubelet怎么和这些插件交互,以及这些插件提供哪些接口,还需要继续阅读代码

  • RunKubelet:这才是kubelet服务的启动方法,其中最重要的功能都藏在startKubelet中

        func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {                //初始化启动器                ...                if runOnce {                        if _, err := k.RunOnce(podCfg.Updates()); err != nil {                                return fmt.Errorf("runonce failed: %v", err)                        }                        glog.Infof("Started kubelet %s as runonce", version.Get().String())                } else {                        startKubelet(k, podCfg, kubeCfg, kubeDeps)                        glog.Infof("Started kubelet %s", version.Get().String())                }                return nil        }


    startKubelet包含两个环节

           func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {                // 同步pod信息                go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)                // 启动kubelet服务                if kubeCfg.EnableServer {                        go wait.Until(func() {                                k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)                        }, 0, wait.NeverStop)                }                if kubeCfg.ReadOnlyPort > 0 {                        go wait.Until(func() {                                k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))                        }, 0, wait.NeverStop)                }        }


    跟踪同步pod信息的Run方法,会追查到这段代码

     func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {            ...                go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)                if kl.kubeClient != nil {                        //同步node信息                        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)                }                // 同步pod信息                kl.pleg.Start()                kl.syncLoop(updates, kl)        }


    kl.volumeManager是kubelet进行数据卷管理的核心接口

        type VolumeManager interface {                Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})                WaitForAttachAndMount(pod *v1.Pod) error                GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap                GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64                GetVolumesInUse() []v1.UniqueVolumeName                ReconcilerStatesHasBeenSynced() bool                VolumeIsAttached(volumeName v1.UniqueVolumeName) bool                MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)        }


    VolumeManager的Run会执行一个异步循环,当pod被调度到该node,它会检查该pod所申请的所有volume,根据这些volume与pod的关系做attach/detach/mount/unmount操作

        func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {                defer runtime.HandleCrash()                go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)                glog.V(2).Infof("The desired_state_of_world populator starts")                glog.Infof("Starting Kubelet Volume Manager")                go vm.reconciler.Run(stopCh)                <-stopCh                glog.Infof("Shutting down Kubelet Volume Manager")        }


    其中重点关注的地方是vm.desiredStateOfWorldPopulator.Runvm.reconciler.Run这两个方法。在介绍这两个方法之前,需要补充一个关键信息,这也是理解这两个方法的关键信息。

    kubelet管理volume的方式基于两个不同的状态:

    理解了这两个状态,就能大概知道vm.desiredStateOfWorldPopulator.Run这个方法是干什么的呢。很明显,它就是根据从apiserver同步到的pod信息,来更新DesiredStateOfWorld。另外一个方法vm.reconciler.Run,是预期状态和实际状态的协调者,它负责将实际状态调整成与预期状态。预期状态的更新实现,以及协调者具体如何协调,需要继续阅读代码才能理解

    追踪vm.desiredStateOfWorldPopulator.Run,我们发现这段逻辑

           func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {                for _, pod := range dswp.podManager.GetPods() {                        if dswp.isPodTerminated(pod) {                                continue                        }                        dswp.processPodVolumes(pod)                }        }


    kubelet会同步新增的pod到desiredStateOfWorldPopulator的podManager中。这段代码就是轮询其中非结束状态的pod,并交给desiredStateOfWorldPopulator处理

      func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {                ...                for _, podVolume := range pod.Spec.Volumes {                        volumeSpec, volumeGidValue, err :=                                dswp.createVolumeSpec(podVolume, pod.Namespace)                        if err != nil {                                glog.Errorf(                                        "Error processing volume %q for pod %q: %v",                                        podVolume.Name,                                        format.Pod(pod),                                        err)                                continue                        }                        _, err = dswp.desiredStateOfWorld.AddPodToVolume(                                uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)                        if err != nil {                                glog.Errorf(                                        "Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v",                                        podVolume.Name,                                        volumeSpec.Name(),                                        uniquePodName,                                        err)                        }                        glog.V(10).Infof(                                "Added volume %q (volSpec=%q) for pod %q to desired state.",                                podVolume.Name,                                volumeSpec.Name(),                                uniquePodName)                }                dswp.markPodProcessed(uniquePodName)        }


    desiredStateOfWorldPopulator并不处理很重的逻辑,只是作为一个代理,将控制某个pod预期状态的逻辑交付给desiredStateOfWorld,并标记为已处理

         func (dsw *desiredStateOfWorld) AddPodToVolume(                podName types.UniquePodName,                pod *v1.Pod,                volumeSpec *volume.Spec,                outerVolumeSpecName string,                volumeGidValue string) (v1.UniqueVolumeName, error) {                ...                dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{                        podName:             podName,                        pod:                 pod,                        spec:                volumeSpec,                        outerVolumeSpecName: outerVolumeSpecName,                }                return volumeName, nil        }


    这段逻辑中,我们忽略了前面一系列预处理操作,直接关注最核心的地方:确定预期状态的方式就是,用一个映射表结构,绑定volume到pod之间的关系,这个关系表就是绑定关系的参考依据

    看完了desiredStateOfWorldPopulator的处理逻辑,接着进入另一个核心接口reconciler。它才是volume manager中最重要的控制器

    追踪reconciler的Run方法,我们定位到最核心的一段代码

           func (rc *reconciler) reconcile() {                //umount                for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {                        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {                                ...                                err := rc.operationExecutor.UnmountVolume(                                        mountedVolume.MountedVolume, rc.actualStateOfWorld)                                ...                        }                }                // attach/mount                for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {                        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)                        volumeToMount.DevicePath = devicePath                        if cache.IsVolumeNotAttachedError(err) {                                ...                                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)                                ...                        } else if !volMounted || cache.IsRemountRequiredError(err) {                                ...                                err := rc.operationExecutor.MountVolume(                                        rc.waitForAttachTimeout,                                        volumeToMount.VolumeToMount,                                        rc.actualStateOfWorld)                                ...                        }                }                //detach/unmount                for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {                        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&                                !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {                                if attachedVolume.GloballyMounted {                                        ...                                        err := rc.operationExecutor.UnmountDevice(                                                attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)                                        ...                                } else {                                        ...                                        err := rc.operationExecutor.DetachVolume(                                                        attachedVolume.AttachedVolume, false,rc.actualStateOfWorld)                                        ...                                }                        }                }        }


    我略去了多余的代码,保留最核心的部分。这段控制逻辑就是一个协调器,具体要做的事情就是,根据实际状态与预期状态的差异,做协调操作

    如果采用自定义的flexvolume插件,上述这些方法会对插件中实现的方法进行系统调用

    flex volume提供的lvm插件。如果需要支持mount和unmount操作,可以在这个脚本中补充

         #!/bin/bash        # Copyright 2015 The Kubernetes Authors.        #        # Licensed under the Apache License, Version 2.0 (the "License");        # you may not use this file except in compliance with the License.        # You may obtain a copy of the License at        #        #     http://www.apache.org/licenses/LICENSE-2.0        #        # Unless required by applicable law or agreed to in writing, software        # distributed under the License is distributed on an "AS IS" BASIS,        # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.        # See the License for the specific language governing permissions and        # limitations under the License.        # Notes:        #  - Please install "jq" package before using this driver.        usage() {                err "Invalid usage. Usage: "                err "\t$0 init"                err "\t$0 attach  "                err "\t$0 detach  "                err "\t$0 waitforattach  "                err "\t$0 mountdevice   "                err "\t$0 unmountdevice "                err "\t$0 isattached  "                exit 1        }        err() {                echo -ne $* 1>&2        }        log() {                echo -ne $* >&1        }        ismounted() {                MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`                if [ "${MOUNT}" == "${MNTPATH}" ]; then                        echo "1"                else                        echo "0"                fi        }        getdevice() {                VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID')                VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup')                # LVM substitutes - with --                VOLUMEID=`echo $VOLUMEID|sed s/-/--/g`                VG=`echo $VG|sed s/-/--/g`                DMDEV="/dev/mapper/${VG}-${VOLUMEID}"                echo ${DMDEV}        }        attach() {                JSON_PARAMS=$1                SIZE=$(echo $1 | jq -r '.size')                DMDEV=$(getdevice)                if [ ! -b "${DMDEV}" ]; then                        err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}"                        exit 1                fi                log "{\"status\": \"Success\", \"device\":\"${DMDEV}\"}"                exit 0        }        detach() {                log "{\"status\": \"Success\"}"                exit 0        }        waitforattach() {                shift                attach $*        }        domountdevice() {                MNTPATH=$1                DMDEV=$2                FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]')                if [ ! -b "${DMDEV}" ]; then                        err "{\"status\": \"Failure\", \"message\": \"${DMDEV} does not exist\"}"                        exit 1                fi                if [ $(ismounted) -eq 1 ] ; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                VOLFSTYPE=`blkid -o udev ${DMDEV} 2>/dev/null|grep "ID_FS_TYPE"|cut -d"=" -f2`                if [ "${VOLFSTYPE}" == "" ]; then                        mkfs -t ${FSTYPE} ${DMDEV} >/dev/null 2>&1                        if [ $? -ne 0 ]; then                                err "{ \"status\": \"Failure\", \"message\": \"Failed to create fs ${FSTYPE} on device ${DMDEV}\"}"                                exit 1                        fi                fi                mkdir -p ${MNTPATH} &> /dev/null                mount ${DMDEV} ${MNTPATH} &> /dev/null                if [ $? -ne 0 ]; then                        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount device ${DMDEV} at ${MNTPATH}\"}"                        exit 1                fi                log "{\"status\": \"Success\"}"                exit 0        }        unmountdevice() {                MNTPATH=$1                if [ ! -d ${MNTPATH} ]; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                if [ $(ismounted) -eq 0 ] ; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                umount ${MNTPATH} &> /dev/null                if [ $? -ne 0 ]; then                        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"                        exit 1                fi                log "{\"status\": \"Success\"}"                exit 0        }        isattached() {                log "{\"status\": \"Success\", \"attached\":true}"                exit 0        }        op=$1        if [ "$op" = "init" ]; then                log "{\"status\": \"Success\"}"                exit 0        fi        if [ $# -lt 2 ]; then                usage        fi        shift        case "$op" in                attach)                        attach $*                        ;;                detach)                        detach $*                        ;;                waitforattach)                        waitforattach $*                        ;;                mountdevice)                        domountdevice $*                        ;;                unmountdevice)                        unmountdevice $*                        ;;                isattached)                        isattached $*                        ;;                *)                        log "{ \"status\": \"Not supported\" }"                        exit 0        esac        exit 1


    值得注意的是,为什么会有两次mount操作,一次mountdevice,一次mount。分别是做什么的?

    其实k8s提供的volume管理方式是,一个volume可以被多个pod挂载,如果某个device需要作为多个pod的volume,就需要多次挂载。但是device只能被挂载一次。所以,k8s采用的方式是,先用mountdevice将device挂载到一个全局目录,然后这个全局目录就可以被多次挂载到pod的卷目录。如此一来,就能完成多pod挂载同一个volume

    • AttachVolume:调用attach

    • DetachVolume:调用detach

    • MountVolume:调用mountdevice,mount

    • UnmountVolume:调用unmount

    • UnmountDevice:调用umountdevice

    • volume和pod的预期状态不存在绑定关系,则detach volume,并对pod和volume执行unmount操作

    • volume和pod的预期状态存在绑定关系,则attach volume,并对pod和volume执行mount操作

    • DesiredStateOfWorld:预期中,pod对volume的使用情况,简称预期状态。当pod.yaml定制好volume,并提交成功,预期状态就已经确定

    • ActualStateOfWorld:实际中,pod对voluem的使用情况,简称实际状态。实际状态是kubelet的后台线程监控的结果

    • 不断同步apiserver的pod信息,根据新增、删除的pod对volume状态进行同步更新

    • 启动服务,监听controller manager的请求。其中controller manager可以辅助kubelet管理volume,用户也可以选择禁用controller manager的管理

只有理解了volume manager的代码,在使用它提供的volume plugin或者实现自定义flex volume plugin时才能驾轻就熟。以上代码,都是基于k8s v1.6.6版本

以上就是如何理解kubernetes数据卷管理的源码,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0