Commit 5b769df4 by Adam Palay

No longer chunk queries when sending out bulk email (LMS-2718)

refactor _generate_items_for_subtask not to chunk queries

use django's iterator methor for querysets

remove bulk email query settings

use read_replica if available, since this is read-only

update changelog
parent 0f580a7e
...@@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes, ...@@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes,
in roughly chronological order, most recent first. Add your entries at or near in roughly chronological order, most recent first. Add your entries at or near
the top. Include a label indicating the component affected. the top. Include a label indicating the component affected.
LMS: Update bulk email implementation to lessen load on the database
by consolidating chunked queries for recipients into a single query.
Blades: Fix link to javascript file in ChoiceTextResponse. BLD-1103. Blades: Fix link to javascript file in ChoiceTextResponse. BLD-1103.
All: refactored code to handle course_ids, module_ids, etc in a cleaner way. All: refactored code to handle course_ids, module_ids, etc in a cleaner way.
......
...@@ -46,6 +46,7 @@ from instructor_task.subtasks import ( ...@@ -46,6 +46,7 @@ from instructor_task.subtasks import (
update_subtask_status, update_subtask_status,
) )
from xmodule.modulestore import Location from xmodule.modulestore import Location
from util.query import use_read_replica_if_available
log = get_task_logger(__name__) log = get_task_logger(__name__)
...@@ -109,7 +110,7 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location): ...@@ -109,7 +110,7 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
else: else:
staff_qset = CourseStaffRole(course_id).users_with_role() staff_qset = CourseStaffRole(course_id).users_with_role()
instructor_qset = CourseInstructorRole(course_id).users_with_role() instructor_qset = CourseInstructorRole(course_id).users_with_role()
recipient_qset = staff_qset | instructor_qset recipient_qset = (staff_qset | instructor_qset).distinct()
if to_option == SEND_TO_ALL: if to_option == SEND_TO_ALL:
# We also require students to have activated their accounts to # We also require students to have activated their accounts to
# provide verification that the provided email address is valid. # provide verification that the provided email address is valid.
...@@ -118,11 +119,20 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location): ...@@ -118,11 +119,20 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
courseenrollment__course_id=course_id, courseenrollment__course_id=course_id,
courseenrollment__is_active=True courseenrollment__is_active=True
) )
recipient_qset = recipient_qset | enrollment_qset # Now we do some queryset sidestepping to avoid doing a DISTINCT
recipient_qset = recipient_qset.distinct() # query across the course staff and the enrolled students, which
# forces the creation of a temporary table in the db.
unenrolled_staff_qset = recipient_qset.exclude(
courseenrollment__course_id=course_id, courseenrollment__is_active=True
)
# use read_replica if available:
unenrolled_staff_qset = use_read_replica_if_available(unenrolled_staff_qset)
unenrolled_staff_ids = [user.id for user in unenrolled_staff_qset]
recipient_qset = enrollment_qset | User.objects.filter(id__in=unenrolled_staff_ids)
recipient_qset = recipient_qset.order_by('pk') # again, use read_replica if available to lighten the load for large queries
return recipient_qset return use_read_replica_if_available(recipient_qset)
def _get_course_email_context(course): def _get_course_email_context(course):
...@@ -232,8 +242,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -232,8 +242,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
_create_send_email_subtask, _create_send_email_subtask,
recipient_qset, recipient_qset,
recipient_fields, recipient_fields,
settings.BULK_EMAIL_EMAILS_PER_QUERY, settings.BULK_EMAIL_EMAILS_PER_TASK,
settings.BULK_EMAIL_EMAILS_PER_TASK
) )
# We want to return progress here, as this is what will be stored in the # We want to return progress here, as this is what will be stored in the
......
...@@ -20,6 +20,8 @@ from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase ...@@ -20,6 +20,8 @@ from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout from bulk_email.models import Optout
from instructor_task.subtasks import update_subtask_status from instructor_task.subtasks import update_subtask_status
from student.roles import CourseStaffRole
from student.models import CourseEnrollment
STAFF_COUNT = 3 STAFF_COUNT = 3
STUDENT_COUNT = 10 STUDENT_COUNT = 10
...@@ -176,6 +178,22 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ...@@ -176,6 +178,22 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
) )
def test_no_duplicate_emails_staff_instructor(self):
"""
Test that no duplicate emails are sent to a course instructor that is
also course staff
"""
CourseStaffRole(self.course.id).add_users(self.instructor)
self.test_send_to_all()
def test_no_duplicate_emails_enrolled_staff(self):
"""
Test that no duplicate emials are sent to a course instructor that is
also enrolled in the course
"""
CourseEnrollment.enroll(self.instructor, self.course.id)
self.test_send_to_all()
def test_unicode_subject_send_to_all(self): def test_unicode_subject_send_to_all(self):
""" """
Make sure email (with Unicode characters) send to all goes there. Make sure email (with Unicode characters) send to all goes there.
...@@ -260,7 +278,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ...@@ -260,7 +278,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students] [self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
) )
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7) @override_settings(BULK_EMAIL_EMAILS_PER_TASK=3)
@patch('bulk_email.tasks.update_subtask_status') @patch('bulk_email.tasks.update_subtask_status')
def test_chunked_queries_send_numerous_emails(self, email_mock): def test_chunked_queries_send_numerous_emails(self, email_mock):
""" """
......
...@@ -29,29 +29,20 @@ class DuplicateTaskException(Exception): ...@@ -29,29 +29,20 @@ class DuplicateTaskException(Exception):
pass pass
def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task): def _get_number_of_subtasks(total_num_items, items_per_task):
""" """
Determines number of subtasks that would be generated by _generate_items_for_subtask. Determines number of subtasks that would be generated by _generate_items_for_subtask.
This needs to be calculated before a query is executed so that the list of all subtasks can be This needs to be calculated before the query is executed so that the list of all subtasks can be
stored in the InstructorTask before any subtasks are started. stored in the InstructorTask before any subtasks are started.
The number of subtask_id values returned by this should match the number of chunks returned The number of subtask_id values returned by this should match the number of chunks returned
by the generate_items_for_subtask generator. by the generate_items_for_subtask generator.
""" """
total_num_tasks = 0 return int(math.ceil(float(total_num_items) / float(items_per_task)))
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
num_items_remaining = total_num_items
for _ in range(num_queries):
num_items_this_query = min(num_items_remaining, items_per_query)
num_items_remaining -= num_items_this_query
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
total_num_tasks += num_tasks_this_query
return total_num_tasks
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks):
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task):
""" """
Generates a chunk of "items" that should be passed into a subtask. Generates a chunk of "items" that should be passed into a subtask.
...@@ -67,41 +58,31 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, tot ...@@ -67,41 +58,31 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, tot
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed. Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
""" """
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
last_pk = item_queryset.order_by('pk')[0].pk - 1
num_items_queued = 0 num_items_queued = 0
available_num_subtasks = total_num_subtasks
all_item_fields = list(item_fields) all_item_fields = list(item_fields)
all_item_fields.append('pk') all_item_fields.append('pk')
num_subtasks = 0
for query_number in range(num_queries): items_for_task = []
# In case total_num_items has increased since it was initially calculated for item in item_queryset.values(*all_item_fields).iterator():
# include all remaining items in last query. if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)
if query_number < num_queries - 1:
item_sublist = list(item_sublist[:items_per_query])
else:
item_sublist = list(item_sublist)
last_pk = item_sublist[-1]['pk']
num_items_this_query = len(item_sublist)
# In case total_num_items has increased since it was initially calculated just distribute the extra
# items among the available subtasks.
num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task))))
available_num_subtasks -= num_tasks_this_query
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
yield items_for_task yield items_for_task
num_items_queued += items_per_task
num_items_queued += num_items_this_query items_for_task = []
num_subtasks += 1
# Because queueing does not happen in one transaction the number of items in the queryset may change items_for_task.append(item)
# from the initial count. For example if the queryset is of the CourseEnrollment model students may
# enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the # yield remainder items for task, if any
# number of subtasks needed to perform the requested task. if items_for_task:
yield items_for_task
num_items_queued += len(items_for_task)
# Note, depending on what kind of DB is used, it's possible for the queryset
# we iterate over to change in the course of the query. Therefore it's
# possible that there are more (or fewer) items queued than were initially
# calculated. It also means it's possible that the last task contains
# more items than items_per_task allows. We expect this to be a small enough
# number as to be negligible.
if num_items_queued != total_num_items: if num_items_queued != total_num_items:
TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items) TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)
...@@ -256,7 +237,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): ...@@ -256,7 +237,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return task_progress return task_progress
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_query, items_per_task): def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_task):
""" """
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset. Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
...@@ -269,7 +250,6 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys ...@@ -269,7 +250,6 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
`item_queryset` : a query set that defines the "items" that should be passed to subtasks. `item_queryset` : a query set that defines the "items" that should be passed to subtasks.
`item_fields` : the fields that should be included in the dict that is returned. `item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field. These are in addition to the 'pk' field.
`items_per_query` : size of chunks to break the query operation into.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
Returns: the task progress as stored in the InstructorTask object. Returns: the task progress as stored in the InstructorTask object.
...@@ -279,7 +259,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys ...@@ -279,7 +259,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
total_num_items = item_queryset.count() total_num_items = item_queryset.count()
# Calculate the number of tasks that will be created, and create a list of ids for each task. # Calculate the number of tasks that will be created, and create a list of ids for each task.
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_query, items_per_task) total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)
subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)] subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]
# Update the InstructorTask with information about the subtasks we've defined. # Update the InstructorTask with information about the subtasks we've defined.
...@@ -293,9 +273,8 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys ...@@ -293,9 +273,8 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
item_queryset, item_queryset,
item_fields, item_fields,
total_num_items, total_num_items,
items_per_task,
total_num_subtasks, total_num_subtasks,
items_per_query,
items_per_task
) )
# Now create the subtasks, and start them running. # Now create the subtasks, and start them running.
......
...@@ -26,7 +26,7 @@ class TestSubtasks(InstructorTaskCourseTestCase): ...@@ -26,7 +26,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
random_id = uuid4().hex[:8] random_id = uuid4().hex[:8]
self.create_student(username='student{0}'.format(random_id)) self.create_student(username='student{0}'.format(random_id))
def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count): def _queue_subtasks(self, create_subtask_fcn, items_per_task, initial_count, extra_count):
"""Queue subtasks while enrolling more students into course in the middle of the process.""" """Queue subtasks while enrolling more students into course in the middle of the process."""
task_id = str(uuid4()) task_id = str(uuid4())
...@@ -53,43 +53,29 @@ class TestSubtasks(InstructorTaskCourseTestCase): ...@@ -53,43 +53,29 @@ class TestSubtasks(InstructorTaskCourseTestCase):
create_subtask_fcn=create_subtask_fcn, create_subtask_fcn=create_subtask_fcn,
item_queryset=task_queryset, item_queryset=task_queryset,
item_fields=[], item_fields=[],
items_per_query=items_per_query,
items_per_task=items_per_task, items_per_task=items_per_task,
) )
def test_queue_subtasks_for_query1(self): def test_queue_subtasks_for_query1(self):
"""Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items.""" """Test queue_subtasks_for_query() if the last subtask only needs to accommodate < items_per_tasks items."""
mock_create_subtask_fcn = Mock() mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1) self._queue_subtasks(mock_create_subtask_fcn, 3, 7, 1)
# Check number of items for each subtask # Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3) self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 2)
def test_queue_subtasks_for_query2(self): def test_queue_subtasks_for_query2(self):
"""Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items.""" """Test queue_subtasks_for_query() if the last subtask needs to accommodate > items_per_task items."""
mock_create_subtask_fcn = Mock() mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3) self._queue_subtasks(mock_create_subtask_fcn, 3, 8, 3)
# Check number of items for each subtask # Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5) self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5)
def test_queue_subtasks_for_query3(self):
"""Test queue_subtasks_for_query() if in last query the number of items available > items_per_query."""
mock_create_subtask_fcn = Mock()
self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3)
# Check number of items for each subtask
mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list
self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3)
self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4)
self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)
...@@ -179,7 +179,6 @@ PAYMENT_REPORT_GENERATOR_GROUP = ENV_TOKENS.get('PAYMENT_REPORT_GENERATOR_GROUP' ...@@ -179,7 +179,6 @@ PAYMENT_REPORT_GENERATOR_GROUP = ENV_TOKENS.get('PAYMENT_REPORT_GENERATOR_GROUP'
# Bulk Email overrides # Bulk Email overrides
BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL) BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL)
BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK) BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK)
BULK_EMAIL_EMAILS_PER_QUERY = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_QUERY', BULK_EMAIL_EMAILS_PER_QUERY)
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY) BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES) BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP) BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP)
......
...@@ -1096,7 +1096,6 @@ BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@example.com' ...@@ -1096,7 +1096,6 @@ BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@example.com'
# Parameters for breaking down course enrollment into subtasks. # Parameters for breaking down course enrollment into subtasks.
BULK_EMAIL_EMAILS_PER_TASK = 100 BULK_EMAIL_EMAILS_PER_TASK = 100
BULK_EMAIL_EMAILS_PER_QUERY = 1000
# Initial delay used for retrying tasks. Additional retries use # Initial delay used for retrying tasks. Additional retries use
# longer delays. Value is in seconds. # longer delays. Value is in seconds.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment