千家信息网

异步redis队列实现 数据入库的方法

发表于:2025-11-06 作者:千家信息网编辑
千家信息网最后更新 2025年11月06日,业务需求app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送出问题之前业务逻辑:php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据该用户已经存在
千家信息网最后更新 2025年11月06日异步redis队列实现 数据入库的方法

业务需求

app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送

出问题之前业务逻辑:

php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据

该用户已经存在 和新增的id

入另一种详情表

问题所在:

当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下

优化思路:

1、异步队列处理

2、redis 过滤(就是只处理当天第一次请求)

3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)

4、拼接插入的以及新增的如详细表中

解决办法:

1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中

首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值

table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');  // $str = '';  // foreach ($app_name as $key => $val) {  //  $str.= "{$val} {$key} ";  // }  // $redis->hmset('app_name', $app_name);  // echo $str;exit;  $result = $request->input('res');  $list = json_decode($result, true);  if (empty ($list) || !is_array($list)) {   return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);  }  $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;  $data['time'] = date('Y-m-d');  $redis_key = 'log_app:'.$data['time'];  //redis 过滤  $redis = Redis::connection('web_active');  //redis 键值过期设置  if (empty($redis->exists($redis_key))) {   $redis->hset($redis_key, 1, 'start');   $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));  }  //值确定  if ($redis->hexists($redis_key, $data['uid'])) {   return json_encode(['result' => 'SUCCESS']);  } else {   //推入队列   $redis->hset($redis_key, $data['uid'], $result);   $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);   return json_encode(['result' => 'SUCCESS']);  } }}

2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出

mget 获取该用户的app id 不存在就会返回null

通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表

table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');   // $redis->hmset('app_name', $app_name);   // exit;    while(1) {   $redis = Redis::connection('web_active');   //队列名称   $res = $redis->lpop('log_app_list');    //开关按钮   $lock = $redis->get('log_app_lock');   if (!empty($res)) {    list($date,$uid) = explode(':',$res);    $result = $redis->hget('log_app:'.$date, $uid);    if (!empty($result)) {      $table_name = 'app_total'.date('Ym');      $list = json_decode($result, true);      $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;      $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;      $data['device'] = isset($list['device']) ? $list['device'] : '' ;      $data['appList'] = isset($list['list']) ? $list['list'] : '' ;      //数据去重 flip比unique更节约性能      $data['appList'] = array_flip($data['appList']);      $data['appList'] = array_flip($data['appList']);      $data['time'] = date('Y-m-d');      //app应用过滤      $app_res = $redis->hmget('app_name', $data['appList']);      //新增加app数组      $new_app = [];      //mysql 入库数组      $mysql_new_app = [];   //获取当前redis 自增指针   $total = $redis->get('app_name_total');   foreach ($app_res as $key =>& $val) {    if (is_null($val)) {     $total += 1;     $new_app[$data['appList'][$key]] = $total;      $val = $total;     array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);    }   }   if (count($new_app)){    $str = "INSERT IGNORE INTO app_set_name (id,appName) values";    foreach ($new_app as $key => $val) {    $str.= "(".$val.",'".$key."'),";    }    $str = trim($str, ',');    //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);    $mysql_res = DB::connection('phpLog')->statement($str);    if ($mysql_res) {     // 设置redis 指针     $redis->set('app_name_total', $total);     // redis 数据入库     $redis->hmset('app_name', $new_app);    }  }    // 详情数据入库    $data['appList'] = implode(',', $app_res);      //app统计入库      DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList)   values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");      //log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志      Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n");   } else {   Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n");  }   }   //执行间隔   sleep(1);   //结束按钮   if ($lock == 2) {    exit;   }   //内存检测   if(memory_get_usage()>1000*1024*1024){    exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端   }  } }}

3、执定 定时任务监控脚本执行情况

crontab -e/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1

test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)

#!/bin/bashalive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`if [ ! $alive ]then /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &fi

记得授权哦 chmod +x test.sh

笔者用的laravel 框架 将命令激活丢入后台

执行命令

 /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id

这样就实现队列异步入库

还有很多问题需要优化!!大致功能已经实现!!!!!!

优化完成后cpu

以上这篇异步redis队列实现 数据入库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

数据 队列 内存 命令 指针 名称 数组 用户 详情 问题 业务 任务 内容 后台 就是 情况 按钮 接口 日志 缓存 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 从数据库查取数据用poi写入 嘉兴物联网智慧工厂软件开发 杭州众栋网络技术有限公司 体供给金融信息数据库会坐牢吗 泊头软件开发项目管理在线咨询 网络技术工程师是做什么的 韶关手机游戏软件开发分析 兴城天气预报软件开发 超凡先锋连接服务器时闪退 梦幻手游新服务器好玩吗 钉钉可以后台管理服务器 商丘想象力网络技术有限公司 软件开发 季度计划表 网络安全等保测评表 山东省浪潮存储服务器哪家好 快速汇总多表数据库 数据库技术应用困境 战地1怎么创服务器 恒生互联网科技股票有哪些 csgo怎么进入社区服务器列表 邓霞软件开发 杭州众栋网络技术有限公司 php 显示数据库图片 河南 网络安全与 幼儿园网络安全整改落实情况汇报 旅游网站设计数据库报告 网络安全保障是 泰拉瑞亚服务器在哪儿弄 如何成为一名软件开发项目经理 网络安全法全称
0