tasks_helper.py 69.4 KB
Newer Older
1
"""
2
This file contains tasks that are designed to perform background operations on the
3 4 5
running state of a course.

"""
6
import json
7
import re
8
from collections import OrderedDict
9
from datetime import datetime
Afzal Wali committed
10
from django.conf import settings
11 12
from eventtracking import tracker
from itertools import chain
13
from time import time
14
import unicodecsv
15
import logging
16

17
from celery import Task, current_task
18 19
from celery.states import SUCCESS, FAILURE
from django.contrib.auth.models import User
20
from django.core.files.storage import DefaultStorage
21
from django.db import transaction, reset_queries
22
from django.db.models import Q
23
import dogstats_wrapper as dog_stats_api
24
from pytz import UTC
Afzal Wali committed
25 26
from StringIO import StringIO
from edxmako.shortcuts import render_to_string
27
from instructor.paidcourse_enrollment_report import PaidCourseEnrollmentReportProvider
Afzal Wali committed
28 29 30 31
from shoppingcart.models import (
    PaidCourseRegistration, CourseRegCodeItem, InvoiceTransaction,
    Invoice, CouponRedemption, RegistrationCodeRedemption, CourseRegistrationCode
)
32
from survey.models import SurveyAnswer
33

34
from track.views import task_track
35
from util.db import outer_atomic
36
from util.file import course_filename_prefix_generator, UniversalNewlineIterator
37
from xblock.runtime import KvsFieldData
38
from xmodule.modulestore.django import modulestore
39
from xmodule.split_test_module import get_split_user_partitions
40
from django.utils.translation import ugettext as _
41 42 43
from certificates.models import (
    CertificateWhitelist,
    certificate_info_for_user,
44 45
    CertificateStatuses,
    GeneratedCertificate
46 47
)
from certificates.api import generate_user_certificates
48
from courseware.courses import get_course_by_id, get_problems_in_section
49
from courseware.grades import iterate_grades_for
50
from courseware.models import StudentModule
51
from courseware.model_data import DjangoKeyValueStore, FieldDataCache
52
from courseware.module_render import get_module_for_descriptor_internal
53 54 55 56 57 58
from instructor_analytics.basic import (
    enrolled_students_features,
    get_proctored_exam_results,
    list_may_enroll,
    list_problem_responses
)
59
from instructor_analytics.csvs import format_dictlist
60
from instructor_task.models import ReportStore, InstructorTask, PROGRESS
61 62 63
from lms.djangoapps.lms_xblock.runtime import LmsPartitionService
from openedx.core.djangoapps.course_groups.cohorts import get_cohort
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
64
from openedx.core.djangoapps.content.course_structures.models import CourseStructure
65
from opaque_keys.edx.keys import UsageKey
66
from openedx.core.djangoapps.course_groups.cohorts import add_user_to_cohort, is_course_cohorted
Afzal Wali committed
67
from student.models import CourseEnrollment, CourseAccessRole
68 69
from lms.djangoapps.teams.models import CourseTeamMembership
from lms.djangoapps.verify_student.models import SoftwareSecurePhotoVerification
70

71
# define different loggers for use within tasks and on client side
72
TASK_LOG = logging.getLogger('edx.celery.task')
73 74 75

# define value to use when no task_id is provided:
UNKNOWN_TASK_ID = 'unknown-task_id'
Afzal Wali committed
76
FILTERED_OUT_ROLES = ['staff', 'instructor', 'finance_admin', 'sales_admin']
77 78 79 80 81
# define values for update functions to use to return status to perform_module_state_update
UPDATE_STATUS_SUCCEEDED = 'succeeded'
UPDATE_STATUS_FAILED = 'failed'
UPDATE_STATUS_SKIPPED = 'skipped'

82 83 84
# The setting name used for events when "settings" (account settings, preferences, profile information) change.
REPORT_REQUESTED_EVENT_NAME = u'edx.instructor.report.requested'

85

86 87 88 89 90 91 92
class BaseInstructorTask(Task):
    """
    Base task class for use with InstructorTask models.

    Permits updating information about task in corresponding InstructorTask for monitoring purposes.

    Assumes that the entry_id of the InstructorTask model is the first argument to the task.
93 94 95 96 97 98

    The `entry_id` is the primary key for the InstructorTask entry representing the task.  This class
    updates the entry on success and failure of the task it wraps.  It is setting the entry's value
    for task_state based on what Celery would set it to once the task returns to Celery:
    FAILURE if an exception is encountered, and SUCCESS if it returns normally.
    Other arguments are pass-throughs to perform_module_state_update, and documented there.
99 100 101 102 103 104 105 106 107
    """
    abstract = True

    def on_success(self, task_progress, task_id, args, kwargs):
        """
        Update InstructorTask object corresponding to this task with info about success.

        Updates task_output and task_state.  But it shouldn't actually do anything
        if the task is only creating subtasks to actually do the work.
108 109 110 111 112 113 114 115 116 117 118 119 120 121

        Assumes `task_progress` is a dict containing the task's result, with the following keys:

          'attempted': number of attempts made
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.

        This is JSON-serialized and stored in the task_output column of the InstructorTask entry.

122
        """
123
        TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress)
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        # We should be able to find the InstructorTask object to update
        # based on the task_id here, without having to dig into the
        # original args to the task.  On the other hand, the entry_id
        # is the first value passed to all such args, so we'll use that.
        # And we assume that it exists, else we would already have had a failure.
        entry_id = args[0]
        entry = InstructorTask.objects.get(pk=entry_id)
        # Check to see if any subtasks had been defined as part of this task.
        # If not, then we know that we're done.  (If so, let the subtasks
        # handle updating task_state themselves.)
        if len(entry.subtasks) == 0:
            entry.task_output = InstructorTask.create_output_for_success(task_progress)
            entry.task_state = SUCCESS
            entry.save_now()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """
        Update InstructorTask object corresponding to this task with info about failure.

143 144 145 146 147 148 149 150 151 152 153 154
        Fetches and updates exception and traceback information on failure.

        If an exception is raised internal to the task, it is caught by celery and provided here.
        The information is recorded in the InstructorTask object as a JSON-serialized dict
        stored in the task_output column.  It contains the following keys:

               'exception':  type of exception object
               'message': error message from exception object
               'traceback': traceback information (truncated if necessary)

        Note that there is no way to record progress made within the task (e.g. attempted,
        succeeded, etc.) when such failures occur.
155
        """
156
        TASK_LOG.debug(u'Task %s: failure returned', task_id)
157 158 159 160 161 162
        entry_id = args[0]
        try:
            entry = InstructorTask.objects.get(pk=entry_id)
        except InstructorTask.DoesNotExist:
            # if the InstructorTask object does not exist, then there's no point
            # trying to update it.
163
            TASK_LOG.error(u"Task (%s) has no InstructorTask object for id %s", task_id, entry_id)
164
        else:
165
            TASK_LOG.warning(u"Task (%s) failed", task_id, exc_info=True)
166 167 168 169 170
            entry.task_output = InstructorTask.create_output_for_failure(einfo.exception, einfo.traceback)
            entry.task_state = FAILURE
            entry.save_now()


171
class UpdateProblemModuleStateError(Exception):
172 173 174
    """
    Error signaling a fatal condition while updating problem modules.

175
    Used when the current module cannot be processed and no more
176 177
    modules should be attempted.
    """
178 179 180
    pass


Brian Wilson committed
181
def _get_current_task():
182 183 184 185 186 187 188 189 190
    """
    Stub to make it easier to test without actually running Celery.

    This is a wrapper around celery.current_task, which provides access
    to the top of the stack of Celery's tasks.  When running tests, however,
    it doesn't seem to work to mock current_task directly, so this wrapper
    is used to provide a hook to mock in tests, while providing the real
    `current_task` in production.
    """
Brian Wilson committed
191 192 193
    return current_task


194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
class TaskProgress(object):
    """
    Encapsulates the current task's progress by keeping track of
    'attempted', 'succeeded', 'skipped', 'failed', 'total',
    'action_name', and 'duration_ms' values.
    """
    def __init__(self, action_name, total, start_time):
        self.action_name = action_name
        self.total = total
        self.start_time = start_time
        self.attempted = 0
        self.succeeded = 0
        self.skipped = 0
        self.failed = 0

    def update_task_state(self, extra_meta=None):
        """
        Update the current celery task's state to the progress state
        specified by the current object.  Returns the progress
        dictionary for use by `run_main_task` and
        `BaseInstructorTask.on_success`.

        Arguments:
            extra_meta (dict): Extra metadata to pass to `update_state`

        Returns:
            dict: The current task's progress dict
        """
        progress_dict = {
            'action_name': self.action_name,
            'attempted': self.attempted,
            'succeeded': self.succeeded,
            'skipped': self.skipped,
            'failed': self.failed,
            'total': self.total,
            'duration_ms': int((time() - self.start_time) * 1000),
        }
        if extra_meta is not None:
            progress_dict.update(extra_meta)
        _get_current_task().update_state(state=PROGRESS, meta=progress_dict)
        return progress_dict


237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
def run_main_task(entry_id, task_fcn, action_name):
    """
    Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.

    Arguments passed to `task_fcn` are:

     `entry_id` : the primary key for the InstructorTask entry representing the task.
     `course_id` : the id for the course.
     `task_input` : dict containing task-specific arguments, JSON-decoded from InstructorTask's task_input.
     `action_name` : past-tense verb to use for constructing status messages.

    If no exceptions are raised, the `task_fcn` should return a dict containing
    the task's result with the following keys:

          'attempted': number of attempts made
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
          'total': number of possible subtasks to attempt
          'action_name': user-visible verb to use in status messages.
              Should be past-tense.  Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.

    """

262
    # Get the InstructorTask to be updated. If this fails then let the exception return to Celery.
263
    # There's no point in catching it here.
264 265 266 267
    with outer_atomic():
        entry = InstructorTask.objects.get(pk=entry_id)
        entry.task_state = PROGRESS
        entry.save_now()
268

269
    # Get inputs to use in this task from the entry
270 271 272 273
    task_id = entry.task_id
    course_id = entry.course_id
    task_input = json.loads(entry.task_input)

274 275 276 277
    # Construct log message
    fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
    task_info_string = fmt.format(task_id=task_id, entry_id=entry_id, course_id=course_id, task_input=task_input)
    TASK_LOG.info(u'%s, Starting update (nothing %s yet)', task_info_string, action_name)
278 279 280 281 282

    # Check that the task_id submitted in the InstructorTask matches the current task
    # that is running.
    request_task_id = _get_current_task().request.id
    if task_id != request_task_id:
283 284
        fmt = u'{task_info}, Requested task did not match actual task "{actual_id}"'
        message = fmt.format(task_info=task_info_string, actual_id=request_task_id)
285 286 287
        TASK_LOG.error(message)
        raise ValueError(message)

288
    # Now do the work
289
    with dog_stats_api.timer('instructor_tasks.time.overall', tags=[u'action:{name}'.format(name=action_name)]):
290 291
        task_progress = task_fcn(entry_id, course_id, task_input, action_name)

292
    # Release any queries that the connection has been hanging onto
293 294
    reset_queries()

295 296
    # Log and exit, returning task_progress info as task result
    TASK_LOG.info(u'%s, Task type: %s, Finishing task: %s', task_info_string, action_name, task_progress)
297 298 299
    return task_progress


300
def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name):
301 302 303
    """
    Performs generic update by visiting StudentModule instances with the update_fcn provided.

304
    StudentModule instances are those that match the specified `course_id` and `module_state_key`.
305 306
    If `student_identifier` is not None, it is used as an additional filter to limit the modules to those belonging
    to that student. If `student_identifier` is None, performs update on modules for all students on the specified problem.
307 308

    If a `filter_fcn` is not None, it is applied to the query that has been constructed.  It takes one
309
    argument, which is the query being filtered, and returns the filtered version of the query.
310 311 312 313

    The `update_fcn` is called on each StudentModule that passes the resulting filtering.
    It is passed three arguments:  the module_descriptor for the module pointed to by the
    module_state_key, the particular StudentModule to update, and the xmodule_instance_args being
314 315 316 317
    passed through.  If the value returned by the update function evaluates to a boolean True,
    the update is successful; False indicates the update on the particular student module failed.
    A raised exception indicates a fatal condition -- that no other student modules should be considered.

318
    The return value is a dict containing the task's results, with the following keys:
319 320

          'attempted': number of attempts made
321 322 323
          'succeeded': number of attempts that "succeeded"
          'skipped': number of attempts that "skipped"
          'failed': number of attempts that "failed"
324
          'total': number of possible updates to attempt
325 326 327
          'action_name': user-visible verb to use in status messages.  Should be past-tense.
              Pass-through of input `action_name`.
          'duration_ms': how long the task has (or had) been running.
328 329

    Because this is run internal to a task, it does not catch exceptions.  These are allowed to pass up to the
330
    next level, so that it can set the failure modes and capture the error trace in the InstructorTask and the
331
    result object.
332

333
    """
334
    start_time = time()
335 336 337
    usage_keys = []
    problem_url = task_input.get('problem_url')
    entrance_exam_url = task_input.get('entrance_exam_url')
338
    student_identifier = task_input.get('student')
339
    problems = {}
340

341 342 343 344
    # if problem_url is present make a usage key from it
    if problem_url:
        usage_key = course_id.make_usage_key_from_deprecated_string(problem_url)
        usage_keys.append(usage_key)
345

346 347 348 349 350 351 352 353 354 355 356
        # find the problem descriptor:
        problem_descriptor = modulestore().get_item(usage_key)
        problems[unicode(usage_key)] = problem_descriptor

    # if entrance_exam is present grab all problems in it
    if entrance_exam_url:
        problems = get_problems_in_section(entrance_exam_url)
        usage_keys = [UsageKey.from_string(location) for location in problems.keys()]

    # find the modules in question
    modules_to_update = StudentModule.objects.filter(course_id=course_id, module_state_key__in=usage_keys)
357

358 359
    # give the option of updating an individual student. If not specified,
    # then updates all students who have responded to a problem so far
360 361 362 363 364 365 366 367 368
    student = None
    if student_identifier is not None:
        # if an identifier is supplied, then look for the student,
        # and let it throw an exception if none is found.
        if "@" in student_identifier:
            student = User.objects.get(email=student_identifier)
        elif student_identifier is not None:
            student = User.objects.get(username=student_identifier)

369 370 371 372 373 374
    if student is not None:
        modules_to_update = modules_to_update.filter(student_id=student.id)

    if filter_fcn is not None:
        modules_to_update = filter_fcn(modules_to_update)

375 376 377
    task_progress = TaskProgress(action_name, modules_to_update.count(), start_time)
    task_progress.update_task_state()

378
    for module_to_update in modules_to_update:
379
        task_progress.attempted += 1
380
        module_descriptor = problems[unicode(module_to_update.module_state_key)]
381 382
        # There is no try here:  if there's an error, we let it throw, and the task will
        # be marked as FAILED, with a stack trace.
383
        with dog_stats_api.timer('instructor_tasks.module.time.step', tags=[u'action:{name}'.format(name=action_name)]):
384 385
            update_status = update_fcn(module_descriptor, module_to_update)
            if update_status == UPDATE_STATUS_SUCCEEDED:
386 387
                # If the update_fcn returns true, then it performed some kind of work.
                # Logging of failures is left to the update_fcn itself.
388
                task_progress.succeeded += 1
389
            elif update_status == UPDATE_STATUS_FAILED:
390
                task_progress.failed += 1
391
            elif update_status == UPDATE_STATUS_SKIPPED:
