Commit c2e5bd3d by Adam Palay

use different queue for smaller emails (TNL-1591)

parent 037a1898
...@@ -219,6 +219,20 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -219,6 +219,20 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_option = email_obj.to_option to_option = email_obj.to_option
global_email_context = _get_course_email_context(course) global_email_context = _get_course_email_context(course)
recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id)
recipient_fields = ['profile__name', 'email']
log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
task_id, course_id, email_id, to_option)
total_recipients = sum([recipient_queryset.count() for recipient_queryset in recipient_qsets])
routing_key = settings.BULK_EMAIL_ROUTING_KEY
# if there are few enough emails, send them through a different queue
# to avoid large courses blocking emails to self and staff
if total_recipients <= settings.BULK_EMAIL_JOB_SIZE_THRESHOLD:
routing_key = settings.BULK_EMAIL_ROUTING_KEY_SMALL_JOBS
def _create_send_email_subtask(to_list, initial_subtask_status): def _create_send_email_subtask(to_list, initial_subtask_status):
"""Creates a subtask to send email to a given recipient list.""" """Creates a subtask to send email to a given recipient list."""
subtask_id = initial_subtask_status.task_id subtask_id = initial_subtask_status.task_id
...@@ -231,16 +245,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -231,16 +245,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
initial_subtask_status.to_dict(), initial_subtask_status.to_dict(),
), ),
task_id=subtask_id, task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY, routing_key=routing_key,
) )
return new_subtask return new_subtask
recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id)
recipient_fields = ['profile__name', 'email']
log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
task_id, course_id, email_id, to_option)
progress = queue_subtasks_for_query( progress = queue_subtasks_for_query(
entry, entry,
action_name, action_name,
...@@ -248,6 +256,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -248,6 +256,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
recipient_qsets, recipient_qsets,
recipient_fields, recipient_fields,
settings.BULK_EMAIL_EMAILS_PER_TASK, settings.BULK_EMAIL_EMAILS_PER_TASK,
total_recipients,
) )
# We want to return progress here, as this is what will be stored in the # We want to return progress here, as this is what will be stored in the
......
...@@ -179,6 +179,13 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase) ...@@ -179,6 +179,13 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
) )
@override_settings(BULK_EMAIL_JOB_SIZE_THRESHOLD=1)
def test_send_to_all_high_queue(self):
"""
Test that email is still sent when the high priority queue is used
"""
self.test_send_to_all()
def test_no_duplicate_emails_staff_instructor(self): def test_no_duplicate_emails_staff_instructor(self):
""" """
Test that no duplicate emails are sent to a course instructor that is Test that no duplicate emails are sent to a course instructor that is
......
...@@ -275,7 +275,16 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): ...@@ -275,7 +275,16 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return task_progress return task_progress
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querysets, item_fields, items_per_task): # pylint: disable=bad-continuation
def queue_subtasks_for_query(
entry,
action_name,
create_subtask_fcn,
item_querysets,
item_fields,
items_per_task,
total_num_items,
):
""" """
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset. Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
...@@ -289,12 +298,12 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys ...@@ -289,12 +298,12 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
`item_fields` : the fields that should be included in the dict that is returned. `item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field. These are in addition to the 'pk' field.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
`total_num_items` : total amount of items that will be put into subtasks
Returns: the task progress as stored in the InstructorTask object. Returns: the task progress as stored in the InstructorTask object.
""" """
task_id = entry.task_id task_id = entry.task_id
total_num_items = sum([item_queryset.count() for item_queryset in item_querysets])
# Calculate the number of tasks that will be created, and create a list of ids for each task. # Calculate the number of tasks that will be created, and create a list of ids for each task.
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task) total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)
......
...@@ -54,6 +54,7 @@ class TestSubtasks(InstructorTaskCourseTestCase): ...@@ -54,6 +54,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
item_querysets=task_querysets, item_querysets=task_querysets,
item_fields=[], item_fields=[],
items_per_task=items_per_task, items_per_task=items_per_task,
total_num_items=initial_count,
) )
def test_queue_subtasks_for_query1(self): def test_queue_subtasks_for_query1(self):
......
...@@ -1499,6 +1499,15 @@ BULK_EMAIL_INFINITE_RETRY_CAP = 1000 ...@@ -1499,6 +1499,15 @@ BULK_EMAIL_INFINITE_RETRY_CAP = 1000
# routing key that points to it. At the moment, the name is the same. # routing key that points to it. At the moment, the name is the same.
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
# We also define a queue for smaller jobs so that large courses don't block
# smaller emails (see BULK_EMAIL_JOB_SIZE_THRESHOLD setting)
BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = DEFAULT_PRIORITY_QUEUE
# For emails with fewer than these number of recipients, send them through
# a different queue to avoid large courses blocking emails that are meant to be
# sent to self and staff
BULK_EMAIL_JOB_SIZE_THRESHOLD = 100
# Flag to indicate if individual email addresses should be logged as they are sent # Flag to indicate if individual email addresses should be logged as they are sent
# a bulk email message. # a bulk email message.
BULK_EMAIL_LOG_SENT_EMAILS = False BULK_EMAIL_LOG_SENT_EMAILS = False
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment