Commit 87a72b7e by Brian Wilson

Rename some constants, and refactor bulk email task flow.

parent bc599a06
......@@ -132,10 +132,57 @@ def _get_course_email_context(course):
return email_context
def _generate_subtasks(create_subtask_fcn, recipient_qset):
"""
Generates a list of subtasks to send email to a given set of recipients.
Arguments:
`create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id
to assign to the new subtask. Returns the subtask that will send email to that
list of recipients.
`recipient_qset` : a query set that defines the recipients who should receive emails.
Returns: a tuple, containing:
* A list of subtasks that will send emails to all recipients.
* A list of subtask_ids corresponding to those subtasks.
* A count of the total number of emails being sent.
"""
total_num_emails = recipient_qset.count()
num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_emails_queued = 0
task_list = []
subtask_id_list = []
for _ in range(num_queries):
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk']
num_emails_this_query = len(recipient_sublist)
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK)))
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
new_subtask = create_subtask_fcn(to_list, subtask_id)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails)
log.error(error_msg)
raise ValueError(error_msg)
return task_list, subtask_id_list, total_num_emails
def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
"""
Delegates emails by querying for the list of recipients who should
get the mail, chopping up into batches of settings.EMAILS_PER_TASK size,
get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size,
and queueing up worker jobs.
Returns the number of batches (workers) kicked off.
......@@ -151,56 +198,35 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
format_msg = "Course id conflict: explicit value {} does not match task value {}"
raise ValueError(format_msg.format(course_id, entry.course_id))
# Fetch the CourseEmail.
email_id = task_input['email_id']
try:
email_obj = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist as exc:
except CourseEmail.DoesNotExist:
# The CourseEmail object should be committed in the view function before the task
# is submitted and reaches this point.
log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
raise
to_option = email_obj.to_option
# Sanity check that course for email_obj matches that of the task referencing it.
if course_id != email_obj.course_id:
format_msg = "Course id conflict: explicit value {} does not match email value {}"
raise ValueError(format_msg.format(course_id, email_obj.course_id))
# Fetch the course object.
try:
course = get_course(course_id)
except ValueError:
log.exception("Task %s: course not found: %s", task_id, course_id)
raise
global_email_context = _get_course_email_context(course)
to_option = email_obj.to_option
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
total_num_emails = recipient_qset.count()
log.info("Task %s: Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s",
task_id, total_num_emails, course_id, email_id, to_option)
global_email_context = _get_course_email_context(course)
num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_emails_queued = 0
task_list = []
subtask_id_list = []
for _ in range(num_queries):
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk)
.values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk']
num_emails_this_query = len(recipient_sublist)
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.EMAILS_PER_TASK)))
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
def _create_send_email_subtask(to_list, subtask_id):
"""Creates a subtask to send email to a given recipient list."""
subtask_status = create_subtask_status(subtask_id)
# Create subtask, passing args and kwargs.
# This includes specifying the task_id to use, so we can track it.
# Specify the routing key as part of it, which is used by
# Celery to route the task request to the right worker.
new_subtask = send_course_email.subtask(
(
entry_id,
......@@ -212,25 +238,22 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
return new_subtask
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(
task_id, num_emails_queued, total_num_emails
)
log.error(error_msg)
raise Exception(error_msg)
log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s",
task_id, course_id, email_id, to_option)
task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset)
# Update the InstructorTask with information about the subtasks we've defined.
log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s",
task_id, total_num_emails, course_id, email_id, to_option)
progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list)
num_subtasks = len(subtask_id_list)
log.info("Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
num_subtasks, total_num_emails, course_id, email_id, to_option)
# Now group the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
task_id, num_subtasks, total_num_emails, course_id, email_id, to_option)
task_group = group(task_list)
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
......@@ -328,6 +351,49 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
return new_subtask_status
def _filter_optouts_from_recipients(to_list, course_id):
"""
Filters a recipient list based on student opt-outs for a given course.
Returns the filtered recipient list, as well as the number of optouts
removed from the list.
"""
optouts = Optout.objects.filter(
course_id=course_id,
user__in=[i['pk'] for i in to_list]
).values_list('user__email', flat=True)
optouts = set(optouts)
# Only count the num_optout for the first time the optouts are calculated.
# We assume that the number will not change on retries, and so we don't need
# to calculate it each time.
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
return to_list, num_optout
def _get_source_address(course_id, course_title):
"""
Calculates an email address to be used as the 'from-address' for sent emails.
Makes a unique from name and address for each course, e.g.
"COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>
"""
course_title_no_quotes = re.sub(r'"', '', course_title)
# The course_id is assumed to be in the form 'org/course_num/run',
# so pull out the course_num. Then make sure that it can be used
# in an email address, by substituting a '_' anywhere a non-(ascii, period, or dash)
# character appears.
course_num = course_id.split('/')[1]
INVALID_CHARS = re.compile(r"[^\w.-]")
course_num = INVALID_CHARS.sub('_', course_num)
from_addr = '"{0}" Course Staff <{1}-{2}>'.format(course_title_no_quotes, course_num, settings.BULK_EMAIL_DEFAULT_FROM_EMAIL)
return from_addr
def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
"""
Performs the email sending task.
......@@ -371,9 +437,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# Get information from current task's request:
task_id = subtask_status['task_id']
# If this is a second attempt due to rate-limits, then throttle the speed at which mail is sent:
throttle = subtask_status['retried_nomax'] > 0
# collect stats on progress:
num_optout = 0
num_sent = 0
......@@ -392,30 +455,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0:
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
.values_list('user__email', flat=True))
optouts = set(optouts)
# Only count the num_optout for the first time the optouts are calculated.
# We assume that the number will not change on retries, and so we don't need
# to calculate it each time.
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id)
course_title = global_email_context['course_title']
subject = "[" + course_title + "] " + course_email.subject
course_title_no_quotes = re.sub(r'"', '', course_title)
course_num = course_email.course_id.split('/')[1] # course_id = 'org/course_num/run'
# Substitute a '_' anywhere a non-(ascii, period, or dash) character appears.
INVALID_CHARS = re.compile(r"[^\w.-]")
course_num = INVALID_CHARS.sub('_', course_num)
# Make a unique from name and address for each course, eg
# "COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>
from_addr = '"{0}" Course Staff <{1}-{2}>'.format(
course_title_no_quotes, course_num, settings.DEFAULT_BULK_FROM_EMAIL
)
from_addr = _get_source_address(course_email.course_id, course_title)
course_email_template = CourseEmailTemplate.get_template()
try:
......@@ -423,17 +467,19 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
connection.open()
# Define context values to use in all course emails:
email_context = {
'name': '',
'email': ''
}
email_context = {'name': '', 'email': ''}
email_context.update(global_email_context)
while to_list:
# Update context with user-specific values from the user at the end of the list:
email = to_list[-1]['email']
# Update context with user-specific values from the user at the end of the list.
# At the end of processing this user, they will be popped off of the to_list.
# That way, the to_list will always contain the recipients remaining to be emailed.
# This is convenient for retries, which will need to send to those who haven't
# yet been emailed, but not send to those who have already been sent to.
current_recipient = to_list[-1]
email = current_recipient['email']
email_context['email'] = email
email_context['name'] = to_list[-1]['profile__name']
email_context['name'] = current_recipient['profile__name']
# Construct message content using templates and context:
plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context)
......@@ -454,7 +500,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# for a period of time between all emails within this task. Choice of
# the value depends on the number of workers that might be sending email in
# parallel, and what the SES throttle rate is.
if throttle:
if subtask_status['retried_nomax'] > 0:
sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
try:
......@@ -488,7 +534,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
log.debug('Email with id %s sent to %s', email_id, email)
num_sent += 1
# Pop the user that was emailed off the end of the list:
# Pop the user that was emailed off the end of the list only once they have
# successfully been processed. (That way, if there were a failure that
# needed to be retried, the user is still on the list.)
to_list.pop()
except INFINITE_RETRY_ERRORS as exc:
......
......@@ -243,7 +243,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7)
@patch('bulk_email.tasks.increment_subtask_status')
def test_chunked_queries_send_numerous_emails(self, email_mock):
"""
......
......@@ -76,7 +76,7 @@ class TestEmailErrors(ModuleStoreTestCase):
# have every fourth email fail due to blacklisting:
get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"),
None, None, None])
students = [UserFactory() for _ in xrange(settings.EMAILS_PER_TASK)]
students = [UserFactory() for _ in xrange(settings.BULK_EMAIL_EMAILS_PER_TASK)]
for student in students:
CourseEnrollmentFactory.create(user=student, course_id=self.course.id)
......@@ -93,9 +93,9 @@ class TestEmailErrors(ModuleStoreTestCase):
# Test that after the rejected email, the rest still successfully send
((_initial_results), kwargs) = result.call_args
self.assertEquals(kwargs['skipped'], 0)
expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
expected_fails = int((settings.BULK_EMAIL_EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(kwargs['failed'], expected_fails)
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails)
self.assertEquals(kwargs['succeeded'], settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails)
@patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.send_course_email.retry')
......
......@@ -189,7 +189,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_successful(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
self._create_students(num_emails - 1)
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
......@@ -198,7 +198,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_unactivated_user(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
students = self._create_students(num_emails - 1)
# mark a student as not yet having activated their email:
......@@ -211,7 +211,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_skipped(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
students = self._create_students(num_emails - 1)
# have every fourth student optout:
......@@ -227,7 +227,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def _test_email_address_failures(self, exception):
"""Test that celery handles bad address errors by failing and not retrying."""
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
self._create_students(num_emails - 1)
expected_fails = int((num_emails + 3) / 4.0)
......
......@@ -139,9 +139,9 @@ PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CUR
PAID_COURSE_REGISTRATION_CURRENCY)
# Bulk Email overrides
DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL)
EMAILS_PER_TASK = ENV_TOKENS.get('EMAILS_PER_TASK', EMAILS_PER_TASK)
EMAILS_PER_QUERY = ENV_TOKENS.get('EMAILS_PER_QUERY', EMAILS_PER_QUERY)
BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL)
BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK)
BULK_EMAIL_EMAILS_PER_QUERY = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_QUERY', BULK_EMAIL_EMAILS_PER_QUERY)
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP)
......@@ -149,6 +149,7 @@ BULK_EMAIL_LOG_SENT_EMAILS = ENV_TOKENS.get('BULK_EMAIL_LOG_SENT_EMAILS', BULK_E
BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = ENV_TOKENS.get('BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS', BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
# We want Bulk Email running on the high-priority queue, so we define the
# routing key that points to it. At the moment, the name is the same.
# We have to reset the value here, since we have changed the value of the queue name.
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
# Theme overrides
......
......@@ -798,9 +798,13 @@ CELERYD_HIJACK_ROOT_LOGGER = False
################################ Bulk Email ###################################
DEFAULT_BULK_FROM_EMAIL = 'no-reply@courseupdates.edx.org'
EMAILS_PER_TASK = 100
EMAILS_PER_QUERY = 1000
# Suffix used to construct 'from' email address for bulk emails.
# A course-specific identifier is prepended.
BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@courseupdates.edx.org'
# Parameters for breaking down course enrollment into subtasks.
BULK_EMAIL_EMAILS_PER_TASK = 100
BULK_EMAIL_EMAILS_PER_QUERY = 1000
# Initial delay used for retrying tasks. Additional retries use
# longer delays. Value is in seconds.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment