django+celery项目结构
- project_name - app01 - __init__.py - admin.py - views.py - modes.py - tasks.py #celery用来执行任务的文件,task里的任务由views函数里去触发 - urls.py - views.py - project_name - __init__.py #初始化celery - celery.py #celery 定义实例 - settings.py #用来配置redis或rabbitmq地址 - urls.py - views.py - wsgi.py - templates - static - manager.py - db.sqlite3
celery.py
from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings') #这里要写项目名称app = Celery('project_name')app.config_from_object('django.conf:settings', namespace='CELERY') #这里配置settings里与celery相关配置的前缀# Load task modules from all registered Django app configs.app.autodiscover_tasks()@app.task(bind=True)def debug_task(self): print('Request: {0!r}'.format(self.request))
project_name/__init__.py
from __future__ import absolute_import, unicode_literals# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app__all__ = ['celery_app']
tasks.py
#!/usr/bin/env python#-*-coding:utf-8-*-from __future__ import absolute_import, unicode_literalsfrom celery import shared_taskimport subprocess@shared_taskdef add(x, y): return x + y@shared_taskdef mul(x, y): return x * y@shared_taskdef cmd_run(cmd): result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) return result.stdout.read().decode("utf-8")
app01/views.py
from django.shortcuts import render,redirect,HttpResponsefrom django_celery import tasks #引入task from celery.result import AsyncResult
def test_celery(request): #这里用来触发tasks里的任务 res = tasks.cmd_run.delay( "ipconfig", ) #print (res.get) #如果在此处直接get会变成同步 return HttpResponse(res.task_id) #获取taskid #获取任务执行状态返回给前端 def task_res(request): result = AsyncResult(id=task_id) return HttpResponse(result.status)
app01/urls.py
from django.conf.urls import url,includefrom django.contrib import adminfrom django_celery import viewsurlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^test_celery/$',views.test_celery)]
启动worker
celery -A wecaht worker -l info -P eventlet
django&celery 定时任务
pip3 install django-celery-beat##settings.py 里注册 django-celery-beatINSTALLED_APPS = [ 'django_celery_beat',]python manage.py migrateD:\django-project\wechat>python manage.py migrateOperations to perform: Apply all migrations: auth, sessions, django_celery_beat, contenttypes, adminRunning migrations: Rendering model states... DONE Applying contenttypes.0001_initial... OK Applying auth.0001_initial... OK Applying admin.0001_initial... OK Applying admin.0002_logentry_remove_auto_add... OK Applying contenttypes.0002_remove_content_type_name... OK Applying auth.0002_alter_permission_name_max_length... OK Applying auth.0003_alter_user_email_max_length... OK Applying auth.0004_alter_user_username_opts... OK Applying auth.0005_alter_user_last_login_null... OK Applying auth.0006_require_contenttypes_0002... OK Applying auth.0007_alter_validators_add_error_messages... OK Applying django_celery_beat.0001_initial... OK Applying django_celery_beat.0002_auto_20161118_0346... OK Applying django_celery_beat.0003_auto_20161209_0049... OK Applying django_celery_beat.0004_auto_20170221_0000... OK Applying django_celery_beat.0005_add_solarschedule_events_choices... OK Applying django_celery_beat.0006_auto_20180210_1226... OK Applying sessions.0001_initial... OK
登录后台
启动celery beat
celery -A project_name beat -l info -S django
每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到