怎么用python+Element实现主机Host操作
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Eleme
千家信息网最后更新 2025年12月01日怎么用python+Element实现主机Host操作
这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Element实现主机Host操作"吧!
1.前端HTML和CSS
<%inherit file="/base.html"/>{{ scope.row.is_monitored ? "移除监控":"加入监控" }} 查看性能 查看状态
2.前端JS
3.Django代码
urls.py文件内容
from django.conf.urls import patternsfrom home_application.temp import views as temp_viewfrom home_application.job import views as job_viewfrom home_application.host import views as host_viewfrom home_application.exam import views as exam_viewurlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^dev-guide/$', 'dev_guide'), (r'^contactus/$', 'contactus'), (r'^api/test/$', 'test'), (r'^temp/$', 'temp'), (r'^host/$', host_view.host), (r'^status/$', host_view.status), (r'^host_view/$', host_view.HostView.as_view()), (r'^get_all_hosts/$', host_view.get_all_hosts), (r'^get_perform_data/$', host_view.get_perform_data), ...)
host\views.py文件内容
import jsonfrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom django.utils.decorators import method_decoratorfrom django.http import JsonResponsefrom common.mymako import render_mako_contextfrom home_application.models import Host, LoadDatafrom home_application.utils.job_api import FastJobApiperform_script = """#!/bin/bashMEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}')DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}')CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}')DATE=$(date "+%Y-%m-%d %H:%M:%S")echo -e "$DATE|$MEMORY|$DISK|$CPU""""def host(request): return render_mako_context(request, '/home_application/host.html')def status(request): return render_mako_context(request, "/home_application/status.html")class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)def get_all_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host().get("info") for host in res_data: bk_host_innerip = host.get("host", {}).get("bk_host_innerip") bk_host_id = host.get("host", {}).get("bk_host_id") bk_host_name = host.get("host", {}).get("bk_host_name") bk_os_name = host.get("host", {}).get("bk_os_name") bk_biz_id = host.get("biz", [])[0].get("bk_biz_id") bk_biz_name = host.get("biz", [])[0].get("bk_biz_name") cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name") cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id") host_obj = { "ip": bk_host_innerip, "bk_biz_id": bk_biz_id, "os_name": bk_os_name, "host_name": bk_host_name, "bk_biz_name": bk_biz_name, "cloud_id": cloud_id, "cloud_name": cloud_name, } Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj) return JsonResponse({"result": res_data})@csrf_exemptdef get_perform_data(request): data = json.loads(request.body) host_id = data.get("host_id") try: obj = LoadData.objects.filter(host_id=host_id).last() if obj: res_data = { "cpu": obj.cpu, "mem": obj.mem, "disk": obj.disk, } return JsonResponse({"result": True, "data": res_data}) bk_biz_id = int(data.get("bk_biz_id")) ip = data.get("ip") cloud_id = data.get("cloud_id") # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script) obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script) job_res = obj.execute_script_and_return() if job_res: log_content = job_res[0].get("log_content") res_data = { "result": True, "cpu": log_content.split("|")[3], "mem": log_content.split("|")[1], "disk": log_content.split("|")[2], } return JsonResponse({"result": True, "data": res_data}) return JsonResponse({"result": False}) except Exception: return JsonResponse({"result": False})class HostView(CsrfExemptView): def get(self, request, *args, **kwargs): try: host_query = Host.objects.all() except Exception: return JsonResponse({"result": False}) search_biz_id = request.GET.get("search_biz_id") query_str = request.GET.get("query_str") if search_biz_id: host_query = host_query.filter(bk_biz_id=search_biz_id) if query_str: host_query = host_query.filter(ip__in=query_str.split(",")) host_query = host_query[:30] if host_query.count() > 10 else host_query res_data = [i.to_dict() for i in host_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): try: data = json.loads(request.body) host_id = data.get("host_id") is_monitored = data.get("is_monitored") if is_monitored: Host.objects.filter(host_id=host_id).update(is_monitored=False) else: Host.objects.filter(host_id=host_id).update(is_monitored=True) return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})models.py文件内容
from django.db import modelsfrom home_application.utils.parse_time import parse_datetime_to_timestrclass Host(models.Model): host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True) ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True) bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True) bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True) os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True) host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True) cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True) cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True) is_monitored = models.BooleanField(u"是否已监控", default=False) def to_dict(self): return { "host_id": self.host_id, "ip": self.ip, "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "os_name": self.os_name, "host_name": self.host_name, "cloud_name": self.cloud_name, "cloud_id": self.cloud_id, "is_monitored": self.is_monitored, "mem_use": "-", "cpu_use": "-", "disk_use": "-", } def get_load_data(self): load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time") return [i.to_dict() for i in load_query]class LoadData(models.Model): host_id = models.IntegerField(u"主机ID", default=0) cpu = models.IntegerField(u"CPU使用率", default=0) mem = models.IntegerField(u"内存使用率", default=0) disk = models.IntegerField(u"硬盘使用率", default=0) create_time = models.DateTimeField(u"创建时间", auto_now_add=True) def to_dict(self): return { "host_id": self.host_id, "cpu": self.cpu, "mem": self.mem, "disk": self.disk, "create_time": parse_datetime_to_timestr(self.create_time) }FastJobApi部分代码
import base64import timefrom conf.default import APP_ID, APP_TOKENfrom blueking.component.shortcuts import get_client_by_user, get_client_by_requestcount = 0class FastJobApi(object): def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content): self.client = get_client_by_user('admin') self.username = "admin" self.biz_id = bk_biz_id self.script_content = script_content self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")] def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000): """ 快速执行脚本 :param execute_account: 执行脚本的账户名 :param param_content: 执行脚本的参数 :param script_timeout: 脚本执行超时时间 :return: job_instance_id """ kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_biz_id": self.biz_id, "bk_username": self.username, "script_content": base64.b64encode(self.script_content), "ip_list": self.ip_list, "script_type": 1, "account": execute_account, "script_param": base64.b64encode(param_content), "script_timeout": script_timeout } result = self.client.job.fast_execute_script(kwargs) if result["result"]: return result.get("data").get("job_instance_id") return False def _get_job_instance_status(self, task_id): """ 获取脚本执行状态 :param task_id: 执行脚本的 job_instance_id :return: """ global count count += 1 # 查询执行状态 resp = self.client.job.get_job_instance_status( bk_username=self.username, bk_biz_id=self.biz_id, job_instance_id=task_id ) if resp.get('data').get('is_finished'): count = 0 return True elif not resp.get('data').get('is_finished') and count <= 5: time.sleep(2) return self._get_job_instance_status(task_id) else: count = 0 return False def _get_job_instance_log(self, task_id): """ 查询作业日志 :param task_id: 执行脚本的 job_instance_id :return: """ if self._get_job_instance_status(task_id): resp = self.client.job.get_job_instance_log( job_instance_id=task_id, bk_biz_id=self.biz_id, bk_username='admin' ) return resp['data'][0]['step_results'][0]['ip_logs'] def execute_script_and_return(self): """ 执行脚本并获取脚本执行结果 :return: 脚本执行结果 """ job_instance_id = self._fast_execute_script() if job_instance_id: ip_logs = self._get_job_instance_log(job_instance_id) return ip_logs实现效果
感谢各位的阅读,以上就是"怎么用python+Element实现主机Host操作"的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
主机
脚本
监控
内容
查询
业务
状态
队列
使用率
情况
文件
系统
学习
成功
代码
前端
区域
名称
时间
模板
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
三级网络技术视频教材
尤唯网络技术
数据库最多能存多少列
软件开发项目及价格
学校秋天网络安全会议
抖音小程序用什么软件开发
开源软件开发多少钱
数据库中字段的显示标题
机关网络安全管理责任制度
数据库mysql期末考试题
社交 数据库
中国统计年检数据库
大多数中小软件开发项目
数据库插入数据和字符串
大学有什么专业学软件开发
南宁软件开发工资待遇
tp的数据库日志在哪里看
软件开发维护叫什么部门
腾讯云 社交网络安全
创新创业软件开发问题
安徽专业软件开发如何收费
摩拜单车连不上服务器
芜湖宦勇网络技术有限公司
网络安全论文格式模板
上海一站式网络技术
将列表中的中文存入sql数据库
兴乐网络技术有限公司
企业网络安全经费
维护移动网络安全
每台计算机都有数据库吗