392
                task_progress.skipped += 1
393 394
            else:
                raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status))
395

396
    return task_progress.update_task_state()
397 398


399 400 401
def _get_task_id_from_xmodule_args(xmodule_instance_args):
    """Gets task_id from `xmodule_instance_args` dict, or returns default value if missing."""
    return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID
402 403


404
def _get_xqueue_callback_url_prefix(xmodule_instance_args):
405
    """Gets prefix to use when constructing xqueue_callback_url."""
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
    return xmodule_instance_args.get('xqueue_callback_url_prefix', '') if xmodule_instance_args is not None else ''


def _get_track_function_for_task(student, xmodule_instance_args=None, source_page='x_module_task'):
    """
    Make a tracking function that logs what happened.

    For insertion into ModuleSystem, and used by CapaModule, which will
    provide the event_type (as string) and event (as dict) as arguments.
    The request_info and task_info (and page) are provided here.
    """
    # get request-related tracking information from args passthrough, and supplement with task-specific
    # information:
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
    task_info = {'student': student.username, 'task_id': _get_task_id_from_xmodule_args(xmodule_instance_args)}

    return lambda event_type, event: task_track(request_info, task_info, event_type, event, page=source_page)


425
def _get_module_instance_for_task(course_id, student, module_descriptor, xmodule_instance_args=None,
426
                                  grade_bucket_type=None, course=None):
427
    """
428
    Fetches a StudentModule instance for a given `course_id`, `student` object, and `module_descriptor`.
429

430 431 432
    `xmodule_instance_args` is used to provide information for creating a track function and an XQueue callback.
    These are passed, along with `grade_bucket_type`, to get_module_for_descriptor_internal, which sidesteps
    the need for a Request object when instantiating an xmodule instance.
433
    """
434
    # reconstitute the problem's corresponding XModule:
Calen Pennington committed
435
    field_data_cache = FieldDataCache.cache_for_descriptor_descendents(course_id, student, module_descriptor)
436
    student_data = KvsFieldData(DjangoKeyValueStore(field_data_cache))
437

438 439
    # get request-related tracking information from args passthrough, and supplement with task-specific
    # information:
440
    request_info = xmodule_instance_args.get('request_info', {}) if xmodule_instance_args is not None else {}
441
    task_info = {"student": student.username, "task_id": _get_task_id_from_xmodule_args(xmodule_instance_args)}
442 443 444 445

    def make_track_function():
        '''
        Make a tracking function that logs what happened.
446 447 448 449

        For insertion into ModuleSystem, and used by CapaModule, which will
        provide the event_type (as string) and event (as dict) as arguments.
        The request_info and task_info (and page) are provided here.
450
        '''
451
        return lambda event_type, event: task_track(request_info, task_info, event_type, event, page='x_module_task')
452

453 454
    xqueue_callback_url_prefix = xmodule_instance_args.get('xqueue_callback_url_prefix', '') \
        if xmodule_instance_args is not None else ''
455

456 457 458
    return get_module_for_descriptor_internal(
        user=student,
        descriptor=module_descriptor,
459
        student_data=student_data,
460 461 462 463 464 465
        course_id=course_id,
        track_function=make_track_function(),
        xqueue_callback_url_prefix=xqueue_callback_url_prefix,
        grade_bucket_type=grade_bucket_type,
        # This module isn't being used for front-end rendering
        request_token=None,
466 467
        # pass in a loaded course for override enabling
        course=course
468
    )
469 470


471
@outer_atomic
472
def rescore_problem_module_state(xmodule_instance_args, module_descriptor, student_module):
473 474
    '''
    Takes an XModule descriptor and a corresponding StudentModule object, and
475
    performs rescoring on the student's problem submission.
476

477
    Throws exceptions if the rescoring is fatal and should be aborted if in a loop.
478
    In particular, raises UpdateProblemModuleStateError if module fails to instantiate,
479
    or if the module doesn't support rescoring.
480 481 482

    Returns True if problem was successfully rescored for the given student, and False
    if problem encountered some kind of error in rescoring.
483 484
    '''
    # unpack the StudentModule:
485 486
    course_id = student_module.course_id
    student = student_module.student
487
    usage_key = student_module.module_state_key
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

    with modulestore().bulk_operations(course_id):
        course = get_course_by_id(course_id)
        # TODO: Here is a call site where we could pass in a loaded course.  I
        # think we certainly need it since grading is happening here, and field
        # overrides would be important in handling that correctly
        instance = _get_module_instance_for_task(
            course_id,
            student,
            module_descriptor,
            xmodule_instance_args,
            grade_bucket_type='rescore',
            course=course
        )

        if instance is None:
            # Either permissions just changed, or someone is trying to be clever
            # and load something they shouldn't have access to.
            msg = "No module {loc} for student {student}--access denied?".format(
                loc=usage_key,
                student=student
            )
            TASK_LOG.debug(msg)
            raise UpdateProblemModuleStateError(msg)

        if not hasattr(instance, 'rescore_problem'):
            # This should also not happen, since it should be already checked in the caller,
            # but check here to be sure.
            msg = "Specified problem does not support rescoring."
            raise UpdateProblemModuleStateError(msg)

        result = instance.rescore_problem()
        instance.save()
        if 'success' not in result:
            # don't consider these fatal, but false means that the individual call didn't complete:
            TASK_LOG.warning(
                u"error processing rescore call for course %(course)s, problem %(loc)s "
                u"and student %(student)s: unexpected response %(msg)s",
                dict(
                    msg=result,
                    course=course_id,
                    loc=usage_key,
                    student=student
                )
            )
            return UPDATE_STATUS_FAILED
        elif result['success'] not in ['correct', 'incorrect']:
            TASK_LOG.warning(
                u"error processing rescore call for course %(course)s, problem %(loc)s "
                u"and student %(student)s: %(msg)s",
                dict(
                    msg=result['success'],
                    course=course_id,
                    loc=usage_key,
                    student=student
                )
            )
            return UPDATE_STATUS_FAILED
        else:
            TASK_LOG.debug(
                u"successfully processed rescore call for course %(course)s, problem %(loc)s "
                u"and student %(student)s: %(msg)s",
                dict(
                    msg=result['success'],
                    course=course_id,
                    loc=usage_key,
                    student=student
                )
            )
            return UPDATE_STATUS_SUCCEEDED
558 559


560
@outer_atomic
561
def reset_attempts_module_state(xmodule_instance_args, _module_descriptor, student_module):
562 563 564
    """
    Resets problem attempts to zero for specified `student_module`.

565 566
    Returns a status of UPDATE_STATUS_SUCCEEDED if a problem has non-zero attempts
    that are being reset, and UPDATE_STATUS_SKIPPED otherwise.
567
    """
568
    update_status = UPDATE_STATUS_SKIPPED
Brian Wilson committed
569
    problem_state = json.loads(student_module.state) if student_module.state else {}
570 571 572 573 574
    if 'attempts' in problem_state:
        old_number_of_attempts = problem_state["attempts"]
        if old_number_of_attempts > 0:
            problem_state["attempts"] = 0
            # convert back to json and save
575 576
            student_module.state = json.dumps(problem_state)
            student_module.save()
577 578
            # get request-related tracking information from args passthrough,
            # and supplement with task-specific information:
579
            track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
580
            event_info = {"old_attempts": old_number_of_attempts, "new_attempts": 0}
581
            track_function('problem_reset_attempts', event_info)
582
            update_status = UPDATE_STATUS_SUCCEEDED
583

584
    return update_status
585 586


587
@outer_atomic
588
def delete_problem_module_state(xmodule_instance_args, _module_descriptor, student_module):
589 590 591
    """
    Delete the StudentModule entry.

592
    Always returns UPDATE_STATUS_SUCCEEDED, indicating success, if it doesn't raise an exception due to database error.
593
    """
594
    student_module.delete()
595 596
    # get request-related tracking information from args passthrough,
    # and supplement with task-specific information:
597 598
    track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
    track_function('problem_delete_state', {})
599
    return UPDATE_STATUS_SUCCEEDED
600 601


602
def upload_csv_to_report_store(rows, csv_name, course_id, timestamp, config_name='GRADES_DOWNLOAD'):
603 604 605 606 607 608 609 610 611 612 613 614 615
    """
    Upload data as a CSV using ReportStore.

    Arguments:
        rows: CSV data in the following format (first column may be a
            header):
            [
                [row1_colum1, row1_colum2, ...],
                ...
            ]
        csv_name: Name of the resulting CSV
        course_id: ID of the course
    """
616
    report_store = ReportStore.from_config(config_name)
617 618 619
    report_store.store_rows(
        course_id,
        u"{course_prefix}_{csv_name}_{timestamp_str}.csv".format(
620
            course_prefix=course_filename_prefix_generator(course_id),
621 622 623 624 625
            csv_name=csv_name,
            timestamp_str=timestamp.strftime("%Y-%m-%d-%H%M")
        ),
        rows
    )
626
    tracker.emit(REPORT_REQUESTED_EVENT_NAME, {"report_type": csv_name, })
627 628


Afzal Wali committed
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
def upload_exec_summary_to_store(data_dict, report_name, course_id, generated_at, config_name='FINANCIAL_REPORTS'):
    """
    Upload Executive Summary Html file using ReportStore.

    Arguments:
        data_dict: containing executive report data.
        report_name: Name of the resulting Html File.
        course_id: ID of the course
    """
    report_store = ReportStore.from_config(config_name)

    # Use the data dict and html template to generate the output buffer
    output_buffer = StringIO(render_to_string("instructor/instructor_dashboard_2/executive_summary.html", data_dict))

    report_store.store(
        course_id,
        u"{course_prefix}_{report_name}_{timestamp_str}.html".format(
            course_prefix=course_filename_prefix_generator(course_id),
            report_name=report_name,
            timestamp_str=generated_at.strftime("%Y-%m-%d-%H%M")
        ),
        output_buffer,
        config={
            'content_type': 'text/html',
            'content_encoding': None,
        }
    )
    tracker.emit(REPORT_REQUESTED_EVENT_NAME, {"report_type": report_name})


659
def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):  # pylint: disable=too-many-statements
660 661
    """
    For a given `course_id`, generate a grades CSV file for all students that
662 663 664
    are enrolled, and store using a `ReportStore`. Once created, the files can
    be accessed by instantiating another `ReportStore` (via
    `ReportStore.from_config()`) and calling `link_for()` on it. Writes are
665
    buffered, so we'll never write part of a CSV file to S3 -- i.e. any files
666
    that are visible in ReportStore will be complete ones.
667 668 669 670

    As we start to add more CSV downloads, it will probably be worthwhile to
    make a more general CSVDoc class instead of building out the rows like we
    do here.
671
    """
672 673
    start_time = time()
    start_date = datetime.now(UTC)
674
    status_interval = 100
675
    enrolled_students = CourseEnrollment.objects.users_enrolled_in(course_id)
676
    task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)
677

678 679 680 681 682 683 684 685 686
    fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
    task_info_string = fmt.format(
        task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None,
        entry_id=_entry_id,
        course_id=course_id,
        task_input=_task_input
    )
    TASK_LOG.info(u'%s, Task type: %s, Starting task execution', task_info_string, action_name)

687
    course = get_course_by_id(course_id)
688
    course_is_cohorted = is_course_cohorted(course.id)
689
    teams_enabled = course.teams_enabled
690
    cohorts_header = ['Cohort Name'] if course_is_cohorted else []
691
    teams_header = ['Team Name'] if teams_enabled else []
692

693 694
    experiment_partitions = get_split_user_partitions(course.user_partitions)
    group_configs_header = [u'Experiment Group ({})'.format(partition.name) for partition in experiment_partitions]
695

696 697 698 699
    certificate_info_header = ['Certificate Eligible', 'Certificate Delivered', 'Certificate Type']
    certificate_whitelist = CertificateWhitelist.objects.filter(course_id=course_id, whitelist=True)
    whitelisted_user_ids = [entry.user_id for entry in certificate_whitelist]

700
    # Loop over all our students and build our CSV lists in memory
701 702 703
    header = None
    rows = []
    err_rows = [["id", "username", "error_msg"]]
704
    current_step = {'step': 'Calculating Grades'}
705 706 707 708 709 710 711 712

    total_enrolled_students = enrolled_students.count()
    student_counter = 0
    TASK_LOG.info(
        u'%s, Task type: %s, Current step: %s, Starting grade calculation for total students: %s',
        task_info_string,
        action_name,
        current_step,
713

714 715
        total_enrolled_students
    )
716
    for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students):
717
        # Periodically update task status (this is a cache write)
718 719 720
        if task_progress.attempted % status_interval == 0:
            task_progress.update_task_state(extra_meta=current_step)
        task_progress.attempted += 1
721

722 723
        # Now add a log entry after each student is graded to get a sense
        # of the task's progress
724
        student_counter += 1
725 726 727 728 729 730 731 732
        TASK_LOG.info(
            u'%s, Task type: %s, Current step: %s, Grade calculation in-progress for students: %s/%s',
            task_info_string,
            action_name,
            current_step,
            student_counter,
            total_enrolled_students
        )
733

734
        if gradeset:
735
            # We were able to successfully grade this student for this course.
736
            task_progress.succeeded += 1
737
            if not header:
738
                header = [section['label'] for section in gradeset[u'section_breakdown']]
739
                rows.append(
740
                    ["id", "email", "username", "grade"] + header + cohorts_header +
741 742
                    group_configs_header + teams_header +
                    ['Enrollment Track', 'Verification Status'] + certificate_info_header
743
                )
744 745 746 747 748 749

            percents = {
                section['label']: section.get('percent', 0.0)
                for section in gradeset[u'section_breakdown']
                if 'label' in section
            }
750

751
            cohorts_group_name = []
752
            if course_is_cohorted:
753 754 755 756
                group = get_cohort(student, course_id, assign=False)
                cohorts_group_name.append(group.name if group else '')

            group_configs_group_names = []
757
            for partition in experiment_partitions:
758 759 760
                group = LmsPartitionService(student, course_id).get_group(partition, assign=False)
                group_configs_group_names.append(group.name if group else '')

761 762 763 764 765 766 767 768
            team_name = []
            if teams_enabled:
                try:
                    membership = CourseTeamMembership.objects.get(user=student, team__course_id=course_id)
                    team_name.append(membership.team.name)
                except CourseTeamMembership.DoesNotExist:
                    team_name.append('')

769 770 771 772 773 774 775 776 777 778 779 780 781
            enrollment_mode = CourseEnrollment.enrollment_mode_for_user(student, course_id)[0]
            verification_status = SoftwareSecurePhotoVerification.verification_status_for_user(
                student,
                course_id,
                enrollment_mode
            )
            certificate_info = certificate_info_for_user(
                student,
                course_id,
                gradeset['grade'],
                student.id in whitelisted_user_ids
            )

782 783 784 785 786 787 788
            # Not everybody has the same gradable items. If the item is not
            # found in the user's gradeset, just assume it's a 0. The aggregated
            # grades for their sections and overall course will be calculated
            # without regard for the item they didn't have access to, so it's
            # possible for a student to have a 0.0 show up in their row but
            # still have 100% for the course.
            row_percents = [percents.get(label, 0.0) for label in header]
789 790
            rows.append(
                [student.id, student.email, student.username, gradeset['percent']] +
791
                row_percents + cohorts_group_name + group_configs_group_names + team_name +
792
                [enrollment_mode] + [verification_status] + certificate_info
793
            )
794 795
        else:
            # An empty gradeset means we failed to grade a student.
796
            task_progress.failed += 1
797 798
            err_rows.append([student.id, student.username, err_msg])

799 800 801 802 803 804 805 806 807
    TASK_LOG.info(
        u'%s, Task type: %s, Current step: %s, Grade calculation completed for students: %s/%s',
        task_info_string,
        action_name,
        current_step,
        student_counter,
        total_enrolled_students
    )

808
    # By this point, we've got the rows we're going to stuff into our CSV files.
809 810
    current_step = {'step': 'Uploading CSVs'}
    task_progress.update_task_state(extra_meta=current_step)
811
    TASK_LOG.info(u'%s, Task type: %s, Current step: %s', task_info_string, action_name, current_step)
812

813
    # Perform the actual upload
814
    upload_csv_to_report_store(rows, 'grade_report', course_id, start_date)
815 816

    # If there are any error rows (don't count the header), write them out as well
817
    if len(err_rows) > 1:
818
        upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_date)
819 820

    # One last update before we close out...
821
    TASK_LOG.info(u'%s, Task type: %s, Finalizing grade task', task_info_string, action_name)
822
    return task_progress.update_task_state(extra_meta=current_step)
823 824


825 826 827
def _order_problems(blocks):
    """
    Sort the problems by the assignment type and assignment that it belongs to.
828 829 830 831 832 833 834 835

    Args:
        blocks (OrderedDict) - A course structure containing blocks that have been ordered
                              (i.e. when we iterate over them, we will see them in the order
                              that they appear in the course).

    Returns:
        an OrderedDict that maps a problem id to its headers in the final report.
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863
    """
    problems = OrderedDict()
    assignments = dict()
    # First, sort out all the blocks into their correct assignments and all the
    # assignments into their correct types.
    for block in blocks:
        # Put the assignments in order into the assignments list.
        if blocks[block]['block_type'] == 'sequential':
            block_format = blocks[block]['format']
            if block_format not in assignments:
                assignments[block_format] = OrderedDict()
            assignments[block_format][block] = list()

        # Put the problems into the correct order within their assignment.
        if blocks[block]['block_type'] == 'problem' and blocks[block]['graded'] is True:
            current = blocks[block]['parent']
            # crawl up the tree for the sequential block
            while blocks[current]['block_type'] != 'sequential':
                current = blocks[current]['parent']

            current_format = blocks[current]['format']
            assignments[current_format][current].append(block)

    # Now that we have a sorting and an order for the assignments and problems,
    # iterate through them in order to generate the header row.
    for assignment_type in assignments:
        for assignment_index, assignment in enumerate(assignments[assignment_type].keys(), start=1):
            for problem in assignments[assignment_type][assignment]:
864
                header_name = u"{assignment_type} {assignment_index}: {assignment_name} - {block}".format(
865 866 867 868 869 870 871 872 873 874
                    block=blocks[problem]['display_name'],
                    assignment_type=assignment_type,
                    assignment_index=assignment_index,
                    assignment_name=blocks[assignment]['display_name']
                )
                problems[problem] = [header_name + " (Earned)", header_name + " (Possible)"]

    return problems


875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
def upload_problem_responses_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
    """
    For a given `course_id`, generate a CSV file containing
    all student answers to a given problem, and store using a `ReportStore`.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    num_reports = 1
    task_progress = TaskProgress(action_name, num_reports, start_time)
    current_step = {'step': 'Calculating students answers to problem'}
    task_progress.update_task_state(extra_meta=current_step)

    # Compute result table and format it
    problem_location = task_input.get('problem_location')
    student_data = list_problem_responses(course_id, problem_location)
    features = ['username', 'state']
    header, rows = format_dictlist(student_data, features)

    task_progress.attempted = task_progress.succeeded = len(rows)
    task_progress.skipped = task_progress.total - task_progress.attempted

    rows.insert(0, header)

    current_step = {'step': 'Uploading CSV'}
    task_progress.update_task_state(extra_meta=current_step)

    # Perform the upload
    problem_location = re.sub(r'[:/]', '_', problem_location)
    csv_name = 'student_state_from_{}'.format(problem_location)
    upload_csv_to_report_store(rows, csv_name, course_id, start_date)

    return task_progress.update_task_state(extra_meta=current_step)


909 910 911 912 913 914 915 916
def upload_problem_grade_report(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
    """
    Generate a CSV containing all students' problem grades within a given
    `course_id`.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    status_interval = 100
917
    enrolled_students = CourseEnrollment.objects.users_enrolled_in(course_id)
918 919
    task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)

920 921 922
    # This struct encapsulates both the display names of each static item in the
    # header row as values as well as the django User field names of those items
    # as the keys.  It is structured in this way to keep the values related.
923 924 925 926 927 928 929
    header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])

    try:
        course_structure = CourseStructure.objects.get(course_id=course_id)
        blocks = course_structure.ordered_blocks
        problems = _order_problems(blocks)
    except CourseStructure.DoesNotExist:
930 931 932
        return task_progress.update_task_state(
            extra_meta={'step': 'Generating course structure. Please refresh and try again.'}
        )
933 934 935

    # Just generate the static fields for now.
    rows = [list(header_row.values()) + ['Final Grade'] + list(chain.from_iterable(problems.values()))]
936
    error_rows = [list(header_row.values()) + ['error_msg']]
937 938 939 940
    current_step = {'step': 'Calculating Grades'}

    for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students, keep_raw_scores=True):
        student_fields = [getattr(student, field_name) for field_name in header_row]
941 942
        task_progress.attempted += 1

943
        if 'percent' not in gradeset or 'raw_scores' not in gradeset:
944
            # There was an error grading this student.
945 946 947
            # Generally there will be a non-empty err_msg, but that is not always the case.
            if not err_msg:
                err_msg = u"Unknown error"
948 949 950 951
            error_rows.append(student_fields + [err_msg])
            task_progress.failed += 1
            continue

952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972
        final_grade = gradeset['percent']
        # Only consider graded problems
        problem_scores = {unicode(score.module_id): score for score in gradeset['raw_scores'] if score.graded}
        earned_possible_values = list()
        for problem_id in problems:
            try:
                problem_score = problem_scores[problem_id]
                earned_possible_values.append([problem_score.earned, problem_score.possible])
            except KeyError:
                # The student has not been graded on this problem.  For example,
                # iterate_grades_for skips problems that students have never
                # seen in order to speed up report generation.  It could also be
                # the case that the student does not have access to it (e.g. A/B
                # test or cohorted courseware).
                earned_possible_values.append(['N/A', 'N/A'])
        rows.append(student_fields + [final_grade] + list(chain.from_iterable(earned_possible_values)))

        task_progress.succeeded += 1
        if task_progress.attempted % status_interval == 0:
            task_progress.update_task_state(extra_meta=current_step)

973 974 975 976 977 978 979
    # Perform the upload if any students have been successfully graded
    if len(rows) > 1:
        upload_csv_to_report_store(rows, 'problem_grade_report', course_id, start_date)
    # If there are any error rows, write them out as well
    if len(error_rows) > 1:
        upload_csv_to_report_store(error_rows, 'problem_grade_report_err', course_id, start_date)

980 981 982
    return task_progress.update_task_state(extra_meta={'step': 'Uploading CSV'})


983
def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
984 985 986 987 988
    """
    For a given `course_id`, generate a CSV file containing profile
    information for all students that are enrolled, and store using a
    `ReportStore`.
    """
989 990
    start_time = time()
    start_date = datetime.now(UTC)
991 992 993
    enrolled_students = CourseEnrollment.objects.users_enrolled_in(course_id)
    task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)

994 995 996
    current_step = {'step': 'Calculating Profile Info'}
    task_progress.update_task_state(extra_meta=current_step)

997 998 999 1000
    # compute the student features table and format it
    query_features = task_input.get('features')
    student_data = enrolled_students_features(course_id, query_features)
    header, rows = format_dictlist(student_data, query_features)
1001 1002 1003 1004

    task_progress.attempted = task_progress.succeeded = len(rows)
    task_progress.skipped = task_progress.total - task_progress.attempted

1005 1006
    rows.insert(0, header)

1007 1008 1009
    current_step = {'step': 'Uploading CSV'}
    task_progress.update_task_state(extra_meta=current_step)

1010
    # Perform the upload
1011
    upload_csv_to_report_store(rows, 'student_profile_info', course_id, start_date)
1012

1013
    return task_progress.update_task_state(extra_meta=current_step)
1014 1015


1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
def upload_enrollment_report(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
    """
    For a given `course_id`, generate a CSV file containing profile
    information for all students that are enrolled, and store using a
    `ReportStore`.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    status_interval = 100
    students_in_course = CourseEnrollment.objects.enrolled_and_dropped_out_users(course_id)
    task_progress = TaskProgress(action_name, students_in_course.count(), start_time)

    fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
    task_info_string = fmt.format(
        task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None,
        entry_id=_entry_id,
        course_id=course_id,
        task_input=_task_input
    )
    TASK_LOG.info(u'%s, Task type: %s, Starting task execution', task_info_string, action_name)

    # Loop over all our students and build our CSV lists in memory
    rows = []
    header = None
    current_step = {'step': 'Gathering Profile Information'}
    enrollment_report_provider = PaidCourseEnrollmentReportProvider()
    total_students = students_in_course.count()
    student_counter = 0
    TASK_LOG.info(
        u'%s, Task type: %s, Current step: %s, generating detailed enrollment report for total students: %s',
        task_info_string,
        action_name,
        current_step,
        total_students
    )

    for student in students_in_course:
        # Periodically update task status (this is a cache write)
        if task_progress.attempted % status_interval == 0:
            task_progress.update_task_state(extra_meta=current_step)
        task_progress.attempted += 1

        # Now add a log entry after certain intervals to get a hint that task is in progress
        student_counter += 1
        if student_counter % 100 == 0:
            TASK_LOG.info(
                u'%s, Task type: %s, Current step: %s, gathering enrollment profile for students in progress: %s/%s',
                task_info_string,
                action_name,
                current_step,
                student_counter,
                total_students
            )

        user_data = enrollment_report_provider.get_user_profile(student.id)
        course_enrollment_data = enrollment_report_provider.get_enrollment_info(student, course_id)
        payment_data = enrollment_report_provider.get_payment_info(student, course_id)

        # display name map for the column headers
        enrollment_report_headers = {
            'User ID': _('User ID'),
            'Username': _('Username'),
            'Full Name': _('Full Name'),
            'First Name': _('First Name'),
            'Last Name': _('Last Name'),
            'Company Name': _('Company Name'),
            'Title': _('Title'),
            'Language': _('Language'),
            'Year of Birth': _('Year of Birth'),
            'Gender': _('Gender'),
            'Level of Education': _('Level of Education'),
            'Mailing Address': _('Mailing Address'),
            'Goals': _('Goals'),
            'City': _('City'),
            'Country': _('Country'),
            'Enrollment Date': _('Enrollment Date'),
            'Currently Enrolled': _('Currently Enrolled'),
            'Enrollment Source': _('Enrollment Source'),
1094
            'Manual (Un)Enrollment Reason': _('Manual (Un)Enrollment Reason'),
1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
            'Enrollment Role': _('Enrollment Role'),
            'List Price': _('List Price'),
            'Payment Amount': _('Payment Amount'),
            'Coupon Codes Used': _('Coupon Codes Used'),
            'Registration Code Used': _('Registration Code Used'),
            'Payment Status': _('Payment Status'),
            'Transaction Reference Number': _('Transaction Reference Number')
        }

        if not header:
            header = user_data.keys() + course_enrollment_data.keys() + payment_data.keys()
            display_headers = []
            for header_element in header:
                # translate header into a localizable display string
                display_headers.append(enrollment_report_headers.get(header_element, header_element))
            rows.append(display_headers)

        rows.append(user_data.values() + course_enrollment_data.values() + payment_data.values())
        task_progress.succeeded += 1

    TASK_LOG.info(
        u'%s, Task type: %s, Current step: %s, Detailed enrollment report generated for students: %s/%s',
        task_info_string,
        action_name,
        current_step,
        student_counter,
        total_students
    )

    # By this point, we've got the rows we're going to stuff into our CSV files.
    current_step = {'step': 'Uploading CSVs'}
    task_progress.update_task_state(extra_meta=current_step)
    TASK_LOG.info(u'%s, Task type: %s, Current step: %s', task_info_string, action_name, current_step)

    # Perform the actual upload
    upload_csv_to_report_store(rows, 'enrollment_report', course_id, start_date, config_name='FINANCIAL_REPORTS')

    # One last update before we close out...
    TASK_LOG.info(u'%s, Task type: %s, Finalizing detailed enrollment task', task_info_string, action_name)
    return task_progress.update_task_state(extra_meta=current_step)


1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
def upload_may_enroll_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
    """
    For a given `course_id`, generate a CSV file containing
    information about students who may enroll but have not done so
    yet, and store using a `ReportStore`.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    num_reports = 1
    task_progress = TaskProgress(action_name, num_reports, start_time)
    current_step = {'step': 'Calculating info about students who may enroll'}
    task_progress.update_task_state(extra_meta=current_step)

    # Compute result table and format it
    query_features = task_input.get('features')
    student_data = list_may_enroll(course_id, query_features)
    header, rows = format_dictlist(student_data, query_features)

    task_progress.attempted = task_progress.succeeded = len(rows)
    task_progress.skipped = task_progress.total - task_progress.attempted

    rows.insert(0, header)

    current_step = {'step': 'Uploading CSV'}
    task_progress.update_task_state(extra_meta=current_step)

    # Perform the upload
    upload_csv_to_report_store(rows, 'may_enroll_info', course_id, start_date)

    return task_progress.update_task_state(extra_meta=current_step)


Afzal Wali committed
1169 1170 1171 1172 1173 1174 1175
def get_executive_report(course_id):
    """
    Returns dict containing information about the course executive summary.
    """
    single_purchase_total = PaidCourseRegistration.get_total_amount_of_purchased_item(course_id)
    bulk_purchase_total = CourseRegCodeItem.get_total_amount_of_purchased_item(course_id)
    paid_invoices_total = InvoiceTransaction.get_total_amount_of_paid_course_invoices(course_id)
Chris Dodge committed
1176
    gross_paid_revenue = single_purchase_total + bulk_purchase_total + paid_invoices_total
Afzal Wali committed
1177 1178 1179 1180

    all_invoices_total = Invoice.get_invoice_total_amount_for_course(course_id)
    gross_pending_revenue = all_invoices_total - float(paid_invoices_total)

Chris Dodge committed
1181 1182
    gross_revenue = float(gross_paid_revenue) + float(gross_pending_revenue)

Afzal Wali committed
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234
    refunded_self_purchased_seats = PaidCourseRegistration.get_self_purchased_seat_count(
        course_id, status='refunded'
    )
    refunded_bulk_purchased_seats = CourseRegCodeItem.get_bulk_purchased_seat_count(
        course_id, status='refunded'
    )
    total_seats_refunded = refunded_self_purchased_seats + refunded_bulk_purchased_seats

    self_purchased_refunds = PaidCourseRegistration.get_total_amount_of_purchased_item(
        course_id,
        status='refunded'
    )
    bulk_purchase_refunds = CourseRegCodeItem.get_total_amount_of_purchased_item(course_id, status='refunded')
    total_amount_refunded = self_purchased_refunds + bulk_purchase_refunds

    top_discounted_codes = CouponRedemption.get_top_discount_codes_used(course_id)
    total_coupon_codes_purchases = CouponRedemption.get_total_coupon_code_purchases(course_id)

    bulk_purchased_codes = CourseRegistrationCode.order_generated_registration_codes(course_id)

    unused_registration_codes = 0
    for registration_code in bulk_purchased_codes:
        if not RegistrationCodeRedemption.is_registration_code_redeemed(registration_code.code):
            unused_registration_codes += 1

    self_purchased_seat_count = PaidCourseRegistration.get_self_purchased_seat_count(course_id)
    bulk_purchased_seat_count = CourseRegCodeItem.get_bulk_purchased_seat_count(course_id)
    total_invoiced_seats = CourseRegistrationCode.invoice_generated_registration_codes(course_id).count()

    total_seats = self_purchased_seat_count + bulk_purchased_seat_count + total_invoiced_seats

    self_purchases_percentage = 0.0
    bulk_purchases_percentage = 0.0
    invoice_purchases_percentage = 0.0
    avg_price_paid = 0.0

    if total_seats != 0:
        self_purchases_percentage = (float(self_purchased_seat_count) / float(total_seats)) * 100
        bulk_purchases_percentage = (float(bulk_purchased_seat_count) / float(total_seats)) * 100
        invoice_purchases_percentage = (float(total_invoiced_seats) / float(total_seats)) * 100
        avg_price_paid = gross_revenue / total_seats

    course = get_course_by_id(course_id, depth=0)
    currency = settings.PAID_COURSE_REGISTRATION_CURRENCY[1]

    return {
        'display_name': course.display_name,
        'start_date': course.start.strftime("%Y-%m-%d") if course.start is not None else 'N/A',
        'end_date': course.end.strftime("%Y-%m-%d") if course.end is not None else 'N/A',
        'total_seats': total_seats,
        'currency': currency,
        'gross_revenue': float(gross_revenue),
Chris Dodge committed
1235
        'gross_paid_revenue': float(gross_paid_revenue),
Afzal Wali committed
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
        'gross_pending_revenue': gross_pending_revenue,
        'total_seats_refunded': total_seats_refunded,
        'total_amount_refunded': float(total_amount_refunded),
        'average_paid_price': float(avg_price_paid),
        'discount_codes_data': top_discounted_codes,
        'total_seats_using_discount_codes': total_coupon_codes_purchases,
        'total_self_purchase_seats': self_purchased_seat_count,
        'total_bulk_purchase_seats': bulk_purchased_seat_count,
        'total_invoiced_seats': total_invoiced_seats,
        'unused_bulk_purchase_code_count': unused_registration_codes,
        'self_purchases_percentage': self_purchases_percentage,
        'bulk_purchases_percentage': bulk_purchases_percentage,
        'invoice_purchases_percentage': invoice_purchases_percentage,
    }


1252
def upload_exec_summary_report(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
Afzal Wali committed
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
    """
    For a given `course_id`, generate a html report containing information,
    which provides a snapshot of how the course is doing.
    """
    start_time = time()
    report_generation_date = datetime.now(UTC)
    status_interval = 100

    enrolled_users = CourseEnrollment.objects.users_enrolled_in(course_id)
    true_enrollment_count = 0
    for user in enrolled_users:
        if not user.is_staff and not CourseAccessRole.objects.filter(
                user=user, course_id=course_id, role__in=FILTERED_OUT_ROLES
        ).exists():
            true_enrollment_count += 1

    task_progress = TaskProgress(action_name, true_enrollment_count, start_time)

    fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
    task_info_string = fmt.format(
        task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None,
        entry_id=_entry_id,
        course_id=course_id,
        task_input=_task_input
    )

    TASK_LOG.info(u'%s, Task type: %s, Starting task execution', task_info_string, action_name)
    current_step = {'step': 'Gathering executive summary report information'}

    TASK_LOG.info(
        u'%s, Task type: %s, Current step: %s, generating executive summary report',
        task_info_string,
        action_name,
        current_step
    )

    if task_progress.attempted % status_interval == 0:
        task_progress.update_task_state(extra_meta=current_step)
    task_progress.attempted += 1

    # get the course executive summary report information.
    data_dict = get_executive_report(course_id)
    data_dict.update(
        {
            'total_enrollments': true_enrollment_count,
            'report_generation_date': report_generation_date.strftime("%Y-%m-%d"),
        }
    )

    # By this point, we've got the data that we need to generate html report.
    current_step = {'step': 'Uploading executive summary report HTML file'}
    task_progress.update_task_state(extra_meta=current_step)
    TASK_LOG.info(u'%s, Task type: %s, Current step: %s', task_info_string, action_name, current_step)

    # Perform the actual upload
    upload_exec_summary_to_store(data_dict, 'executive_report', course_id, report_generation_date)
    task_progress.succeeded += 1
    # One last update before we close out...
    TASK_LOG.info(u'%s, Task type: %s, Finalizing executive summary report task', task_info_string, action_name)
    return task_progress.update_task_state(extra_meta=current_step)
1313 1314


1315
def upload_course_survey_report(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
    """
    For a given `course_id`, generate a html report containing the survey results for a course.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    num_reports = 1
    task_progress = TaskProgress(action_name, num_reports, start_time)

    current_step = {'step': 'Gathering course survey report information'}
    task_progress.update_task_state(extra_meta=current_step)

    distinct_survey_fields_queryset = SurveyAnswer.objects.filter(course_key=course_id).values('field_name').distinct()
    survey_fields = []
    for unique_field_row in distinct_survey_fields_queryset:
        survey_fields.append(unique_field_row['field_name'])
    survey_fields.sort()

    user_survey_answers = OrderedDict()
1334
    survey_answers_for_course = SurveyAnswer.objects.filter(course_key=course_id).select_related('user')
1335 1336 1337 1338

    for survey_field_record in survey_answers_for_course:
        user_id = survey_field_record.user.id
        if user_id not in user_survey_answers.keys():
1339 1340 1341 1342
            user_survey_answers[user_id] = {
                'username': survey_field_record.user.username,
                'email': survey_field_record.user.email
            }
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352

        user_survey_answers[user_id][survey_field_record.field_name] = survey_field_record.field_value

    header = ["User ID", "User Name", "Email"]
    header.extend(survey_fields)
    csv_rows = []

    for user_id in user_survey_answers.keys():
        row = []
        row.append(user_id)
1353 1354
        row.append(user_survey_answers[user_id].get('username', ''))
        row.append(user_survey_answers[user_id].get('email', ''))
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
        for survey_field in survey_fields:
            row.append(user_survey_answers[user_id].get(survey_field, ''))
        csv_rows.append(row)

    task_progress.attempted = task_progress.succeeded = len(csv_rows)
    task_progress.skipped = task_progress.total - task_progress.attempted

    csv_rows.insert(0, header)

    current_step = {'step': 'Uploading CSV'}
    task_progress.update_task_state(extra_meta=current_step)

    # Perform the upload
    upload_csv_to_report_store(csv_rows, 'course_survey_results', course_id, start_date)

    return task_progress.update_task_state(extra_meta=current_step)
Afzal Wali committed
1371 1372


1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
def upload_proctored_exam_results_report(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):  # pylint: disable=invalid-name
    """
    For a given `course_id`, generate a CSV file containing
    information about proctored exam results, and store using a `ReportStore`.
    """
    start_time = time()
    start_date = datetime.now(UTC)
    num_reports = 1
    task_progress = TaskProgress(action_name, num_reports, start_time)
    current_step = {'step': 'Calculating info about proctored exam results in a course'}
    task_progress.update_task_state(extra_meta=current_step)

    # Compute result table and format it
    query_features = _task_input.get('features')
    student_data = get_proctored_exam_results(course_id, query_features)
    header, rows = format_dictlist(student_data, query_features)

    task_progress.attempted = task_progress.succeeded = len(rows)
    task_progress.skipped = task_progress.total - task_progress.attempted

    rows.insert(0, header)

    current_step = {'step': 'Uploading CSV'}
    task_progress.update_task_state(extra_meta=current_step)

    # Perform the upload
    upload_csv_to_report_store(rows, 'proctored_exam_results_report', course_id, start_date)

    return task_progress.update_task_state(extra_meta=current_step)


1404
def generate_students_certificates(
1405
        _xmodule_instance_args, _entry_id, course_id, task_input, action_name):
1406
    """
1407 1408
    For a given `course_id`, generate certificates for only students present in 'students' key in task_input
    json column, otherwise generate certificates for all enrolled students.
1409 1410
    """
    start_time = time()
1411
    enrolled_students = CourseEnrollment.objects.users_enrolled_in(course_id)
1412 1413 1414 1415 1416 1417

    students = task_input.get('students', None)

    if students is not None:
        enrolled_students = enrolled_students.filter(id__in=students)

1418 1419 1420 1421 1422
    task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)

    current_step = {'step': 'Calculating students already have certificates'}
    task_progress.update_task_state(extra_meta=current_step)

1423
    statuses_to_regenerate = task_input.get('statuses_to_regenerate', [])
1424 1425 1426 1427 1428
    if students is not None and not statuses_to_regenerate:
        # We want to skip 'filtering students' only when students are given and statuses to regenerate are not
        students_require_certs = enrolled_students
    else:
        students_require_certs = students_require_certificate(course_id, enrolled_students, statuses_to_regenerate)
1429

1430 1431 1432 1433 1434 1435
    if statuses_to_regenerate:
        # Mark existing generated certificates as 'unavailable' before regenerating
        # We need to call this method after "students_require_certificate" otherwise "students_require_certificate"
        # would return no results.
        invalidate_generated_certificates(course_id, enrolled_students, statuses_to_regenerate)

1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
    task_progress.skipped = task_progress.total - len(students_require_certs)

    current_step = {'step': 'Generating Certificates'}
    task_progress.update_task_state(extra_meta=current_step)

    course = modulestore().get_course(course_id, depth=0)
    # Generate certificate for each student
    for student in students_require_certs:
        task_progress.attempted += 1
        status = generate_user_certificates(
            student,
            course_id,
            course=course
        )

        if status in [CertificateStatuses.generating, CertificateStatuses.downloadable]:
            task_progress.succeeded += 1
        else:
            task_progress.failed += 1

    return task_progress.update_task_state(extra_meta=current_step)


1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
def cohort_students_and_upload(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
    """
    Within a given course, cohort students in bulk, then upload the results
    using a `ReportStore`.
    """
    start_time = time()
    start_date = datetime.now(UTC)

    # Iterate through rows to get total assignments for task progress
    with DefaultStorage().open(task_input['file_name']) as f:
        total_assignments = 0
        for _line in unicodecsv.DictReader(UniversalNewlineIterator(f)):
            total_assignments += 1

    task_progress = TaskProgress(action_name, total_assignments, start_time)
    current_step = {'step': 'Cohorting Students'}
    task_progress.update_task_state(extra_meta=current_step)

    # cohorts_status is a mapping from cohort_name to metadata about
    # that cohort.  The metadata will include information about users
    # successfully added to the cohort, users not found, and a cached
    # reference to the corresponding cohort object to prevent
    # redundant cohort queries.
    cohorts_status = {}

    with DefaultStorage().open(task_input['file_name']) as f:
        for row in unicodecsv.DictReader(UniversalNewlineIterator(f), encoding='utf-8'):
            # Try to use the 'email' field to identify the user.  If it's not present, use 'username'.
            username_or_email = row.get('email') or row.get('username')
            cohort_name = row.get('cohort') or ''
            task_progress.attempted += 1

            if not cohorts_status.get(cohort_name):
                cohorts_status[cohort_name] = {
                    'Cohort Name': cohort_name,
                    'Students Added': 0,
                    'Students Not Found': set()
                }
                try:
                    cohorts_status[cohort_name]['cohort'] = CourseUserGroup.objects.get(
                        course_id=course_id,
                        group_type=CourseUserGroup.COHORT,
                        name=cohort_name
                    )
                    cohorts_status[cohort_name]["Exists"] = True
                except CourseUserGroup.DoesNotExist:
                    cohorts_status[cohort_name]["Exists"] = False

            if not cohorts_status[cohort_name]['Exists']:
                task_progress.failed += 1
                continue

            try:
1512
                add_user_to_cohort(cohorts_status[cohort_name]['cohort'], username_or_email)
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
                cohorts_status[cohort_name]['Students Added'] += 1
                task_progress.succeeded += 1
            except User.DoesNotExist:
                cohorts_status[cohort_name]['Students Not Found'].add(username_or_email)
                task_progress.failed += 1
            except ValueError:
                # Raised when the user is already in the given cohort
                task_progress.skipped += 1

            task_progress.update_task_state(extra_meta=current_step)

    current_step['step'] = 'Uploading CSV'
    task_progress.update_task_state(extra_meta=current_step)

    # Filter the output of `add_users_to_cohorts` in order to upload the result.
    output_header = ['Cohort Name', 'Exists', 'Students Added', 'Students Not Found']
    output_rows = [
        [
            ','.join(status_dict.get(column_name, '')) if column_name == 'Students Not Found'
            else status_dict[column_name]
            for column_name in output_header
        ]
        for _cohort_name, status_dict in cohorts_status.iteritems()
    ]
    output_rows.insert(0, output_header)
    upload_csv_to_report_store(output_rows, 'cohort_results', course_id, start_date)

    return task_progress.update_task_state(extra_meta=current_step)
1541 1542


1543 1544 1545 1546 1547 1548 1549 1550 1551
def students_require_certificate(course_id, enrolled_students, statuses_to_regenerate=None):
    """
    Returns list of students where certificates needs to be generated.
    if 'statuses_to_regenerate' is given then return students that have Generated Certificates
    and the generated certificate status lies in 'statuses_to_regenerate'

    if 'statuses_to_regenerate' is not given then return all the enrolled student skipping the ones
    whose certificates have already been generated.

1552 1553
    :param course_id:
    :param enrolled_students:
1554
    :param statuses_to_regenerate:
1555
    """
1556 1557 1558
    if statuses_to_regenerate:
        # Return Students that have Generated Certificates and the generated certificate status
        # lies in 'statuses_to_regenerate'
1559
        students_require_certificates = enrolled_students.filter(
1560 1561 1562
            generatedcertificate__course_id=course_id,
            generatedcertificate__status__in=statuses_to_regenerate
        )
1563 1564
        # Fetch results otherwise subsequent operations on table cause wrong data fetch
        return list(students_require_certificates)
1565 1566 1567 1568 1569 1570 1571 1572
    else:
        # compute those students whose certificates are already generated
        students_already_have_certs = User.objects.filter(
            ~Q(generatedcertificate__status=CertificateStatuses.unavailable),
            generatedcertificate__course_id=course_id)

        # Return all the enrolled student skipping the ones whose certificates have already been generated
        return list(set(enrolled_students) - set(students_already_have_certs))
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586


def invalidate_generated_certificates(course_id, enrolled_students, certificate_statuses):  # pylint: disable=invalid-name
    """
    Invalidate generated certificates for all enrolled students in the given course having status in
    'certificate_statuses'.

    Generated Certificates are invalidated by marking its status 'unavailable' and updating verify_uuid, download_uuid,
    download_url and grade with empty string.

    :param course_id: Course Key for the course whose generated certificates need to be removed
    :param enrolled_students: (queryset or list) students enrolled in the course
    :param certificate_statuses: certificates statuses for whom to remove generated certificate
    """
1587
    certificates = GeneratedCertificate.objects.filter(  # pylint: disable=no-member
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601
        user__in=enrolled_students,
        course_id=course_id,
        status__in=certificate_statuses,
    )

    # Mark generated certificates as 'unavailable' and update download_url, download_uui, verify_uuid and
    # grade with empty string for each row
    certificates.update(
        status=CertificateStatuses.unavailable,
        verify_uuid='',
        download_uuid='',
        download_url='',
        grade='',
    )