kubernetes中python api的二次封装
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,k8s python api二次封装pip install pprint kubernetesimport urllib3from pprint import pprintfrom kuberne
千家信息网最后更新 2025年12月02日kubernetes中python api的二次封装
k8s python api二次封装
pip install pprint kubernetesimport urllib3from pprint import pprintfrom kubernetes import clientfrom os import pathimport yamlclass K8sApi(object): def __init__(self): # self.config = config.kube_config.load_kube_config() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.configuration = client.Configuration() self.configuration.host = "https://192.168.100.111:6443" self.configuration.api_key[ 'authorization'] = 'Bearer token' self.configuration.verify_ssl = False self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration)) self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration)) self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration)) #################################################################################################################### def list_deployment(self, namespace="default"): api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace) return api_response def read_deployment(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace) return api_response def create_deployment(self, file="deploy-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_deployment( body=dep, namespace="default") return resp def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace, body=dep) return resp def delete_deployment(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace) return api_response #################################################################################################################### def list_namespace(self): api_response = self.Api_Instance.list_namespace() return api_response def read_namespace(self, name="default"): api_response = self.Api_Instance.read_namespace(name) return api_response def create_namespace(self, file="pod-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespace(body=dep) return api_response def replace_namespace(self, file="pod-nginx.yaml", name="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespace(name, body=dep) return api_response def delete_namespace(self, name="default", namespace="default"): api_response = self.Api_Instance.delete_namespace(name) return api_response #################################################################################################################### def list_node(self): api_response = self.Api_Instance.list_node() data = {} for i in api_response.items: data[i.metadata.name] = {"name": i.metadata.name, "status": i.status.conditions[-1].type if i.status.conditions[ -1].status == "True" else "NotReady", "ip": i.status.addresses[0].address, "kubelet_version": i.status.node_info.kubelet_version, "os_image": i.status.node_info.os_image, } return data def list_pod(self): api_response = self.Api_Instance.list_pod_for_all_namespaces() data = {} for i in api_response.items: data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace} return data def read_pod(self, name="nginx-pod", namespace="default"): api_response = self.Api_Instance.read_namespaced_pod(name, namespace) return api_response def create_pod(self, file="pod-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep) return api_response def replace_pod(self, file="pod-nginx.yaml", name="nginx-pod", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep) return api_response def delete_pod(self, name="nginx-pod", namespace="default"): api_response = self.Api_Instance.delete_namespaced_pod(name, namespace) return api_response #################################################################################################################### def list_service(self): api_response = self.Api_Instance.list_service_for_all_namespaces() return api_response def read_service(self, name="", namespace="default"): api_response = self.Api_Instance.read_namespaced_service(name, namespace) return api_response def create_service(self, file="service-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep) return api_response def replace_service(self, file="pod-nginx.yaml", name="hequan", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep) return api_response def delete_service(self, name="hequan", namespace="default"): api_response = self.Api_Instance.delete_namespaced_service(name, namespace) return api_response #################################################################################################################### def list_ingress(self): api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces() return api_response def read_ingress(self, name="", namespace="default"): api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace) return api_response def create_ingress(self, file="ingress-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep) return api_response def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace, body=dep) return api_response def delete_ingress(self, name="hequan", namespace="default"): api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace) return api_response ##################################################################################################################### def list_stateful(self): api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces() return api_response def read_stateful(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace) return api_response def create_stateful(self, file="deploy-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_stateful_set( body=dep, namespace="default") return resp def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace, body=dep) return resp def delete_stateful(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace) return api_response ####################################################################################################################if __name__ == '__main__': def test(): obj = K8sApi() ret = obj.list_deployment() pprint(ret) test()
封装
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
三维管理软件开发
重庆邮电网络安全读研怎样
查询保存数据库文件sql
华为云服务器容量价格
软件开发规范文件
安徽新鱼网络技术有限公司
数据库root 管理
分布式数据库具有什么特征
上海客户管理软件开发
服务器代码管理软件
用友t3数据库服务怎么启动
图书馆数据库 百度百科
root服务器有哪些
张家口服务器机柜公司
中国互联网管理服务器
网络技术方面的杂志
上海对日软件开发好找吗
丽水工业软件开发教程
软件开发绩效是什么
网络安全设计规划文档
周村食品管理软件开发
网络安全 运营商
关于网络安全的画报
福建漳州嘉通网络技术
网络安全工作机构实施方案
弹性云服务器绑定域名
跨数据库查询工具
电子竞技软件开发
网络安全活动周活动方案
七日杀开服务器吃内存吗