python3多进程和协程处理MySQL数据讲义
发表于:2025-11-09 作者:千家信息网编辑
千家信息网最后更新 2025年11月09日,下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助
千家信息网最后更新 2025年11月09日python3多进程和协程处理MySQL数据讲义
下文内容主要给大家带来python3多进程和协程处理MySQL数据讲义,这里所讲到的知识,与书籍略有不同,都是专业技术人员在与用户接触过程中,总结出来的,具有一定的经验分享价值,希望给广大读者带来帮助。
python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下(async_mysql.py):
import timeimport asyncioimport randomfrom concurrent.futures import ProcessPoolExecutor as Poolimport aiomysqlfrom flashtext import KeywordProcessorimport clickclass AttrDict(dict): """可以用"."获取属性,没有该属性时返回None的字典""" def __getattr__(self, name): try: return self[name] except KeyError: return None def __setattr__(self, name, value): self[name] = valueclass AttrDictCursor(aiomysql.DictCursor): """继承aiomysql的字典cursor""" dict_type = AttrDictclass MultiProcessMysql(object): """用多进程和协程处理MySQL数据""" def __init__(self, workers=2, pool=10, start=0, end=2000): """第一段的参数需要跟随需求变动""" self.host = "192.168.0.34" self.port = 3306 self.user = "root" self.password = "root" self.db = "mydb" self.origin_table = "judgment_main_etl" # main self.dest_table = "laws_finance1" self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;" self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)" self.pool = pool # 协程数和MySQL连接数 self.aionum = self.pool self.step = 2000 # 一次性从MySQL拉取的行数 self.workers = workers # 进程数 self.start = start # MySQL开始的行数 self.end = end # MySQL结束的行数 self.keyword = ['非法经营支付业务', '网络洗钱', '资金池', '支付牌照', '清洁算', '网络支付', '网上支付', '移动支付', '聚合支付', '保本保息', '担保交易', '供应链金融', '网贷', '网络借贷', '网络投资', '虚假标的', '自融', '资金池', '关联交易', '庞氏骗局', '网络金融理财', '线上投资理财', '互联网私募', '互联网股权', '非法集资', '合同欺诈', '众筹投资', '股权转让', '互联网债权转让', '资本自融', '投资骗局', '洗钱', '非法集资', '网络传销', '虚拟币泡沫', '网络互助金融', '金融欺诈', '网上银行', '信用卡盗刷', '网络钓鱼', '信用卡信息窃取', '网上洗钱', '洗钱诈骗', '数字签名更改', '支付命令窃取', '金融诈骗', '引诱投资', '隐瞒项目信息', '风险披露', '夸大收益', '诈骗保险金', '非法经营保险业务', '侵占客户资金', '征信报告窃取', '金融诈骗', '破坏金融管理'] self.kp = KeywordProcessor() # flashtext是一个文本匹配包,在关键词数量大时速度远大于re self.kp.add_keywords_from_list(self.keyword) async def createMysqlPool(self, loop): """每个进程要有独立的pool,所以不绑定self""" pool = await aiomysql.create_pool( loop=loop, host=self.host, port=self.port, user=self.user, password=self.password, db=self.db, maxsize=self.pool, charset='utf8', cursorclass=AttrDictCursor ) return pool def cutRange(self, start, end, times): """将数据区间分段""" partition = (end - start) // times ranges = [] tmp_end = start while tmp_end < end: tmp_end += partition # 剩下的不足以再分 if (end - tmp_end) < partition: tmp_end = end ranges.append((start, tmp_end)) start = tmp_end return ranges async def findKeyword(self, db, start, end): """从MySQL数据中匹配出关键字""" # 随机休息一定时间,防止数据同时到达,同时处理, 应该是一部分等待,一部分处理 await asyncio.sleep(random.random() * self.workers * 2) print("coroutine start") async with db.acquire() as conn: async with conn.cursor() as cur: while start < end: tmp_end = start + self.step if tmp_end > end: tmp_end = end print("aio start: %s, end: %s" % (start, tmp_end)) # <=id 和 id< await cur.execute(self.s_sql, (start, tmp_end)) datas = await cur.fetchall() uuids = [] for data in datas: if data: for key in list(data.keys()): if not data[key]: data.pop(key) keyword = self.kp.extract_keywords( " ".join(data.values())) if keyword: keyword = ' '.join(set(keyword)) # 对关键字去重 # print(keyword) uuids.append( (data.uuid, data.title, data.reason, keyword)) await cur.executemany(self.i_sql, uuids) await conn.commit() start = tmp_end def singleProcess(self, start, end): """单个进程的任务""" loop = asyncio.get_event_loop() # 为每个进程创建一个pool db = loop.run_until_complete(asyncio.ensure_future( self.createMysqlPool(loop))) tasks = [] ranges = self.cutRange(start, end, self.aionum) print(ranges) for start, end in ranges: tasks.append(self.findKeyword(db, start, end)) loop.run_until_complete(asyncio.gather(*tasks)) def run(self): """多进程跑""" tasks = [] ranges = self.cutRange(self.start, self.end, self.workers) start_time = time.time() with Pool(max_workers=self.workers) as executor: for start, end in ranges: print("processor start: %s, end: %s" % (start, end)) tasks.append(executor.submit(self.singleProcess, start, end)) for task in tasks: task.result() print("total time: %s" % (time.time() - start_time))@click.command(help="运行")@click.option("-w", "--workers", default=2, help="进程数")@click.option('-p', "--pool", default=10, help="协程数")@click.option('-s', '--start', default=0, help='MySQL开始的id')@click.option('-e', "--end", default=2640000, help="MySQL结束的id")def main(workers, pool, start, end): mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end) if workers * pool > 100: if not click.confirm('MySQL连接数超过100(%s),确认吗?' % (workers * pool)): return mp.run()if __name__ == "__main__": main()运行如下:$ python3 async_mysql.py -w 2 # 可以指定其他参数,也可使用默认值
对于以上关于python3多进程和协程处理MySQL数据讲义,如果大家还有更多需要了解的可以持续关注我们的行业推新,如需获取专业解答,可在官网联系售前售后的,希望该文章可给大家带来一定的知识更新。
数据
网络
金融
支付
进程
处理
投资
诈骗
互联网
关键
资金
互联
讲义
专业
业务
信息
信用
信用卡
关键字
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库PC版
服务器磁盘阵列代号
盘古软件开发有限公司在业界地位
信阳科奇网络技术有限公司
关于数据库建表的要求
数据库erp应用教程
长城服务器怎么安装麒麟系统
世界三大生物信息学数据库
熙美网络技术
深挖网络安全市场
为什么会出现服务器无响应
服务器中的文件打不开怎么办
河北星车网络技术有限公司
安装数据库提示实例名已存在
乡镇网络安全保障工作措施
卸载rsat服务器管理器
网络安全工程师技术面试题
广安数据库项目
安装游戏提示无法连接服务器
中兴服务器系统管理
软件开发零基础入门mac
coreboot 服务器
网络安全问题的讨论和对策
互联网科技软件排行
美国的网络安全发
南通市苏通网络技术有限公司
龙华网络安全建设找哪家
命令符怎么打开数据库
用友t3数据库没有自动启动
网络技术平台