tasks.py 12 KB
Newer Older
1 2 3 4
"""
This module contains tasks for asynchronous execution of grade updates.
"""

5 6 7
from logging import getLogger

import six
8
from celery import task
9 10
from celery_utils.logged_task import LoggedTask
from celery_utils.persist_on_failure import PersistOnFailureTask
11
from courseware.model_data import get_score
12
from django.conf import settings
13
from django.contrib.auth.models import User
14
from django.core.exceptions import ValidationError
15
from django.db.utils import DatabaseError
16
from lms.djangoapps.course_blocks.api import get_course_blocks
17
from lms.djangoapps.grades.config.models import ComputeGradesSetting
18 19
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import CourseLocator
20
from openedx.core.djangoapps.monitoring_utils import set_custom_metric, set_custom_metrics_for_course_key
21
from student.models import CourseEnrollment
22
from submissions import api as sub_api
23
from track.event_transaction_utils import set_event_transaction_id, set_event_transaction_type
24
from util.date_utils import from_timestamp
25
from xmodule.modulestore.django import modulestore
26

Nimisha Asthagiri committed
27
from .config.waffle import DISABLE_REGRADE_ON_POLICY_CHANGE, waffle
28
from .constants import ScoreDatabaseTableEnum
29
from .course_grade_factory import CourseGradeFactory
30
from .exceptions import DatabaseNotReadyError
31
from .services import GradesService
32
from .signals.signals import SUBSECTION_SCORE_CHANGED
33
from .subsection_grade_factory import SubsectionGradeFactory
34
from .transformer import GradesTransformer
35

36
log = getLogger(__name__)
37

38
COURSE_GRADE_TIMEOUT_SECONDS = 1200
39 40 41
KNOWN_RETRY_ERRORS = (  # Errors we expect occasionally, should be resolved on retry
    DatabaseError,
    ValidationError,
42
    DatabaseNotReadyError,
43
)
44 45 46
RECALCULATE_GRADE_DELAY_SECONDS = 2  # to prevent excessive _has_db_updated failures. See TNL-6424.
RETRY_DELAY_SECONDS = 30
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300
47

48

49 50 51 52 53 54 55
class _BaseTask(PersistOnFailureTask, LoggedTask):  # pylint: disable=abstract-method
    """
    Include persistence features, as well as logging of task invocation.
    """
    abstract = True


56
@task(base=_BaseTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
57 58 59 60 61 62
def compute_all_grades_for_course(**kwargs):
    """
    Compute grades for all students in the specified course.
    Kicks off a series of compute_grades_for_course_v2 tasks
    to cover all of the students in the course.
    """
63 64 65 66 67 68 69 70 71 72 73 74 75
    if waffle().is_enabled(DISABLE_REGRADE_ON_POLICY_CHANGE):
        log.debug('Grades: ignoring policy change regrade due to waffle switch')
    else:
        course_key = CourseKey.from_string(kwargs.pop('course_key'))
        for course_key_string, offset, batch_size in _course_task_args(course_key=course_key, **kwargs):
            kwargs.update({
                'course_key': course_key_string,
                'offset': offset,
                'batch_size': batch_size,
            })
            compute_grades_for_course_v2.apply_async(
                kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
            )
76 77


78 79 80 81 82 83 84
@task(
    bind=True,
    base=_BaseTask,
    default_retry_delay=RETRY_DELAY_SECONDS,
    max_retries=1,
    time_limit=COURSE_GRADE_TIMEOUT_SECONDS
)
85
def compute_grades_for_course_v2(self, **kwargs):
86 87 88 89 90 91 92 93 94 95
    """
    Compute grades for a set of students in the specified course.

    The set of students will be determined by the order of enrollment date, and
    limited to at most <batch_size> students, starting from the specified
    offset.

    TODO: Roll this back into compute_grades_for_course once all workers have
    the version with **kwargs.
    """
96 97 98 99 100 101
    if 'event_transaction_id' in kwargs:
        set_event_transaction_id(kwargs['event_transaction_id'])

    if 'event_transaction_type' in kwargs:
        set_event_transaction_type(kwargs['event_transaction_type'])

102
    try:
103
        return compute_grades_for_course(kwargs['course_key'], kwargs['offset'], kwargs['batch_size'])
104 105
    except Exception as exc:   # pylint: disable=broad-except
        raise self.retry(kwargs=kwargs, exc=exc)
106 107 108 109


@task(base=_BaseTask)
def compute_grades_for_course(course_key, offset, batch_size, **kwargs):  # pylint: disable=unused-argument
110
    """
111
    Compute and save grades for a set of students in the specified course.
112 113 114 115

    The set of students will be determined by the order of enrollment date, and
    limited to at most <batch_size> students, starting from the specified
    offset.
116
    """
117 118
    course_key = CourseKey.from_string(course_key)
    enrollments = CourseEnrollment.objects.filter(course_id=course_key).order_by('created')
119
    student_iter = (enrollment.user for enrollment in enrollments[offset:offset + batch_size])
120
    for result in CourseGradeFactory().iter(users=student_iter, course_key=course_key, force_update=True):
121 122
        if result.error is not None:
            raise result.error
123 124


125 126 127 128 129 130 131 132
@task(
    bind=True,
    base=_BaseTask,
    time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
    max_retries=2,
    default_retry_delay=RETRY_DELAY_SECONDS,
    routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY
)
133
def recalculate_subsection_grade_v3(self, **kwargs):
134 135 136 137
    """
    Latest version of the recalculate_subsection_grade task.  See docstring
    for _recalculate_subsection_grade for further description.
    """
138
    _recalculate_subsection_grade(self, **kwargs)
139 140


141
def _recalculate_subsection_grade(self, **kwargs):
142
    """
143
    Updates a saved subsection grade.
144

145
    Keyword Arguments:
146
        user_id (int): id of applicable User object
147
        anonymous_user_id (int, OPTIONAL): Anonymous ID of the User
148 149 150 151 152
        course_id (string): identifying the course
        usage_id (string): identifying the course block
        only_if_higher (boolean): indicating whether grades should
            be updated only if the new raw_earned is higher than the
            previous value.
153 154
        expected_modified_time (serialized timestamp): indicates when the task
            was queued so that we can verify the underlying data update.
155 156
        score_deleted (boolean): indicating whether the grade change is
            a result of the problem's score being deleted.
157
        event_transaction_id (string): uuid identifying the current
158
            event transaction.
159
        event_transaction_type (string): human-readable type of the
160
            event at the root of the current event transaction.
161 162
        score_db_table (ScoreDatabaseTableEnum): database table that houses
            the changed score. Used in conjunction with expected_modified_time.
163
    """
164 165 166 167
    try:
        course_key = CourseLocator.from_string(kwargs['course_id'])
        scored_block_usage_key = UsageKey.from_string(kwargs['usage_id']).replace(course_key=course_key)

168 169
        set_custom_metrics_for_course_key(course_key)
        set_custom_metric('usage_id', unicode(scored_block_usage_key))
170

171 172 173 174 175
        # The request cache is not maintained on celery workers,
        # where this code runs. So we take the values from the
        # main request cache and store them in the local request
        # cache. This correlates model-level grading events with
        # higher-level ones.
176 177
        set_event_transaction_id(kwargs.get('event_transaction_id'))
        set_event_transaction_type(kwargs.get('event_transaction_type'))
178 179 180 181 182

        # Verify the database has been updated with the scores when the task was
        # created. This race condition occurs if the transaction in the task
        # creator's process hasn't committed before the task initiates in the worker
        # process.
183
        has_database_updated = _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs)
184 185

        if not has_database_updated:
186
            raise DatabaseNotReadyError
187 188 189 190 191 192

        _update_subsection_grades(
            course_key,
            scored_block_usage_key,
            kwargs['only_if_higher'],
            kwargs['user_id'],
193
            kwargs['score_deleted'],
194 195 196
        )
    except Exception as exc:   # pylint: disable=broad-except
        if not isinstance(exc, KNOWN_RETRY_ERRORS):
