Flask项目接入celery方案
python 项目中使用常celery 做函数异步,提高接口可用性以及并发量,本文档主要整理 Flask项目中接入celery的方案
版本兼容性
Celery version 5.0.5 runs on,
-----on 16 Dec 2020
- Python (3.6, 3.7, 3.8)
框架兼容性
框架 | 插件 |
---|---|
Django | not needed |
Pyramid | pyramid_celery |
Pylons | celery-pylons |
Flask | not needed |
web2py | web2py-celery |
Tornado | tornado-celery |
使用celery 异步
celery 具有以下特性使其成为python 项目中的优选:
Highly Available:
工作进程和客户端将在连接丢失或失败时自动重试
Fast
一个Celery进程每分钟可以处理数百万个任务,往返延迟为亚毫秒
Flexible
几乎celery的每个部分都可以单独扩展或使用
前期准备-安装celery相关配置
celery 可以使用redis ,RabbitMQ,Amazon SQS等多种方式作为消息代理,下面演示使用redis 的方案
建议使用redis 作为broker 使用celery前确保有redis存在
pip install -U Celery # 安装celery
创建实例
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
业务解耦-视图中引入celery
1 定义celery 任务
# tasks.py
from celery import Celery
import requests
app = Celery('tasks', broker='redis://localhost')
@app.task
def net_io(url):
requests.get(url)
return
2 flask项目 中 调用celery任务
# main.py
from flask import Flask, request, jsonify
from tasks import net_io
app = Flask(__name__)
@app.route("/send_req", methods=['POST'])
def send_req():
params = request.form.to_dict()
url = params.get("url")
task_id = net_io.delay(url)
return jsonify({"msg": "ok", "code": 0})
if __name__ == '__main__':
app.run()
运行celery 服务
$ celery -A tasks worker --loglevel=INFO
启动falsk 项目
$ python main.py
测试一下
可以看到使用前后接口性能出现明显差异-----且耗时越大的操作使用celery性能差异会越大
celery 监控
celery 作为一个独立于falsk 的进程 自然也需要对其监控
保存celery 执行结果
配置结果储存
可以通过配置参数把celery 执行的任务情况存储下来
app.conf.result_backend='redis://localhost//1'
# 除了redis外该配置也可以选择 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等等
保存结果查看
通过celery 已以回的task_id 为key 可以在redis找到celery 执行 每一个任务状态和结果存储为json 字符串格式如下:
"{\"status\": \"SUCCESS\", \"result\": {\"msg\": \"no no no\"}, \"traceback\": null, \"children\": [], \"date_done\": \"2021-02-03T07:32:08.884642\", \"task_id\": \"be39c6bd-2493-4732-94e2-cd4b2c444522\"}"
celery 服务状态监控
celery 自带的命令也为我们提供了实例状态的查看功能,也可通过这个命令编写定时任务脚本去监控服务状态:
celery -A tasks status # 查看实例运行状态
-----------------------
-> celery@SZMCS1311: OK # 实例运行中
-----------------------------
No nodes replied within time constraint # 关闭已实例