pycharm 运行celery_Flask+Celery 挖坑与填坑之旅
任务背景emmmm,最近沉迷学习无法自拔,不过就在昨天收到了一个新需求,因为目前程序的某些地方计算所需的时间太太太太久了,导致让用户等待返回结果的时间太久,严重影响了用户体验急需搞定这件事情,不出意外这个任务又又又落到了我的身上。用户使用逻辑与后端处理逻辑图用户与后端交互方式从图中可以发现第二步会消耗大量的时间,并且在经过计算后仅仅会给用户返回一个id,用户需要携带此id再次发起请求获取结果,所以
任务背景
emmmm,最近沉迷学习无法自拔,不过就在昨天收到了一个新需求,因为目前程序的某些地方计算所需的时间太太太太久了,导致让用户等待返回结果的时间太久,严重影响了用户体验急需搞定这件事情,不出意外这个任务又又又落到了我的身上。
用户使用逻辑与后端处理逻辑图
从图中可以发现第二步会消耗大量的时间,并且在经过计算后仅仅会给用户返回一个id,用户需要携带此id再次发起请求获取结果,所以根据逻辑图可以确定后端处理文件这件事可以异步进行,没必要叫客户等待,那如何做到这个功能呢?就要用到万能的Celery了[celery的详细用户可以直接查询官网]
简单说一下Celery到底是一个什么东西
Celery是一个简单,灵活且可靠的分布式系统,是一个任务队列,可以处理大量消息,着重于实时处理,同时还支持任务调度,最主要的是Celery是开源免费且好用的~
项目背景及环境
- 是一个提供文件解析的项目
- 用的是Python语言进行编写的 版本为3.6.10
- 框架使用的是Flask框架 版本为0.11.1
需要新安装的模块有
redis
pip install redis==2.10.6 # 高版本redis会引发一个报错
Celery
pip install celery==3.1.24
可能遇到的报错
AttributeError: 'str' object has no attribute 'items'
降低redis版本 pip install redis==2.10.6
前置条件
已经安装redis
示例程序结构及功能
功能是非阻塞发邮件
项目目录结构
|-flask_email_celery
| |-app
| | |-utils
| | | |-__init__.py # 1 号 __init__ 文件
| | | |-send_email.py
| | |-views
| | | |-__init__.py # 2 号__init__ 文件
| | | |-email_send_cle.py
| | |-__init__.py # 3 号__init__ 文件
| | |-worker.py
| |-run.py
| |-settings.py
目录结构截图
源代码汇总
settings.py
# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File : settings.py
@Time : 2020/7/30 15:26
@Author : xxx
@Software: PyCharm
@desc: 配置文件
"""
# celery 配置CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
# flask 配置SECRET_KEY = "xxxxxxxxxxx"
run.py
# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File : run.py
@Time : 2020/7/30 15:25
@Author : xxx
@Software: PyCharm
@desc:
"""
from app import create_app
app = create_app()
if __name__ == "__main__":
print(app.url_map)
app.run()
app/worker.py
# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File : worker.py
@Time : 2020/7/30 10:33
@Author : xxxx
@Software: PyCharm
@desc: 异步主要控制
"""
from app import celery
from app.utils.send_email import EmailSend
@celery.task()
def send_email(to, header, content):
email = EmailSend()
email.send_text_email(to, header, content)
app/__init__.py
# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File : __init__.py.py
@Time : 2020/7/30 15:21
@Author : xxx
@Software: PyCharm
@desc:
"""
from flask import Flask
from celery import Celery
import settingscelery = Celery(__name__, broker=settings.CELERY_BROKER_URL)def create_app(): app = Flask(__name__) app.config.from_object(settings) celery.conf.update(app.config) from .views.email_send_cle import email_sends
app.register_blueprint(email_sends) return app
app/utils/send_email.py
"""
@version: 3.6
@author: xxx
@file: send_email.py
@time: 2019/10/10 10:57
@desc: 统一发邮件
"""
import smtplibfrom email.mime.text import MIMETextimport loggingclass EmailSend(object): def __init__(self):
self.logging = logging.getLogger('Waring')
self.email_host = 'smtp.xxx.com' # host
self.email_port = 'xxx' # 端口号
self.email_pass = 'xxxx' # smtp密码
self.from_addr = "xxx@163.com" # 用来发送邮件的邮箱
def send_text_email(self, to_addrs, subject, content):
self.logging.warning('send_text_email is willed Discard')
self.logging.error('send_text_email is None')
message_text = MIMEText(content, 'plain', 'utf8')
message_text['From'] = self.from_addr
message_text['To'] = to_addrs
message_text['Subject'] = subject
try:
# 在创建客户端对象的同时,连接到邮箱服务器。 client = smtplib.SMTP_SSL(host=self.email_host, port=self.email_port)
login_result = client.login(self.from_addr, self.email_pass)
print(login_result) # (235, b'Authentication successful')
if login_result and login_result[0] == 235:
print('Successful login')
client.sendmail(self.from_addr, to_addrs, message_text.as_string())
print('Successful mail delivery')
else:
print('Mail sending exception:', login_result[0], login_result[1])
except Exception as e:
self.logging.error('Connecting Mailbox Server Exception:{}'.format(e))
def send_image_email(self):
pass def send_word_email(self):
pass def send_video_email(self):
pass
app/views/email_send_cle.py
# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File : email_send_cle.py
@Time : 2020/7/30 15:34
@Author : xxx
@Software: PyCharm
@desc:
"""
from flask import Blueprint, request, jsonify
from app.worker import send_email
email_sends = Blueprint('email_sends', __name__)
@email_sends.route('/testemail', methods=['POST'])
def reset_password(): email = request.form['email']
content = request.form['content']
header = request.form['header']
send_email.delay(email, header, content) return jsonify(code=200, message=u"异步邮件发送成功")
启动及运行方法
1、首先将虚拟环境到安装有celery与flask的python环境下
source activate 虚拟环境名
2、启动redis
redis-server.exe
3、命令行模式下启动flask项目
python run.py
4、启动celery
celery worker -A app.worker:celery --loglevel=info
5、发起一个请求使用postman
结语
这就是一个比较简单的应用celery的项目,期间遇到过很多的坑,包括循环引用,找不到模块等等,简直欲仙欲死,经过一天多的时间搞定了这个需求,希望对诸位有所帮助,如果有什么问题或者好的办法欢迎交流。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)