LogoLogo
  • README
  • 前端编程
    • 01 Node JS
    • 02-ES6详解
    • 03-NPM详解
    • 04-Babel详解
    • 05-前端模块化开发
    • 06-WebPack详解
    • 07-Vue详解
    • 08-Git详解
    • 09-微信小程序
  • 人工智能
    • 机器学习
      • 二次分配问题
      • 非负矩阵
      • 概率潜在语义分析
      • 概率图模型
      • 集成学习
      • 降维
      • 距离度量
      • 决策树
      • 逻辑回归
      • 马尔可夫决策过程
      • 马尔可夫链蒙特卡洛法
      • 朴素贝叶斯法
      • 谱聚类
      • 奇异值分解
      • 潜在狄利克雷分配
      • 潜在语义分析
      • 强化学习
      • 社区算法
      • 时间序列模型
      • 特征工程
      • 条件随机场
      • 图论基础
      • 线性分类
      • 线性回归
      • 信息论中的熵
      • 隐马尔科夫模型
      • 支持向量机
      • 主成分分析
      • EM算法
      • Hermite 矩阵的特征值不等式
      • k-means聚类
      • k近邻法
      • PageRank算法
    • 深度学习
      • Pytorch篇
        • 01-线性模型
        • 02-梯度下降法
        • 03-反向传播
        • 04-pytorch入门
        • 05-用pytorch实现线性回归
        • 06-logistic回归
        • 07-处理多维特征的输入
        • 08-加载数据集
        • 09-多分类问题
        • 10-卷积神经网络
        • 11-循环神经网络
    • 图神经网络
      • 图神经网络笔记01
        • 01-图(Graphs)的结构
        • 02-网络的性质和随机图模型
        • 03-网络工具
        • 04-网络中的主题和结构角色
        • 05-网络中的社区结构
      • 图神经网络笔记02
        • 01-深度学习引言
        • 02-神经网络基础
        • 03-卷积神经网络
        • 04-图信号处理与图卷积神经网络
        • 05-GNN的变体与框架-
        • [06-Google PPRGo 两分钟分类千万节点的最快GNN](人工智能/图神经网络/图神经网络笔记02/06-Google%20PPRGo 两分钟分类千万节点的最快GNN.md)
        • 07-序列模型
        • 08-变分自编码器
        • 09-对抗生成网络
  • 日常记录
    • 健身日记
    • 面经记录
    • 自动生成Summary文件
  • 实战项目
    • 谷粒商城
      • 00-项目概述
      • 01-分布式基础-全栈开发篇
      • 02-分布式高级-微服务架构篇
      • 03-高可用集群-架构师提升篇
  • 数据库
    • MySQL笔记
      • 01-MySQL基础篇
      • 02-MySQL架构篇
      • 03-MySQL索引及调优篇
      • 04-MySQL事务篇
      • 05-MySQL日志与备份篇
    • Redis笔记
      • 01-Redis基础篇
      • 02-Redis高级篇
    • 02-Redis篇
  • 算法笔记
    • 01-算法基础篇
    • 02-算法刷题篇
  • 职能扩展
    • 产品运营篇
  • Go编程
    • 01-Go基础
      • 01-Go基础篇
  • Java编程
    • 01-Java基础
      • 01-Java基础篇
      • 02-多线程篇
      • 03-注射与反解篇
      • 04-JUC并发编程篇
      • 05-JUC并发编程与源码分析
      • 06-JVM原理篇
      • 07-Netty原理篇
      • 08-设计模式篇
    • 02 Java Web
      • 01-Mybatis篇
      • 01-Mybatis篇(新版)
      • 02-Spring篇
      • 02-Spring篇(新版)
      • 03-SpringMVC篇
      • 04-MybatisPlus篇
    • 03-Java微服务
      • 01-SpringBoot篇
      • 01-SpringBoot篇(新版)
      • 02-SpringSecurity篇
      • 03-Shiro篇
      • 04-Swagger篇
      • 05-Zookeeper篇
      • 06-Dubbo篇
      • 07-SpringCloud篇
      • 08-SpringAlibaba篇
      • 09-SpringCloud篇(新版)
    • 04-Java中间件
      • 数据库篇
        • 01-分库分表概述
        • 02-MyCat篇
        • 03-MyCat2篇
        • 04-Sharding-jdbc篇
        • 05-ElasticSearch篇
      • 消息中间件篇
        • 01-MQ概述
        • 02-RabbitMQ篇
        • 03-Kafka篇
        • 04-RocketMQ篇
        • 05-Pulsar篇
    • 05-扩展篇
      • Dubbo篇
      • SpringBoot篇
      • SpringCloud篇
    • 06-第三方技术
      • 01-CDN技术篇
      • 02-POI技术篇
      • 03-第三方支付技术篇
      • 04-第三方登录技术篇
      • 05-第三方短信接入篇
      • 06-视频点播技术篇
      • 07-视频直播技术篇
    • 07-云原生
      • 01-Docker篇
      • 02-Kubernetes篇
      • 03-Kubesphere篇
  • Linux运维
    • 01-Linux篇
    • 02-Nginx篇
  • Python编程
    • 01-Python基础
      • 01.配置环境
      • 02.流程控制
      • 03.数值
      • 04.操作符
      • 05.列表
      • 06.元祖
      • 07.集合
      • 08.字典
      • 09.复制
      • 10.字符串
      • 11.函数
      • 12.常见内置函数
      • 13.变量
      • 14.异常和语法错误
      • 15.时间和日期
      • 16.正则表达式
    • 02 Python Web
      • flask篇
        • 01.前言
        • 02.路由
        • 03.模板
        • 04.视图进阶
        • 05.flask-sqlalchemy
        • 06.表单WTForms
        • 07.session与cookie
        • 08.上下文
        • 09.钩子函数
        • 10.flask 信号
        • 11.RESTFUL
        • 13.flask-mail
        • 14.flask+celery
        • 15.部署
        • 16.flask-login
        • 17.flask-cache
        • 18.flask-babel
        • 19.flask-dashed
        • 20.flask-pjax
        • 21.flask上传文件到第三方
        • 22.flask-restless
        • 23.flask-redis
        • 24.flask-flash
        • 25.消息通知
        • 26.分页
    • 03-Python数据分析
      • Matplotlib
      • Numpy
      • Pandas
      • Seaborn
    • 04-Python爬虫
      • 1.准备工作
      • 2.请求模块的使用
      • 3.解析模块的使用
      • 4.数据存储
      • 5.识别验证码
      • 6.爬取APP
      • 7.爬虫框架
      • 8.分布式爬虫
由 GitBook 提供支持
在本页
  • 安装
  • 相关文档
  • 关系
  • 配置
  • 可见性超时
  • 结果
  • 简单测试
  • 运行 Celery 职程(Worker)服务
  • 调用任务
  • flask-celery
  • 最简示例
  • 常见用途

这有帮助吗?

在GitHub上编辑
  1. Python编程
  2. 02 Python Web
  3. flask篇

14.flask+celery

上一页13.flask-mail下一页15.部署

最后更新于3年前

这有帮助吗?

安装

  • 模块

pip install celery
pip install eventlet
pip install -U "celery[redis]"
pip install redis

在Windows操作系统上,还需要安装另外一个东西,eventlet

  • redis数据库

image-20200905165531410

相关文档

关系

  • task,任务

  • broker(中间人),存储任务的队列(借助redis实现)

  • worker:真正执行任务的工作者

  • backend:用来存储任务执行后的结果

配置

想要在flask中使用celery,可以配合redis数据库,redis配置如下:

app.conf.broker_url = 'redis://localhost:6379/0'

其中redis数据库链接格式为:

redis://:password@hostname:port/db_number

若没有密码,则为:

redis://hostname:port/db_number

多个redis链接

app.conf.broker_url = 'redis://localhost:6379/0;redis://localhost:6378/0'

URL 的所有配置都可以自定义配置的,默认使用的是 localhost 的 6379 端口中 0 数据库。( Redis 默认有 16 个数据库)

可见性超时

可见性超时为将消息重新下发给另外一个程序之前等待确认的任务秒数。请注意查看下面的。

可以通过 broker_transport_options 选项进行修改:

app.conf.broker_transport_options = {'visibility_timeout': 3600} # 一个小时

默认的可见性超时时间为1个小时。

结果

如果想保存任务执行返回结果保存到Redis,需要进行以下配置:

app.conf.result_backend = 'redis://localhost:6379/0'

简单测试

  • 用法

celery = Celery("tasks",
                broker="redis://118.24.128.18:6379/0",
                backend="redis://118.24.128.18:6379/0")
存储结果的位置:backend
  1. 第一个参数为当前模块的名称

  2. 第二个参数为中间人(Broker)的链接 URL

  3. 第三个参数backend为存储结果的位置

  • 创建task.py

from celery import Celery
import time

celery = Celery("tasks",
                broker="redis://localhost:6379/0",
                backend="redis://localhost:6379/0")

@celery.task
def send_mail(a,b):
    time.sleep(5000)
    return a+b

可以看见send_mail()中设置等待5000秒,通常这个函数会运行很久,毫无体验感,但是为了获取体验感使用celery进行加速。

运行 Celery 职程(Worker)服务

现在可以使用 worker 参数进行执行刚刚创建职程(Worker):

celery -A task worker --loglevel=info

格式:

celery -A 文件名称 worker --loglevel=info

调用任务

需要调用我们创建的实例任务,可以通过 delay() 进行调用。

delay() 是 apply_async() 的快捷方法,可以更好的控制任务的执行:

>>> from task import *
>>> r = add.delay(4,5)
>>> r
<AsyncResult: 1d616754-2447-43df-a1a4-1368ce01a86b>

如果出现以下报错:

ValueError: not enough values to unpack (expected 3, got 0)

解决

celery -A your_app_name worker --pool=solo -l info

其中mymodule为文件名称

ready() 可以检测是否已经处理完毕:

>>> r.ready()
True

整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:

>>> r.get()
9
>>> r.get(timeout=1)

如果运行出现如下错误:

如果任务出现异常,get() 会再次引发异常,可以通过 propagate 参数进行覆盖:

>>> r.get(propagate=False)

如果任务出现异常,可以通过以下命令进行回溯:

>>> r.traceback

flask-celery

最简示例

通过上面的步骤,下面即是在 Flask 中使用 Celery 的最简示例:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery()

@celery.task()
def add(a, b):
    return a + b

if __name__ == "__main__":
    app.run()

在后台调用:

>>> from app import *
>>> result = add.delay(4,5)
>>> result.wait()

运行 Celery 职程

通过动手得知如上.wait() 永远不会实际地返回。这是因为需要运行 Celery。可以这样把 Celery 以职程运行:

celery -A your_app_name.celery worker --pool=solo --loglevel=info

实例:

celery -A app.celery worker --pool=solo --loglevel=info

而在linux中,这样使用

celery -A app.celery worker --pool=eventlet --loglevel=info

这次结果就有了。

>>> from app import *
>>> result = add.delay(4,5)
>>> result.wait()
9

常见用途

  • 邮件发送,由于可能在发送邮件时,由于网络等其他原因,导致发送时间过长,从而时用户体验感低,因此配合celery使用,就能够让等待时间降低,从而提高用户体验感。

img
image-20200905171339504
image-20200905173728940
image-20200905174318246
image-20200905173537139

celery文档
flask celery文档
redis desktop manager