Commit 6dfa47b2 by Alex Dusenbery Committed by Alex Dusenbery

EDUCATOR-565 | Add POLICY_CHANGE_GRADES_ROUTING_KEY, fix errors in…

EDUCATOR-565 | Add POLICY_CHANGE_GRADES_ROUTING_KEY, fix errors in compute_all_grades_for_course circuitry.
parent b14b9511
...@@ -34,8 +34,18 @@ class Router(AlternateEnvironmentRouter): ...@@ -34,8 +34,18 @@ class Router(AlternateEnvironmentRouter):
""" """
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
""" """
# The tasks below will be routed to the default lms queue.
return { return {
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': 'lms', }
@property
def explicit_queues(self):
"""
Defines specific queues for tasks to run in (typically outside of the cms environment),
as a dict of form { task_name: queue_name }.
"""
return {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
} }
...@@ -95,12 +95,12 @@ def handle_grading_policy_changed(sender, **kwargs): ...@@ -95,12 +95,12 @@ def handle_grading_policy_changed(sender, **kwargs):
""" """
Receives signal and kicks off celery task to recalculate grades Receives signal and kicks off celery task to recalculate grades
""" """
course_key = kwargs.get('course_key') kwargs = {
result = compute_all_grades_for_course.apply_async( 'course_key': unicode(kwargs.get('course_key')),
course_key=course_key, 'event_transaction_id': unicode(get_event_transaction_id()),
event_transaction_id=get_event_transaction_id(), 'event_transaction_type': unicode(get_event_transaction_type()),
event_transaction_type=get_event_transaction_type(), }
) result = compute_all_grades_for_course.apply_async(kwargs=kwargs)
log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format( log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format(
task_name=compute_all_grades_for_course.name, task_name=compute_all_grades_for_course.name,
task_id=result.task_id, task_id=result.task_id,
......
...@@ -9,6 +9,6 @@ from django.dispatch import Signal ...@@ -9,6 +9,6 @@ from django.dispatch import Signal
GRADING_POLICY_CHANGED = Signal( GRADING_POLICY_CHANGED = Signal(
providing_args=[ providing_args=[
'user_id', # Integer User ID 'user_id', # Integer User ID
'course_id', # Unicode string representing the course 'course_key', # Unicode string representing the course
] ]
) )
...@@ -455,11 +455,11 @@ class CourseGradingTest(CourseTestCase): ...@@ -455,11 +455,11 @@ class CourseGradingTest(CourseTestCase):
# one for each of the calls to update_from_json() # one for each of the calls to update_from_json()
send_signal.assert_has_calls([ send_signal.assert_has_calls([
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
]) ])
# one for each of the calls to update_from_json(); the last update doesn't actually change the parts of the # one for each of the calls to update_from_json(); the last update doesn't actually change the parts of the
...@@ -505,9 +505,9 @@ class CourseGradingTest(CourseTestCase): ...@@ -505,9 +505,9 @@ class CourseGradingTest(CourseTestCase):
# one for each of the calls to update_grader_from_json() # one for each of the calls to update_grader_from_json()
send_signal.assert_has_calls([ send_signal.assert_has_calls([
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
]) ])
# one for each of the calls to update_grader_from_json() # one for each of the calls to update_grader_from_json()
...@@ -620,8 +620,8 @@ class CourseGradingTest(CourseTestCase): ...@@ -620,8 +620,8 @@ class CourseGradingTest(CourseTestCase):
# one for each call to update_section_grader_type() # one for each call to update_section_grader_type()
send_signal.assert_has_calls([ send_signal.assert_has_calls([
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
]) ])
tracker.emit.assert_has_calls([ tracker.emit.assert_has_calls([
...@@ -698,9 +698,9 @@ class CourseGradingTest(CourseTestCase): ...@@ -698,9 +698,9 @@ class CourseGradingTest(CourseTestCase):
self.assertNotIn(original_model['graders'][1], updated_model['graders']) self.assertNotIn(original_model['graders'][1], updated_model['graders'])
send_signal.assert_has_calls([ send_signal.assert_has_calls([
# once for the POST # once for the POST
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
# once for the DELETE # once for the DELETE
mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id),
]) ])
def setup_test_set_get_section_grader_ajax(self): def setup_test_set_get_section_grader_ajax(self):
......
...@@ -265,7 +265,7 @@ def _grading_event_and_signal(course_key, user_id): ...@@ -265,7 +265,7 @@ def _grading_event_and_signal(course_key, user_id):
"event_transaction_type": GRADING_POLICY_CHANGED_EVENT_TYPE, "event_transaction_type": GRADING_POLICY_CHANGED_EVENT_TYPE,
} }
tracker.emit(name, data) tracker.emit(name, data)
GRADING_POLICY_CHANGED.send(sender=CourseGradingModel, user_id=user_id, course_id=course_key) GRADING_POLICY_CHANGED.send(sender=CourseGradingModel, user_id=user_id, course_key=course_key)
def hash_grading_policy(grading_policy): def hash_grading_policy(grading_policy):
......
...@@ -400,6 +400,7 @@ ALTERNATE_QUEUES = [ ...@@ -400,6 +400,7 @@ ALTERNATE_QUEUES = [
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.') DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
for alternate in ALTERNATE_QUEUE_ENVS for alternate in ALTERNATE_QUEUE_ENVS
] ]
CELERY_QUEUES.update( CELERY_QUEUES.update(
{ {
alternate: {} alternate: {}
...@@ -408,6 +409,9 @@ CELERY_QUEUES.update( ...@@ -408,6 +409,9 @@ CELERY_QUEUES.update(
} }
) )
# Queue to use for updating grades due to grading policy change
POLICY_CHANGE_GRADES_ROUTING_KEY = ENV_TOKENS.get('POLICY_CHANGE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE)
# Event tracking # Event tracking
TRACKING_BACKENDS.update(AUTH_TOKENS.get("TRACKING_BACKENDS", {})) TRACKING_BACKENDS.update(AUTH_TOKENS.get("TRACKING_BACKENDS", {}))
EVENT_TRACKING_BACKENDS['tracking_logs']['OPTIONS']['backends'].update(AUTH_TOKENS.get("EVENT_TRACKING_BACKENDS", {})) EVENT_TRACKING_BACKENDS['tracking_logs']['OPTIONS']['backends'].update(AUTH_TOKENS.get("EVENT_TRACKING_BACKENDS", {}))
......
...@@ -1339,6 +1339,9 @@ COURSE_CATALOG_API_URL = None ...@@ -1339,6 +1339,9 @@ COURSE_CATALOG_API_URL = None
# Queue to use for updating persistent grades # Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
# Queue to use for updating grades due to grading policy change
POLICY_CHANGE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
############## Settings for CourseGraph ############################ ############## Settings for CourseGraph ############################
COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE
......
...@@ -51,19 +51,22 @@ class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-m ...@@ -51,19 +51,22 @@ class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-m
abstract = True abstract = True
@task(base=_BaseTask) @task(base=_BaseTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
def compute_all_grades_for_course(**kwargs): def compute_all_grades_for_course(**kwargs):
""" """
Compute grades for all students in the specified course. Compute grades for all students in the specified course.
Kicks off a series of compute_grades_for_course_v2 tasks Kicks off a series of compute_grades_for_course_v2 tasks
to cover all of the students in the course. to cover all of the students in the course.
""" """
for course_key, offset, batch_size in _course_task_args( course_key = CourseKey.from_string(kwargs.pop('course_key'))
course_key=kwargs.pop('course_key'), for course_key_string, offset, batch_size in _course_task_args(course_key=course_key, **kwargs):
kwargs=kwargs kwargs.update({
): 'course_key': course_key_string,
task_options = {'course_key': course_key, 'offset': offset, 'batch_size': batch_size} 'offset': offset,
compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options) 'batch_size': batch_size,
'routing_key': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
})
compute_grades_for_course_v2.apply_async(kwargs=kwargs)
@task(base=_BaseTask, bind=True, default_retry_delay=30, max_retries=1) @task(base=_BaseTask, bind=True, default_retry_delay=30, max_retries=1)
...@@ -92,14 +95,11 @@ def compute_grades_for_course_v2(self, **kwargs): ...@@ -92,14 +95,11 @@ def compute_grades_for_course_v2(self, **kwargs):
if 'event_transaction_type' in kwargs: if 'event_transaction_type' in kwargs:
set_event_transaction_type(kwargs['event_transaction_type']) set_event_transaction_type(kwargs['event_transaction_type'])
course_key = kwargs.pop('course_key') if kwargs.get('estimate_first_attempted'):
offset = kwargs.pop('offset')
batch_size = kwargs.pop('batch_size')
estimate_first_attempted = kwargs.pop('estimate_first_attempted', False)
if estimate_first_attempted:
waffle().override_for_request(ESTIMATE_FIRST_ATTEMPTED, True) waffle().override_for_request(ESTIMATE_FIRST_ATTEMPTED, True)
try: try:
return compute_grades_for_course(course_key, offset, batch_size) return compute_grades_for_course(kwargs['course_key'], kwargs['offset'], kwargs['batch_size'])
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
raise self.retry(kwargs=kwargs, exc=exc) raise self.retry(kwargs=kwargs, exc=exc)
...@@ -113,7 +113,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli ...@@ -113,7 +113,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli
limited to at most <batch_size> students, starting from the specified limited to at most <batch_size> students, starting from the specified
offset. offset.
""" """
course = courses.get_course_by_id(CourseKey.from_string(course_key)) course = courses.get_course_by_id(CourseKey.from_string(course_key))
enrollments = CourseEnrollment.objects.filter(course_id=course.id).order_by('created') enrollments = CourseEnrollment.objects.filter(course_id=course.id).order_by('created')
student_iter = (enrollment.user for enrollment in enrollments[offset:offset + batch_size]) student_iter = (enrollment.user for enrollment in enrollments[offset:offset + batch_size])
......
...@@ -272,6 +272,9 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL ...@@ -272,6 +272,9 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL
# Queue to use for updating persistent grades # Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE) RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE)
# Queue to use for updating grades due to grading policy change
POLICY_CHANGE_GRADES_ROUTING_KEY = ENV_TOKENS.get('POLICY_CHANGE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE)
# Message expiry time in seconds # Message expiry time in seconds
CELERY_EVENT_QUEUE_TTL = ENV_TOKENS.get('CELERY_EVENT_QUEUE_TTL', None) CELERY_EVENT_QUEUE_TTL = ENV_TOKENS.get('CELERY_EVENT_QUEUE_TTL', None)
......
...@@ -1927,6 +1927,9 @@ BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = 0.02 ...@@ -1927,6 +1927,9 @@ BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = 0.02
# Queue to use for updating persistent grades # Queue to use for updating persistent grades
RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
# Queue to use for updating grades due to grading policy change
POLICY_CHANGE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE
############################# Email Opt In #################################### ############################# Email Opt In ####################################
# Minimum age for organization-wide email opt in # Minimum age for organization-wide email opt in
......
...@@ -21,21 +21,32 @@ class AlternateEnvironmentRouter(object): ...@@ -21,21 +21,32 @@ class AlternateEnvironmentRouter(object):
@abstractproperty @abstractproperty
def alternate_env_tasks(self): def alternate_env_tasks(self):
""" """
Defines the task -> alternate worker environment queue to be used when routing. Defines the task -> alternate worker environment to be used when routing.
Subclasses must override this property with their own specific mappings. Subclasses must override this property with their own specific mappings.
""" """
return {} return {}
@property
def explicit_queues(self):
"""
Defines the task -> alternate worker queue to be used when routing.
"""
return {}
def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument
""" """
Celery-defined method allowing for custom routing logic. Celery-defined method allowing for custom routing logic.
If None is returned from this method, default routing logic is used. If None is returned from this method, default routing logic is used.
""" """
if task in self.explicit_queues:
return self.explicit_queues[task]
alternate_env = self.alternate_env_tasks.get(task, None) alternate_env = self.alternate_env_tasks.get(task, None)
if alternate_env: if alternate_env:
return self.ensure_queue_env(alternate_env) return self.ensure_queue_env(alternate_env)
return None return None
def ensure_queue_env(self, desired_env): def ensure_queue_env(self, desired_env):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment