使用celery怎么实现集群管理
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。架构:这里作为例子的celery
千家信息网最后更新 2025年12月03日使用celery怎么实现集群管理
本篇文章给大家分享的是有关使用celery怎么实现集群管理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
架构:

这里作为例子的celery app为myapp:
root@workgroup0:~/celeryapp# ls myappagent.py celery.py config.py __init__.pyroot@workgroup0:~/celeryapp#
公用代码部分:
celery.py:(备注:172.16.77.175是任务发布节点的ip地址)
from __future__ import absolute_importfrom celery import Celeryapp = Celery('myapp', broker='amqp://guest@172.16.77.175//', backend='amqp://guest@172.16.77.175//', include=['myapp.agent'])app.config_from_object('myapp.config')if __name__ == '__main__': app.start()config.py:
from __future__ import absolute_importfrom kombu import Queue,Exchangefrom datetime import timedeltaCELERY_TASK_RESULT_EXPIRES=3600CELERY_TASK_SERIALIZER='json'CELERY_ACCEPT_CONTENT=['json']CELERY_RESULT_SERIALIZER='json'CELERY_DEFAULT_EXCHANGE = 'agent'CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'CELERT_QUEUES = ( Queue('machine1',exchange='agent',routing_key='machine1'), Queue('machine2',exchange='agent',routing_key='machine2'),)__init__.py:(空白)
任务发布节点的agent.py:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'the value is ':str(x+y)}@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)docker1上的agent.py:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'value':str(x+y),'node_name':'docker1'} #增加了node_name用来识别节点@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)docker2上的:
from __future__ import absolute_importfrom myapp.celery import app@app.taskdef add(x,y): return {'value':str(x+y),'node_name':'docker2'}@app.taskdef writefile(): out=open('/tmp/data.txt','w') out.write('hello'+'\n') out.close()@app.taskdef mul(x,y): return x*y@app.taskdef xsum(numbers): return sum(numbers)@app.taskdef getl(stri): return getlength(stri)def getlength(stri): return len(stri)在这个例子中我只测试add()函数:
在docker1节点上启动worker:(用-Q指定监听的queue)
root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater)---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app: myapp:0x7f472d73f190- ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results: amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine1 exchange=machine1(direct) key=machine1 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum[2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete[2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.
启动docker2上的worker:
root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which isabsolutely not recommended!Please specify a different user using the -u option.User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater)---- **** ----- --- * *** * -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty-- * - **** --- - ** ---------- [config]- ** ---------- .> app: myapp:0x7f708cb8ec10- ** ---------- .> transport: amqp://guest:**@172.16.77.175:5672//- ** ---------- .> results: amqp://guest@172.16.77.175//- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> machine2 exchange=machine2(direct) key=machine2 [tasks] . myapp.agent.add . myapp.agent.getl . myapp.agent.mul . myapp.agent.writefile . myapp.agent.xsum[2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672//[2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors[2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
在任务发布节点发布一个计算任务给docker1:
root@workgroup0:~/celeryapp# lsdefault.etcd hots.sh hotswap.py myapp myapp1tmp people.db resp sora test.pyroot@workgroup0:~/celeryapp# pythonPython 2.7.6 (default, Mar 22 2014, 22:59:56) [GCC 4.8.2] on linux2Type "help", "copyright", "credits" or "license" for more information.>>> from myapp.agent import add>>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')>>> res.get(){u'value': u'156', u'node_name': u'docker1'}用get()可以看到来自docker1的返回,再看看docker1的显示:
[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}至于docker2,一点没动:
[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.
发布一个任务给docker2:
>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')>>> res.get(){u'value': u'2340', u'node_name': u'docker2'}>>>以上就是使用celery怎么实现集群管理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
任务
节点
集群
管理
例子
更多
知识
篇文章
部分
实用
代码
函数
地址
备注
就是
工作会
文章
架构
看吧
知识点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
steam安全服务器封禁这么办
南京通用软件开发代理商
魔力宝贝30龙之传说服务器
计算机网络安全与防护闫宏生
智邮网络技术有限公司怎么样
软件开发岗位逻辑笔试题
网络安全知识手抄报的内容
服务器运维平均工资
网络安全演讲稿500字可复制
国安泰数据库能查什么
200万条数据使用什么数据库
山东龙芯服务器价钱
spring数据库框架
360快搜网络技术有限公司
网络安全评价方案设计
服务器可以加几个路由器
永宁县软件开发技术
怀孕去服务器机房
岳阳软件开发培训有哪些
深圳美约网络技术怎么样
服务器退信
重新加载数据库怎么设置
软件开发现状调研报告内容
一家软件开发公司需要哪些人
当前网络技术发展缺陷
判断数据库类型
法迅网络技术
网络安全最高的路由器
数据库修改连接密码
华为数据库调研报告