api_helper.py 17.9 KB
Newer Older
1 2 3 4 5 6
"""
Helper lib for instructor_tasks API.

Includes methods to check args for rescoring task, encoding student input,
and task submission logic, including handling the Celery backend.
"""
7 8 9
import hashlib
import json
import logging
Brian Wilson committed
10

11
from celery.result import AsyncResult
12 13 14
from celery.states import FAILURE, READY_STATES, REVOKED, SUCCESS
from django.utils.translation import ugettext as _
from opaque_keys.edx.keys import UsageKey
15

16
from courseware.courses import get_problems_in_section
17 18 19
from courseware.module_render import get_xqueue_callback_url_prefix
from lms.djangoapps.instructor_task.models import PROGRESS, InstructorTask
from util.db import outer_atomic
20 21
from xmodule.modulestore.django import modulestore

22
log = logging.getLogger(__name__)
23 24 25


class AlreadyRunningError(Exception):
26
    """Exception indicating that a background task is already running"""
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

    message = _('Requested task is already running')

    def __init__(self, message=None):

        if not message:
            message = self.message
        super(AlreadyRunningError, self).__init__(message)


class QueueConnectionError(Exception):
    """
    Exception indicating that celery task was not created successfully.
    """
    message = _('Error occured. Please try again later.')

    def __init__(self, message=None):
        if not message:
            message = self.message
        super(QueueConnectionError, self).__init__(message)
47 48 49 50


def _task_is_running(course_id, task_type, task_key):
    """Checks if a particular task is already running"""
51 52 53
    running_tasks = InstructorTask.objects.filter(
        course_id=course_id, task_type=task_type, task_key=task_key
    )
54 55
    # exclude states that are "ready" (i.e. not "running", e.g. failure, success, revoked):
    for state in READY_STATES:
56 57
        running_tasks = running_tasks.exclude(task_state=state)
    return len(running_tasks) > 0
58 59 60 61 62 63 64


def _reserve_task(course_id, task_type, task_key, task_input, requester):
    """
    Creates a database entry to indicate that a task is in progress.

    Throws AlreadyRunningError if the task is already in progress.
Brian Wilson committed
65 66
    Includes the creation of an arbitrary value for task_id, to be
    submitted with the task call to celery.
67

Brian Wilson committed
68 69 70 71 72 73
    Note that there is a chance of a race condition here, when two users
    try to run the same task at almost exactly the same time.  One user
    could be after the check and before the create when the second user
    gets to the check.  At that point, both users are able to run their
    tasks simultaneously.  This is deemed a small enough risk to not
    put in further safeguards.
74 75 76
    """

    if _task_is_running(course_id, task_type, task_key):
77
        log.warning("Duplicate task found for task_type %s and task_key %s", task_type, task_key)
78 79
        error_message = generate_already_running_error_message(task_type)
        raise AlreadyRunningError(error_message)
80

81 82 83 84 85 86 87 88 89 90 91 92
    try:
        most_recent_id = InstructorTask.objects.latest('id').id
    except InstructorTask.DoesNotExist:
        most_recent_id = "None found"
    finally:
        log.warning(
            "No duplicate tasks found: task_type %s, task_key %s, and most recent task_id = %s",
            task_type,
            task_key,
            most_recent_id
        )

93 94
    # Create log entry now, so that future requests will know it's running.
    return InstructorTask.create(course_id, task_type, task_key, task_input, requester)
95 96


97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
def generate_already_running_error_message(task_type):
    """
    Returns already running error message for given task type.
    """

    message = ''
    report_types = {
        'grade_problems': _('problem grade'),
        'problem_responses_csv': _('problem responses'),
        'profile_info_csv': _('enrolled learner profile'),
        'may_enroll_info_csv': _('enrollment'),
        'detailed_enrollment_report': _('detailed enrollment'),
        'exec_summary_report': _('executive summary'),
        'course_survey_report': _('survey'),
        'proctored_exam_results_report': _('proctored exam results'),
        'export_ora2_data': _('ORA data'),
        'grade_course': _('grade'),

    }

    if report_types.get(task_type):

        message = _(
            "The {report_type} report is being created. "
            "To view the status of the report, see Pending Tasks below. "
            "You will be able to download the report when it is complete."
        ).format(report_type=report_types.get(task_type))

    return message


128
def _get_xmodule_instance_args(request, task_id):
129 130 131 132 133 134
    """
    Calculate parameters needed for instantiating xmodule instances.

    The `request_info` will be passed to a tracking log function, to provide information
    about the source of the task request.   The `xqueue_callback_url_prefix` is used to
    permit old-style xqueue callbacks directly to the appropriate module in the LMS.
135
    The `task_id` is also passed to the tracking log function.
136 137
    """
    request_info = {'username': request.user.username,
138
                    'user_id': request.user.id,
139
                    'ip': request.META['REMOTE_ADDR'],
140
                    'agent': request.META.get('HTTP_USER_AGENT', '').decode('latin1'),
141 142 143 144 145
                    'host': request.META['SERVER_NAME'],
                    }

    xmodule_instance_args = {'xqueue_callback_url_prefix': get_xqueue_callback_url_prefix(request),
                             'request_info': request_info,
146
                             'task_id': task_id,
147 148 149 150
                             }
    return xmodule_instance_args


151 152 153 154 155 156 157 158 159 160 161
def _supports_rescore(descriptor):
    """
    Helper method to determine whether a given item supports rescoring.
    In order to accommodate both XModules and XBlocks, we have to check
    the descriptor itself then fall back on its module class.
    """
    return hasattr(descriptor, 'rescore') or (
        hasattr(descriptor, 'module_class') and hasattr(descriptor.module_class, 'rescore')
    )


162 163 164 165
def _update_instructor_task(instructor_task, task_result):
    """
    Updates and possibly saves a InstructorTask entry based on a task Result.

Brian Wilson committed
166
    Used when updated status is requested.
167 168 169 170 171

    The `instructor_task` that is passed in is updated in-place, but
    is usually not saved.  In general, tasks that have finished (either with
    success or failure) should have their entries updated by the task itself,
    so are not updated here.  Tasks that are still running are not updated
172
    and saved while they run.  The one exception to the no-save rule are tasks that
173 174 175
    are in a "revoked" state.  This may mean that the task never had the
    opportunity to update the InstructorTask entry.

176 177 178 179 180 181
    Tasks that are in progress and have subtasks doing the processing do not look
    to the task's AsyncResult object.  When subtasks are running, the
    InstructorTask object itself is updated with the subtasks' progress,
    not any AsyncResult object.  In this case, the InstructorTask is
    not updated at all.

182
    Calculates json to store in "task_output" field of the `instructor_task`,
Brian Wilson committed
183
    as well as updating the task_state.
184

Brian Wilson committed
185 186 187
    For a successful task, the json contains the output of the task result.
    For a failed task, the json contains "exception", "message", and "traceback"
    keys.   A revoked task just has a "message" stating it was revoked.
188 189 190 191 192 193 194 195 196 197
    """
    # Pull values out of the result object as close to each other as possible.
    # If we wait and check the values later, the values for the state and result
    # are more likely to have changed.  Pull the state out first, and
    # then code assuming that the result may not exactly match the state.
    task_id = task_result.task_id
    result_state = task_result.state
    returned_result = task_result.result
    result_traceback = task_result.traceback

198 199 200 201
    # Assume we don't always save the InstructorTask entry if we don't have to,
    # but that in most cases we will update the InstructorTask in-place with its
    # current progress.
    entry_needs_updating = True
202
    entry_needs_saving = False
203
    task_output = None
204

205
    if instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0:
206 207 208 209 210 211 212
        # This happens when running subtasks:  the result object is marked with SUCCESS,
        # meaning that the subtasks have successfully been defined.  However, the InstructorTask
        # will be marked as in PROGRESS, until the last subtask completes and marks it as SUCCESS.
        # We want to ignore the parent SUCCESS if subtasks are still running, and just trust the
        # contents of the InstructorTask.
        entry_needs_updating = False
    elif result_state in [PROGRESS, SUCCESS]:
213 214
        # construct a status message directly from the task result's result:
        # it needs to go back with the entry passed in.
Brian Wilson committed
215
        log.info("background task (%s), state %s:  result: %s", task_id, result_state, returned_result)
216
        task_output = InstructorTask.create_output_for_success(returned_result)
217 218 219 220 221
    elif result_state == FAILURE:
        # on failure, the result's result contains the exception that caused the failure
        exception = returned_result
        traceback = result_traceback if result_traceback is not None else ''
        log.warning("background task (%s) failed: %s %s", task_id, returned_result, traceback)
222
        task_output = InstructorTask.create_output_for_failure(exception, result_traceback)
223 224 225 226 227 228
    elif result_state == REVOKED:
        # on revocation, the result's result doesn't contain anything
        # but we cannot rely on the worker thread to set this status,
        # so we set it here.
        entry_needs_saving = True
        log.warning("background task (%s) revoked.", task_id)
229
        task_output = InstructorTask.create_output_for_revoked()
230

Brian Wilson committed
231 232 233
    # save progress and state into the entry, even if it's not being saved:
    # when celery is run in "ALWAYS_EAGER" mode, progress needs to go back
    # with the entry passed in.
234 235 236 237
    if entry_needs_updating:
        instructor_task.task_state = result_state
        if task_output is not None:
            instructor_task.task_output = task_output
238

239 240
        if entry_needs_saving:
            instructor_task.save()
241 242


243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
def _update_instructor_task_state(instructor_task, task_state, message=None):
    """
    Update state and output of InstructorTask object.
    """
    instructor_task.task_state = task_state
    if message:
        instructor_task.task_output = message

    instructor_task.save()


def _handle_instructor_task_failure(instructor_task, error):
    """
    Do required operations if task creation was not complete.
    """
    log.info("instructor task (%s) failed, result: %s", instructor_task.task_id, error.message)
    _update_instructor_task_state(instructor_task, FAILURE, error.message)

    raise QueueConnectionError()


Brian Wilson committed
264 265 266
def get_updated_instructor_task(task_id):
    """
    Returns InstructorTask object corresponding to a given `task_id`.
267

Brian Wilson committed
268 269 270
    If the InstructorTask thinks the task is still running, then
    the task's result is checked to return an updated state and output.
    """
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    # First check if the task_id is known
    try:
        instructor_task = InstructorTask.objects.get(task_id=task_id)
    except InstructorTask.DoesNotExist:
        log.warning("query for InstructorTask status failed: task_id=(%s) not found", task_id)
        return None

    # if the task is not already known to be done, then we need to query
    # the underlying task's result object:
    if instructor_task.task_state not in READY_STATES:
        result = AsyncResult(task_id)
        _update_instructor_task(instructor_task, result)

    return instructor_task


Brian Wilson committed
287
def get_status_from_instructor_task(instructor_task):
288
    """
Brian Wilson committed
289
    Get the status for a given InstructorTask entry.
290 291

    Returns a dict, with the following keys:
Brian Wilson committed
292 293 294
      'task_id': id assigned by LMS and used by celery.
      'task_state': state of task as stored in celery's result store.
      'in_progress': boolean indicating if task is still running.
295 296
      'task_progress': dict containing progress information.  This includes:
          'attempted': number of attempts made
297
          'succeeded': number of attempts that "succeeded"
298 299 300
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
          'duration_ms': how long the task has (or had) been running.
Brian Wilson committed
301 302 303
          'exception': name of exception class raised in failed tasks.
          'message': returned for failed and revoked tasks.
          'traceback': optional, returned if task failed and produced a traceback.
304

305
     """
306 307
    status = {}

308 309 310 311 312 313 314
    if instructor_task is not None:
        # status basic information matching what's stored in InstructorTask:
        status['task_id'] = instructor_task.task_id
        status['task_state'] = instructor_task.task_state
        status['in_progress'] = instructor_task.task_state not in READY_STATES
        if instructor_task.task_output is not None:
            status['task_progress'] = json.loads(instructor_task.task_output)
315 316 317 318

    return status


319
def check_arguments_for_rescoring(usage_key):
320 321 322
    """
    Do simple checks on the descriptor to confirm that it supports rescoring.

323
    Confirms first that the usage_key is defined (since that's currently typed
324 325 326
    in).  An ItemNotFoundException is raised if the corresponding module
    descriptor doesn't exist.  NotImplementedError is raised if the
    corresponding module doesn't support rescoring calls.
327 328 329 330

    Note: the string returned here is surfaced as the error
    message on the instructor dashboard when a rescore is
    submitted for a non-rescorable block.
331
    """
332
    descriptor = modulestore().get_item(usage_key)
333
    if not _supports_rescore(descriptor):
334
        msg = _("This component cannot be rescored.")
335 336 337
        raise NotImplementedError(msg)


