Commit ea9fe1de by Cliff Dyer Committed by GitHub

Merge pull request #15004 from edx/neem/shuffled-backfill-tasks

Shuffle tasks.
parents ecce0f15 1daa4ead
......@@ -5,8 +5,9 @@ Command to compute all grades for specified courses.
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
from random import shuffle
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import BaseCommand
import six
from openedx.core.lib.command_utils import (
......@@ -17,7 +18,6 @@ from lms.djangoapps.grades.config.models import ComputeGradesSetting
from student.models import CourseEnrollment
from xmodule.modulestore.django import modulestore
from ...config.waffle import waffle, ESTIMATE_FIRST_ATTEMPTED
from ... import tasks
......@@ -80,32 +80,15 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
self._set_log_level(options)
self.enqueue_all_shuffled_tasks(options)
for course_key in self._get_course_keys(options):
self.enqueue_compute_grades_for_course_tasks(course_key, options)
def enqueue_compute_grades_for_course_tasks(self, course_key, options):
def enqueue_all_shuffled_tasks(self, options):
"""
Enqueue celery tasks to compute and persist all grades for the
specified course, in batches.
Enqueue all tasks, in shuffled order.
"""
enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
if enrollment_count == 0:
log.warning("No enrollments found for {}".format(course_key))
batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size']
for offset in six.moves.range(options['start_index'], enrollment_count, batch_size):
# If the number of enrollments increases after the tasks are
# created, the most recent enrollments may not get processed.
# This is an acceptable limitation for our known use cases.
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
kwargs = {
'course_key': six.text_type(course_key),
'offset': offset,
'batch_size': batch_size,
'estimate_first_attempted': options['estimate_first_attempted']
}
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
for kwargs in self._shuffled_task_kwargs(options):
result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format(
task_name=tasks.compute_grades_for_course.name,
......@@ -113,6 +96,35 @@ class Command(BaseCommand):
kwargs=kwargs,
))
def _shuffled_task_kwargs(self, options):
"""
Iterate over all task keyword arguments in random order.
Randomizing them will help even out the load on the task workers,
though it will not entirely prevent the possibility of spikes. It will
also make the overall time to completion more predictable.
"""
all_args = []
estimate_first_attempted = options['estimate_first_attempted']
for course_key in self._get_course_keys(options):
enrollment_count = CourseEnrollment.objects.filter(course_id=course_key).count()
if enrollment_count == 0:
log.warning("No enrollments found for {}".format(course_key))
batch_size = self._latest_settings().batch_size if options.get('from_settings') else options['batch_size']
for offset in six.moves.range(options['start_index'], enrollment_count, batch_size):
# This is a tuple to reduce memory consumption.
# The dictionaries with their extra overhead will be created
# and consumed one at a time.
all_args.append((six.text_type(course_key), offset, batch_size))
shuffle(all_args)
for args in all_args:
yield {
'course_key': args[0],
'offset': args[1],
'batch_size': args[2],
'estimate_first_attempted': estimate_first_attempted,
}
def _get_course_keys(self, options):
"""
Return a list of courses that need scores computed.
......@@ -139,4 +151,7 @@ class Command(BaseCommand):
log.setLevel(log_level)
def _latest_settings(self):
"""
Return the latest version of the ComputeGradesSetting
"""
return ComputeGradesSetting.current()
......@@ -17,11 +17,16 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from lms.djangoapps.grades.config.models import ComputeGradesSetting
from openedx.core.djangolib.waffle_utils import WaffleSwitchPlus
from lms.djangoapps.grades.config.waffle import ESTIMATE_FIRST_ATTEMPTED
from lms.djangoapps.grades.management.commands import compute_grades
def _sorted_by_batch(calls):
"""
Return the list of calls sorted by course_key and batch.
"""
return sorted(calls, key=lambda x: (x[1]['kwargs']['course_key'], x[1]['kwargs']['offset']))
@ddt.ddt
class TestComputeGrades(SharedModuleStoreTestCase):
"""
......@@ -98,7 +103,7 @@ class TestComputeGrades(SharedModuleStoreTestCase):
'estimate_first_attempted': estimate_first_attempted
}
self.assertEqual(
mock_task.apply_async.call_args_list,
_sorted_by_batch(mock_task.apply_async.call_args_list),
[
({
'routing_key': 'key',
......@@ -124,7 +129,7 @@ class TestComputeGrades(SharedModuleStoreTestCase):
ComputeGradesSetting.objects.create(course_ids=self.course_keys[1], batch_size=2)
call_command('compute_grades', '--from_settings')
self.assertEqual(
mock_task.apply_async.call_args_list,
_sorted_by_batch(mock_task.apply_async.call_args_list),
[
({
'kwargs': {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment