Commit c12b5421 by Adam Palay

split up bulk email query for students and unenrolled course staff (TNL-1332) (TNL-1143)

parent 6845790d
......@@ -36,6 +36,7 @@ from django.core.urlresolvers import reverse
from bulk_email.models import (
CourseEmail, Optout, CourseEmailTemplate,
SEND_TO_MYSELF, SEND_TO_ALL, TO_OPTIONS,
SEND_TO_STAFF,
)
from courseware.courses import get_course, course_image_url
from student.roles import CourseStaffRole, CourseInstructorRole
......@@ -92,25 +93,30 @@ BULK_EMAIL_FAILURE_ERRORS = (
)
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
def _get_recipient_querysets(user_id, to_option, course_id):
"""
Returns a query set of email recipients corresponding to the requested to_option category.
Returns a list of query sets of email recipients corresponding to the
requested `to_option` category.
`to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
Recipients who are in more than one category (e.g. enrolled in the course and are staff or self)
will be properly deduped.
Recipients who are in more than one category (e.g. enrolled in the course
and are staff or self) will be properly deduped.
"""
if to_option not in TO_OPTIONS:
log.error("Unexpected bulk email TO_OPTION found: %s", to_option)
raise Exception("Unexpected bulk email TO_OPTION found: {0}".format(to_option))
if to_option == SEND_TO_MYSELF:
recipient_qset = User.objects.filter(id=user_id)
user = User.objects.filter(id=user_id)
return [use_read_replica_if_available(user)]
else:
staff_qset = CourseStaffRole(course_id).users_with_role()
instructor_qset = CourseInstructorRole(course_id).users_with_role()
recipient_qset = (staff_qset | instructor_qset).distinct()
staff_instructor_qset = (staff_qset | instructor_qset).distinct()
if to_option == SEND_TO_STAFF:
return [use_read_replica_if_available(staff_instructor_qset)]
if to_option == SEND_TO_ALL:
# We also require students to have activated their accounts to
# provide verification that the provided email address is valid.
......@@ -119,20 +125,19 @@ def _get_recipient_queryset(user_id, to_option, course_id, course_location):
courseenrollment__course_id=course_id,
courseenrollment__is_active=True
)
# Now we do some queryset sidestepping to avoid doing a DISTINCT
# query across the course staff and the enrolled students, which
# forces the creation of a temporary table in the db.
unenrolled_staff_qset = recipient_qset.exclude(
# to avoid duplicates, we only want to email unenrolled course staff
# members here
unenrolled_staff_qset = staff_instructor_qset.exclude(
courseenrollment__course_id=course_id, courseenrollment__is_active=True
)
# use read_replica if available:
unenrolled_staff_qset = use_read_replica_if_available(unenrolled_staff_qset)
unenrolled_staff_ids = [user.id for user in unenrolled_staff_qset]
recipient_qset = enrollment_qset | User.objects.filter(id__in=unenrolled_staff_ids)
# again, use read_replica if available to lighten the load for large queries
return use_read_replica_if_available(recipient_qset)
# use read_replica if available
recipient_qsets = [
use_read_replica_if_available(unenrolled_staff_qset),
use_read_replica_if_available(enrollment_qset),
]
return recipient_qsets
def _get_course_email_context(course):
......@@ -230,7 +235,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
)
return new_subtask
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
recipient_qsets = _get_recipient_querysets(user_id, to_option, course_id)
recipient_fields = ['profile__name', 'email']
log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s, to_option %s",
......@@ -240,7 +245,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
entry,
action_name,
_create_send_email_subtask,
recipient_qset,
recipient_qsets,
recipient_fields,
settings.BULK_EMAIL_EMAILS_PER_TASK,
)
......
......@@ -179,6 +179,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
response = self.client.post(self.send_mail_url, test_email)
self.assertEquals(json.loads(response.content), self.success_content)
# the 1 is for the instructor
self.assertEquals(len(mail.outbox), 1 + len(self.staff) + len(self.students))
self.assertItemsEqual(
[e.to[0] for e in mail.outbox],
......@@ -195,12 +196,25 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
def test_no_duplicate_emails_enrolled_staff(self):
"""
Test that no duplicate emials are sent to a course instructor that is
Test that no duplicate emails are sent to a course instructor that is
also enrolled in the course
"""
CourseEnrollment.enroll(self.instructor, self.course.id)
self.test_send_to_all()
def test_no_duplicate_emails_unenrolled_staff(self):
"""
Test that no duplicate emails are sent to a course staff that is
not enrolled in the course, but is enrolled in other courses
"""
course_1 = CourseFactory.create()
course_2 = CourseFactory.create()
# make sure self.instructor isn't enrolled in the course
self.assertFalse(CourseEnrollment.is_enrolled(self.instructor, self.course.id))
CourseEnrollment.enroll(self.instructor, course_1.id)
CourseEnrollment.enroll(self.instructor, course_2.id)
self.test_send_to_all()
def test_unicode_subject_send_to_all(self):
"""
Make sure email (with Unicode characters) send to all goes there.
......
......@@ -71,7 +71,7 @@ def track_memory_usage(metric, course_id):
def _generate_items_for_subtask(
item_queryset,
item_querysets,
item_fields,
total_num_items,
items_per_task,
......@@ -82,10 +82,10 @@ def _generate_items_for_subtask(
Generates a chunk of "items" that should be passed into a subtask.
Arguments:
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
`item_querysets` : a list of query sets, each of which defines the "items" that should be passed to subtasks.
`item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field.
`total_num_items` : the result of item_queryset.count().
`total_num_items` : the result of summing the count of each queryset in `item_querysets`.
`items_per_query` : size of chunks to break the query operation into.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
`course_id` : course_id of the course. Only needed for the track_memory_usage context manager.
......@@ -102,13 +102,14 @@ def _generate_items_for_subtask(
items_for_task = []
with track_memory_usage('course_email.subtask_generation.memory', course_id):
for item in item_queryset.values(*all_item_fields).iterator():
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
yield items_for_task
num_items_queued += items_per_task
items_for_task = []
num_subtasks += 1
items_for_task.append(item)
for queryset in item_querysets:
for item in queryset.values(*all_item_fields).iterator():
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
yield items_for_task
num_items_queued += items_per_task
items_for_task = []
num_subtasks += 1
items_for_task.append(item)
# yield remainder items for task, if any
if items_for_task:
......@@ -275,7 +276,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return task_progress
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_queryset, item_fields, items_per_task):
def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querysets, item_fields, items_per_task):
"""
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
......@@ -285,7 +286,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
`create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.
Arguments are the list of items to be processed by this subtask, and a SubtaskStatus
object reflecting initial status (and containing the subtask's id).
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
`item_querysets` : a list of query sets that define the "items" that should be passed to subtasks.
`item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
......@@ -294,7 +295,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
"""
task_id = entry.task_id
total_num_items = item_queryset.count()
total_num_items = sum([item_queryset.count() for item_queryset in item_querysets])
# Calculate the number of tasks that will be created, and create a list of ids for each task.
total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)
......@@ -313,7 +314,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
# Construct a generator that will return the recipients to use for each subtask.
# Pass in the desired fields to fetch for each recipient.
item_list_generator = _generate_items_for_subtask(
item_queryset,
item_querysets,
item_fields,
total_num_items,
items_per_task,
......
......@@ -38,7 +38,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
)
self._enroll_students_in_course(self.course.id, initial_count)
task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id)
task_querysets = [CourseEnrollment.objects.filter(course_id=self.course.id)]
def initialize_subtask_info(*args): # pylint: disable=unused-argument
"""Instead of initializing subtask info enroll some more students into course."""
......@@ -51,7 +51,7 @@ class TestSubtasks(InstructorTaskCourseTestCase):
entry=instructor_task,
action_name='action_name',
create_subtask_fcn=create_subtask_fcn,
item_queryset=task_queryset,
item_querysets=task_querysets,
item_fields=[],
items_per_task=items_per_task,
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment