tasks_helper.py 17.8 KB
Newer Older
1
"""
2
This file contains tasks that are designed to perform background operations on the
3 4 5
running state of a course.

"""
6 7

import json
8
from time import time
9 10
from sys import exc_info
from traceback import format_exc
11

12
from celery import current_task
13
from celery.utils.log import get_task_logger
Brian Wilson committed
14
from celery.signals import worker_process_init
15 16 17 18 19
from celery.states import SUCCESS, FAILURE

from django.contrib.auth.models import User
from django.db import transaction
from dogapi import dog_stats_api
20

21 22 23
from xmodule.modulestore.django import modulestore

from track.views import task_track
24

25
from courseware.models import StudentModule
Calen Pennington committed
26
from courseware.model_data import FieldDataCache
27
from courseware.module_render import get_module_for_descriptor_internal
28
from instructor_task.models import InstructorTask, PROGRESS
29

30
# define different loggers for use within tasks and on client side
31 32 33 34
TASK_LOG = get_task_logger(__name__)

# define value to use when no task_id is provided:
UNKNOWN_TASK_ID = 'unknown-task_id'
35 36 37


class UpdateProblemModuleStateError(Exception):
38 39 40
    """
    Error signaling a fatal condition while updating problem modules.

41
    Used when the current module cannot be processed and no more
42 43
    modules should be attempted.
    """
44 45 46
    pass


Brian Wilson committed
47 48 49 50 51
def _get_current_task():
    """Stub to make it easier to test without actually running Celery"""
    return current_task


52
def _perform_module_state_update(course_id, module_state_key, student_identifier, update_fcn, action_name, filter_fcn,
53
                                 xmodule_instance_args):
54 55 56
    """
    Performs generic update by visiting StudentModule instances with the update_fcn provided.

57
    StudentModule instances are those that match the specified `course_id` and `module_state_key`.
58 59
    If `student_identifier` is not None, it is used as an additional filter to limit the modules to those belonging
    to that student. If `student_identifier` is None, performs update on modules for all students on the specified problem.
60 61

    If a `filter_fcn` is not None, it is applied to the query that has been constructed.  It takes one
62
    argument, which is the query being filtered, and returns the filtered version of the query.
63 64 65 66

    The `update_fcn` is called on each StudentModule that passes the resulting filtering.
    It is passed three arguments:  the module_descriptor for the module pointed to by the
    module_state_key, the particular StudentModule to update, and the xmodule_instance_args being
67 68 69 70
    passed through.  If the value returned by the update function evaluates to a boolean True,
    the update is successful; False indicates the update on the particular student module failed.
    A raised exception indicates a fatal condition -- that no other student modules should be considered.

71
    The return value is a dict containing the task's results, with the following keys:
72 73 74 75 76 77 78

          'attempted': number of attempts made
          'updated': number of attempts that "succeeded"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.
79 80

    Because this is run internal to a task, it does not catch exceptions.  These are allowed to pass up to the
81
    next level, so that it can set the failure modes and capture the error trace in the InstructorTask and the
82
    result object.
83

84
    """
85 86 87
    # get start time for task:
    start_time = time()

88 89
    # find the problem descriptor:
    module_descriptor = modulestore().get_instance(course_id, module_state_key)
90 91 92 93 94

    # find the module in question
    modules_to_update = StudentModule.objects.filter(course_id=course_id,
                                                     module_state_key=module_state_key)

95 96 97 98 99 100 101 102 103 104 105
    # give the option of rescoring an individual student. If not specified,
    # then rescores all students who have responded to a problem so far
    student = None
    if student_identifier is not None:
        # if an identifier is supplied, then look for the student,
        # and let it throw an exception if none is found.
        if "@" in student_identifier:
            student = User.objects.get(email=student_identifier)
        elif student_identifier is not None:
            student = User.objects.get(username=student_identifier)

106 107 108 109 110 111 112 113 114
    if student is not None:
        modules_to_update = modules_to_update.filter(student_id=student.id)

    if filter_fcn is not None:
        modules_to_update = filter_fcn(modules_to_update)

    # perform the main loop
    num_updated = 0
    num_attempted = 0
115
    num_total = modules_to_update.count()
116 117

    def get_task_progress():
118
        """Return a dict containing info about current task"""
119
        current_time = time()
120 121 122 123
        progress = {'action_name': action_name,
                    'attempted': num_attempted,
                    'updated': num_updated,
                    'total': num_total,
124
                    'duration_ms': int((current_time - start_time) * 1000),
125 126 127
                    }
        return progress

128
    task_progress = get_task_progress()
Brian Wilson committed
129
    _get_current_task().update_state(state=PROGRESS, meta=task_progress)
130 131
    for module_to_update in modules_to_update:
        num_attempted += 1
132 133
        # There is no try here:  if there's an error, we let it throw, and the task will
        # be marked as FAILED, with a stack trace.
Brian Wilson committed
134
        with dog_stats_api.timer('instructor_tasks.module.time.step', tags=['action:{name}'.format(name=action_name)]):
135 136 137 138
            if update_fcn(module_descriptor, module_to_update, xmodule_instance_args):
                # If the update_fcn returns true, then it performed some kind of work.
                # Logging of failures is left to the update_fcn itself.
                num_updated += 1
139

140
        # update task status:
141
        task_progress = get_task_progress()
Brian Wilson committed
142
        _get_current_task().update_state(state=PROGRESS, meta=task_progress)
143

144 145 146
    return task_progress


147
def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
Brian Wilson committed
148
                                xmodule_instance_args):
149 150 151
    """
    Performs generic update by visiting StudentModule instances with the update_fcn provided.

152
    The `entry_id` is the primary key for the InstructorTask entry representing the task.  This function
153 154 155 156 157 158 159 160 161 162 163 164 165 166
    updates the entry on success and failure of the _perform_module_state_update function it
    wraps.  It is setting the entry's value for task_state based on what Celery would set it to once
    the task returns to Celery:  FAILURE if an exception is encountered, and SUCCESS if it returns normally.
    Other arguments are pass-throughs to _perform_module_state_update, and documented there.

    If no exceptions are raised, a dict containing the task's result is returned, with the following keys:

          'attempted': number of attempts made
          'updated': number of attempts that "succeeded"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.

167
    Before returning, this is also JSON-serialized and stored in the task_output column of the InstructorTask entry.
168

169
    If an exception is raised internally, it is caught and recorded in the InstructorTask entry.
170 171 172 173 174 175 176 177 178
    This is also a JSON-serialized dict, stored in the task_output column, containing the following keys:

           'exception':  type of exception object
           'message': error message from exception object
           'traceback': traceback information (truncated if necessary)

    Once the exception is caught, it is raised again and allowed to pass up to the
    task-running level, so that it can also set the failure modes and capture the error trace in the
    result object that Celery creates.
179 180 181

    """

182
    # get the InstructorTask to be updated.  If this fails, then let the exception return to Celery.
183
    # There's no point in catching it here.
184
    entry = InstructorTask.objects.get(pk=entry_id)
185 186 187 188 189 190 191 192 193 194

    # get inputs to use in this task from the entry:
    task_id = entry.task_id
    course_id = entry.course_id
    task_input = json.loads(entry.task_input)
    module_state_key = task_input.get('problem_url')
    student_ident = task_input['student'] if 'student' in task_input else None

    fmt = 'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet'
    TASK_LOG.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, action=action_name))
195 196

    # add task_id to xmodule_instance_args, so that it can be output with tracking info:
197 198
    if xmodule_instance_args is not None:
        xmodule_instance_args['task_id'] = task_id
199

200
    # Now that we have an entry we can try to catch failures:
201 202
    task_progress = None
    try:
203
        # Check that the task_id submitted in the InstructorTask matches the current task
204
        # that is running.
Brian Wilson committed
205
        request_task_id = _get_current_task().request.id
206 207 208 209 210 211
        if task_id != request_task_id:
            fmt = 'Requested task "{task_id}" did not match actual task "{actual_id}"'
            message = fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, actual_id=request_task_id)
            TASK_LOG.error(message)
            raise UpdateProblemModuleStateError(message)

212
        # Now do the work:
Brian Wilson committed
213
        with dog_stats_api.timer('instructor_tasks.module.time.overall', tags=['action:{name}'.format(name=action_name)]):
214 215
            task_progress = _perform_module_state_update(course_id, module_state_key, student_ident, update_fcn,
                                                         action_name, filter_fcn, xmodule_instance_args)
216 217 218 219 220 221 222
        # If we get here, we assume we've succeeded, so update the InstructorTask entry in anticipation.
        # But we do this within the try, in case creating the task_output causes an exception to be
        # raised.
        entry.task_output = InstructorTask.create_output_for_success(task_progress)
        entry.task_state = SUCCESS
        entry.save_now()

223 224
    except Exception:
        # try to write out the failure to the entry before failing
225
        _, exception, traceback = exc_info()
226
        traceback_string = format_exc(traceback) if traceback is not None else ''
227
        TASK_LOG.warning("background task (%s) failed: %s %s", task_id, exception, traceback_string)
228
        entry.task_output = InstructorTask.create_output_for_failure(exception, traceback_string)
229
        entry.task_state = FAILURE
230
        entry.save_now()
231 232 233
        raise

    # log and exit, returning task_progress info as task result:
234
    fmt = 'Finishing task "{task_id}": course "{course_id}" problem "{state_key}": final: {progress}'
235
    TASK_LOG.info(fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, progress=task_progress))
236
    return task_progress
237 238


239 240 241
def _get_task_id_from_xmodule_args(xmodule_instance_args):
    """Gets task_id from `xmodule_instance_args` dict, or returns default value if missing."""
    return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID
242 243


244
def _get_module_instance_for_task(course_id, student, module_descriptor, xmodule_instance_args=None,
245
                                  grade_bucket_type=None):
246
    """
247
    Fetches a StudentModule instance for a given `course_id`, `student` object, and `module_descriptor`.
248

249 250 251
    `xmodule_instance_args` is used to provide information for creating a track function and an XQueue callback.
    These are passed, along with `grade_bucket_type`, to get_module_for_descriptor_internal, which sidesteps
    the need for a Request object when instantiating an xmodule instance.
252
    """
253
    # reconstitute the problem's corresponding XModule:
Calen Pennington committed
254
    field_data_cache = FieldDataCache.cache_for_descriptor_descendents(course_id, student, module_descriptor)
255

256 257
    # get request-related tracking information from args passthrough, and supplement with task-specific
    # information:
258
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
259
    task_info = {"student": student.username, "task_id": _get_task_id_from_xmodule_args(xmodule_instance_args)}
260 261 262 263

    def make_track_function():
        '''
        Make a tracking function that logs what happened.
264 265 266 267

        For insertion into ModuleSystem, and used by CapaModule, which will
        provide the event_type (as string) and event (as dict) as arguments.
        The request_info and task_info (and page) are provided here.
268
        '''
269
        return lambda event_type, event: task_track(request_info, task_info, event_type, event, page='x_module_task')
270

271 272
    xqueue_callback_url_prefix = xmodule_instance_args.get('xqueue_callback_url_prefix', '') \
        if xmodule_instance_args is not None else ''
273

Calen Pennington committed
274
    return get_module_for_descriptor_internal(student, module_descriptor, field_data_cache, course_id,
275 276 277 278 279
                                              make_track_function(), xqueue_callback_url_prefix,
                                              grade_bucket_type=grade_bucket_type)


@transaction.autocommit
280
def rescore_problem_module_state(module_descriptor, student_module, xmodule_instance_args=None):
281 282
    '''
    Takes an XModule descriptor and a corresponding StudentModule object, and
283
    performs rescoring on the student's problem submission.
284

285
    Throws exceptions if the rescoring is fatal and should be aborted if in a loop.
286
    In particular, raises UpdateProblemModuleStateError if module fails to instantiate,
287
    or if the module doesn't support rescoring.
288 289 290

    Returns True if problem was successfully rescored for the given student, and False
    if problem encountered some kind of error in rescoring.
291 292
    '''
    # unpack the StudentModule:
293 294 295
    course_id = student_module.course_id
    student = student_module.student
    module_state_key = student_module.module_state_key
296
    instance = _get_module_instance_for_task(course_id, student, module_descriptor, xmodule_instance_args, grade_bucket_type='rescore')
297 298 299 300 301 302

    if instance is None:
        # Either permissions just changed, or someone is trying to be clever
        # and load something they shouldn't have access to.
        msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key,
                                                                             student=student)
303
        TASK_LOG.debug(msg)
304 305
        raise UpdateProblemModuleStateError(msg)

306
    if not hasattr(instance, 'rescore_problem'):
307 308
        # This should also not happen, since it should be already checked in the caller,
        # but check here to be sure.
309
        msg = "Specified problem does not support rescoring."
310 311
        raise UpdateProblemModuleStateError(msg)

312
    result = instance.rescore_problem()
313 314
    if 'success' not in result:
        # don't consider these fatal, but false means that the individual call didn't complete:
315
        TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
316
                         "unexpected response {msg}".format(msg=result, course=course_id, loc=module_state_key, student=student))
317
        return False
318
    elif result['success'] not in ['correct', 'incorrect']:
319
        TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
320
                         "{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
321 322
        return False
    else:
323
        TASK_LOG.debug(u"successfully processed rescore call for course {course}, problem {loc} and student {student}: "
324
                       "{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
325 326 327
        return True


328
@transaction.autocommit
329
def reset_attempts_module_state(_module_descriptor, student_module, xmodule_instance_args=None):
330 331 332
    """
    Resets problem attempts to zero for specified `student_module`.

333
    Always returns true, indicating success, if it doesn't raise an exception due to database error.
334
    """
Brian Wilson committed
335
    problem_state = json.loads(student_module.state) if student_module.state else {}
336 337 338 339 340
    if 'attempts' in problem_state:
        old_number_of_attempts = problem_state["attempts"]
        if old_number_of_attempts > 0:
            problem_state["attempts"] = 0
            # convert back to json and save
341 342
            student_module.state = json.dumps(problem_state)
            student_module.save()
343 344 345
            # get request-related tracking information from args passthrough,
            # and supplement with task-specific information:
            request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
346
            task_info = {"student": student_module.student.username, "task_id": _get_task_id_from_xmodule_args(xmodule_instance_args)}
347 348
            event_info = {"old_attempts": old_number_of_attempts, "new_attempts": 0}
            task_track(request_info, task_info, 'problem_reset_attempts', event_info, page='x_module_task')
349 350 351 352 353

    # consider the reset to be successful, even if no update was performed.  (It's just "optimized".)
    return True


354
@transaction.autocommit
355
def delete_problem_module_state(_module_descriptor, student_module, xmodule_instance_args=None):
356 357 358 359 360
    """
    Delete the StudentModule entry.

    Always returns true, indicating success, if it doesn't raise an exception due to database error.
    """
361
    student_module.delete()
362 363 364
    # get request-related tracking information from args passthrough,
    # and supplement with task-specific information:
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
365
    task_info = {"student": student_module.student.username, "task_id": _get_task_id_from_xmodule_args(xmodule_instance_args)}
366
    task_track(request_info, task_info, 'problem_delete_state', {}, page='x_module_task')
367
    return True