tasks_helper.py 25.9 KB
Newer Older
1
"""
2
This file contains tasks that are designed to perform background operations on the
3 4 5
running state of a course.

"""
6
import json
7 8
import urllib
from datetime import datetime
9
from time import time
10

11
from celery import Task, current_task
12
from celery.utils.log import get_task_logger
13 14
from celery.states import SUCCESS, FAILURE
from django.contrib.auth.models import User
15
from django.db import transaction, reset_queries
16
from dogapi import dog_stats_api
17
from pytz import UTC
18

19 20
from xmodule.modulestore.django import modulestore
from track.views import task_track
21

22
from courseware.grades import iterate_grades_for
23
from courseware.models import StudentModule
Calen Pennington committed
24
from courseware.model_data import FieldDataCache
25
from courseware.module_render import get_module_for_descriptor_internal
26
from instructor_task.models import GradesStore, InstructorTask, PROGRESS
27
from student.models import CourseEnrollment
28

29
# define different loggers for use within tasks and on client side
30 31 32 33
TASK_LOG = get_task_logger(__name__)

# define value to use when no task_id is provided:
UNKNOWN_TASK_ID = 'unknown-task_id'
34

35 36 37 38 39
# define values for update functions to use to return status to perform_module_state_update
UPDATE_STATUS_SUCCEEDED = 'succeeded'
UPDATE_STATUS_FAILED = 'failed'
UPDATE_STATUS_SKIPPED = 'skipped'

40

41 42 43 44 45 46 47
class BaseInstructorTask(Task):
    """
    Base task class for use with InstructorTask models.

    Permits updating information about task in corresponding InstructorTask for monitoring purposes.

    Assumes that the entry_id of the InstructorTask model is the first argument to the task.
48 49 50 51 52 53

    The `entry_id` is the primary key for the InstructorTask entry representing the task.  This class
    updates the entry on success and failure of the task it wraps.  It is setting the entry's value
    for task_state based on what Celery would set it to once the task returns to Celery:
    FAILURE if an exception is encountered, and SUCCESS if it returns normally.
    Other arguments are pass-throughs to perform_module_state_update, and documented there.
54 55 56 57 58 59 60 61 62
    """
    abstract = True

    def on_success(self, task_progress, task_id, args, kwargs):
        """
        Update InstructorTask object corresponding to this task with info about success.

        Updates task_output and task_state.  But it shouldn't actually do anything
        if the task is only creating subtasks to actually do the work.
63 64 65 66 67 68 69 70 71 72 73 74 75 76

        Assumes `task_progress` is a dict containing the task's result, with the following keys:

          'attempted': number of attempts made
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.

        This is JSON-serialized and stored in the task_output column of the InstructorTask entry.

77
        """
78
        TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
        # We should be able to find the InstructorTask object to update
        # based on the task_id here, without having to dig into the
        # original args to the task.  On the other hand, the entry_id
        # is the first value passed to all such args, so we'll use that.
        # And we assume that it exists, else we would already have had a failure.
        entry_id = args[0]
        entry = InstructorTask.objects.get(pk=entry_id)
        # Check to see if any subtasks had been defined as part of this task.
        # If not, then we know that we're done.  (If so, let the subtasks
        # handle updating task_state themselves.)
        if len(entry.subtasks) == 0:
            entry.task_output = InstructorTask.create_output_for_success(task_progress)
            entry.task_state = SUCCESS
            entry.save_now()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """
        Update InstructorTask object corresponding to this task with info about failure.

98 99 100 101 102 103 104 105 106 107 108 109
        Fetches and updates exception and traceback information on failure.

        If an exception is raised internal to the task, it is caught by celery and provided here.
        The information is recorded in the InstructorTask object as a JSON-serialized dict
        stored in the task_output column.  It contains the following keys:

               'exception':  type of exception object
               'message': error message from exception object
               'traceback': traceback information (truncated if necessary)

        Note that there is no way to record progress made within the task (e.g. attempted,
        succeeded, etc.) when such failures occur.
110
        """
111
        TASK_LOG.debug('Task %s: failure returned', task_id)
112 113 114 115 116 117
        entry_id = args[0]
        try:
            entry = InstructorTask.objects.get(pk=entry_id)
        except InstructorTask.DoesNotExist:
            # if the InstructorTask object does not exist, then there's no point
            # trying to update it.
118
            TASK_LOG.error("Task (%s) has no InstructorTask object for id %s", task_id, entry_id)
119
        else:
120
            TASK_LOG.warning("Task (%s) failed: %s %s", task_id, einfo.exception, einfo.traceback)
121 122 123 124 125
            entry.task_output = InstructorTask.create_output_for_failure(einfo.exception, einfo.traceback)
            entry.task_state = FAILURE
            entry.save_now()


126
class UpdateProblemModuleStateError(Exception):
127 128 129
    """
    Error signaling a fatal condition while updating problem modules.

130
    Used when the current module cannot be processed and no more
131 132
    modules should be attempted.
    """
133 134 135
    pass


Brian Wilson committed
136
def _get_current_task():
137 138 139 140 141 142 143 144 145
    """
    Stub to make it easier to test without actually running Celery.

    This is a wrapper around celery.current_task, which provides access
    to the top of the stack of Celery's tasks.  When running tests, however,
    it doesn't seem to work to mock current_task directly, so this wrapper
    is used to provide a hook to mock in tests, while providing the real
    `current_task` in production.
    """
Brian Wilson committed
146 147 148
    return current_task


149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
def run_main_task(entry_id, task_fcn, action_name):
    """
    Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.

    Arguments passed to `task_fcn` are:

     `entry_id` : the primary key for the InstructorTask entry representing the task.
     `course_id` : the id for the course.
     `task_input` : dict containing task-specific arguments, JSON-decoded from InstructorTask's task_input.
     `action_name` : past-tense verb to use for constructing status messages.

    If no exceptions are raised, the `task_fcn` should return a dict containing
    the task's result with the following keys:

          'attempted': number of attempts made
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.
              Should be past-tense.  Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.

    """

    # get the InstructorTask to be updated.  If this fails, then let the exception return to Celery.
    # There's no point in catching it here.
    entry = InstructorTask.objects.get(pk=entry_id)

    # get inputs to use in this task from the entry:
    task_id = entry.task_id
    course_id = entry.course_id
    task_input = json.loads(entry.task_input)

    # construct log message:
184
    fmt = u'task "{task_id}": course "{course_id}" input "{task_input}"'
185 186 187 188 189 190 191 192
    task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)

    TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)

    # Check that the task_id submitted in the InstructorTask matches the current task
    # that is running.
    request_task_id = _get_current_task().request.id
    if task_id != request_task_id:
193
        fmt = u'Requested task did not match actual task "{actual_id}": {task_info}'
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
        message = fmt.format(actual_id=request_task_id, task_info=task_info_string)
        TASK_LOG.error(message)
        raise ValueError(message)

    # Now do the work:
    with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
        task_progress = task_fcn(entry_id, course_id, task_input, action_name)

    # Release any queries that the connection has been hanging onto:
    reset_queries()

    # log and exit, returning task_progress info as task result:
    TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress)
    return task_progress


210
def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name):
211 212 213
    """
    Performs generic update by visiting StudentModule instances with the update_fcn provided.

214
    StudentModule instances are those that match the specified `course_id` and `module_state_key`.
215 216
    If `student_identifier` is not None, it is used as an additional filter to limit the modules to those belonging
    to that student. If `student_identifier` is None, performs update on modules for all students on the specified problem.
217 218

    If a `filter_fcn` is not None, it is applied to the query that has been constructed.  It takes one
219
    argument, which is the query being filtered, and returns the filtered version of the query.
220 221 222 223

    The `update_fcn` is called on each StudentModule that passes the resulting filtering.
    It is passed three arguments:  the module_descriptor for the module pointed to by the
    module_state_key, the particular StudentModule to update, and the xmodule_instance_args being
224 225 226 227
    passed through.  If the value returned by the update function evaluates to a boolean True,
    the update is successful; False indicates the update on the particular student module failed.
    A raised exception indicates a fatal condition -- that no other student modules should be considered.

228
    The return value is a dict containing the task's results, with the following keys:
229 230

          'attempted': number of attempts made
231 232 233
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
234
          'total': number of possible updates to attempt
235 236 237
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.
238 239

    Because this is run internal to a task, it does not catch exceptions.  These are allowed to pass up to the
240
    next level, so that it can set the failure modes and capture the error trace in the InstructorTask and the
241
    result object.
242

243
    """
244 245 246
    # get start time for task:
    start_time = time()

247 248 249
    module_state_key = task_input.get('problem_url')
    student_identifier = task_input.get('student')

250 251
    # find the problem descriptor:
    module_descriptor = modulestore().get_instance(course_id, module_state_key)
252 253 254 255 256

    # find the module in question
    modules_to_update = StudentModule.objects.filter(course_id=course_id,
                                                     module_state_key=module_state_key)

257 258
    # give the option of updating an individual student. If not specified,
    # then updates all students who have responded to a problem so far
259 260 261 262 263 264 265 266 267
    student = None
    if student_identifier is not None:
        # if an identifier is supplied, then look for the student,
        # and let it throw an exception if none is found.
        if "@" in student_identifier:
            student = User.objects.get(email=student_identifier)
        elif student_identifier is not None:
            student = User.objects.get(username=student_identifier)

268 269 270 271 272 273 274 275
    if student is not None:
        modules_to_update = modules_to_update.filter(student_id=student.id)

    if filter_fcn is not None:
        modules_to_update = filter_fcn(modules_to_update)

    # perform the main loop
    num_attempted = 0
276 277 278
    num_succeeded = 0
    num_skipped = 0
    num_failed = 0
279
    num_total = modules_to_update.count()
280 281

    def get_task_progress():
282
        """Return a dict containing info about current task"""
283
        current_time = time()
284 285
        progress = {'action_name': action_name,
                    'attempted': num_attempted,
286 287 288
                    'succeeded': num_succeeded,
                    'skipped': num_skipped,
                    'failed': num_failed,
289
                    'total': num_total,
290
                    'duration_ms': int((current_time - start_time) * 1000),
291 292 293
                    }
        return progress

294
    task_progress = get_task_progress()
Brian Wilson committed
295
    _get_current_task().update_state(state=PROGRESS, meta=task_progress)
296 297
    for module_to_update in modules_to_update:
        num_attempted += 1
298 299
        # There is no try here:  if there's an error, we let it throw, and the task will
        # be marked as FAILED, with a stack trace.
Brian Wilson committed
300
        with dog_stats_api.timer('instructor_tasks.module.time.step', tags=['action:{name}'.format(name=action_name)]):
301 302
            update_status = update_fcn(module_descriptor, module_to_update)
            if update_status == UPDATE_STATUS_SUCCEEDED:
303 304
                # If the update_fcn returns true, then it performed some kind of work.
                # Logging of failures is left to the update_fcn itself.
305 306 307 308 309 310 311
                num_succeeded += 1
            elif update_status == UPDATE_STATUS_FAILED:
                num_failed += 1
            elif update_status == UPDATE_STATUS_SKIPPED:
                num_skipped += 1
            else:
                raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status))
312

313
        # update task status:
314
        task_progress = get_task_progress()
Brian Wilson committed
315
        _get_current_task().update_state(state=PROGRESS, meta=task_progress)
316

317 318 319
    return task_progress


320 321 322
def _get_task_id_from_xmodule_args(xmodule_instance_args):
    """Gets task_id from `xmodule_instance_args` dict, or returns default value if missing."""
    return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID
323 324


325
def _get_xqueue_callback_url_prefix(xmodule_instance_args):
326
    """Gets prefix to use when constructing xqueue_callback_url."""
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
    return xmodule_instance_args.get('xqueue_callback_url_prefix', '') if xmodule_instance_args is not None else ''


def _get_track_function_for_task(student, xmodule_instance_args=None, source_page='x_module_task'):
    """
    Make a tracking function that logs what happened.

    For insertion into ModuleSystem, and used by CapaModule, which will
    provide the event_type (as string) and event (as dict) as arguments.
    The request_info and task_info (and page) are provided here.
    """
    # get request-related tracking information from args passthrough, and supplement with task-specific
    # information:
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
    task_info = {'student': student.username, 'task_id': _get_task_id_from_xmodule_args(xmodule_instance_args)}

    return lambda event_type, event: task_track(request_info, task_info, event_type, event, page=source_page)


346
def _get_module_instance_for_task(course_id, student, module_descriptor, xmodule_instance_args=None,
347
                                  grade_bucket_type=None):
348
    """
349
    Fetches a StudentModule instance for a given `course_id`, `student` object, and `module_descriptor`.
350

351 352 353
    `xmodule_instance_args` is used to provide information for creating a track function and an XQueue callback.
    These are passed, along with `grade_bucket_type`, to get_module_for_descriptor_internal, which sidesteps
    the need for a Request object when instantiating an xmodule instance.
354
    """
355
    # reconstitute the problem's corresponding XModule:
Calen Pennington committed
356
    field_data_cache = FieldDataCache.cache_for_descriptor_descendents(course_id, student, module_descriptor)
357

358 359
    # get request-related tracking information from args passthrough, and supplement with task-specific
    # information:
360
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
361
    task_info = {"student": student.username, "task_id": _get_task_id_from_xmodule_args(xmodule_instance_args)}
362 363 364 365

    def make_track_function():
        '''
        Make a tracking function that logs what happened.
366 367 368 369

        For insertion into ModuleSystem, and used by CapaModule, which will
        provide the event_type (as string) and event (as dict) as arguments.
        The request_info and task_info (and page) are provided here.
370
        '''
371
        return lambda event_type, event: task_track(request_info, task_info, event_type, event, page='x_module_task')
372

373 374
    xqueue_callback_url_prefix = xmodule_instance_args.get('xqueue_callback_url_prefix', '') \
        if xmodule_instance_args is not None else ''
375

Calen Pennington committed
376
    return get_module_for_descriptor_internal(student, module_descriptor, field_data_cache, course_id,
377 378 379 380 381
                                              make_track_function(), xqueue_callback_url_prefix,
                                              grade_bucket_type=grade_bucket_type)


@transaction.autocommit
382
def rescore_problem_module_state(xmodule_instance_args, module_descriptor, student_module):
383 384
    '''
    Takes an XModule descriptor and a corresponding StudentModule object, and
385
    performs rescoring on the student's problem submission.
386

387
    Throws exceptions if the rescoring is fatal and should be aborted if in a loop.
388
    In particular, raises UpdateProblemModuleStateError if module fails to instantiate,
389
    or if the module doesn't support rescoring.
390 391 392

    Returns True if problem was successfully rescored for the given student, and False
    if problem encountered some kind of error in rescoring.
393 394
    '''
    # unpack the StudentModule:
395 396 397
    course_id = student_module.course_id
    student = student_module.student
    module_state_key = student_module.module_state_key
398
    instance = _get_module_instance_for_task(course_id, student, module_descriptor, xmodule_instance_args, grade_bucket_type='rescore')
399 400 401 402 403 404

    if instance is None:
        # Either permissions just changed, or someone is trying to be clever
        # and load something they shouldn't have access to.
        msg = "No module {loc} for student {student}--access denied?".format(loc=module_state_key,
                                                                             student=student)
405
        TASK_LOG.debug(msg)
406 407
        raise UpdateProblemModuleStateError(msg)

408
    if not hasattr(instance, 'rescore_problem'):
409 410
        # This should also not happen, since it should be already checked in the caller,
        # but check here to be sure.
411
        msg = "Specified problem does not support rescoring."
412 413
        raise UpdateProblemModuleStateError(msg)

414
    result = instance.rescore_problem()
415
    instance.save()
416 417
    if 'success' not in result:
        # don't consider these fatal, but false means that the individual call didn't complete:
418
        TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
419
                         u"unexpected response {msg}".format(msg=result, course=course_id, loc=module_state_key, student=student))
420
        return UPDATE_STATUS_FAILED
421
    elif result['success'] not in ['correct', 'incorrect']:
422
        TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
423
                         u"{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
424
        return UPDATE_STATUS_FAILED
425
    else:
426
        TASK_LOG.debug(u"successfully processed rescore call for course {course}, problem {loc} and student {student}: "
427
                       u"{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
428
        return UPDATE_STATUS_SUCCEEDED
429 430


431
@transaction.autocommit
432
def reset_attempts_module_state(xmodule_instance_args, _module_descriptor, student_module):
433 434 435
    """
    Resets problem attempts to zero for specified `student_module`.

436 437
    Returns a status of UPDATE_STATUS_SUCCEEDED if a problem has non-zero attempts
    that are being reset, and UPDATE_STATUS_SKIPPED otherwise.
438
    """
439
    update_status = UPDATE_STATUS_SKIPPED
Brian Wilson committed
440
    problem_state = json.loads(student_module.state) if student_module.state else {}
441 442 443 444 445
    if 'attempts' in problem_state:
        old_number_of_attempts = problem_state["attempts"]
        if old_number_of_attempts > 0:
            problem_state["attempts"] = 0
            # convert back to json and save
446 447
            student_module.state = json.dumps(problem_state)
            student_module.save()
448 449
            # get request-related tracking information from args passthrough,
            # and supplement with task-specific information:
450
            track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
451
            event_info = {"old_attempts": old_number_of_attempts, "new_attempts": 0}
452
            track_function('problem_reset_attempts', event_info)
453
            update_status = UPDATE_STATUS_SUCCEEDED
454

455
    return update_status
456 457


458
@transaction.autocommit
459
def delete_problem_module_state(xmodule_instance_args, _module_descriptor, student_module):
460 461 462
    """
    Delete the StudentModule entry.

463
    Always returns UPDATE_STATUS_SUCCEEDED, indicating success, if it doesn't raise an exception due to database error.
464
    """
465
    student_module.delete()
466 467
    # get request-related tracking information from args passthrough,
    # and supplement with task-specific information:
468 469
    track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
    track_function('problem_delete_state', {})
470
    return UPDATE_STATUS_SUCCEEDED
471 472 473 474 475 476 477 478 479 480


def push_grades_to_s3(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
    """
    For a given `course_id`, generate a grades CSV file for all students that
    are enrolled, and store using a `GradesStore`. Once created, the files can
    be accessed by instantiating another `GradesStore` (via
    `GradesStore.from_config()`) and calling `link_for()` on it. Writes are
    buffered, so we'll never write part of a CSV file to S3 -- i.e. any files
    that are visible in GradesStore will be complete ones.
481 482 483 484

    As we start to add more CSV downloads, it will probably be worthwhile to
    make a more general CSVDoc class instead of building out the rows like we
    do here.
485 486 487 488
    """
    start_time = datetime.now(UTC)
    status_interval = 100

489 490
    enrolled_students = CourseEnrollment.users_enrolled_in(course_id)
    num_total = enrolled_students.count()
491 492 493 494 495 496 497 498 499 500 501 502
    num_attempted = 0
    num_succeeded = 0
    num_failed = 0
    curr_step = "Calculating Grades"

    def update_task_progress():
        """Return a dict containing info about current task"""
        current_time = datetime.now(UTC)
        progress = {
            'action_name': action_name,
            'attempted': num_attempted,
            'succeeded': num_succeeded,
503 504
            'failed': num_failed,
            'total': num_total,
505
            'duration_ms': int((current_time - start_time).total_seconds() * 1000),
506
            'step': curr_step,
507 508 509 510 511
        }
        _get_current_task().update_state(state=PROGRESS, meta=progress)

        return progress

512
    # Loop over all our students and build our CSV lists in memory
513 514 515 516
    header = None
    rows = []
    err_rows = [["id", "username", "error_msg"]]
    for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students):
517
        # Periodically update task status (this is a cache write)
518 519 520 521 522
        if num_attempted % status_interval == 0:
            update_task_progress()
        num_attempted += 1

        if gradeset:
523
            # We were able to successfully grade this student for this course.
524 525
            num_succeeded += 1
            if not header:
526 527
                # Encode the header row in utf-8 encoding in case there are unicode characters
                header = [section['label'].encode('utf-8') for section in gradeset[u'section_breakdown']]
528 529 530 531 532 533 534
                rows.append(["id", "email", "username", "grade"] + header)

            percents = {
                section['label']: section.get('percent', 0.0)
                for section in gradeset[u'section_breakdown']
                if 'label' in section
            }
535 536 537 538 539 540 541 542

            # Not everybody has the same gradable items. If the item is not
            # found in the user's gradeset, just assume it's a 0. The aggregated
            # grades for their sections and overall course will be calculated
            # without regard for the item they didn't have access to, so it's
            # possible for a student to have a 0.0 show up in their row but
            # still have 100% for the course.
            row_percents = [percents.get(label, 0.0) for label in header]
543 544 545 546 547 548
            rows.append([student.id, student.email, student.username, gradeset['percent']] + row_percents)
        else:
            # An empty gradeset means we failed to grade a student.
            num_failed += 1
            err_rows.append([student.id, student.username, err_msg])

549
    # By this point, we've got the rows we're going to stuff into our CSV files.
550 551 552
    curr_step = "Uploading CSVs"
    update_task_progress()

553
    # Generate parts of the file name
554 555
    timestamp_str = start_time.strftime("%Y-%m-%d-%H%M")
    course_id_prefix = urllib.quote(course_id.replace("/", "_"))
556 557 558

    # Perform the actual upload
    grades_store = GradesStore.from_config()
559 560
    grades_store.store_rows(
        course_id,
561
        u"{}_grade_report_{}.csv".format(course_id_prefix, timestamp_str),
562 563
        rows
    )
564 565

    # If there are any error rows (don't count the header), write them out as well
566 567 568
    if len(err_rows) > 1:
        grades_store.store_rows(
            course_id,
569
            u"{}_grade_report_{}_err.csv".format(course_id_prefix, timestamp_str),
570 571 572 573 574
            err_rows
        )

    # One last update before we close out...
    return update_task_progress()