Commit a8c3e910 by Brian Wilson

Handle failure task_output that won't fit in the model column.

parent eb1fe899
......@@ -22,10 +22,10 @@ from lxml import etree
from xml.sax.saxutils import unescape
from copy import deepcopy
from .correctmap import CorrectMap
from capa.correctmap import CorrectMap
import capa.inputtypes as inputtypes
import capa.customrender as customrender
from .util import contextualize_text, convert_files_to_filenames
from capa.util import contextualize_text, convert_files_to_filenames
import capa.xqueue_interface as xqueue_interface
# to be replaced with auto-registering
......@@ -43,8 +43,8 @@ response_properties = ["codeparam", "responseparam", "answer", "openendedparam"]
# special problem tags which should be turned into innocuous HTML
html_transforms = {'problem': {'tag': 'div'},
"text": {'tag': 'span'},
"math": {'tag': 'span'},
'text': {'tag': 'span'},
'math': {'tag': 'span'},
}
# These should be removed from HTML output, including all subelements
......@@ -284,20 +284,15 @@ class LoncapaProblem(object):
permits rescoring to be complete when the rescoring call returns.
"""
return all('filesubmission' not in responder.allowed_inputfields for responder in self.responders.values())
# for responder in self.responders.values():
# if 'filesubmission' in responder.allowed_inputfields:
# return False
#
# return True
def rescore_existing_answers(self):
'''
"""
Rescore student responses. Called by capa_module.rescore_problem.
'''
"""
return self._grade_answers(None)
def _grade_answers(self, student_answers):
'''
"""
Internal grading call used for checking new 'student_answers' and also
rescoring existing student_answers.
......@@ -309,13 +304,13 @@ class LoncapaProblem(object):
For rescoring, `student_answers` is None.
Calls the Response for each question in this problem, to do the actual grading.
'''
"""
# old CorrectMap
oldcmap = self.correct_map
# start new with empty CorrectMap
newcmap = CorrectMap()
# log.debug('Responders: %s' % self.responders)
# Call each responsetype instance to do actual grading
for responder in self.responders.values():
# File objects are passed only if responsetype explicitly allows
......@@ -335,7 +330,6 @@ class LoncapaProblem(object):
newcmap.update(results)
self.correct_map = newcmap
# log.debug('%s: in grade_answers, student_answers=%s, cmap=%s' % (self,student_answers,newcmap))
return newcmap
def get_question_answers(self):
......
......@@ -662,7 +662,6 @@ class StringResponseTest(ResponseTest):
)
correct_map = problem.grade_answers({'1_2_1': '2'})
hint = correct_map.get_hint('1_2_1')
# rand = random.Random(problem.seed)
self.assertEqual(hint, self._get_random_number_result(problem.seed))
......
......@@ -828,9 +828,7 @@ class CapaModule(CapaFields, XModule):
Returns the error messages for exceptions occurring while performing
the rescoring, rather than throwing them.
"""
event_info = dict()
event_info['state'] = self.lcp.get_state()
event_info['problem_id'] = self.location.url()
event_info = {'state': self.lcp.get_state(), 'problem_id': self.location.url()}
if not self.lcp.supports_rescoring():
event_info['failure'] = 'unsupported'
......@@ -851,8 +849,8 @@ class CapaModule(CapaFields, XModule):
correct_map = self.lcp.rescore_existing_answers()
except (StudentInputError, ResponseError, LoncapaProblemError) as inst:
log.warning("StudentInputError in capa_module:problem_rescore", exc_info=True)
event_info['failure'] = 'student_input_error'
log.warning("Input error in capa_module:problem_rescore", exc_info=True)
event_info['failure'] = 'input_error'
self.system.track_function('problem_rescore_fail', event_info)
return {'success': "Error: {0}".format(inst.message)}
......
import hashlib
import json
import logging
from uuid import uuid4
from django.db import transaction
......@@ -11,16 +10,14 @@ from celery.states import READY_STATES, SUCCESS, FAILURE, REVOKED
from courseware.module_render import get_xqueue_callback_url_prefix
from xmodule.modulestore.django import modulestore
from instructor_task.models import InstructorTask
from instructor_task.tasks_helper import PROGRESS
from instructor_task.models import InstructorTask, PROGRESS
log = logging.getLogger(__name__)
# define a "state" used in InstructorTask
QUEUING = 'QUEUING'
log = logging.getLogger(__name__)
class AlreadyRunningError(Exception):
"""Exception indicating that a background task is already running"""
pass
......@@ -60,20 +57,8 @@ def _reserve_task(course_id, task_type, task_key, task_input, requester):
if _task_is_running(course_id, task_type, task_key):
raise AlreadyRunningError("requested task is already running")
# create the task_id here, and pass it into celery:
task_id = str(uuid4())
# Create log entry now, so that future requests won't
tasklog_args = {'course_id': course_id,
'task_type': task_type,
'task_id': task_id,
'task_key': task_key,
'task_input': json.dumps(task_input),
'task_state': 'QUEUING',
'requester': requester}
instructor_task = InstructorTask.objects.create(**tasklog_args)
return instructor_task
# Create log entry now, so that future requests will know it's running.
return InstructorTask.create(course_id, task_type, task_key, task_input, requester)
def _get_xmodule_instance_args(request):
......@@ -128,37 +113,33 @@ def _update_instructor_task(instructor_task, task_result):
# Assume we don't always update the InstructorTask entry if we don't have to:
entry_needs_saving = False
task_progress = None
task_output = None
if result_state in [PROGRESS, SUCCESS]:
# construct a status message directly from the task result's result:
# it needs to go back with the entry passed in.
log.info("background task (%s), state %s: result: %s", task_id, result_state, returned_result)
task_progress = returned_result
task_output = InstructorTask.create_output_for_success(returned_result)
elif result_state == FAILURE:
# on failure, the result's result contains the exception that caused the failure
exception = returned_result
traceback = result_traceback if result_traceback is not None else ''
task_progress = {'exception': type(exception).__name__, 'message': str(exception.message)}
log.warning("background task (%s) failed: %s %s", task_id, returned_result, traceback)
if result_traceback is not None:
# truncate any traceback that goes into the InstructorTask model:
task_progress['traceback'] = result_traceback[:700]
task_output = InstructorTask.create_output_for_failure(exception, result_traceback)
elif result_state == REVOKED:
# on revocation, the result's result doesn't contain anything
# but we cannot rely on the worker thread to set this status,
# so we set it here.
entry_needs_saving = True
log.warning("background task (%s) revoked.", task_id)
task_progress = {'message': 'Task revoked before running'}
task_output = InstructorTask.create_output_for_revoked()
# save progress and state into the entry, even if it's not being saved:
# when celery is run in "ALWAYS_EAGER" mode, progress needs to go back
# with the entry passed in.
instructor_task.task_state = result_state
if task_progress is not None:
instructor_task.task_output = json.dumps(task_progress)
if task_output is not None:
instructor_task.task_output = task_output
if entry_needs_saving:
instructor_task.save()
......
......@@ -5,15 +5,23 @@ If you make changes to this model, be sure to create an appropriate migration
file and check it in at the same time as your model changes. To do that,
1. Go to the edx-platform dir
2. ./manage.py schemamigration courseware --auto description_of_your_change
3. Add the migration file created in edx-platform/lms/djangoapps/instructor_task/migrations/
2. ./manage.py schemamigration instructor_task --auto description_of_your_change
3. Add the migration file created in edx-platform/lms/djangoapps/instructor_task/migrations/
ASSUMPTIONS: modules have unique IDs, even across different module_types
"""
from uuid import uuid4
import json
from django.contrib.auth.models import User
from django.db import models
from django.db import models, transaction
# define custom states used by InstructorTask
QUEUING = 'QUEUING'
PROGRESS = 'PROGRESS'
class InstructorTask(models.Model):
......@@ -24,10 +32,10 @@ class InstructorTask(models.Model):
`task_type` identifies the kind of task being performed, e.g. rescoring.
`course_id` uses the course run's unique id to identify the course.
`task_input` stores input arguments as JSON-serialized dict, for reporting purposes.
Examples include url of problem being rescored, id of student if only one student being rescored.
`task_key` stores relevant input arguments encoded into key value for testing to see
if the task is already running (together with task_type and course_id).
`task_input` stores input arguments as JSON-serialized dict, for reporting purposes.
Examples include url of problem being rescored, id of student if only one student being rescored.
`task_id` stores the id used by celery for the background task.
`task_state` stores the last known state of the celery task
......@@ -61,3 +69,79 @@ class InstructorTask(models.Model):
def __unicode__(self):
return unicode(repr(self))
@classmethod
def create(cls, course_id, task_type, task_key, task_input, requester):
# create the task_id here, and pass it into celery:
task_id = str(uuid4())
json_task_input = json.dumps(task_input)
# check length of task_input, and return an exception if it's too long:
if len(json_task_input) > 255:
fmt = 'Task input longer than 255: "{input}" for "{task}" of "{course}"'
msg = fmt.format(input=json_task_input, task=task_type, course=course_id)
raise ValueError(msg)
# create the task, then save it:
instructor_task = cls(course_id=course_id,
task_type=task_type,
task_id=task_id,
task_key=task_key,
task_input=json_task_input,
task_state=QUEUING,
requester=requester)
instructor_task.save()
return instructor_task
@transaction.autocommit
def save_now(self):
"""Writes InstructorTask immediately, ensuring the transaction is committed."""
self.save()
@staticmethod
def create_output_for_success(returned_result):
"""Converts successful result to output format"""
json_output = json.dumps(returned_result)
return json_output
@staticmethod
def create_output_for_failure(exception, traceback_string):
"""
Converts failed result inofrmation to output format.
Traceback information is truncated or not included if it would result in an output string
that would not fit in the database. If the output is still too long, then the
exception message is also truncated.
Truncation is indicated by adding "..." to the end of the value.
"""
task_progress = {'exception': type(exception).__name__, 'message': str(exception.message)}
if traceback_string is not None:
# truncate any traceback that goes into the InstructorTask model:
task_progress['traceback'] = traceback_string
json_output = json.dumps(task_progress)
# if the resulting output is too long, then first shorten the
# traceback, and then the message, until it fits.
too_long = len(json_output) - 1023
if too_long > 0:
if traceback_string is not None:
if too_long >= len(traceback_string) - len('...'):
# remove the traceback entry entirely (so no key or value)
del task_progress['traceback']
too_long -= (len(traceback_string) + len('traceback'))
else:
# truncate the traceback:
task_progress['traceback'] = traceback_string[:-(too_long + 3)] + "..."
too_long = -1
if too_long > 0:
# we need to shorten the message:
task_progress['message'] = task_progress['message'][:-(too_long + 3)] + "..."
json_output = json.dumps(task_progress)
return json_output
@staticmethod
def create_output_for_revoked():
"""Creates standard message to store in output format for revoked tasks."""
return json.dumps({'message': 'Task revoked before running'})
......@@ -2,8 +2,6 @@
This file contains tasks that are designed to perform background operations on the
running state of a course.
"""
from celery import task
from instructor_task.tasks_helper import (update_problem_module_state,
......@@ -14,16 +12,19 @@ from instructor_task.tasks_helper import (update_problem_module_state,
@task
def rescore_problem(entry_id, xmodule_instance_args):
"""Rescores problem in `course_id`.
"""Rescores a problem in a course, for all students or one specific student.
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
`course_id` identifies the course.
`task_input` should be a dict with the following entries:
The entry contains the `course_id` that identifies the course, as well as the
`task_input`, which contains task-specific input.
The task_input should be a dict with the following entries:
'problem_url': the full URL to the problem to be rescored. (required)
'student': the identifier (username or email) of a particular user whose
problem submission should be rescored. If not specified, all problem
submissions will be rescored.
submissions for the problem will be rescored.
`xmodule_instance_args` provides information needed by _get_module_instance_for_task()
to instantiate an xmodule instance.
......@@ -38,11 +39,13 @@ def rescore_problem(entry_id, xmodule_instance_args):
@task
def reset_problem_attempts(entry_id, xmodule_instance_args):
"""Resets problem attempts to zero for `problem_url` in `course_id` for all students.
"""Resets problem attempts to zero for a particular problem for all students in a course.
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
`course_id` identifies the course.
`task_input` should be a dict with the following entries:
The entry contains the `course_id` that identifies the course, as well as the
`task_input`, which contains task-specific input.
The task_input should be a dict with the following entries:
'problem_url': the full URL to the problem to be rescored. (required)
......@@ -58,11 +61,13 @@ def reset_problem_attempts(entry_id, xmodule_instance_args):
@task
def delete_problem_state(entry_id, xmodule_instance_args):
"""Deletes problem state entirely for `problem_url` in `course_id` for all students.
"""Deletes problem state entirely for all students on a particular problem in a course.
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
`course_id` identifies the course.
`task_input` should be a dict with the following entries:
The entry contains the `course_id` that identifies the course, as well as the
`task_input`, which contains task-specific input.
The task_input should be a dict with the following entries:
'problem_url': the full URL to the problem to be rescored. (required)
......
......@@ -2,8 +2,6 @@
This file contains tasks that are designed to perform background operations on the
running state of a course.
"""
import json
......@@ -28,14 +26,11 @@ from track.views import task_track
from courseware.models import StudentModule
from courseware.model_data import ModelDataCache
from courseware.module_render import get_module_for_descriptor_internal
from instructor_task.models import InstructorTask
from instructor_task.models import InstructorTask, PROGRESS
# define different loggers for use within tasks and on client side
TASK_LOG = get_task_logger(__name__)
# define custom task state:
PROGRESS = 'PROGRESS'
# define value to use when no task_id is provided:
UNKNOWN_TASK_ID = 'unknown-task_id'
......@@ -94,7 +89,7 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
the update is successful; False indicates the update on the particular student module failed.
A raised exception indicates a fatal condition -- that no other student modules should be considered.
If no exceptions are raised, a dict containing the task's result is returned, with the following keys:
The return value is a dict containing the task's results, with the following keys:
'attempted': number of attempts made
'updated': number of attempts that "succeeded"
......@@ -170,12 +165,6 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
return task_progress
@transaction.autocommit
def _save_course_task(course_task):
"""Writes InstructorTask course_task immediately, ensuring the transaction is committed."""
course_task.save()
def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
xmodule_instance_args):
"""
......@@ -198,7 +187,7 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
Before returning, this is also JSON-serialized and stored in the task_output column of the InstructorTask entry.
If exceptions were raised internally, they are caught and recorded in the InstructorTask entry.
If an exception is raised internally, it is caught and recorded in the InstructorTask entry.
This is also a JSON-serialized dict, stored in the task_output column, containing the following keys:
'exception': type of exception object
......@@ -247,21 +236,18 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
action_name, filter_fcn, xmodule_instance_args)
except Exception:
# try to write out the failure to the entry before failing
exception_type, exception, traceback = exc_info()
_, exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
task_progress = {'exception': exception_type.__name__, 'message': str(exception.message)}
TASK_LOG.warning("background task (%s) failed: %s %s", task_id, exception, traceback_string)
if traceback is not None:
task_progress['traceback'] = traceback_string[:700]
entry.task_output = json.dumps(task_progress)
entry.task_output = InstructorTask.create_output_for_failure(exception, traceback_string)
entry.task_state = FAILURE
_save_course_task(entry)
entry.save_now()
raise
# if we get here, we assume we've succeeded, so update the InstructorTask entry in anticipation:
entry.task_output = json.dumps(task_progress)
entry.task_state = SUCCESS
_save_course_task(entry)
entry.save_now()
# log and exit, returning task_progress info as task result:
fmt = 'Finishing task "{task_id}": course "{course_id}" problem "{state_key}": final: {progress}'
......@@ -317,7 +303,7 @@ def rescore_problem_module_state(module_descriptor, student_module, xmodule_inst
Throws exceptions if the rescoring is fatal and should be aborted if in a loop.
In particular, raises UpdateProblemModuleStateError if module fails to instantiate,
and if the module doesn't support rescoring.
or if the module doesn't support rescoring.
Returns True if problem was successfully rescored for the given student, and False
if problem encountered some kind of error in rescoring.
......
......@@ -22,12 +22,9 @@ from instructor_task.api import (get_running_instructor_tasks,
submit_reset_problem_attempts_for_all_students,
submit_delete_problem_state_for_all_students)
from instructor_task.api_helper import (QUEUING,
AlreadyRunningError,
encode_problem_and_student_input,
)
from instructor_task.models import InstructorTask
from instructor_task.tasks_helper import PROGRESS
from instructor_task.api_helper import (AlreadyRunningError,
encode_problem_and_student_input)
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
from instructor_task.tests.test_base import InstructorTaskTestCase
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.views import instructor_task_status, get_task_completion_info
......@@ -376,9 +373,9 @@ class InstructorTaskSubmitTest(InstructorTaskTestCase):
submit_delete_problem_state_for_all_students(request, course_id, problem_url)
def test_submit_nonrescorable_modules(self):
# confirm that a rescore of a non-existent module returns an exception
# confirm that a rescore of an existent but unscorable module returns an exception
# (Note that it is easier to test a non-rescorable module in test_tasks,
# where we are creating real modules.
# where we are creating real modules.)
problem_url = self.problem_section.location.url()
course_id = self.course.id
request = None
......@@ -387,6 +384,28 @@ class InstructorTaskSubmitTest(InstructorTaskTestCase):
with self.assertRaises(NotImplementedError):
submit_rescore_problem_for_all_students(request, course_id, problem_url)
def _test_submit_with_long_url(self, task_class, student=None):
problem_url_name = 'x' * 255
self.define_option_problem(problem_url_name)
location = InstructorTaskTestCase.problem_location(problem_url_name)
with self.assertRaises(ValueError):
if student is not None:
task_class(self.create_task_request(self.instructor), self.course.id, location, student)
else:
task_class(self.create_task_request(self.instructor), self.course.id, location)
def test_submit_rescore_all_with_long_url(self):
self._test_submit_with_long_url(submit_rescore_problem_for_all_students)
def test_submit_rescore_student_with_long_url(self):
self._test_submit_with_long_url(submit_rescore_problem_for_student, self.student)
def test_submit_reset_all_with_long_url(self):
self._test_submit_with_long_url(submit_reset_problem_attempts_for_all_students)
def test_submit_delete_all_with_long_url(self):
self._test_submit_with_long_url(submit_delete_problem_state_for_all_students)
def _test_submit_task(self, task_class, student=None):
problem_url_name = 'H1P1'
self.define_option_problem(problem_url_name)
......
......@@ -25,6 +25,7 @@ from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state
from instructor_task.tasks_helper import UpdateProblemModuleStateError
log = logging.getLogger(__name__)
PROBLEM_URL_NAME = "test_urlname"
......@@ -202,7 +203,16 @@ class TestInstructorTasks(InstructorTaskTestCase):
entry = InstructorTask.objects.get(id=task_entry.id)
self.assertEquals(json.loads(entry.task_output), status)
self.assertEquals(entry.task_state, SUCCESS)
# TODO: check that entries were reset
# check that the correct entry was reset
for index, student in enumerate(students):
module = StudentModule.objects.get(course_id=self.course.id,
student=student,
module_state_key=self.problem_url)
state = json.loads(module.state)
if index == 3:
self.assertEquals(state['attempts'], 0)
else:
self.assertEquals(state['attempts'], initial_attempts)
def test_reset_with_student_username(self):
self._test_reset_with_student(False)
......@@ -236,7 +246,8 @@ class TestInstructorTasks(InstructorTaskTestCase):
self._test_run_with_failure(delete_problem_state, 'We expected this to fail')
def _test_run_with_long_error_msg(self, task_class):
# run with no StudentModules for the problem
# run with an error message that is so long it will require
# truncation (as well as the jettisoning of the traceback).
task_entry = self._create_input_entry()
self.define_option_problem(PROBLEM_URL_NAME)
expected_message = "x" * 1500
......@@ -247,12 +258,46 @@ class TestInstructorTasks(InstructorTaskTestCase):
# compare with entry in table:
entry = InstructorTask.objects.get(id=task_entry.id)
self.assertEquals(entry.task_state, FAILURE)
# TODO: on MySQL this will actually fail, because it was truncated
# when it was persisted. It does not fail on SqlLite3 at the moment,
# because it doesn't actually enforce length limits!
self.assertGreater(1023, len(entry.task_output))
output = json.loads(entry.task_output)
self.assertEquals(output['exception'], 'TestTaskFailure')
self.assertEquals(output['message'], expected_message)
self.assertEquals(output['message'], expected_message[:len(output['message']) - 3] + "...")
self.assertTrue('traceback' not in output)
def test_rescore_with_long_error_msg(self):
self._test_run_with_long_error_msg(rescore_problem)
def test_reset_with_long_error_msg(self):
self._test_run_with_long_error_msg(reset_problem_attempts)
def test_delete_with_long_error_msg(self):
self._test_run_with_long_error_msg(delete_problem_state)
def _test_run_with_short_error_msg(self, task_class):
# run with an error message that is short enough to fit
# in the output, but long enough that the traceback won't.
# Confirm that the traceback is truncated.
task_entry = self._create_input_entry()
self.define_option_problem(PROBLEM_URL_NAME)
expected_message = "x" * 900
try:
self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message)
except TestTaskFailure:
pass
# compare with entry in table:
entry = InstructorTask.objects.get(id=task_entry.id)
self.assertEquals(entry.task_state, FAILURE)
self.assertGreater(1023, len(entry.task_output))
output = json.loads(entry.task_output)
self.assertEquals(output['exception'], 'TestTaskFailure')
self.assertEquals(output['message'], expected_message)
self.assertEquals(output['traceback'][-3:], "...")
def test_rescore_with_short_error_msg(self):
self._test_run_with_short_error_msg(rescore_problem)
def test_reset_with_short_error_msg(self):
self._test_run_with_short_error_msg(reset_problem_attempts)
def test_delete_with_short_error_msg(self):
self._test_run_with_short_error_msg(delete_problem_state)
......@@ -8,7 +8,7 @@ from celery.states import FAILURE, REVOKED, READY_STATES
from instructor_task.api_helper import (get_status_from_instructor_task,
get_updated_instructor_task)
from instructor_task.tasks_helper import PROGRESS
from instructor_task.models import PROGRESS
log = logging.getLogger(__name__)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment