mysql慢查询和错误日志分析
发表于:2025-11-19 作者:千家信息网编辑
千家信息网最后更新 2025年11月19日,mysql慢查询和错误日志分析和告警查看比较麻烦,目前的慢查询告警都是仅仅反应慢查询数量的。我们做了一个慢查询日志告警和分析的程序后台使用filebeat日志文件托运工具,将日志传输到redis数据库
千家信息网最后更新 2025年11月19日mysql慢查询和错误日志分析mysql慢查询和错误日志分析和告警查看比较麻烦,目前的慢查询告警都是仅仅反应慢查询数量的。
我们做了一个慢查询日志告警和分析的程序
后台使用filebeat日志文件托运工具,将日志传输到redis数据库。filebeat默认使用es。定时器1分钟执行一次。
vi /etc/filebeat/filebeat.yml
filebeat.prospectors: paths: - /data/mysql/xxx/tmp/slow.log document_type: syslog fields: app: mysql_slowlog port: xxx ip: xxxx scan_frequency: 30s tail_files: true multiline.pattern: '^\#\ Time' multiline.negate: true multiline.match: after app: mysql_slowlog output.redis: enabled: true hosts: ["IP:port"] port: 2402 key: filebeat keys: - key: "%{[fields.app]}" mapping: "mysql_slowlog": "mysql_slowlog" "mysql_errorlog": "mysql_errorlog" db: 0 datatype: list logging.to_files: true
在监控端读取redis数据,并通过正则处理到mysql数据库。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading # redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog' # mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
data_list = []
logList = []
keyState = redisConn.exists(redisKey)
if keyState:
logLen = redisConn.llen(redisKey)
if logLen > 0:
redisKeyNew = redisKey + '-bak'
redisConn.renamenx(redisKey, redisKeyNew)
logList = redisConn.lrange(redisKeyNew,0,logLen)
redisConn.delete(redisKeyNew)
else:
pass
else:
pass if len(logList) > 0:
for item in logList:
data_dict = {}
slowLogJson = json.loads(item)
#print(slowLogJson['message'])
data_dict['hostname'] = slowLogJson['beat']['hostname']
#print(slowLogJson['beat']['hostname'])
data_dict['ip'] = slowLogJson['fields']['ip']
#print(slowLogJson['fields']['ip'])
data_dict['port'] = slowLogJson['fields']['port']
#print(slowLogJson['fields']['port'])
logContent = slowLogJson['message'] #Regex
timeRe = r'# Time: (.*)\n# User@Host:'
userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
hostRe = r'# User@Host: .*\[(.*?)\] Id:'
schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
bytesRe = r'# Bytes_sent:\s+(\d+)'
timestampRe = r'SET\s+timestamp=(.*?);'
commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
if re.findall(timeRe, logContent):
data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
data_dict['sys_time'] = data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
data_dict['cli_user'] = re.findall(userRe, logContent)[0]
data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
data_dict['schema'] = re.findall(schemaRe,logContent)[0]
data_dict['query_time'] = re.findall(queryRe,logContent)[0]
data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
data_list.append(data_dict)
else:
pass
#print('Not slowlog data')
else:
pass
#print('No data')
return data_list def send_data(data,mysql_pool):
mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
mysqlTable = mysqlTablePrefix + mysqlTableDate
cursor = mysql_pool.cursor()
data_list = []
createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
"hostname varchar(64) NOT NULL," \
"ip varchar(20) NOT NULL," \
"port int(11) NOT NULL," \
"sys_time datetime NOT NULL," \
"cli_user varchar(32) NOT NULL," \
"cli_ip varchar(32) NOT NULL," \
"`schema` varchar(32) NOT NULL," \
"query_time float(6,3) NOT NULL," \
"lock_time float(6,3) NOT NULL," \
"rows_sent int(11) NOT NULL," \
"bytes_sent int(11) NOT NULL," \
"`timestamp` varchar(40) NOT NULL," \
"command varchar(2048) DEFAULT NULL," \
"PRIMARY KEY (`id`)," \
"KEY `idx_slowlog_000000_user` (`cli_user`)," \
"KEY `idx_slowlog_000000_query_time` (`query_time`)," \
"KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
createTableSql = createTableSql.replace('000000',mysqlTableDate) # Create slow log table if not exist
try:
cursor.execute("show tables like '%s'" % mysqlTable)
res = cursor.fetchone()
if not res:
cursor.execute(createTableSql)
mysql_pool.commit()
except Exception as e:
print(time_log() +'Error:', e)
mysql_pool.rollback()
mysql_pool.close()
slowLogInsertSql ="insert into %s" % mysqlTable + "(hostname," \
"ip," \
"port," \
"sys_time," \
"cli_user," \
"cli_ip," \
"`schema`," \
"query_time," \
"lock_time," \
"rows_sent," \
"bytes_sent," \
"`timestamp`," \
"command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
if len(data) > 0:
for item in data:
row = (item['hostname'].encode('utf-8'),
item['ip'].encode('utf-8'),
item['port'],
item['sys_time'].encode('utf-8'),
item['cli_user'].encode('utf-8'),
item['cli_ip'].encode('utf-8'),
item['schema'].encode('utf-8'),
item['query_time'].encode('utf-8'),
item['lock_time'].encode('utf-8'),
item['rows_sent'].encode('utf-8'),
item['bytes_sent'].encode('utf-8'),
item['timestamp'].encode('utf-8'),
pymysql.escape_string(item['command']).encode('utf-8'))
data_list.append(row)
print(len(data_list)) # Insert slow log data
try:
cursor.executemany(slowLogInsertSql , data_list)
mysql_pool.commit()
mysql_pool.close()
except Exception as e:
print(time_log() +'Error:',e)
mysql_pool.rollback()
mysql_pool.close()
else:
print(time_log() + 'No data')
def main():
try:
redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
redisConn= redis.Redis(connection_pool=redis_pool)
except:
print(time_log() + 'Error! Can not connect to redis!') try:
mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
except:
print(time_log() + 'Error! Can not connect to mysql!')
print(time_log())
data = gather_log(redisConn)
send_data(data,mysql_pool)
print(time_log()) # time scheduler
timeSchedule = collectStep
global timer
timer = threading.Timer(timeSchedule, main)
timer.start() if __name__ == '__main__':
timer = threading.Timer(1, main)
timer.start()
前端使用django展示慢查询数据,同时每周通过将响应的业务慢查询数据发送给开发人员。
mysql错误日志也是同样进行处理。
我们做了一个慢查询日志告警和分析的程序
后台使用filebeat日志文件托运工具,将日志传输到redis数据库。filebeat默认使用es。定时器1分钟执行一次。
vi /etc/filebeat/filebeat.yml
filebeat.prospectors: paths: - /data/mysql/xxx/tmp/slow.log document_type: syslog fields: app: mysql_slowlog port: xxx ip: xxxx scan_frequency: 30s tail_files: true multiline.pattern: '^\#\ Time' multiline.negate: true multiline.match: after app: mysql_slowlog output.redis: enabled: true hosts: ["IP:port"] port: 2402 key: filebeat keys: - key: "%{[fields.app]}" mapping: "mysql_slowlog": "mysql_slowlog" "mysql_errorlog": "mysql_errorlog" db: 0 datatype: list logging.to_files: true
在监控端读取redis数据,并通过正则处理到mysql数据库。
vi /data/mysql_slowLog.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import json
import pymysql
import re
import time
import threading # redis connect info
redisHost = 'xxx'
redisPort = 2402
redisDB = '0'
redisKey = 'mysql_slowlog' # mysql connect info
mysqlHost = 'xxx'
mysqlPort = 2001
# mysqlPort = 23306
mysqlUser = ''
mysqlPasswd = ''
# mysqlPasswd = 'open'
mysqlDB = ''
mysqlTablePrefix = 'mysql_slowlog_'
collectStep = 60
def time_log():
return '[' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ']'
def gather_log(redisConn):
data_list = []
logList = []
keyState = redisConn.exists(redisKey)
if keyState:
logLen = redisConn.llen(redisKey)
if logLen > 0:
redisKeyNew = redisKey + '-bak'
redisConn.renamenx(redisKey, redisKeyNew)
logList = redisConn.lrange(redisKeyNew,0,logLen)
redisConn.delete(redisKeyNew)
else:
pass
else:
pass if len(logList) > 0:
for item in logList:
data_dict = {}
slowLogJson = json.loads(item)
#print(slowLogJson['message'])
data_dict['hostname'] = slowLogJson['beat']['hostname']
#print(slowLogJson['beat']['hostname'])
data_dict['ip'] = slowLogJson['fields']['ip']
#print(slowLogJson['fields']['ip'])
data_dict['port'] = slowLogJson['fields']['port']
#print(slowLogJson['fields']['port'])
logContent = slowLogJson['message'] #Regex
timeRe = r'# Time: (.*)\n# User@Host:'
userRe = r'# User@Host:.*\[(.*?)\]\s+@ '
hostRe = r'# User@Host: .*\[(.*?)\] Id:'
schemaRe = r'# Schema:\s+(.*?)\s+Last_errno:'
queryRe = r'# Query_time:\s+(.*?)\s+Lock_time:'
locklRe = r'# Query_time:.*?Lock_time:\s+(.*?)\s+Rows_sent:'
rowsRe = r'# Query_time:.*?Lock_time:.*?Rows_sent:\s+(\d+)\s+Rows_examined:'
bytesRe = r'# Bytes_sent:\s+(\d+)'
timestampRe = r'SET\s+timestamp=(.*?);'
commandRe = r'SET\s+timestamp=.*?;\n(.*?)(?=$)'
if re.findall(timeRe, logContent):
data_dict['sys_time'] = u'20' + re.findall(timeRe, logContent)[0]
data_dict['sys_time'] = data_dict['sys_time'][:4] + '-' + data_dict['sys_time'][4:6] + '-' + data_dict['sys_time'][6:]
data_dict['cli_user'] = re.findall(userRe, logContent)[0]
data_dict['cli_ip'] = re.findall(hostRe,logContent)[0]
data_dict['schema'] = re.findall(schemaRe,logContent)[0]
data_dict['query_time'] = re.findall(queryRe,logContent)[0]
data_dict['lock_time'] = re.findall(locklRe,logContent)[0]
data_dict['rows_sent'] = re.findall(rowsRe,logContent)[0]
data_dict['bytes_sent'] = re.findall(bytesRe,logContent)[0]
data_dict['timestamp'] = re.findall(timestampRe,logContent)[0]
data_dict['command'] = re.findall(commandRe,logContent,re.M)[0]
data_list.append(data_dict)
else:
pass
#print('Not slowlog data')
else:
pass
#print('No data')
return data_list def send_data(data,mysql_pool):
mysqlTableDate = time.strftime('%Y%m', time.localtime(time.time()))
mysqlTable = mysqlTablePrefix + mysqlTableDate
cursor = mysql_pool.cursor()
data_list = []
createTableSql = "create table mysql_slowlog_000000 (`id` int(11) NOT NULL AUTO_INCREMENT," \
"hostname varchar(64) NOT NULL," \
"ip varchar(20) NOT NULL," \
"port int(11) NOT NULL," \
"sys_time datetime NOT NULL," \
"cli_user varchar(32) NOT NULL," \
"cli_ip varchar(32) NOT NULL," \
"`schema` varchar(32) NOT NULL," \
"query_time float(6,3) NOT NULL," \
"lock_time float(6,3) NOT NULL," \
"rows_sent int(11) NOT NULL," \
"bytes_sent int(11) NOT NULL," \
"`timestamp` varchar(40) NOT NULL," \
"command varchar(2048) DEFAULT NULL," \
"PRIMARY KEY (`id`)," \
"KEY `idx_slowlog_000000_user` (`cli_user`)," \
"KEY `idx_slowlog_000000_query_time` (`query_time`)," \
"KEY `idx_slowlog_000000_timestamp` (`timestamp`)) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8"
createTableSql = createTableSql.replace('000000',mysqlTableDate) # Create slow log table if not exist
try:
cursor.execute("show tables like '%s'" % mysqlTable)
res = cursor.fetchone()
if not res:
cursor.execute(createTableSql)
mysql_pool.commit()
except Exception as e:
print(time_log() +'Error:', e)
mysql_pool.rollback()
mysql_pool.close()
slowLogInsertSql ="insert into %s" % mysqlTable + "(hostname," \
"ip," \
"port," \
"sys_time," \
"cli_user," \
"cli_ip," \
"`schema`," \
"query_time," \
"lock_time," \
"rows_sent," \
"bytes_sent," \
"`timestamp`," \
"command) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
if len(data) > 0:
for item in data:
row = (item['hostname'].encode('utf-8'),
item['ip'].encode('utf-8'),
item['port'],
item['sys_time'].encode('utf-8'),
item['cli_user'].encode('utf-8'),
item['cli_ip'].encode('utf-8'),
item['schema'].encode('utf-8'),
item['query_time'].encode('utf-8'),
item['lock_time'].encode('utf-8'),
item['rows_sent'].encode('utf-8'),
item['bytes_sent'].encode('utf-8'),
item['timestamp'].encode('utf-8'),
pymysql.escape_string(item['command']).encode('utf-8'))
data_list.append(row)
print(len(data_list)) # Insert slow log data
try:
cursor.executemany(slowLogInsertSql , data_list)
mysql_pool.commit()
mysql_pool.close()
except Exception as e:
print(time_log() +'Error:',e)
mysql_pool.rollback()
mysql_pool.close()
else:
print(time_log() + 'No data')
def main():
try:
redis_pool = redis.ConnectionPool(host=redisHost, port=redisPort, db=redisDB)
redisConn= redis.Redis(connection_pool=redis_pool)
except:
print(time_log() + 'Error! Can not connect to redis!') try:
mysql_pool = pymysql.connect(host=mysqlHost, port=mysqlPort, user=mysqlUser, password=mysqlPasswd, db=mysqlDB)
except:
print(time_log() + 'Error! Can not connect to mysql!')
print(time_log())
data = gather_log(redisConn)
send_data(data,mysql_pool)
print(time_log()) # time scheduler
timeSchedule = collectStep
global timer
timer = threading.Timer(timeSchedule, main)
timer.start() if __name__ == '__main__':
timer = threading.Timer(1, main)
timer.start()
前端使用django展示慢查询数据,同时每周通过将响应的业务慢查询数据发送给开发人员。
mysql错误日志也是同样进行处理。
查询
日志
数据
错误
分析
数据库
处理
业务
人员
前端
同时
后台
定时器
工具
数量
文件
正则
程序
麻烦
utf-8
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
分销系统平台生鲜电商软件开发
网络安全党组会议记录
xshell连接云服务器
控制网络技术概述
网络安全系统管理与维护
字段的英文数据库
关于银行的数据库表
cbm数据库检索策略
安徽语音网络技术分类产品介绍
数据库怎么建前置库
网络安全体验服务器
小学网络安全教育宣传视频
许昌网络安全工程师资格证
艾尔登法环港服服务器
未来科技网络技术
废弃电脑存储服务器
惠州tcl软件开发
违反信息和网络安全管理制度
浙江管理系统软件开发
泰兴市宁致软件开发有限公司
共享打印无法发送数据库
网络安全工作的运转机制
网络安全工程师必火
方舟生存如何在网吧建服务器
数据库对象 概念
本田云端数据库未设密码
访问数据库的权限有哪四个
软件开发人工成本
vb6财务软件开发
数据采集数据库备份文件