python项目接入celery方案


Flask项目接入celery方案

python 项目中使用常celery 做函数异步,提高接口可用性以及并发量,本文档主要整理 Flask项目中接入celery的方案

版本兼容性

Celery version 5.0.5 runs on,

-----on 16 Dec 2020

  • Python (3.6, 3.7, 3.8)

框架兼容性

框架 插件
Django not needed
Pyramid pyramid_celery
Pylons celery-pylons
Flask not needed
web2py web2py-celery
Tornado tornado-celery

使用celery 异步

celery 具有以下特性使其成为python 项目中的优选:

Highly Available:

工作进程和客户端将在连接丢失或失败时自动重试

Fast

一个Celery进程每分钟可以处理数百万个任务,往返延迟为亚毫秒

Flexible

几乎celery的每个部分都可以单独扩展或使用

前期准备-安装celery相关配置

celery 可以使用redis ,RabbitMQ,Amazon SQS等多种方式作为消息代理,下面演示使用redis 的方案

建议使用redis 作为broker 使用celery前确保有redis存在

pip install -U Celery # 安装celery

创建实例

from celery import Celery

app = Celery('tasks', broker='redis://localhost')

业务解耦-视图中引入celery

1 定义celery 任务

# tasks.py
from celery import Celery
import requests
app = Celery('tasks', broker='redis://localhost')

@app.task
def net_io(url):
    requests.get(url)
    return

2 flask项目 中 调用celery任务

# main.py
from flask import Flask, request, jsonify

from tasks import net_io

app = Flask(__name__)


@app.route("/send_req", methods=['POST'])
def send_req():
    params = request.form.to_dict()

    url = params.get("url")
    task_id = net_io.delay(url)
    return jsonify({"msg": "ok", "code": 0})


if __name__ == '__main__':
    app.run()

运行celery 服务

$ celery -A tasks worker --loglevel=INFO

启动falsk 项目

$ python main.py

测试一下

可以看到使用前后接口性能出现明显差异-----且耗时越大的操作使用celery性能差异会越大

celery 监控

celery 作为一个独立于falsk 的进程 自然也需要对其监控

保存celery 执行结果

配置结果储存

可以通过配置参数把celery 执行的任务情况存储下来

app.conf.result_backend='redis://localhost//1' 
# 除了redis外该配置也可以选择 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等等
保存结果查看

通过celery 已以回的task_id 为key 可以在redis找到celery 执行 每一个任务状态和结果存储为json 字符串格式如下:

"{\"status\": \"SUCCESS\", \"result\": {\"msg\": \"no no no\"}, \"traceback\": null, \"children\": [], \"date_done\": \"2021-02-03T07:32:08.884642\", \"task_id\": \"be39c6bd-2493-4732-94e2-cd4b2c444522\"}"
celery 服务状态监控

celery 自带的命令也为我们提供了实例状态的查看功能,也可通过这个命令编写定时任务脚本去监控服务状态:

celery -A tasks status # 查看实例运行状态 
-----------------------
->  celery@SZMCS1311: OK # 实例运行中
-----------------------------
No nodes replied within time constraint # 关闭已实例