Django框架集成Celery异步-【2】:django集成celery,拿来即用,可用操作django的orm等功能-CSDN博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、项目结构和依赖

study_celery

          | --user

                |-- models.py

                |--views.py

                |--urls.py

          |--celery_task

                  |--__init__.py

                  |--async_task.py

                  |-- celery.py

                  | --check_task.py

                  | --config.py

                  | --scheduler_task.py

        | --study_celery

                | --settings.py

        | --manage.py

依赖redis数据库
 

redis==4.6.0
Django==3.2
django-redis==5.3.0
celery==5.3.1

二、celery框架配置详情

1、配置文件

config.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析
accept_content允许的内容类型/序列化程序的白名单如果收到不在此列表中的消息则该消息将被丢弃并出现错误默认只为json
task_serializer标识要使用的默认序列化方法的字符串默认值为json
result_serializer结果序列化格式默认值为json
timezone配置Celery以使用自定义时区
enable_utc启用消息中的日期和时间将转换为使用 UTC 时区与timezone连用当设置为 false 时将使用系统本地时区。
result_expires 异步任务结果存活时长
beat_schedule设置定时任务
'''
#手动注册celery的异步任务将所有celery异步任务所在的模块找到写成字符串
task_module = [
    'celery_task.async_task',  # 写任务模块导入路径该模块主要写异步任务的方法
    'celery_task.scheduler_task',  # 写任务模块导入路径该模块主要写定时任务的方法
]

#celery的配置
config = {
    "broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时123456是密码
    "result_backend" : 'redis://127.0.0.1:6379/1',
    "task_serializer" : 'json',
    "result_serializer" : 'json',
    "accept_content" : ['json'],
    "timezone" : 'Asia/Shanghai',
    "enable_utc" : False,
    "result_expires" : 1*60*60,
    "beat_schedule" : { #定时任务配置
            # 名字随意命名
            'add-func-30-seconds': {
                # 执行add_task下的addy函数
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径from celery_task.scheduler_task import add_func
                # 每10秒执行一次
                'schedule': timedelta(seconds=30),
                # add函数传递的参数
                'args': (10, 21)
            },
            # 名字随意起
            'add-func-5-minutes': {
                'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径from celery_task.scheduler_task import add_func
                # crontab不传的参数默认就是每的意思比如这里是每年每月每日每天每小时的5分执行该任务
                'schedule': crontab(minute='5'),  # 之前时间点执行每小时的第5分钟执行任务, 改成小时分钟秒 就是每天的哪个小时哪分钟哪秒钟执行
                'args': (19, 22)  # 定时任务需要的参数
            },
            # 缓存用户数据到cache中
            'cache-user-func': {
                'task': 'celery_task.scheduler_task.cache_user_func',
                # 导入任务函数from celery_task.scheduler_task import cache_user_func
                'schedule': timedelta(minutes=1),  # 每1分钟执行一次将用户消息缓存到cache中
            }
        }
}

2、celery对象创建

celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from .config import config,task_module

# 生成celery对象'task'相当于key用于区分celery对象
# broker是指定消息处理backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
app.conf.update(**config)

3、异步任务模块

async_task.py

'1、因为需要用到django中的内容所以需要配置django环境'
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
import django
django.setup()

# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
from apps.user.models import UserModel
'''
2、异步任务
不保存函数返回值的@app.task(ignore_result=True)
保存函数返回值的任务@app.task
'''

#没有返回值禁用掉结果后端
@app.task
def send_email_task(email,code):  # 此时可以直接传邮箱还能减少一次数据库的IO操作
    '''
    :param email: 接收消息的邮箱用户的邮箱
    :return:
    '''
    # 启用线程发送邮件此处最好加线程池
    t = threading.Thread(
        target=send_mail,
        args=(
            "登录前获取的验证码",  # 邮件标题
            '点击该邮件激活你的账号否则无法登陆',  # 给html_message参数传值后该参数信息失效
            settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
            [email],  # 接收邮件的邮件地址可以写多个
         ),
         # html_message中定义的字符串即HTML格式的信息可以在一个html文件中写好复制出来放在该字符串中
         kwargs={
                'html_message': f"<p></p> <p>验证码{code}</p>"
         }
        )
    t.start()
    return {'email':email,'code':code}

@app.task
def search_user_task():
    users = UserModel.objects.all()
    lis = []
    for user in users:
        dic = {'id':user.id,'name':user.name}
        lis.append(dic)
    return {'users':lis}

4、定时任务模块

scheduler_task.py

'1、因为需要用到django中的内容所以需要配置django环境 '
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")#根目录下study_celery/settings.py
import django
django.setup()

from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict

'2、定时任务'
#有返回值返回值可以从结果后端中获取
@app.task
def add_func(a,b):
    print('执行了加法函数')
    cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})
    return a+b

#不需要返回值禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():
    user = user_models.UserModel.objects.all()
    user_dict = {}
    for obj in user:
        user_dict[obj.account] = model_to_dict(obj)
    cache.set('all-user-data',user_dict,timeout=35*60)



5、检测任务完成状态

check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''

def check_task_status(task_id):
    '''
    任务的执行状态
        PENDING :等待执行
        STARTED :开始执行
        RETRY   :重新尝试执行
        SUCCESS :执行成功
        FAILURE :执行失败
    :param task_id:
    :return:
    '''
    result = AsyncResult(id=task_id, app=app)
    dic = {
        'type':result.status,
        'msg':'',
        'data':'',
        'code':400
    }
    if result.status == 'PENDING':
       dic['msg'] = '任务等待中'
    elif result.status == 'STARTED':
        dic['msg'] = '任务开始执行'
    elif result.status == 'RETRY':
        dic['msg']='任务重新尝试执行'
    elif result.status =='FAILURE':
        dic['msg'] = '任务执行失败了'
    elif result.status == 'SUCCESS':
        result = result.get()
        dic['msg'] = '任务执行成功'
        dic['data'] = result
        dic['code'] = 200
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢那么就可以终止。
    return dic


三、django项目的配置

1、settings.py

模块注册

#app注册
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'user.apps.UserConfig',
]

#cache缓存
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/2",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 1000}
            # "PASSWORD": "123",
        },
        'TIMEOUT':30*60 #缓存过期时间
    }
}

#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = 'xxxx@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'xxx'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '2333<'1234567890@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的即只能有一个为 True

2、关于视图函数的方法

1、获取cache对象,操作cache

from django.core.cache import cache

cache.set(key,value)

cache.get(key)

2、执行异步任务

from celery_task.async_task import send_email_task

res = send_email_task.delay('xxx@qq.com','23456')

task_id = res.id #获取异步任务的id通过该id可用获取任务的运行状态

from celery_task.async_task import search_user_task
res = search_user_task.delay()

四、启动项目项目根目录下执行

django项目

python manage.py runserver

celery框架

#windows系统

celery -A celery_task.celery worker -l info  -P  eventlet

#linux系统

celery -A celery_task.celery worker -l info 

定时任务启动

celery -A celery_task beat -l info

五、码云地址

django配置celery: django配置使用celerydjango使用celerydjango+celeryicon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-configuration-celery

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go

“Django框架集成Celery异步-【2】:django集成celery,拿来即用,可用操作django的orm等功能-CSDN博客” 的相关文章