197
            log.info("tnl-6244 grades unexpected failure: {}. task id: {}. kwargs={}".format(
198
                repr(exc),
199 200
                self.request.id,
                kwargs,
201
            ))
202
        raise self.retry(kwargs=kwargs, exc=exc)
203 204


205
def _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs):
206 207 208 209 210 211 212 213
    """
    Returns whether the database has been updated with the
    expected new score values for the given problem and user.
    """
    if kwargs['score_db_table'] == ScoreDatabaseTableEnum.courseware_student_module:
        score = get_score(kwargs['user_id'], scored_block_usage_key)
        found_modified_time = score.modified if score is not None else None

214
    elif kwargs['score_db_table'] == ScoreDatabaseTableEnum.submissions:
215 216 217 218 219 220 221 222 223
        score = sub_api.get_score(
            {
                "student_id": kwargs['anonymous_user_id'],
                "course_id": unicode(scored_block_usage_key.course_key),
                "item_id": unicode(scored_block_usage_key),
                "item_type": scored_block_usage_key.block_type,
            }
        )
        found_modified_time = score['created_at'] if score is not None else None
224 225 226 227 228 229 230 231
    else:
        assert kwargs['score_db_table'] == ScoreDatabaseTableEnum.overrides
        score = GradesService().get_subsection_grade_override(
            user_id=kwargs['user_id'],
            course_key_or_id=kwargs['course_id'],
            usage_key_or_id=kwargs['usage_id']
        )
        found_modified_time = score.modified if score is not None else None
232 233 234 235

    if score is None:
        # score should be None only if it was deleted.
        # Otherwise, it hasn't yet been saved.
236
        db_is_updated = kwargs['score_deleted']
237
    else:
238 239 240 241
        db_is_updated = found_modified_time >= from_timestamp(kwargs['expected_modified_time'])

    if not db_is_updated:
        log.info(
242
            u"Grades: tasks._has_database_updated_with_new_score is False. Task ID: {}. Kwargs: {}. Found "
243 244 245 246 247 248
            u"modified time: {}".format(
                self.request.id,
                kwargs,
                found_modified_time,
            )
        )
249

250
    return db_is_updated
251 252


253
def _update_subsection_grades(course_key, scored_block_usage_key, only_if_higher, user_id, score_deleted):
254 255 256 257 258 259
    """
    A helper function to update subsection grades in the database
    for each subsection containing the given block, and to signal
    that those subsection grades were updated.
    """
    student = User.objects.get(id=user_id)
260 261 262 263 264 265 266 267
    store = modulestore()
    with store.bulk_operations(course_key):
        course_structure = get_course_blocks(student, store.make_course_usage_key(course_key))
        subsections_to_update = course_structure.get_transformer_block_field(
            scored_block_usage_key,
            GradesTransformer,
            'subsections',
            set(),
268 269
        )

270 271 272
        course = store.get_course(course_key, depth=0)
        subsection_grade_factory = SubsectionGradeFactory(student, course, course_structure)

273 274 275 276 277
        for subsection_usage_key in subsections_to_update:
            if subsection_usage_key in course_structure:
                subsection_grade = subsection_grade_factory.update(
                    course_structure[subsection_usage_key],
                    only_if_higher,
278
                    score_deleted
279 280
                )
                SUBSECTION_SCORE_CHANGED.send(
281
                    sender=None,
282 283 284 285 286
                    course=course,
                    course_structure=course_structure,
                    user=student,
                    subsection_grade=subsection_grade,
                )
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304


def _course_task_args(course_key, **kwargs):
    """
    Helper function to generate course-grade task args.
    """
    from_settings = kwargs.pop('from_settings', True)
    enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
    if enrollment_count == 0:
        log.warning("No enrollments found for {}".format(course_key))

    if from_settings is False:
        batch_size = kwargs.pop('batch_size', 100)
    else:
        batch_size = ComputeGradesSetting.current().batch_size

    for offset in six.moves.range(0, enrollment_count, batch_size):
        yield (six.text_type(course_key), offset, batch_size)