Commit 266bfb1a by Nimisha Asthagiri

Optimize async grades computation

TNL-5909
parent 8a784eef
......@@ -340,11 +340,10 @@ class CourseGradeFactory(object):
self._compute_and_update_grade(course, course_structure, read_only)
)
def update(self, course):
def update(self, course, course_structure):
"""
Updates the CourseGrade for this Factory's student.
"""
course_structure = get_course_blocks(self.student, course.location)
self._compute_and_update_grade(course, course_structure)
def get_persisted(self, course):
......
......@@ -12,7 +12,7 @@ from lms.djangoapps.grades.scores import get_score, possibly_scored
from lms.djangoapps.grades.models import BlockRecord, PersistentSubsectionGrade
from lms.djangoapps.grades.config.models import PersistentGradesEnabledFlag
from openedx.core.lib.grade_utils import is_score_higher
from student.models import anonymous_id_for_user, User
from student.models import anonymous_id_for_user
from submissions import api as submissions_api
from traceback import format_exc
from xmodule import block_metadata_utils, graders
......@@ -49,7 +49,7 @@ class SubsectionGrade(object):
self.due = getattr(subsection, 'due', None)
self.graded = getattr(subsection, 'graded', False)
self.course_version = getattr(course, 'course_version', None)
self.course_version = getattr(subsection, 'course_version', None)
self.subtree_edited_timestamp = subsection.subtree_edited_on
self.graded_total = None # aggregated grade for all graded problems
......@@ -211,24 +211,20 @@ class SubsectionGradeFactory(object):
self._cached_subsection_grades = None
self._unsaved_subsection_grades = []
def create(self, subsection, block_structure=None, read_only=False):
def create(self, subsection, read_only=False):
"""
Returns the SubsectionGrade object for the student and subsection.
If block_structure is provided, uses it for finding and computing
the grade instead of the course_structure passed in earlier.
If read_only is True, doesn't save any updates to the grades.
"""
self._log_event(
log.debug, u"create, read_only: {0}, subsection: {1}".format(read_only, subsection.location)
log.debug, u"create, read_only: {0}, subsection: {1}".format(read_only, subsection.location), subsection,
)
block_structure = self._get_block_structure(block_structure)
subsection_grade = self._get_bulk_cached_grade(subsection, block_structure)
subsection_grade = self._get_bulk_cached_grade(subsection)
if not subsection_grade:
subsection_grade = SubsectionGrade(subsection, self.course).init_from_structure(
self.student, block_structure, self._submissions_scores, self._csm_scores,
self.student, self.course_structure, self._submissions_scores, self._csm_scores,
)
if PersistentGradesEnabledFlag.feature_enabled(self.course.id):
if read_only:
......@@ -243,13 +239,11 @@ class SubsectionGradeFactory(object):
"""
Bulk creates all the unsaved subsection_grades to this point.
"""
self._log_event(log.debug, u"bulk_create_unsaved")
with persistence_safe_fallback():
SubsectionGrade.bulk_create_models(self.student, self._unsaved_subsection_grades, self.course.id)
self._unsaved_subsection_grades = []
def update(self, subsection, block_structure=None, only_if_higher=None):
def update(self, subsection, only_if_higher=None):
"""
Updates the SubsectionGrade object for the student and subsection.
"""
......@@ -258,11 +252,10 @@ class SubsectionGradeFactory(object):
if not PersistentGradesEnabledFlag.feature_enabled(self.course.id):
return
self._log_event(log.warning, u"update, subsection: {}".format(subsection.location))
self._log_event(log.warning, u"update, subsection: {}".format(subsection.location), subsection)
block_structure = self._get_block_structure(block_structure)
calculated_grade = SubsectionGrade(subsection, self.course).init_from_structure(
self.student, block_structure, self._submissions_scores, self._csm_scores,
self.student, self.course_structure, self._submissions_scores, self._csm_scores,
)
if only_if_higher:
......@@ -272,7 +265,7 @@ class SubsectionGradeFactory(object):
pass
else:
orig_subsection_grade = SubsectionGrade(subsection, self.course).init_from_model(
self.student, grade_model, block_structure, self._submissions_scores, self._csm_scores,
self.student, grade_model, self.course_structure, self._submissions_scores, self._csm_scores,
)
if not is_score_higher(
orig_subsection_grade.graded_total.earned,
......@@ -304,7 +297,7 @@ class SubsectionGradeFactory(object):
anonymous_user_id = anonymous_id_for_user(self.student, self.course.id)
return submissions_api.get_scores(unicode(self.course.id), anonymous_user_id)
def _get_bulk_cached_grade(self, subsection, block_structure): # pylint: disable=unused-argument
def _get_bulk_cached_grade(self, subsection):
"""
Returns the student's SubsectionGrade for the subsection,
while caching the results of a bulk retrieval for the
......@@ -318,7 +311,7 @@ class SubsectionGradeFactory(object):
subsection_grade = saved_subsection_grades.get(subsection.location)
if subsection_grade:
return SubsectionGrade(subsection, self.course).init_from_model(
self.student, subsection_grade, block_structure, self._submissions_scores, self._csm_scores,
self.student, subsection_grade, self.course_structure, self._submissions_scores, self._csm_scores,
)
def _get_bulk_cached_subsection_grades(self):
......@@ -342,27 +335,14 @@ class SubsectionGradeFactory(object):
if self._cached_subsection_grades is not None:
self._cached_subsection_grades[subsection_usage_key] = subsection_model
def _get_block_structure(self, block_structure):
"""
If block_structure is None, returns self.course_structure.
Otherwise, returns block_structure after verifying that the
given block_structure is a sub-structure of self.course_structure.
"""
if block_structure:
if block_structure.root_block_usage_key not in self.course_structure:
raise ValueError
return block_structure
else:
return self.course_structure
def _log_event(self, log_func, log_statement):
def _log_event(self, log_func, log_statement, subsection):
"""
Logs the given statement, for this instance.
"""
log_func(u"Persistent Grades: SGF.{}, course: {}, version: {}, edit: {}, user: {}".format(
log_statement,
self.course.id,
getattr(self.course, 'course_version', None),
self.course.subtree_edited_on,
getattr(subsection, 'course_version', None),
subsection.subtree_edited_on,
self.student.id,
))
......@@ -2,7 +2,6 @@
Grades related signals.
"""
from celery import Task
from django.dispatch import receiver
from logging import getLogger
......@@ -12,7 +11,8 @@ from student.models import user_by_anonymous_id
from submissions.models import score_set, score_reset
from .signals import PROBLEM_SCORE_CHANGED, SUBSECTION_SCORE_CHANGED, SCORE_PUBLISHED
from ..tasks import recalculate_subsection_grade, recalculate_course_grade
from ..new.course_grade import CourseGradeFactory
from ..tasks import recalculate_subsection_grade
log = getLogger(__name__)
......@@ -92,7 +92,7 @@ def score_published_handler(sender, block, user, raw_earned, raw_possible, only_
if only_if_higher:
previous_score = get_score(user.id, block.location)
if previous_score:
if previous_score is not None:
prev_raw_earned, prev_raw_possible = previous_score # pylint: disable=unpacking-non-sequence
if not is_score_higher(prev_raw_earned, prev_raw_possible, raw_earned, raw_possible):
......@@ -136,11 +136,8 @@ def enqueue_subsection_update(sender, **kwargs): # pylint: disable=unused-argum
@receiver(SUBSECTION_SCORE_CHANGED)
def enqueue_course_update(sender, **kwargs): # pylint: disable=unused-argument
def recalculate_course_grade(sender, course, course_structure, user, **kwargs): # pylint: disable=unused-argument
"""
Handles the SUBSECTION_SCORE_CHANGED signal by enqueueing a course update operation to occur asynchronously.
Updates a saved course grade.
"""
if isinstance(sender, Task): # We're already in a async worker, just do the task
recalculate_course_grade.apply(args=(kwargs['user'].id, unicode(kwargs['course'].id)))
else: # Otherwise, queue the work to be done asynchronously
recalculate_course_grade.apply_async(args=(kwargs['user'].id, unicode(kwargs['course'].id)))
CourseGradeFactory(user).update(course, course_structure)
......@@ -45,6 +45,7 @@ SCORE_PUBLISHED = Signal(
SUBSECTION_SCORE_CHANGED = Signal(
providing_args=[
'course', # Course object
'course_structure', # BlockStructure object
'user', # User object
'subsection_grade', # SubsectionGrade object
]
......
......@@ -12,11 +12,9 @@ from courseware.model_data import get_score
from lms.djangoapps.course_blocks.api import get_course_blocks
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.locator import CourseLocator
from openedx.core.djangoapps.content.block_structure.api import get_course_in_cache
from xmodule.modulestore.django import modulestore
from .config.models import PersistentGradesEnabledFlag
from .new.course_grade import CourseGradeFactory
from .new.subsection_grade import SubsectionGradeFactory
from .signals.signals import SUBSECTION_SCORE_CHANGED
from .transformer import GradesTransformer
......@@ -40,17 +38,19 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r
- raw_possible: the max raw points the leaner could have earned
on the problem
"""
if not PersistentGradesEnabledFlag.feature_enabled(course_id):
course_key = CourseLocator.from_string(course_id)
if not PersistentGradesEnabledFlag.feature_enabled(course_key):
return
course_key = CourseLocator.from_string(course_id)
scored_block_usage_key = UsageKey.from_string(usage_id).replace(course_key=course_key)
score = get_score(user_id, scored_block_usage_key)
# If the score is None, it has not been saved at all yet
# and we need to retry until it has been saved.
if score is None:
_retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible)
raise _retry_recalculate_subsection_grade(
user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible,
)
else:
module_raw_earned, module_raw_possible = score # pylint: disable=unpacking-non-sequence
......@@ -65,7 +65,9 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r
state_deleted = module_raw_earned is None and module_raw_possible is None and raw_earned == 0
if not (state_deleted or grades_match):
_retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible)
raise _retry_recalculate_subsection_grade(
user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible,
)
_update_subsection_grades(
course_key,
......@@ -79,44 +81,6 @@ def recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, r
)
@task(default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
def recalculate_course_grade(user_id, course_id):
"""
Updates a saved course grade.
This method expects the following parameters:
- user_id: serialized id of applicable User object
- course_id: Unicode string representing the course
"""
if not PersistentGradesEnabledFlag.feature_enabled(course_id):
return
student = User.objects.get(id=user_id)
course_key = CourseLocator.from_string(course_id)
course = modulestore().get_course(course_key, depth=0)
try:
CourseGradeFactory(student).update(course)
except IntegrityError as exc:
raise recalculate_course_grade.retry(args=[user_id, course_id], exc=exc)
def _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, grade, max_grade, exc=None):
"""
Calls retry for the recalculate_subsection_grade task with the
given inputs.
"""
raise recalculate_subsection_grade.retry(
args=[
user_id,
course_id,
usage_id,
only_if_higher,
grade,
max_grade,
],
exc=exc
)
def _update_subsection_grades(
course_key,
scored_block_usage_key,
......@@ -132,35 +96,51 @@ def _update_subsection_grades(
for each subsection containing the given block, and to signal
that those subsection grades were updated.
"""
collected_block_structure = get_course_in_cache(course_key)
course = modulestore().get_course(course_key, depth=0)
student = User.objects.get(id=user_id)
subsection_grade_factory = SubsectionGradeFactory(student, course, collected_block_structure)
subsections_to_update = collected_block_structure.get_transformer_block_field(
course_structure = get_course_blocks(student, modulestore().make_course_usage_key(course_key))
subsections_to_update = course_structure.get_transformer_block_field(
scored_block_usage_key,
GradesTransformer,
'subsections',
set()
set(),
)
course = modulestore().get_course(course_key, depth=0)
subsection_grade_factory = SubsectionGradeFactory(student, course, course_structure)
try:
for subsection_usage_key in subsections_to_update:
transformed_subsection_structure = get_course_blocks(
student,
subsection_usage_key,
collected_block_structure=collected_block_structure,
)
subsection_grade = subsection_grade_factory.update(
transformed_subsection_structure[subsection_usage_key],
transformed_subsection_structure,
course_structure[subsection_usage_key],
only_if_higher,
)
SUBSECTION_SCORE_CHANGED.send(
sender=recalculate_subsection_grade,
course=course,
course_structure=course_structure,
user=student,
subsection_grade=subsection_grade,
)
except IntegrityError as exc:
_retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, exc)
raise _retry_recalculate_subsection_grade(
user_id, course_id, usage_id, only_if_higher, raw_earned, raw_possible, exc,
)
def _retry_recalculate_subsection_grade(user_id, course_id, usage_id, only_if_higher, grade, max_grade, exc=None):
"""
Calls retry for the recalculate_subsection_grade task with the
given inputs.
"""
recalculate_subsection_grade.retry(
args=[
user_id,
course_id,
usage_id,
only_if_higher,
grade,
max_grade,
],
exc=exc
)
......@@ -21,7 +21,7 @@ from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory, chec
from lms.djangoapps.grades.config.models import PersistentGradesEnabledFlag
from lms.djangoapps.grades.signals.signals import PROBLEM_SCORE_CHANGED, SUBSECTION_SCORE_CHANGED
from lms.djangoapps.grades.tasks import recalculate_course_grade, recalculate_subsection_grade
from lms.djangoapps.grades.tasks import recalculate_subsection_grade
@patch.dict(settings.FEATURES, {'PERSISTENT_GRADES_ENABLED_FOR_ALL_TESTS': False})
......@@ -85,7 +85,6 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
@ddt.data(
('lms.djangoapps.grades.tasks.recalculate_subsection_grade.apply_async', PROBLEM_SCORE_CHANGED),
('lms.djangoapps.grades.tasks.recalculate_course_grade.apply_async', SUBSECTION_SCORE_CHANGED)
)
@ddt.unpack
def test_signal_queues_task(self, enqueue_op, test_signal):
......@@ -115,52 +114,8 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
Ensures that the subsection update operation also updates the course grade.
"""
self.set_up_course()
mock_update_return = uuid4()
course_key = CourseLocator.from_string(unicode(self.course.id))
course = modulestore().get_course(course_key, depth=0)
with patch(
'lms.djangoapps.grades.new.subsection_grade.SubsectionGradeFactory.update',
return_value=mock_update_return
):
self._apply_recalculate_subsection_grade()
mock_course_signal.assert_called_once_with(
sender=recalculate_subsection_grade,
course=course,
user=self.user,
subsection_grade=mock_update_return,
)
@ddt.data(True, False)
def test_course_update_enqueuing(self, should_be_async):
"""
Ensures that the course update operation is enqueued on an async queue (or not) as expected.
"""
base = 'lms.djangoapps.grades.tasks.recalculate_course_grade'
if should_be_async:
executed = base + '.apply_async'
other = base + '.apply'
sender = None
else:
executed = base + '.apply'
other = base + '.apply_async'
sender = recalculate_subsection_grade
self.set_up_course()
with patch(executed) as executed_task:
with patch(other) as other_task:
SUBSECTION_SCORE_CHANGED.send(
sender=sender,
course=self.course,
user=self.user,
)
other_task.assert_not_called()
executed_task.assert_called_once_with(
args=(
self.problem_score_changed_kwargs['user_id'],
self.problem_score_changed_kwargs['course_id'],
)
)
self._apply_recalculate_subsection_grade()
self.assertTrue(mock_course_signal.called)
@ddt.data(
(ModuleStoreEnum.Type.mongo, 1),
......@@ -171,7 +126,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
with self.store.default_store(default_store):
self.set_up_course()
self.assertTrue(PersistentGradesEnabledFlag.feature_enabled(self.course.id))
with check_mongo_calls(2) and self.assertNumQueries(25 + added_queries):
with check_mongo_calls(2) and self.assertNumQueries(22 + added_queries):
self._apply_recalculate_subsection_grade()
def test_single_call_to_create_block_structure(self):
......@@ -182,7 +137,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
return_value=None,
) as mock_block_structure_create:
self._apply_recalculate_subsection_grade()
self.assertEquals(mock_block_structure_create.call_count, 2)
self.assertEquals(mock_block_structure_create.call_count, 1)
@ddt.data(
(ModuleStoreEnum.Type.mongo, 1),
......@@ -195,7 +150,7 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
self.assertTrue(PersistentGradesEnabledFlag.feature_enabled(self.course.id))
ItemFactory.create(parent=self.sequential, category='problem', display_name='problem2')
ItemFactory.create(parent=self.sequential, category='problem', display_name='problem3')
with check_mongo_calls(2) and self.assertNumQueries(25 + added_queries):
with check_mongo_calls(2) and self.assertNumQueries(22 + added_queries):
self._apply_recalculate_subsection_grade()
@ddt.data(ModuleStoreEnum.Type.mongo, ModuleStoreEnum.Type.split)
......@@ -232,22 +187,6 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
self._apply_recalculate_subsection_grade()
self.assertTrue(mock_retry.called)
@patch('lms.djangoapps.grades.tasks.recalculate_course_grade.retry')
@patch('lms.djangoapps.grades.new.course_grade.CourseGradeFactory.update')
def test_retry_course_update_on_integrity_error(self, mock_update, mock_retry):
"""
Ensures that tasks will be retried if IntegrityErrors are encountered.
"""
self.set_up_course()
mock_update.side_effect = IntegrityError("WHAMMY")
recalculate_course_grade.apply(
args=(
self.problem_score_changed_kwargs['user_id'],
self.problem_score_changed_kwargs['course_id'],
)
)
self.assertTrue(mock_retry.called)
@patch('lms.djangoapps.grades.tasks.recalculate_subsection_grade.retry')
def test_retry_subsection_grade_on_update_not_complete(self, mock_retry):
self.set_up_course()
......
"""
Common utilities for performance testing.
"""
from contextlib import contextmanager
def collect_profile_func(file_prefix, enabled=False):
"""
Method decorator for collecting profile.
"""
import functools
def _outer(func):
"""
Outer function decorator.
"""
@functools.wraps(func)
def _inner(self, *args, **kwargs):
"""
Inner wrapper function.
"""
if enabled:
with collect_profile(file_prefix):
return func(self, *args, **kwargs)
else:
return func(self, *args, **kwargs)
return _inner
return _outer
@contextmanager
def collect_profile(file_prefix):
"""
Context manager to collect profile information.
"""
import cProfile
import uuid
profiler = cProfile.Profile()
profiler.enable()
try:
yield
finally:
profiler.disable()
profiler.dump_stats("{0}_{1}_master.profile".format(file_prefix, uuid.uuid4()))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment