Commit 7711c00e by Brian Wilson

Pull task_queue.py methods out from tasks.py, to represent API calls

from client.  Tasks.py remains the task implementations running on the
celery worker.

In particular, move status message generation out of task thread to client side.
parent d503e331
...@@ -8,22 +8,23 @@ from django.db import models ...@@ -8,22 +8,23 @@ from django.db import models
class Migration(SchemaMigration): class Migration(SchemaMigration):
def forwards(self, orm): def forwards(self, orm):
# Adding model 'CourseTaskLog' # Adding model 'CourseTaskLog'
db.create_table('courseware_coursetasklog', ( db.create_table('courseware_coursetasklog', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)),
('course_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('course_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)),
('student', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', null=True, to=orm['auth.User'])), ('student', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', null=True, to=orm['auth.User'])),
('task_name', self.gf('django.db.models.fields.CharField')(max_length=50, db_index=True)),
('task_args', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('task_args', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)),
('task_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)), ('task_id', self.gf('django.db.models.fields.CharField')(max_length=255, db_index=True)),
('task_status', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)), ('task_state', self.gf('django.db.models.fields.CharField')(max_length=50, null=True, db_index=True)),
('task_progress', self.gf('django.db.models.fields.CharField')(max_length=1024, null=True, db_index=True)),
('requester', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', to=orm['auth.User'])), ('requester', self.gf('django.db.models.fields.related.ForeignKey')(related_name='+', to=orm['auth.User'])),
('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)), ('created', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, null=True, db_index=True, blank=True)),
('updated', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, db_index=True, blank=True)), ('updated', self.gf('django.db.models.fields.DateTimeField')(auto_now=True, db_index=True, blank=True)),
)) ))
db.send_create_signal('courseware', ['CourseTaskLog']) db.send_create_signal('courseware', ['CourseTaskLog'])
def backwards(self, orm): def backwards(self, orm):
# Deleting model 'CourseTaskLog' # Deleting model 'CourseTaskLog'
db.delete_table('courseware_coursetasklog') db.delete_table('courseware_coursetasklog')
...@@ -76,7 +77,8 @@ class Migration(SchemaMigration): ...@@ -76,7 +77,8 @@ class Migration(SchemaMigration):
'task_args': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_args': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
'task_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}), 'task_id': ('django.db.models.fields.CharField', [], {'max_length': '255', 'db_index': 'True'}),
'task_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}), 'task_name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
'task_status': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}), 'task_progress': ('django.db.models.fields.CharField', [], {'max_length': '1024', 'null': 'True', 'db_index': 'True'}),
'task_state': ('django.db.models.fields.CharField', [], {'max_length': '50', 'null': 'True', 'db_index': 'True'}),
'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'db_index': 'True', 'blank': 'True'}) 'updated': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'db_index': 'True', 'blank': 'True'})
}, },
'courseware.offlinecomputedgrade': { 'courseware.offlinecomputedgrade': {
......
...@@ -271,12 +271,27 @@ class CourseTaskLog(models.Model): ...@@ -271,12 +271,27 @@ class CourseTaskLog(models.Model):
perform course-specific work. perform course-specific work.
Examples include grading and regrading. Examples include grading and regrading.
""" """
task_name = models.CharField(max_length=50, db_index=True)
course_id = models.CharField(max_length=255, db_index=True) course_id = models.CharField(max_length=255, db_index=True)
student = models.ForeignKey(User, null=True, db_index=True, related_name='+') # optional: None = task applies to all students student = models.ForeignKey(User, null=True, db_index=True, related_name='+') # optional: None = task applies to all students
task_name = models.CharField(max_length=50, db_index=True)
task_args = models.CharField(max_length=255, db_index=True) task_args = models.CharField(max_length=255, db_index=True)
task_id = models.CharField(max_length=255, db_index=True) # max_length from celery_taskmeta task_id = models.CharField(max_length=255, db_index=True) # max_length from celery_taskmeta
task_status = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta task_state = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta
task_progress = models.CharField(max_length=1024, null=True, db_index=True)
requester = models.ForeignKey(User, db_index=True, related_name='+') requester = models.ForeignKey(User, db_index=True, related_name='+')
created = models.DateTimeField(auto_now_add=True, null=True, db_index=True) created = models.DateTimeField(auto_now_add=True, null=True, db_index=True)
updated = models.DateTimeField(auto_now=True, db_index=True) updated = models.DateTimeField(auto_now=True, db_index=True)
def __repr__(self):
return 'CourseTaskLog<%r>' % ({
'task_name': self.task_name,
'course_id': self.course_id,
'student': self.student.username,
'task_args': self.task_args,
'task_id': self.task_id,
'task_state': self.task_state,
'task_progress': self.task_progress,
},)
def __unicode__(self):
return unicode(repr(self))
import json
import logging
from django.http import HttpResponse
from celery.result import AsyncResult
from celery.states import READY_STATES
from courseware.models import CourseTaskLog
from courseware.tasks import regrade_problem_for_all_students
from xmodule.modulestore.django import modulestore
# define different loggers for use within tasks and on client side
log = logging.getLogger(__name__)
def get_running_course_tasks(course_id):
course_tasks = CourseTaskLog.objects.filter(course_id=course_id)
# exclude(task_state='SUCCESS').exclude(task_state='FAILURE').exclude(task_state='REVOKED')
for state in READY_STATES:
course_tasks = course_tasks.exclude(task_state=state)
return course_tasks
def _task_is_running(course_id, task_name, task_args, student=None):
runningTasks = CourseTaskLog.objects.filter(course_id=course_id, task_name=task_name, task_args=task_args)
if student is not None:
runningTasks = runningTasks.filter(student=student)
for state in READY_STATES:
runningTasks = runningTasks.exclude(task_state=state)
return len(runningTasks) > 0
def submit_regrade_problem_for_all_students(request, course_id, problem_url):
# check arguments: in particular, make sure that problem_url is defined
# (since that's currently typed in). If the corresponding module descriptor doesn't exist,
# an exception should be raised. Let it continue to the caller.
modulestore().get_instance(course_id, problem_url)
# TODO: adjust transactions so that one request will not be about to create an
# entry while a second is testing to see if the entry exists. (Need to handle
# quick accidental double-clicks when submitting.)
# check to see if task is already running
task_name = 'regrade'
if _task_is_running(course_id, task_name, problem_url):
# TODO: figure out how to return info that it's already running
raise Exception("task is already running")
# Create log entry now, so that future requests won't
tasklog_args = {'course_id': course_id,
'task_name': task_name,
'task_args': problem_url,
'task_state': 'QUEUING',
'requester': request.user}
course_task_log = CourseTaskLog.objects.create(**tasklog_args)
# At a low level of processing, the task currently fetches some information from the web request.
# This is used for setting up X-Queue, as well as for tracking.
# An actual request will not successfully serialize with json or with pickle.
# TODO: we can just pass all META info as a dict.
request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'],
'REMOTE_ADDR': request.META['REMOTE_ADDR'],
'SERVER_NAME': request.META['SERVER_NAME'],
'REQUEST_METHOD': 'GET',
# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'],
}
# Submit task:
task_args = [request_environ, course_id, problem_url]
result = regrade_problem_for_all_students.apply_async(task_args)
# Put info into table with the resulting task_id.
course_task_log.task_state = result.state
course_task_log.task_id = result.id
course_task_log.save()
return course_task_log
def course_task_log_status(request, task_id=None):
"""
This returns the status of a course-related task as a JSON-serialized dict.
"""
output = {}
if task_id is not None:
output = _get_course_task_log_status(task_id)
elif 'task_id' in request.POST:
task_id = request.POST['task_id']
output = _get_course_task_log_status(task_id)
elif 'task_ids[]' in request.POST:
tasks = request.POST.getlist('task_ids[]')
for task_id in tasks:
task_output = _get_course_task_log_status(task_id)
if task_output is not None:
output[task_id] = task_output
# TODO decide whether to raise exception if bad args are passed.
# May be enough just to return an empty output.
return HttpResponse(json.dumps(output, indent=4))
def _get_course_task_log_status(task_id):
"""
Get the status for a given task_id.
Returns a dict, with the following keys:
'task_id'
'task_state'
'in_progress': boolean indicating if the task is still running.
'task_traceback': optional, returned if task failed and produced a traceback.
If task doesn't exist, returns None.
"""
# First check if the task_id is known
try:
course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id)
except CourseTaskLog.DoesNotExist:
# TODO: log a message here
return None
output = {}
# if the task is already known to be done, then there's no reason to query
# the underlying task:
if course_task_log_entry.task_state not in READY_STATES:
# we need to get information from the task result directly now.
# Just create the result object.
result = AsyncResult(task_id)
if result.traceback is not None:
output['task_traceback'] = result.traceback
if result.state == "PROGRESS":
# construct a status message directly from the task result's metadata:
if hasattr(result, 'result') and 'current' in result.result:
fmt = "Attempted {attempted} of {total}, {action_name} {updated}"
message = fmt.format(attempted=result.result['attempted'],
updated=result.result['updated'],
total=result.result['total'],
action_name=result.result['action_name'])
output['message'] = message
log.info("progress: {0}".format(message))
for name in ['attempted', 'updated', 'total', 'action_name']:
output[name] = result.result[name]
else:
log.info("still making progress... ")
# update the entry if the state has changed:
if result.state != course_task_log_entry.task_state:
course_task_log_entry.task_state = result.state
course_task_log_entry.save()
output['task_id'] = course_task_log_entry.task_id
output['task_state'] = course_task_log_entry.task_state
output['in_progress'] = course_task_log_entry.task_state not in READY_STATES
if course_task_log_entry.task_progress is not None:
output['task_progress'] = course_task_log_entry.task_progress
if course_task_log_entry.task_state == 'SUCCESS':
succeeded, message = _get_task_completion_message(course_task_log_entry)
output['message'] = message
output['succeeded'] = succeeded
return output
def _get_task_completion_message(course_task_log_entry):
"""
Construct progress message from progress information in CourseTaskLog entry.
Returns (boolean, message string) duple.
"""
succeeded = False
if course_task_log_entry.task_progress is None:
log.warning("No task_progress information found for course_task {0}".format(course_task_log_entry.task_id))
return (succeeded, "No status information available")
task_progress = json.loads(course_task_log_entry.task_progress)
action_name = task_progress['action_name']
num_attempted = task_progress['attempted']
num_updated = task_progress['updated']
# num_total = task_progress['total']
if course_task_log_entry.student is not None:
if num_attempted == 0:
msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'."
elif num_updated == 0:
msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!"
else:
succeeded = True
msg = "Problem successfully {action} for student '{student}' and problem '{problem}'"
elif num_attempted == 0:
msg = "Unable to find any students with submissions to be {action} for problem '{problem}'."
elif num_updated == 0:
msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!"
elif num_updated == num_attempted:
succeeded = True
msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!"
elif num_updated < num_attempted:
msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!"
# Update status in task result object itself:
message = msg.format(action=action_name, updated=num_updated, attempted=num_attempted,
student=course_task_log_entry.student, problem=course_task_log_entry.task_args)
return (succeeded, message)
import json import json
import logging #import logging
from time import sleep from time import sleep
from django.contrib.auth.models import User from django.contrib.auth.models import User
import mitxmako.middleware as middleware
from django.http import HttpResponse
# from django.http import HttpRequest
from django.test.client import RequestFactory from django.test.client import RequestFactory
from celery import task, current_task from celery import task, current_task
from celery.result import AsyncResult from celery.signals import worker_ready
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
import mitxmako.middleware as middleware
from courseware.models import StudentModule, CourseTaskLog from courseware.models import StudentModule, CourseTaskLog
from courseware.model_data import ModelDataCache from courseware.model_data import ModelDataCache
from courseware.module_render import get_module from courseware.module_render import get_module
from xmodule.modulestore.django import modulestore from xmodule.modulestore.django import modulestore
from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError #from xmodule.modulestore.exceptions import ItemNotFoundError, InvalidLocationError
import track.views import track.views
# define different loggers for use within tasks and on client side # define different loggers for use within tasks and on client side
logger = get_task_logger(__name__) task_log = get_task_logger(__name__)
log = logging.getLogger(__name__) # log = logging.getLogger(__name__)
@task @task
def waitawhile(value): def waitawhile(value):
for i in range(value): for i in range(value):
sleep(1) # in seconds sleep(1) # in seconds
logger.info('Waited {0} seconds...'.format(i)) task_log.info('Waited {0} seconds...'.format(i))
current_task.update_state(state='PROGRESS', current_task.update_state(state='PROGRESS',
meta={'current': i, 'total': value}) meta={'current': i, 'total': value})
...@@ -41,19 +40,35 @@ def waitawhile(value): ...@@ -41,19 +40,35 @@ def waitawhile(value):
class UpdateProblemModuleStateError(Exception): class UpdateProblemModuleStateError(Exception):
pass pass
#def get_module_descriptor(course_id, module_state_key):
# """Return module descriptor for requested module, or None if not found."""
# try:
# module_descriptor = modulestore().get_instance(course_id, module_state_key)
# except ItemNotFoundError:
# pass
# except InvalidLocationError:
# pass
# return module_descriptor
# except ItemNotFoundError:
# msg = "Couldn't find problem with that urlname."
# except InvalidLocationError:
# msg = "Couldn't find problem with that urlname."
# if module_descriptor is None:
# msg = "Couldn't find problem with that urlname."
# if not succeeded:
# current_task.update_state(
# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total})
# The task should still succeed, but should have metadata indicating
# that the result of the successful task was a failure. (It's not
# the queue that failed, but the task put on the queue.)
def _update_problem_module_state(request, course_id, problem_url, student, update_fcn, action_name, filter_fcn): def _update_problem_module_state(request, course_id, module_state_key, student, update_fcn, action_name, filter_fcn):
''' '''
Performs generic update by visiting StudentModule instances with the update_fcn provided Performs generic update by visiting StudentModule instances with the update_fcn provided
If student is None, performs update on modules for all students on the specified problem If student is None, performs update on modules for all students on the specified problem
''' '''
module_state_key = problem_url
# TODO: store this in the task state, not as a separate return value.
# (Unless that's not what the task state is intended to mean. The task can successfully
# complete, as far as celery is concerned, but have an internal status of failed.)
succeeded = False
# add hack so that mako templates will work on celery worker server: # add hack so that mako templates will work on celery worker server:
# The initialization of Make templating is usually done when Django is # The initialization of Make templating is usually done when Django is
# initialize middleware packages as part of processing a server request. # initialize middleware packages as part of processing a server request.
...@@ -61,24 +76,11 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat ...@@ -61,24 +76,11 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat
# called. So we look for the result: the defining of the lookup paths # called. So we look for the result: the defining of the lookup paths
# for templates. # for templates.
if 'main' not in middleware.lookup: if 'main' not in middleware.lookup:
task_log.info("Initializing Mako middleware explicitly")
middleware.MakoMiddleware() middleware.MakoMiddleware()
# find the problem descriptor, if any: # find the problem descriptor:
try: module_descriptor = modulestore().get_instance(course_id, module_state_key)
module_descriptor = modulestore().get_instance(course_id, module_state_key)
succeeded = True
except ItemNotFoundError:
msg = "Couldn't find problem with that urlname."
except InvalidLocationError:
msg = "Couldn't find problem with that urlname."
if module_descriptor is None:
msg = "Couldn't find problem with that urlname."
# if not succeeded:
# current_task.update_state(
# meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total})
# The task should still succeed, but should have metadata indicating
# that the result of the successful task was a failure. (It's not
# the queue that failed, but the task put on the queue.)
# find the module in question # find the module in question
succeeded = False succeeded = False
...@@ -97,54 +99,67 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat ...@@ -97,54 +99,67 @@ def _update_problem_module_state(request, course_id, problem_url, student, updat
num_updated = 0 num_updated = 0
num_attempted = 0 num_attempted = 0
num_total = len(modules_to_update) # TODO: make this more efficient. Count()? num_total = len(modules_to_update) # TODO: make this more efficient. Count()?
def get_task_progress():
progress = {'action_name': action_name,
'attempted': num_attempted,
'updated': num_updated,
'total': num_total,
}
return progress
task_log.info("Starting to process task {0}".format(current_task.request.id))
for module_to_update in modules_to_update: for module_to_update in modules_to_update:
num_attempted += 1 num_attempted += 1
# try: # There is no try here: if there's an error, we let it throw, and the task will
# be marked as FAILED, with a stack trace.
if update_fcn(request, module_to_update, module_descriptor): if update_fcn(request, module_to_update, module_descriptor):
# If the update_fcn returns true, then it performed some kind of work.
num_updated += 1 num_updated += 1
# if there's an error, just let it throw, and the task will
# be marked as FAILED, with a stack trace.
# except UpdateProblemModuleStateError as e:
# something bad happened, so exit right away
# return (succeeded, e.message)
# update task status: # update task status:
current_task.update_state(state='PROGRESS', # TODO: decide on the frequency for updating this:
meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total}) # -- it may not make sense to do so every time through the loop
# -- may depend on each iteration's duration
current_task.update_state(state='PROGRESS', meta=get_task_progress())
sleep(5) # in seconds
# Done with looping through all modules, so just return final statistics:
# TODO: these messages should be rendered at the view level -- move them there!
# if student is not None:
# if num_attempted == 0:
# msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'."
# elif num_updated == 0:
# msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!"
# else:
# succeeded = True
# msg = "Problem successfully {action} for student '{student}' and problem '{problem}'"
# elif num_attempted == 0:
# msg = "Unable to find any students with submissions to be {action} for problem '{problem}'."
# elif num_updated == 0:
# msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!"
# elif num_updated == num_attempted:
# succeeded = True
# msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!"
# elif num_updated < num_attempted:
# msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!"
#
# # Update status in task result object itself:
# msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key)
task_progress = get_task_progress() # succeeded=succeeded, message=msg)
current_task.update_state(state='PROGRESS', meta=task_progress)
# Update final progress in course task table as well:
# The actual task result state is updated by celery when this task completes, and thus
# clobbers any custom metadata. So if we want any such status to persist, we have to
# write it to the CourseTaskLog instead.
task_log.info("Finished processing task, updating CourseTaskLog entry")
course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.request.id)
course_task_log_entry.task_progress = json.dumps(task_progress)
course_task_log_entry.save()
# done with looping through all modules, so just return final statistics:
if student is not None:
if num_attempted == 0:
msg = "Unable to find submission to be {action} for student '{student}' and problem '{problem}'."
elif num_updated == 0:
msg = "Problem failed to be {action} for student '{student}' and problem '{problem}'!"
else:
succeeded = True
msg = "Problem successfully {action} for student '{student}' and problem '{problem}'"
elif num_attempted == 0:
msg = "Unable to find any students with submissions to be {action} for problem '{problem}'."
elif num_updated == 0:
msg = "Problem failed to be {action} for any of {attempted} students for problem '{problem}'!"
elif num_updated == num_attempted:
succeeded = True
msg = "Problem successfully {action} for {attempted} students for problem '{problem}'!"
elif num_updated < num_attempted:
msg = "Problem {action} for {updated} of {attempted} students for problem '{problem}'!"
msg = msg.format(action=action_name, updated=num_updated, attempted=num_attempted, student=student, problem=module_state_key)
# update status in task result object itself:
current_task.update_state(state='DONE',
meta={'attempted': num_attempted, 'updated': num_updated, 'total': num_total,
'succeeded': succeeded, 'message': msg})
# and update status in course task table as well:
# TODO: figure out how this is legal. The actual task result
# status is updated by celery when this task completes, and is
# presumably going to clobber this custom metadata. So if we want
# any such status to persist, we have to write it to the CourseTaskLog instead.
# course_task_log_entry = CourseTaskLog.objects.get(task_id=current_task.id)
# course_task_log_entry.task_status = ...
# return (succeeded, msg)
return succeeded return succeeded
...@@ -193,7 +208,7 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) ...@@ -193,7 +208,7 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor)
# and load something they shouldn't have access to. # and load something they shouldn't have access to.
msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key, msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key,
student=student) student=student)
log.debug(msg) task_log.debug(msg)
raise UpdateProblemModuleStateError(msg) raise UpdateProblemModuleStateError(msg)
if not hasattr(instance, 'regrade_problem'): if not hasattr(instance, 'regrade_problem'):
...@@ -205,11 +220,11 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor) ...@@ -205,11 +220,11 @@ def _regrade_problem_module_state(request, module_to_regrade, module_descriptor)
result = instance.regrade_problem() result = instance.regrade_problem()
if 'success' not in result: if 'success' not in result:
# don't consider these fatal, but false means that the individual call didn't complete: # don't consider these fatal, but false means that the individual call didn't complete:
log.debug("error processing regrade call for problem {loc} and student {student}: " task_log.debug("error processing regrade call for problem {loc} and student {student}: "
"unexpected response {msg}".format(msg=result, loc=module_state_key, student=student)) "unexpected response {msg}".format(msg=result, loc=module_state_key, student=student))
return False return False
elif result['success'] != 'correct' and result['success'] != 'incorrect': elif result['success'] != 'correct' and result['success'] != 'incorrect':
log.debug("error processing regrade call for problem {loc} and student {student}: " task_log.debug("error processing regrade call for problem {loc} and student {student}: "
"{msg}".format(msg=result['success'], loc=module_state_key, student=student)) "{msg}".format(msg=result['success'], loc=module_state_key, student=student))
return False return False
else: else:
...@@ -245,7 +260,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif ...@@ -245,7 +260,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif
'task_name': 'regrade', 'task_name': 'regrade',
'task_args': problem_url, 'task_args': problem_url,
'task_id': task_id, 'task_id': task_id,
'task_status': result.state, 'task_state': result.state,
'requester': request.user} 'requester': request.user}
CourseTaskLog.objects.create(**tasklog_args) CourseTaskLog.objects.create(**tasklog_args)
...@@ -253,9 +268,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif ...@@ -253,9 +268,7 @@ def regrade_problem_for_student(request, course_id, problem_url, student_identif
@task @task
def _regrade_problem_for_all_students(request_environ, course_id, problem_url): def regrade_problem_for_all_students(request_environ, course_id, problem_url):
# request = HttpRequest()
# request.META.update(request_environ)
factory = RequestFactory(**request_environ) factory = RequestFactory(**request_environ)
request = factory.get('/') request = factory.get('/')
action_name = 'regraded' action_name = 'regraded'
...@@ -265,101 +278,6 @@ def _regrade_problem_for_all_students(request_environ, course_id, problem_url): ...@@ -265,101 +278,6 @@ def _regrade_problem_for_all_students(request_environ, course_id, problem_url):
update_fcn, action_name, filter_fcn) update_fcn, action_name, filter_fcn)
def regrade_problem_for_all_students(request, course_id, problem_url):
# Figure out (for now) how to serialize what we need of the request. The actual
# request will not successfully serialize with json or with pickle.
# Maybe we can just pass all META info as a dict.
request_environ = {'HTTP_USER_AGENT': request.META['HTTP_USER_AGENT'],
'REMOTE_ADDR': request.META['REMOTE_ADDR'],
'SERVER_NAME': request.META['SERVER_NAME'],
'REQUEST_METHOD': 'GET',
# 'HTTP_X_FORWARDED_PROTO': request.META['HTTP_X_FORWARDED_PROTO'],
}
# Submit task. Then put stuff into table with the resulting task_id.
task_args = [request_environ, course_id, problem_url]
result = _regrade_problem_for_all_students.apply_async(task_args)
task_id = result.id
tasklog_args = {'course_id': course_id,
'task_name': 'regrade',
'task_args': problem_url,
'task_id': task_id,
'task_status': result.state,
'requester': request.user}
course_task_log = CourseTaskLog.objects.create(**tasklog_args)
return course_task_log
def course_task_log_status(request, task_id=None):
"""
This returns the status of a course-related task as a JSON-serialized dict.
"""
output = {}
if task_id is not None:
output = _get_course_task_log_status(task_id)
elif 'task_id' in request.POST:
task_id = request.POST['task_id']
output = _get_course_task_log_status(task_id)
elif 'task_ids[]' in request.POST:
tasks = request.POST.getlist('task_ids[]')
for task_id in tasks:
task_output = _get_course_task_log_status(task_id)
output[task_id] = task_output
# TODO else: raise exception?
return HttpResponse(json.dumps(output, indent=4))
def _get_course_task_log_status(task_id):
course_task_log_entry = CourseTaskLog.objects.get(task_id=task_id)
# TODO: error handling if it doesn't exist...
def not_in_progress(entry):
# TODO: do better than to copy list from celery.states.READY_STATES
return entry.task_status in ['SUCCESS', 'FAILURE', 'REVOKED']
# if the task is already known to be done, then there's no reason to query
# the underlying task:
if not_in_progress(course_task_log_entry):
output = {
'task_id': course_task_log_entry.task_id,
'task_status': course_task_log_entry.task_status,
'in_progress': False
}
return output
# we need to get information from the task result directly now.
result = AsyncResult(task_id)
output = {
'task_id': result.id,
'task_status': result.state,
'in_progress': True
}
if result.traceback is not None:
output['task_traceback'] = result.traceback
if result.state == "PROGRESS":
if hasattr(result, 'result') and 'current' in result.result:
log.info("still waiting... progress at {0} of {1}".format(result.result['current'],
result.result['total']))
output['current'] = result.result['current']
output['total'] = result.result['total']
else:
log.info("still making progress... ")
if result.successful():
value = result.result
output['value'] = value
# update the entry if necessary:
if course_task_log_entry.task_status != result.state:
course_task_log_entry.task_status = result.state
course_task_log_entry.save()
return output
def _reset_problem_attempts_module_state(request, module_to_reset, module_descriptor): def _reset_problem_attempts_module_state(request, module_to_reset, module_descriptor):
# modify the problem's state # modify the problem's state
# load the state json and change state # load the state json and change state
...@@ -420,3 +338,16 @@ def _delete_problem_state_for_all_students(request, course_id, problem_url): ...@@ -420,3 +338,16 @@ def _delete_problem_state_for_all_students(request, course_id, problem_url):
update_fcn = _delete_problem_module_state update_fcn = _delete_problem_module_state
return _update_problem_module_state_for_all_students(request, course_id, problem_url, return _update_problem_module_state_for_all_students(request, course_id, problem_url,
update_fcn, action_name) update_fcn, action_name)
@worker_ready.connect
def initialize_middleware(**kwargs):
# The initialize Django middleware - some middleware components
# are initialized lazily when the first request is served. Since
# the celery workers do not serve request, the components never
# get initialized, causing errors in some dependencies.
# In particular, the Mako template middleware is used by some xmodules
task_log.info("Initializing all middleware from worker_ready.connect hook")
from django.core.handlers.base import BaseHandler
BaseHandler().load_middleware()
...@@ -10,9 +10,9 @@ import os ...@@ -10,9 +10,9 @@ import os
import re import re
import requests import requests
from requests.status_codes import codes from requests.status_codes import codes
import urllib #import urllib
from collections import OrderedDict from collections import OrderedDict
from time import sleep #from time import sleep
from StringIO import StringIO from StringIO import StringIO
...@@ -25,7 +25,8 @@ from mitxmako.shortcuts import render_to_response ...@@ -25,7 +25,8 @@ from mitxmako.shortcuts import render_to_response
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from courseware import grades from courseware import grades
from courseware import tasks #from courseware import tasks # for now... should remove once things are in queue instead
from courseware import task_queue
from courseware.access import (has_access, get_access_group_name, from courseware.access import (has_access, get_access_group_name,
course_beta_test_group_name) course_beta_test_group_name)
from courseware.courses import get_course_with_access from courseware.courses import get_course_with_access
...@@ -176,12 +177,12 @@ def instructor_dashboard(request, course_id): ...@@ -176,12 +177,12 @@ def instructor_dashboard(request, course_id):
datatable['title'] = 'List of students enrolled in {0}'.format(course_id) datatable['title'] = 'List of students enrolled in {0}'.format(course_id)
track.views.server_track(request, 'list-students', {}, page='idashboard') track.views.server_track(request, 'list-students', {}, page='idashboard')
elif 'Test Celery' in action: # elif 'Test Celery' in action:
args = (10,) # args = (10,)
result = tasks.waitawhile.apply_async(args, retry=False) # result = tasks.waitawhile.apply_async(args, retry=False)
task_id = result.id # task_id = result.id
celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id}) # celery_ajax_url = reverse('celery_ajax_status', kwargs={'task_id': task_id})
msg += '<p>Celery Status for task ${task}:</p><div class="celery-status" data-ajax_url="${url}"></div><p>Status end.</p>'.format(task=task_id, url=celery_ajax_url) # msg += '<p>Celery Status for task ${task}:</p><div class="celery-status" data-ajax_url="${url}"></div><p>Status end.</p>'.format(task=task_id, url=celery_ajax_url)
elif 'Dump Grades' in action: elif 'Dump Grades' in action:
log.debug(action) log.debug(action)
...@@ -217,13 +218,13 @@ def instructor_dashboard(request, course_id): ...@@ -217,13 +218,13 @@ def instructor_dashboard(request, course_id):
elif "Regrade ALL students' problem submissions" in action: elif "Regrade ALL students' problem submissions" in action:
problem_url = request.POST.get('problem_to_regrade', '') problem_url = request.POST.get('problem_to_regrade', '')
try: try:
course_task_log_entry = tasks.regrade_problem_for_all_students(request, course_id, problem_url) course_task_log_entry = task_queue.submit_regrade_problem_for_all_students(request, course_id, problem_url)
if course_task_log_entry is None:
msg += '<font="red">Failed to create a background task for regrading "{0}".</font>'.format(problem_url)
except Exception as e: except Exception as e:
log.error("Encountered exception from regrade: {0}", e) log.error("Encountered exception from regrade: {0}".format(e))
# check that a course_task_log entry was created: msg += '<font="red">Failed to create a background task for regrading "{0}": {1}.</font>'.format(problem_url, e)
if course_task_log_entry is None:
msg += '<font="red">Failed to create a background task for regrading "{0}".</font>'.format(problem_url)
elif "Reset student's attempts" in action or "Delete student state for problem" in action: elif "Reset student's attempts" in action or "Delete student state for problem" in action:
# get the form data # get the form data
unique_student_identifier = request.POST.get('unique_student_identifier', '') unique_student_identifier = request.POST.get('unique_student_identifier', '')
...@@ -645,7 +646,7 @@ def instructor_dashboard(request, course_id): ...@@ -645,7 +646,7 @@ def instructor_dashboard(request, course_id):
msg += "<br/><font color='orange'>Grades from %s</font>" % offline_grades_available(course_id) msg += "<br/><font color='orange'>Grades from %s</font>" % offline_grades_available(course_id)
# generate list of pending background tasks # generate list of pending background tasks
course_tasks = CourseTaskLog.objects.filter(course_id = course_id).exclude(task_status='SUCCESS').exclude(task_status='FAILURE') course_tasks = task_queue.get_running_course_tasks(course_id)
#---------------------------------------- #----------------------------------------
# context for rendering # context for rendering
...@@ -1205,99 +1206,99 @@ def dump_grading_context(course): ...@@ -1205,99 +1206,99 @@ def dump_grading_context(course):
return msg return msg
def old1testcelery(request): #def old1testcelery(request):
""" # """
A Simple view that checks if the application can talk to the celery workers # A Simple view that checks if the application can talk to the celery workers
""" # """
args = ('ping',) # args = ('ping',)
result = tasks.echo.apply_async(args, retry=False) # result = tasks.echo.apply_async(args, retry=False)
value = result.get(timeout=0.5) # value = result.get(timeout=0.5)
output = { # output = {
'task_id': result.id, # 'task_id': result.id,
'value': value # 'value': value
} # }
return HttpResponse(json.dumps(output, indent=4)) # return HttpResponse(json.dumps(output, indent=4))
#
#
def old2testcelery(request): #def old2testcelery(request):
""" # """
A Simple view that checks if the application can talk to the celery workers # A Simple view that checks if the application can talk to the celery workers
""" # """
args = (10,) # args = (10,)
result = tasks.waitawhile.apply_async(args, retry=False) # result = tasks.waitawhile.apply_async(args, retry=False)
while not result.ready(): # while not result.ready():
sleep(0.5) # in seconds # sleep(0.5) # in seconds
if result.state == "PROGRESS": # if result.state == "PROGRESS":
if hasattr(result, 'result') and 'current' in result.result: # if hasattr(result, 'result') and 'current' in result.result:
log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) # log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total']))
else: # else:
log.info("still making progress... ") # log.info("still making progress... ")
if result.successful(): # if result.successful():
value = result.result # value = result.result
output = { # output = {
'task_id': result.id, # 'task_id': result.id,
'value': value # 'value': value
} # }
return HttpResponse(json.dumps(output, indent=4)) # return HttpResponse(json.dumps(output, indent=4))
#
#
def testcelery(request): #def testcelery(request):
""" # """
A Simple view that checks if the application can talk to the celery workers # A Simple view that checks if the application can talk to the celery workers
""" # """
args = (10,) # args = (10,)
result = tasks.waitawhile.apply_async(args, retry=False) # result = tasks.waitawhile.apply_async(args, retry=False)
task_id = result.id # task_id = result.id
# return the task_id to a template which will set up an ajax call to # # return the task_id to a template which will set up an ajax call to
# check the progress of the task. # # check the progress of the task.
return testcelery_status(request, task_id) # return testcelery_status(request, task_id)
# return mitxmako.shortcuts.render_to_response('celery_ajax.html', { ## return mitxmako.shortcuts.render_to_response('celery_ajax.html', {
# 'element_id': 'celery_task' ## 'element_id': 'celery_task'
# 'id': self.task_id, ## 'id': self.task_id,
# 'ajax_url': reverse('testcelery_ajax'), ## 'ajax_url': reverse('testcelery_ajax'),
# }) ## })
#
#
def testcelery_status(request, task_id): #def testcelery_status(request, task_id):
result = tasks.waitawhile.AsyncResult(task_id) # result = tasks.waitawhile.AsyncResult(task_id)
while not result.ready(): # while not result.ready():
sleep(0.5) # in seconds # sleep(0.5) # in seconds
if result.state == "PROGRESS": # if result.state == "PROGRESS":
if hasattr(result, 'result') and 'current' in result.result: # if hasattr(result, 'result') and 'current' in result.result:
log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) # log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total']))
else: # else:
log.info("still making progress... ") # log.info("still making progress... ")
if result.successful(): # if result.successful():
value = result.result # value = result.result
output = { # output = {
'task_id': result.id, # 'task_id': result.id,
'value': value # 'value': value
} # }
return HttpResponse(json.dumps(output, indent=4)) # return HttpResponse(json.dumps(output, indent=4))
#
#
def celery_task_status(request, task_id): #def celery_task_status(request, task_id):
# TODO: determine if we need to know the name of the original task, # # TODO: determine if we need to know the name of the original task,
# or if this could be any task... Sample code seems to indicate that # # or if this could be any task... Sample code seems to indicate that
# we could just include the AsyncResult class directly, i.e.: # # we could just include the AsyncResult class directly, i.e.:
# from celery.result import AsyncResult. # # from celery.result import AsyncResult.
result = tasks.waitawhile.AsyncResult(task_id) # result = tasks.waitawhile.AsyncResult(task_id)
#
output = { # output = {
'task_id': result.id, # 'task_id': result.id,
'state': result.state # 'state': result.state
} # }
#
if result.state == "PROGRESS": # if result.state == "PROGRESS":
if hasattr(result, 'result') and 'current' in result.result: # if hasattr(result, 'result') and 'current' in result.result:
log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total'])) # log.info("still waiting... progress at {0} of {1}".format(result.result['current'], result.result['total']))
output['current'] = result.result['current'] # output['current'] = result.result['current']
output['total'] = result.result['total'] # output['total'] = result.result['total']
else: # else:
log.info("still making progress... ") # log.info("still making progress... ")
#
if result.successful(): # if result.successful():
value = result.result # value = result.result
output['value'] = value # output['value'] = value
#
return HttpResponse(json.dumps(output, indent=4)) # return HttpResponse(json.dumps(output, indent=4))
...@@ -74,18 +74,14 @@ ...@@ -74,18 +74,14 @@
var task_id = name; var task_id = name;
var task_dict = response[task_id]; var task_dict = response[task_id];
// this should be a dict of properties for this task_id // this should be a dict of properties for this task_id
var in_progress = task_dict.in_progress if (task_dict.in_progress === true) {
if (in_progress === true) {
something_in_progress = true; something_in_progress = true;
} }
// find the corresponding entry, and update it: // find the corresponding entry, and update it:
selector = '[data-task-id="' + task_id + '"]'; entry = $(_this.element).find('[data-task-id="' + task_id + '"]');
entry = $(_this.element).find(selector); entry.find('.task-state').text(task_dict.task_state)
var task_status_el = entry.find('.task-status'); var progress_value = task_dict.message || '';
task_status_el.text(task_dict.task_status) entry.find('.task-progress').text(progress_value);
var task_progress_el = entry.find('.task-progress');
var progress_value = task_dict.task_progress || '';
task_progress_el.text(progress_value);
} }
} }
if (something_in_progress) { if (something_in_progress) {
...@@ -491,7 +487,7 @@ function goto( mode) ...@@ -491,7 +487,7 @@ function goto( mode)
##----------------------------------------------------------------------------- ##-----------------------------------------------------------------------------
## Output tasks in progress ## Output tasks in progress
%if course_tasks is not None: %if course_tasks is not None and len(course_tasks) > 0:
<p>Pending Course Tasks</p> <p>Pending Course Tasks</p>
<div id="task-progress-wrapper"> <div id="task-progress-wrapper">
<table class="stat_table"> <table class="stat_table">
...@@ -503,7 +499,7 @@ function goto( mode) ...@@ -503,7 +499,7 @@ function goto( mode)
<th>Requester</th> <th>Requester</th>
<th>Submitted</th> <th>Submitted</th>
<th>Last Update</th> <th>Last Update</th>
<th>Task Status</th> <th>Task State</th>
<th>Task Progress</th> <th>Task Progress</th>
</tr> </tr>
%for tasknum, course_task in enumerate(course_tasks): %for tasknum, course_task in enumerate(course_tasks):
...@@ -515,7 +511,7 @@ function goto( mode) ...@@ -515,7 +511,7 @@ function goto( mode)
<td>${course_task.requester}</td> <td>${course_task.requester}</td>
<td>${course_task.created}</td> <td>${course_task.created}</td>
<td><div class="task-updated">${course_task.updated}</div></td> <td><div class="task-updated">${course_task.updated}</div></td>
<td><div class="task-status">${course_task.task_status}</div></td> <td><div class="task-state">${course_task.task_state}</div></td>
<td><div class="task-progress">unknown</div></td> <td><div class="task-progress">unknown</div></td>
</tr> </tr>
%endfor %endfor
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment