python是怎么实现skywalking的trace模块过滤和报警
发表于:2025-11-09 作者:千家信息网编辑
千家信息网最后更新 2025年11月09日,python是怎么实现skywalking的trace模块过滤和报警,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。skywalkin
千家信息网最后更新 2025年11月09日python是怎么实现skywalking的trace模块过滤和报警
')def send_mail(receiver): """ 发送报错接口邮件 :return: """ server = "mail.test.com" sender = "sa@test.com" sender_pwd = "1qaz@WSX" send_addr = "sa@test.com" receiver = receiver with open("mail.html","r",encoding="utf-8") as f: content = f.read() if re.search("",content) == None: print("无报错接口!",content) return 0 print("邮件前",content) msg_mail = MIMEText(content,"html","utf-8") msg_mail["Subject"] = "Skywalking报错接口统计" msg_mail["From"] = sender msg_mail["To"] = receiver server_obj = smtplib.SMTP_SSL(server) server_obj.connect(server,465) server_obj.login(sender,sender_pwd) server_obj.sendmail(send_addr,receiver,msg_mail.as_string())if __name__ == "__main__": # 设定查询时间间隔,默认900s(15min) end_time = time.time() start_time = end_time - 900 start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time)) end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time)) print(start_time) print(end_time) sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服务的地址和端口 per_page_size = 5000 #指定一次获取endpoint接口的数目 trace_detail_addr = "127.0.0.1:5000" #指定查询指定trace_id详细日志 receiver = "shy@test.com" #报警邮件接收人地址 trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr) send_mail(receiver) # interface_filter() # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")
python是怎么实现skywalking的trace模块过滤和报警,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:
sw-trace.py
#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timeimport smtplibfrom email.mime.text import MIMETextimport redef interface_content_filter(trace_id): ''' 对详细日志内容(业务逻辑报错)进行过滤 :param trace_id: :return: 【1|0】 ''' url = "http://172.16.53.232:50001/query" params = { "trace_id": trace_id } detail_trace_id_log = requests.request(method="GET",url=url,params=params) detail_trace_id_log = detail_trace_id_log.text print(detail_trace_id_log) print(type(detail_trace_id_log)) with open("blackname_keyword_list","r",encoding="utf-8") as f: for line in f: print(line) result = re.search(line.strip(),detail_trace_id_log) print(result) if result != None: print("哥们匹配到日志黑名单关键字了:%s" % line) return 0 print("提示:%s不在关键字黑名单中" % trace_id) return 1def interface_filter(endpointName): """ 设置接口黑名单 :param endpointName: :return: 【1|0】 """ endpointName = re.sub("\(|\)",".",endpointName) with open("blackname_list","r",encoding="utf-8") as f: bn_list = f.read() match_result = re.search(endpointName.strip(),bn_list) if match_result == None: print("提示:接口不存在黑名单中") return 1 print("提示:接口在黑名单中") return 0def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr): """ skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次 :param start_time: :param end_time: :return: """ url = sw_url data = { "query": "query queryTraces($condition: TraceQueryCondition) {\n data: queryBasicTraces(condition: $condition) {\n traces {\n key: segmentId\n endpointNames\n duration\n start\n isError\n traceIds\n }\n total\n }}", "variables": { "condition": { "queryDuration": { "start": start_time, #"2021-12-07 1734" "end": end_time, "step": "MINUTE" }, "traceState": "ERROR", "paging": { "pageNum": 1, "pageSize": per_page_size, "needTotal": "true" }, "queryOrder": "BY_START_TIME" # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105" } } } result = requests.request(method="post",url=url,json=data) i = 0 # print(result.content) # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))) with open("mail.html","w",encoding="utf-8") as f: f.write('Title 最近15分钟统计:| 持续时长 | 接口名称 | 追踪ID | |
|---|---|---|---|
| %s | %s | %s | %s |
sw-trace-id.py
#!/usr/bin/env python# _*_ coding: utf-8 _*_# Tile:# Author:shyimport requestsimport timefrom flask import Flask,requestapp = Flask(__name__)@app.route("/query",methods=["get"])def trace_id_query(): """ 查询指定trace_id详细日志信息 :return: f.read() """ trace_id = request.args.get("trace_id") url="http://172.16.53.232:9412/graphql" # url="http://skywalking.roulw.com/graphql" data = { "query": "query queryTrace($traceId: ID!) {\n trace: queryTrace(traceId: $traceId) {\n spans {\n traceId\n segmentId\n spanId\n parentSpanId\n refs {\n traceId\n parentSegmentId\n parentSpanId\n type\n }\n serviceCode\n serviceInstanceName\n startTime\n endTime\n endpointName\n type\n peer\n component\n isError\n layer\n tags {\n key\n value\n }\n logs {\n time\n data {\n key\n value\n }\n }\n }\n }\n }", "variables": { "traceId": trace_id } } result = requests.request(method="post",url=url,json=data) with open("detail_log", "w", encoding="utf-8") as f: f.write("生产Skywalking报错接口跟踪日志日志:
") for trace_id in result.json()["data"]["trace"]["spans"]: if trace_id["isError"]: # print(trace_id) print("服务名称:%s\n" % trace_id["serviceCode"], "开始时间:%s\n" % trace_id["startTime"], "接口名称:%s\n" % trace_id["endpointName"], "peer名称:%s\n" % trace_id["peer"], "tags名称:%s\n" % trace_id["tags"], "详细日志:%s" % trace_id["logs"]) content = "服务名称:%s
开始时间:%s
接口名称:%s
peer名称:%s
tags名称:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"]) with open("detail_log","a",encoding="utf-8") as f: f.write(content) f.write("
********详细日志**********
") for logs in trace_id["logs"]: for log in logs["data"]: if log["key"] == "message": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) # return log["value"] elif log["key"] == "stack": print(log["value"]) with open("detail_log","a",encoding="utf-8") as f: f.write(log["value"]) with open("detail_log", "a", encoding="utf-8") as f: f.write("
========下一个接口信息=========
") with open("detail_log","r",encoding="utf-8") as f: return f.read()if __name__ == "__main__": # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785" # trace_id_query(trace_id) app.run()关于python是怎么实现skywalking的trace模块过滤和报警问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
接口
utf-8
名称
黑名单
黑名
日志
报警
时间
关键
关键字
功能
内容
哥们
邮件
问题
提示
服务
查询
模块
信息
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中央企业网络安全现状
黎川软件开发项目管理
当前主流软件开发工具及优缺点
合肥老表网络技术
网络安全工作基本情况
迅雷网络技术深圳招聘
亳州app软件开发费用
数据库保护的内容有哪些问题
软件开发答辩问题及回答
网络安全马甲
边缘智能服务器
萤石云时间服务器
在线网络技术(北京)有限公司
服务器安全远程桌面
襄阳巅峰金耀网络技术有限公司
江苏网络服务器机柜按需定制
网络技术分支介绍
使命召唤不同服务器可以一起玩吗
日本2013年网络安全战略
北京铁路网络安全知识答题答案
渝北区提供网络技术服务活动
电脑邮箱连接服务器失败
中国ai服务器总数
网络安全马甲
闪电公司网络安全
百变网络技术服务部
数据库误删了 怎么解决
分盘直销软件开发
蜀山剑侠传 服务器
画面云管理服务器作用