Celery

x33g5p2x  于2022-05-05 转载在 其他  
字(4.5k)|赞(0)|评价(0)|浏览(407)


Celery异步任务框架如何使用?看这里

Celery

官网

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

  1. # 官网解释
  2. """
  3. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
  4. """

Celery异步任务框架

  1. """
  2. 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
  3. 2)celery服务为为其他项目服务提供异步解决任务需求的
  4. 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
  5. 人是一个独立运行的服务 | 医院也是一个独立运行的服务
  6. 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
  7. 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
  8. """

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务,比如每天数据统计

Celery的安装配置

安装:pip install celery

消息中间件:RabbitMQ/Redis

app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

注意如果是windows平台还需要安装:pip install eventlet

两种celery任务结构:提倡用包管理,结构更清晰

方式一:简单使用

  1. # 第一步:定义一个py文件(名字随意,celery_task)
  2. """celery_task.py"""
  3. from celery import Celery
  4. backend = 'redis://127.0.0.1:6379/1' # 结果存储
  5. broker = 'redis://127.0.0.1:6379/2' # 消息中间件
  6. app = Celery(__name__,broker=broker,backend=backend) # __name__区分__main__
  7. # 被它修饰,就变成了celery的任务
  8. @app.task
  9. def add(a,b):
  10. return a+b
  11. # 第二步:提交任务(新建一个py文件:submit_task)
  12. """submit_task.py"""
  13. from celery_task import add
  14. # 异步调用
  15. # 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
  16. res=add.delay(33,41)
  17. print(res) # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645
  18. # 第三步:任务执行单元执行,使用命令启动worker
  19. 格式:celery -A 文件名 worker -l 日志输出级别 (win平台+-P eventlet
  20. celery -A celery_task worker -l info -P eventlet
  21. '''
  22. celery_task:py文件的名字
  23. -l info:日志输出级别是info
  24. -P eventlet 在win平台需要下载,pip install eventlet
  25. '''
  26. #如果队列里有任务,就会执行,如果没有任务,worker就等在这
  27. # 第四步:查询结果是否执行完成 get_result.py
  28. """get_result.py"""
  29. from celery_task import app
  30. from celery.result import AsyncResult
  31. id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
  32. if __name__ == '__main__':
  33. asy = AsyncResult(id=id, app=app)
  34. if asy.successful():
  35. result = asy.get()
  36. print(result)
  37. elif asy.failed():
  38. print('任务失败')
  39. elif asy.status == 'PENDING':
  40. print('任务等待中被执行')
  41. elif asy.status == 'RETRY':
  42. print('任务异常后正在重试')
  43. elif asy.status == 'STARTED':
  44. print('任务已经开始被执行')

方法二:包管理结构(推荐)

随便定义包名,但是包内必须要有celery.py

步骤

  • 创建包,包下写celery.py文件,文件内写celery任务
  1. from celery import Celery
  2. backend = 'redis://127.0.0.1:6379/1'
  3. broker = 'redis://127.0.0.1:6379/2'
  4. app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
  5. # include内写app管理的任务
  • 任意位置提交任务
  1. from celery_task.add_task import add
  2. from celery_task.celery import app
  3. res=add.delay(100,200)
  4. print(res)
  5. # 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
  • 启动worker包管理只需去包所在的根路径启动就可以了,不需要切换路径到包内去启动worker,因为包下有celery.py了
  1. scripts> celery -A celery_task worker -l info -P eventlet
  • 查看结果
  1. from celery_task import app
  2. from celery.result import AsyncResult
  3. id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
  4. if __name__ == '__main__':
  5. asy = AsyncResult(id=id, app=app)
  6. if asy.successful():
  7. result = asy.get()
  8. print(result)
  9. elif asy.failed():
  10. print('任务失败')
  11. elif asy.status == 'PENDING':
  12. print('任务等待中被执行')
  13. elif asy.status == 'RETRY':
  14. print('任务异常后正在重试')
  15. elif asy.status == 'STARTED':
  16. print('任务已经开始被执行')

Celery主要处理三种任务

异步任务,延迟任务,定时任务

delay提交异步任务

上面的示例就是

apply_async提交延迟任务

  1. # 其他不变,提交任务的时候,如下:
  2. from celery_task.user_task import add
  3. from datetime import datetime, timedelta
  4. eta = datetime.utcnow() + timedelta(seconds=10)
  5. # 参数传递需要使用args,传时间要使用时间对象eta,使用的是utc时间
  6. mul.apply_async(args=(20, 50), eta=eta)

beat_schedule提交定时任务

定时任务需要启动beatworker

  • beat负责提交定时任务
  • worker负责提交celery任务
  1. from celery import Celery
  2. backend = 'redis://127.0.0.1:6379/1'
  3. broker = 'redis://127.0.0.1:6379/2'
  4. app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
  5. # include内写app管理的任务
  6. # 时区
  7. app.conf.timezone='Asia/Shanghai'
  8. # 是否使用UTC
  9. app.conf.enable_utc=False
  10. #第一步:在celery.py中配置
  11. # celery任务的定时配置
  12. from datetime import timedelta
  13. from celery.schedules import crontab
  14. app.conf.beat_schedule = {
  15. '''
  16. 任务名:{
  17. task:任务
  18. schedule:时间
  19. args:参数(函数参数)
  20. }
  21. '''
  22. 'task-mul': {
  23. 'task': 'celery_task.user_task.mul',
  24. 'schedule': timedelta(seconds=3), # 3s后
  25. # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
  26. 'args': (3, 15),
  27. },
  28. 'task-add': {
  29. 'task': 'celery_task.home_task.add',
  30. 'schedule': timedelta(seconds=10), # 10s后
  31. # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
  32. 'args': (3, 5),
  33. },
  34. }
  35. #第二步:启动beat(beat负责定时提交任务)
  36. celery -A celery_task beat -l info
  37. # 第三步:启动worker,任务就会被worker执行了
  38. celery -A celery_task worker -l info -P eventlet

相关文章