338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
def check_arguments_for_overriding(usage_key, score):
    """
    Do simple checks on the descriptor to confirm that it supports overriding
    the problem score and the score passed in is not greater than the value of
    the problem or less than 0.
    """
    descriptor = modulestore().get_item(usage_key)
    score = float(score)

    # some weirdness around initializing the descriptor requires this
    if not hasattr(descriptor.__class__, 'set_score'):
        msg = _("This component does not support score override.")
        raise NotImplementedError(msg)

    if score < 0 or score > descriptor.max_score():
        msg = _("Scores must be between 0 and the value of the problem.")
        raise ValueError(msg)


357 358 359 360 361 362 363 364 365 366
def check_entrance_exam_problems_for_rescoring(exam_key):  # pylint: disable=invalid-name
    """
    Grabs all problem descriptors in exam and checks each descriptor to
    confirm that it supports re-scoring.

    An ItemNotFoundException is raised if the corresponding module
    descriptor doesn't exist for exam_key. NotImplementedError is raised if
    any of the problem in entrance exam doesn't support re-scoring calls.
    """
    problems = get_problems_in_section(exam_key).values()
367
    if any(not _supports_rescore(problem) for problem in problems):
368 369 370 371
        msg = _("Not all problems in entrance exam support re-scoring.")
        raise NotImplementedError(msg)


372
def encode_problem_and_student_input(usage_key, student=None):  # pylint: disable=invalid-name
373
    """
374
    Encode optional usage_key and optional student into task_key and task_input values.
375

376 377 378
    Args:
        usage_key (Location): The usage_key identifying the problem.
        student (User): the student affected
379
    """
380

381
    assert isinstance(usage_key, UsageKey)
382
    if student is not None:
383 384
        task_input = {'problem_url': usage_key.to_deprecated_string(), 'student': student.username}
        task_key_stub = "{student}_{problem}".format(student=student.id, problem=usage_key.to_deprecated_string())
385
    else:
386 387
        task_input = {'problem_url': usage_key.to_deprecated_string()}
        task_key_stub = "_{problem}".format(problem=usage_key.to_deprecated_string())
388 389 390 391 392 393 394

    # create the key value by using MD5 hash:
    task_key = hashlib.md5(task_key_stub).hexdigest()

    return task_input, task_key


395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
def encode_entrance_exam_and_student_input(usage_key, student=None):  # pylint: disable=invalid-name
    """
    Encode usage_key and optional student into task_key and task_input values.

    Args:
        usage_key (Location): The usage_key identifying the entrance exam.
        student (User): the student affected
    """
    assert isinstance(usage_key, UsageKey)
    if student is not None:
        task_input = {'entrance_exam_url': unicode(usage_key), 'student': student.username}
        task_key_stub = "{student}_{entranceexam}".format(student=student.id, entranceexam=unicode(usage_key))
    else:
        task_input = {'entrance_exam_url': unicode(usage_key)}
        task_key_stub = "_{entranceexam}".format(entranceexam=unicode(usage_key))

    # create the key value by using MD5 hash:
    task_key = hashlib.md5(task_key_stub).hexdigest()

    return task_input, task_key


417
def submit_task(request, task_type, task_class, course_key, task_input, task_key):
418 419 420
    """
    Helper method to submit a task.

421
    Reserves the requested task, based on the `course_key`, `task_type`, and `task_key`,
422 423 424
    checking to see if the task is already running.  The `task_input` is also passed so that
    it can be stored in the resulting InstructorTask entry.  Arguments are extracted from
    the `request` provided by the originating server request.  Then the task is submitted to run
Brian Wilson committed
425
    asynchronously, using the specified `task_class` and using the task_id constructed for it.
426

427
    Cannot be inside an atomic block.
Brian Wilson committed
428

429
    `AlreadyRunningError` is raised if the task is already running.
430
    """
431 432 433 434 435
    with outer_atomic():
        # check to see if task is already running, and reserve it otherwise:
        instructor_task = _reserve_task(course_key, task_type, task_key, task_input, request.user)

    # make sure all data has been committed before handing off task to celery.
436

Brian Wilson committed
437
    task_id = instructor_task.task_id
438
    task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)]
439 440 441 442 443
    try:
        task_class.apply_async(task_args, task_id=task_id)

    except Exception as error:
        _handle_instructor_task_failure(instructor_task, error)
444

445
    return instructor_task