任务背景

emmmm,最近沉迷学习无法自拔,不过就在昨天收到了一个新需求,因为目前程序的某些地方计算所需的时间太太太太久了,导致让用户等待返回结果的时间太久,严重影响了用户体验急需搞定这件事情,不出意外这个任务又又又落到了我的身上。

ed1ccf1412ad0c2eff12f35532f01b08.png

用户使用逻辑与后端处理逻辑图

e2f1c53ed85e72f01ad21c76dcabbb4d.png
用户与后端交互方式

从图中可以发现第二步会消耗大量的时间,并且在经过计算后仅仅会给用户返回一个id,用户需要携带此id再次发起请求获取结果,所以根据逻辑图可以确定后端处理文件这件事可以异步进行,没必要叫客户等待,那如何做到这个功能呢?就要用到万能的Celery了[celery的详细用户可以直接查询官网]

简单说一下Celery到底是一个什么东西

Celery是一个简单,灵活且可靠的分布式系统,是一个任务队列,可以处理大量消息,着重于实时处理,同时还支持任务调度,最主要的是Celery是开源免费且好用的~

项目背景及环境

  1. 是一个提供文件解析的项目
  2. 用的是Python语言进行编写的 版本为3.6.10
  3. 框架使用的是Flask框架 版本为0.11.1

需要新安装的模块有

redis
pip install redis==2.10.6  # 高版本redis会引发一个报错
Celery
pip install celery==3.1.24

可能遇到的报错

AttributeError: 'str' object has no attribute 'items'
降低redis版本  pip install redis==2.10.6

前置条件

已经安装redis

示例程序结构及功能

功能是非阻塞发邮件

项目目录结构
|-flask_email_celery
|    |-app
|    |    |-utils
|    |    |    |-__init__.py  # 1 号 __init__ 文件
|    |    |    |-send_email.py
|    |    |-views
|    |    |    |-__init__.py  # 2 号__init__ 文件
|    |    |    |-email_send_cle.py
|    |    |-__init__.py  # 3 号__init__ 文件
|    |    |-worker.py
|    |-run.py
|    |-settings.py

目录结构截图

104f0d6ae4b6230d250659b2a70a1112.png

源代码汇总

settings.py

# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File    : settings.py
@Time    : 2020/7/30 15:26
@Author  : xxx
@Software: PyCharm
@desc: 配置文件
"""
# celery 配置CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
# flask 配置SECRET_KEY = "xxxxxxxxxxx"

run.py

# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File    : run.py
@Time    : 2020/7/30 15:25
@Author  : xxx
@Software: PyCharm
@desc: 
"""
from app import create_app
app = create_app()
if __name__ == "__main__":
    print(app.url_map)
    app.run()

app/worker.py

# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File    : worker.py
@Time    : 2020/7/30 10:33
@Author  : xxxx
@Software: PyCharm
@desc: 异步主要控制
"""
from app import celery
from app.utils.send_email import EmailSend
@celery.task()
def send_email(to, header, content):
    email = EmailSend()
    email.send_text_email(to, header, content)

app/__init__.py

# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File    : __init__.py.py
@Time    : 2020/7/30 15:21
@Author  : xxx
@Software: PyCharm
@desc: 
"""
from flask import Flask
from celery import Celery
import settingscelery = Celery(__name__, broker=settings.CELERY_BROKER_URL)def create_app():    app = Flask(__name__)    app.config.from_object(settings)    celery.conf.update(app.config)    from .views.email_send_cle import email_sends
    app.register_blueprint(email_sends)    return app

app/utils/send_email.py

"""
@version: 3.6
@author: xxx
@file: send_email.py
@time: 2019/10/10 10:57
@desc: 统一发邮件
"""
import smtplibfrom email.mime.text import MIMETextimport loggingclass EmailSend(object):    def __init__(self):
        self.logging = logging.getLogger('Waring')
        self.email_host = 'smtp.xxx.com'  # host
        self.email_port = 'xxx'  # 端口号
        self.email_pass = 'xxxx'  # smtp密码
        self.from_addr = "xxx@163.com"  # 用来发送邮件的邮箱
    def send_text_email(self, to_addrs, subject, content):
        self.logging.warning('send_text_email is willed Discard')
        self.logging.error('send_text_email is None')
        message_text = MIMEText(content, 'plain', 'utf8')
        message_text['From'] = self.from_addr
        message_text['To'] = to_addrs
        message_text['Subject'] = subject
        try:
            # 在创建客户端对象的同时,连接到邮箱服务器。            client = smtplib.SMTP_SSL(host=self.email_host, port=self.email_port)
            login_result = client.login(self.from_addr, self.email_pass)
            print(login_result)            #  (235, b'Authentication successful')
            if login_result and login_result[0] == 235:
                print('Successful login')
                client.sendmail(self.from_addr, to_addrs, message_text.as_string())
                print('Successful mail delivery')
            else:
                print('Mail sending exception:', login_result[0], login_result[1])
        except Exception as e:
            self.logging.error('Connecting Mailbox Server Exception:{}'.format(e))
    def send_image_email(self):
        pass    def send_word_email(self):
        pass    def send_video_email(self):
        pass

app/views/email_send_cle.py

# -*- encoding: utf-8 -*-
"""
@version : 3.6
@File    : email_send_cle.py
@Time    : 2020/7/30 15:34
@Author  : xxx
@Software: PyCharm
@desc: 
"""
from flask import Blueprint, request, jsonify
from app.worker import send_email
email_sends = Blueprint('email_sends', __name__)
@email_sends.route('/testemail', methods=['POST'])
def reset_password():    email = request.form['email']
    content = request.form['content']
    header = request.form['header']
    send_email.delay(email, header, content)    return jsonify(code=200, message=u"异步邮件发送成功")

启动及运行方法

1、首先将虚拟环境到安装有celery与flask的python环境下

source activate 虚拟环境名

2、启动redis

redis-server.exe

f46bc4543422d43e07a03a4d9e973abc.png
redis-server启动样子

3、命令行模式下启动flask项目

python run.py

4、启动celery

celery worker -A app.worker:celery --loglevel=info

128e2b73ceefcca8e027568c6bd37364.png
启动会出现标志性的代码

5、发起一个请求使用postman

2f3b75265f97ad3b757b5fd95206a8a2.png
使用postman发起请求

结语

这就是一个比较简单的应用celery的项目,期间遇到过很多的坑,包括循环引用,找不到模块等等,简直欲仙欲死,经过一天多的时间搞定了这个需求,希望对诸位有所帮助,如果有什么问题或者好的办法欢迎交流。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