Commit 42033ca8 by Brian Wilson

Update handling of bulk-email retries to update InstructorTask before each retry.

parent e2d98520
......@@ -5,8 +5,8 @@ to a course.
import math
import re
from uuid import uuid4
from time import time, sleep
import json
from time import sleep
from sys import exc_info
from traceback import format_exc
......@@ -15,7 +15,8 @@ from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError
from celery import task, current_task, group
from celery.utils.log import get_task_logger
from celery.states import SUCCESS, FAILURE
from celery.states import SUCCESS, FAILURE, RETRY
from celery.exceptions import RetryTaskError
from django.conf import settings
from django.contrib.auth.models import User, Group
......@@ -31,8 +32,9 @@ from courseware.access import _course_staff_group_name, _course_instructor_group
from import get_course_by_id, course_image_url
from instructor_task.models import InstructorTask
from instructor_task.subtasks import (
update_subtask_result, update_subtask_status, create_subtask_result,
log = get_task_logger(__name__)
......@@ -155,13 +157,11 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_progress = create_subtask_result()
), task_id=subtask_id
num_workers += num_tasks_this_query
......@@ -178,8 +178,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# We want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value.
# The Result will then be marked as SUCCEEDED, and have this return value as it's "result".
# That's okay, for the InstructorTask will have the "real" status.
# The AsyncResult will then be marked as SUCCEEDED, and have this return value as it's "result".
# That's okay, for the InstructorTask will have the "real" status, and monitoring code
# will use that instead.
return progress
......@@ -190,17 +191,28 @@ def _get_current_task():
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress):
def send_course_email(entry_id, email_id, to_list, global_email_context):
Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are
'profile__name', 'email' (address), and 'pk' (in the user table).
course_title, course_url, and image_url are to memoize course properties and save lookups.
Sends to all addresses contained in to_list. Emails are sent multi-part, in both plain
text and html.
Sends an email to a list of recipients.
Inputs are:
* `entry_id`: id of the InstructorTask object to which progress should be recorded.
* `email_id`: id of the CourseEmail model that is to be emailed.
* `to_list`: list of recipients. Each is represented as a dict with the following keys:
- 'profile__name': full name of User.
- 'email': email address of User.
- 'pk': primary key of User model.
* `global_email_context`: dict containing values to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
* retry_index: counter indicating how many times this task has been retried. Set to zero
on initial call.
Sends to all addresses contained in to_list that are not also in the Optout table.
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
with status information (sends, failures, skips) and updates number of subtasks completed.
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
# with it right away.
# with it right away, but we also don't expect it to fail.
# Get information from current task's request:
......@@ -210,42 +222,64 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask"Preparing to send email as subtask %s for instructor task %d, retry %d",
current_task_id, entry_id, retry_index)
send_exception = None
course_email_result_value = None
course_title = global_email_context['course_title']
course_email_result_value = None
send_exception = None
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
course_email_result_value, send_exception = _send_course_email(
if send_exception is None:
# Update the InstructorTask object that is storing its progress.
update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
log.error("background task (%s) failed: %s", current_task_id, send_exception)
update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value)
raise send_exception
except Exception:
# try to write out the failure to the entry before failing
_, exception, traceback = exc_info()
# Unexpected exception. Try to write out the failure to the entry before failing
_, send_exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
log.error("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
log.error("background task (%s) failed unexpectedly: %s %s", current_task_id, send_exception, traceback_string)
# consider all emails to not be sent, and update stats:
num_error = len(to_list)
course_email_result_value = create_subtask_result(0, num_error, 0)
if send_exception is None:
# Update the InstructorTask object that is storing its progress."background task (%s) succeeded", current_task_id)
update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
elif isinstance(send_exception, RetryTaskError):
# If retrying, record the progress made before the retry condition
# was encountered. Once the retry is running, it will be only processing
# what wasn't already accomplished.
log.warning("background task (%s) being retried", current_task_id)
update_subtask_status(entry_id, current_task_id, RETRY, course_email_result_value)
raise send_exception
log.error("background task (%s) failed: %s", current_task_id, send_exception)
update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value)
raise send_exception
return course_email_result_value
def _send_course_email(task_id, email_id, to_list, global_email_context, subtask_progress, retry_index):
def _send_course_email(entry_id, email_id, to_list, global_email_context):
Performs the email sending task.
Sends an email to a list of recipients.
Inputs are:
* `entry_id`: id of the InstructorTask object to which progress should be recorded.
* `email_id`: id of the CourseEmail model that is to be emailed.
* `to_list`: list of recipients. Each is represented as a dict with the following keys:
- 'profile__name': full name of User.
- 'email': email address of User.
- 'pk': primary key of User model.
* `global_email_context`: dict containing values to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
Sends to all addresses contained in to_list that are not also in the Optout table.
Emails are sent multi-part, in both plain text and html.
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
......@@ -258,6 +292,9 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
In this case, the number of recipients that were not sent have already been added to the
'failed' count above.
# Get information from current task's request:
task_id = _get_current_task()
retry_index = _get_current_task().request.retries
throttle = retry_index > 0
num_optout = 0
......@@ -268,10 +305,9 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
course_email = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist as exc:
log.exception("Task %s: could not find email id:%s to send.", task_id, email_id)
num_error += len(to_list)
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), exc
# exclude optouts (if not a retry):
# Exclude optouts (if not a retry):
# Note that we don't have to do the optout logic at all if this is a retry,
# because we have presumably already performed the optout logic on the first
# attempt. Anyone on the to_list on a retry has already passed the filter
......@@ -304,7 +340,6 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
course_email_template = CourseEmailTemplate.get_template()
connection = get_connection()
......@@ -317,7 +352,7 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
while to_list:
# Update context with user-specific values:
# Update context with user-specific values from the user at the end of the list:
email = to_list[-1]['email']
email_context['email'] = email
email_context['name'] = to_list[-1]['profile__name']
......@@ -351,7 +386,7 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask'Email with id %s sent to %s', email_id, email)
num_sent += 1
except SMTPDataError as exc:
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
if exc.smtp_code >= 400 and exc.smtp_code < 500:
# This will cause the outer handler to catch the exception and retry the entire task
raise exc
......@@ -361,43 +396,86 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
num_error += 1
# Pop the user that was emailed off the end of the list:
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
# Errors caught here cause the email to be retried. The entire task is actually retried
# without popping the current recipient off of the existing list.
# Errors caught are those that indicate a temporary condition that might succeed on retry.
log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
task_id, email_id, exc, len(to_list))
raise send_course_email.retry(
update_subtask_result(subtask_progress, num_sent, num_error, num_optout),
countdown=(2 ** retry_index) * 15
subtask_progress = create_subtask_result(num_sent, num_error, num_optout)
return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress
# TODO: what happens if there are no more retries, because the maximum has been reached?
# Assume that this then just results in the "exc" being raised directly, which means that the
# subtask status is not going to get updated correctly.
except Exception as exc:
# If we have a general exception for this request, we need to figure out what to do with it.
# If we're going to just mark it as failed
# And the log message below should indicate which task_id is failing, so we have a chance to
# reconstruct the problems.
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
num_error += len(to_list)
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), exc
return create_subtask_result(num_sent, num_error, num_optout), exc
# Add current progress to any progress stemming from previous retries:
# Successful completion is marked by an exception value of None:
return create_subtask_result(num_sent, num_error, num_optout), None
# clean up at the end
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), None
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress):
Helper function to requeue a task for retry, using the new version of arguments provided.
Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
These include the `current_exception` that the task encountered that is causing the retry attempt,
and the `subtask_progress` that is to be returned.
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
'attempted': number of emails attempted
'succeeded': number of emails succeeded
'skipped': number of emails skipped (due to optout)
'failed': number of emails not sent because of some failure
* Second value is an exception returned by the innards of the method. If the retry was
successfully submitted, this value will be the RetryTaskError that retry() returns.
Otherwise, it (ought to be) the current_exception passed in.
task_id = _get_current_task()
retry_index = _get_current_task().request.retries
log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
task_id, email_id, current_exception, len(to_list))
countdown=(2 ** retry_index) * 15,
except RetryTaskError as retry_error:
# If retry call is successful, update with the current progress:
log.exception('Task %s: email with id %d caused send_course_email task to retry.',
task_id, email_id)
return subtask_progress, retry_error
except Exception as retry_exc:
# If there are no more retries, because the maximum has been reached,
# we expect the original exception to be raised. We catch it here
# (and put it in retry_exc just in case it's different, but it shouldn't be),
# and update status as if it were any other failure.
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
return subtask_progress, retry_exc
def _statsd_tag(course_title):
......@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout
from instructor_task.subtasks import update_subtask_result
from instructor_task.subtasks import create_subtask_result
......@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
emails_sent = 0
def get_mock_update_subtask_result(self):
def get_mock_create_subtask_result(self):
"""Wrapper for mock email function."""
def mock_update_subtask_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
def mock_create_subtask_result(sent, failed, output, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return update_subtask_result(prev_results, sent, failed, output)
return mock_update_subtask_result
return create_subtask_result(sent, failed, output)
return mock_create_subtask_result
......@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
def test_chunked_queries_send_numerous_emails(self, email_mock):
Test sending a large number of emails, to test the chunked querying
mock_factory = MockCourseEmailResult()
email_mock.side_effect = mock_factory.get_mock_update_subtask_result()
email_mock.side_effect = mock_factory.get_mock_create_subtask_result()
added_users = []
for _ in xrange(LARGE_NUM_EMAILS):
user = UserFactory()
......@@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertIsInstance(exc, SMTPDataError)
@patch('bulk_email.tasks.get_connection', autospec=True)
def test_data_err_fail(self, retry, result, get_conn):
......@@ -91,10 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error
# Test that after the rejected email, the rest still successfully send
((_, sent, fail, optouts), _) = result.call_args
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, settings.EMAILS_PER_TASK / 4)
self.assertEquals(sent, 3 * settings.EMAILS_PER_TASK / 4)
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(fail, expectedNumFails)
self.assertEquals(sent, settings.EMAILS_PER_TASK - expectedNumFails)
@patch('bulk_email.tasks.get_connection', autospec=True)
......@@ -137,11 +138,10 @@ class TestEmailErrors(ModuleStoreTestCase):
exc = kwargs['exc']
self.assertIsInstance(exc, SMTPConnectError)
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
def test_general_exception(self, mock_log, retry, result):
Tests the if the error is not SMTP-related, we log and reraise
......@@ -152,29 +152,23 @@ class TestEmailErrors(ModuleStoreTestCase):
'subject': 'test subject for myself',
'message': 'test message for myself'
# TODO: This whole test is flawed. Figure out how to make it work correctly,
# possibly moving it elsewhere. It's hitting the wrong exception.
# For some reason (probably the weirdness of testing with celery tasks) assertRaises doesn't work here
# so we assert on the arguments of log.exception
# TODO: This is way too fragile, because if any additional log statement is added anywhere in the flow,
# this test will break., test_email)
# ((log_str, email_id, to_list), _) = mock_log.exception.call_args
# instead, use call_args_list[-1] to get the last call?
# self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
# self.assertEqual(email_id, 1)
# self.assertEqual(to_list, [])
((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args
self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
self.assertEqual(email_id, 1)
self.assertEqual(to_list, [])
# TODO: cannot use the result method to determine if a result was generated,
# because we now call the particular method as part of all subtask calls.
# So use result.called_count to track this...
# self.assertFalse(result.called)
# call_args_list = result.call_args_list
num_calls = result.called_count
self.assertTrue(num_calls == 2)
# check the results being returned
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, 1) # just myself
self.assertEquals(sent, 0)
def test_nonexist_email(self, mock_log, result):
......@@ -5,31 +5,22 @@ from time import time
import json
from celery.utils.log import get_task_logger
from celery.states import SUCCESS
from celery.states import SUCCESS, RETRY
from django.db import transaction
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
log = get_task_logger(__name__)
TASK_LOG = get_task_logger(__name__)
def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout):
def create_subtask_result(new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
# add in any previous results:
if previous_result is not None:
for keyname in current_result:
if keyname in previous_result:
current_result[keyname] += previous_result[keyname]
return current_result
def create_subtask_result():
return update_subtask_result(None, 0, 0, 0)
def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list):
Store initial subtask information to InstructorTask object.
......@@ -61,7 +52,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
# Write out the subtasks information.
num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status}
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'retried': 0, 'status': subtask_status}
entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work:
......@@ -74,8 +65,8 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
Update the status of the subtask in the parent InstructorTask object tracking its progress.
""""Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)"Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
......@@ -85,9 +76,17 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
raise ValueError(msg)
subtask_status[current_task_id] = status
# Update status unless it has already been set. This can happen
# when a task is retried and running in eager mode -- the retries
# will be updating before the original call, and we don't want their
# ultimate status to be clobbered by the "earlier" updates. This
# should not be a problem in normal (non-eager) processing.
old_status = subtask_status[current_task_id]
if status != RETRY or old_status == QUEUING:
subtask_status[current_task_id] = status
# Update the parent task progress
task_progress = json.loads(entry.task_output)
......@@ -102,6 +101,8 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# entire subtask_status dict.
if status == SUCCESS:
subtask_dict['succeeded'] += 1
elif status == RETRY:
subtask_dict['retried'] += 1
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
......@@ -111,15 +112,13 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
entry.subtasks = json.dumps(subtask_dict)
entry.task_output = InstructorTask.create_output_for_success(task_progress)"Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug once working"about to save....")"Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
TASK_LOG.debug("about to save....")
log.exception("Unexpected error while updating InstructorTask.")
except Exception:
TASK_LOG.exception("Unexpected error while updating InstructorTask.")
# TODO: temporary -- switch to debug once working"about to commit....")
TASK_LOG.debug("about to commit....")
......@@ -131,12 +131,12 @@ class InstructorTaskCourseTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase)
def login_username(self, username):
"""Login the user, given the `username`."""
if self.current_user != username:
self.login(InstructorTaskModuleTestCase.get_user_email(username), "test")
self.login(InstructorTaskCourseTestCase.get_user_email(username), "test")
self.current_user = username
def _create_user(self, username, is_staff=False):
"""Creates a user and enrolls them in the test course."""
email = InstructorTaskModuleTestCase.get_user_email(username)
email = InstructorTaskCourseTestCase.get_user_email(username)
thisuser = UserFactory.create(username=username, email=email, is_staff=is_staff)
return thisuser
......@@ -8,7 +8,6 @@ paths actually work.
import json
from uuid import uuid4
from unittest import skip
from functools import partial
from mock import Mock, MagicMock, patch
......@@ -24,7 +23,7 @@ from instructor_task.models import InstructorTask
from instructor_task.tests.test_base import InstructorTaskModuleTestCase
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state
from instructor_task.tasks_helper import UpdateProblemModuleStateError, run_main_task, perform_module_state_update, UPDATE_STATUS_SUCCEEDED
from instructor_task.tasks_helper import UpdateProblemModuleStateError
PROBLEM_URL_NAME = "test_urlname"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment