Commit 01611c33 by Brian Wilson

Refactor instructor_task tests, and add handling for general errors in bulk_email subtasks.

parent 2f4774f4
......@@ -166,7 +166,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_progress = _course_email_result(None, 0, 0, 0)
subtask_progress = update_subtask_result(None, 0, 0, 0)
......@@ -259,14 +259,14 @@ def _update_subtask_status(entry_id, current_task_id, status, subtask_result):"Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug
# TODO: temporary -- switch to debug once working"about to save....")
log.exception("Unexpected error while updating InstructorTask.")
# TODO: temporary -- switch to debug
# TODO: temporary -- switch to debug once working"about to commit....")
......@@ -289,40 +289,69 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
current_task_id = _get_current_task()
retry_index = _get_current_task().request.retries"Preparing to send email as subtask %s for instructor task %d",
current_task_id, entry_id)"Preparing to send email as subtask %s for instructor task %d, retry %d",
current_task_id, entry_id, retry_index)
course_title = global_email_context['course_title']
course_email_result_value = None
send_exception = None
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
course_email_result_value = _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index)
# Assume that if we get here without a raise, the task was successful.
# Update the InstructorTask object that is storing its progress.
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
course_email_result_value, send_exception = _send_course_email(
if send_exception is None:
# Update the InstructorTask object that is storing its progress.
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
log.error("background task (%s) failed: %s", current_task_id, send_exception)
_update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value)
raise send_exception
except Exception:
# try to write out the failure to the entry before failing
_, exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
log.warning("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
log.error("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
_update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
return course_email_result_value
def _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index):
def _send_course_email(task_id, email_id, to_list, global_email_context, subtask_progress, retry_index):
Performs the email sending task.
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
'attempted': number of emails attempted
'succeeded': number of emails succeeded
'skipped': number of emails skipped (due to optout)
'failed': number of emails not sent because of some failure
* Second value is an exception returned by the innards of the method, indicating a fatal error.
In this case, the number of recipients that were not sent have already been added to the
'failed' count above.
throttle = retry_index > 0
num_optout = 0
num_sent = 0
num_error = 0
course_email = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist:
log.exception("Could not find email id:{} to send.".format(email_id))
except CourseEmail.DoesNotExist as exc:
log.exception("Task %s: could not find email id:%s to send.", task_id, email_id)
num_error += len(to_list)
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), exc
# exclude optouts (if not a retry):
# Note that we don't have to do the optout logic at all if this is a retry,
......@@ -330,7 +359,6 @@ def _send_course_email(email_id, to_list, global_email_context, subtask_progress
# attempt. Anyone on the to_list on a retry has already passed the filter
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
num_optout = 0
if retry_index == 0:
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
......@@ -350,8 +378,6 @@ def _send_course_email(email_id, to_list, global_email_context, subtask_progress
course_email_template = CourseEmailTemplate.get_template()
num_sent = 0
num_error = 0
connection = get_connection()
......@@ -404,45 +430,47 @@ def _send_course_email(email_id, to_list, global_email_context, subtask_progress
raise exc
# This will fall through and not retry the message, since it will be popped
log.warning('Email with id %s not delivered to %s due to error %s', email_id, email, exc.smtp_error)
log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc.smtp_error)
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
num_error += 1
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
# Error caught here cause the email to be retried. The entire task is actually retried without popping the list
# Reasoning is that all of these errors may be temporary condition.
# TODO: figure out what this means. Presumably we have popped the list with those that have succeeded
# and failed, rather than those needing a later retry.
log.warning('Email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
email_id, exc, len(to_list))
# Errors caught here cause the email to be retried. The entire task is actually retried
# without popping the current recipient off of the existing list.
# Errors caught are those that indicate a temporary condition that might succeed on retry.
log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
task_id, email_id, exc, len(to_list))
raise send_course_email.retry(
_course_email_result(subtask_progress, num_sent, num_error, num_optout),
update_subtask_result(subtask_progress, num_sent, num_error, num_optout),
countdown=(2 ** retry_index) * 15
log.exception('Email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
[i['email'] for i in to_list])
# Close the connection before we exit
except Exception as exc:
# If we have a general exception for this request, we need to figure out what to do with it.
# If we're going to just mark it as failed
# And the log message below should indicate which task_id is failing, so we have a chance to
# reconstruct the problems.
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
num_error += len(to_list)
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), exc
# Add current progress to any progress stemming from previous retries:
return _course_email_result(subtask_progress, num_sent, num_error, num_optout)
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), None
def _course_email_result(previous_result, new_num_sent, new_num_error, new_num_optout):
def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
......@@ -2,6 +2,8 @@
Unit tests for sending course email
from mock import patch
from django.conf import settings
from django.core import mail
from django.core.urlresolvers import reverse
......@@ -12,13 +14,7 @@ from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE
from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from instructor_task.models import InstructorTask
from instructor_task.tests.factories import InstructorTaskFactory
from bulk_email.tasks import send_course_email
from bulk_email.models import CourseEmail, Optout
from mock import patch
from bulk_email.models import Optout
......@@ -32,13 +28,13 @@ class MockCourseEmailResult(object):
emails_sent = 0
def get_mock_course_email_result(self):
def get_mock_update_subtask_result(self):
"""Wrapper for mock email function."""
def mock_course_email_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
def mock_update_subtask_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return True
return mock_course_email_result
return mock_update_subtask_result
......@@ -247,13 +243,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
def test_chunked_queries_send_numerous_emails(self, email_mock):
Test sending a large number of emails, to test the chunked querying
mock_factory = MockCourseEmailResult()
email_mock.side_effect = mock_factory.get_mock_course_email_result()
email_mock.side_effect = mock_factory.get_mock_update_subtask_result()
added_users = []
for _ in xrange(LARGE_NUM_EMAILS):
user = UserFactory()
......@@ -283,24 +279,3 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
[ for s in self.students] +
[ for s in added_users if s not in optouts])
self.assertItemsEqual(outbox_contents, should_send_contents)
class TestEmailSendExceptions(ModuleStoreTestCase):
Test that exceptions are handled correctly.
def test_no_instructor_task(self):
with self.assertRaises(InstructorTask.DoesNotExist):
send_course_email(100, 101, [], {}, False)
def test_no_course_title(self):
entry = InstructorTaskFactory.create(task_key='', task_id='dummy')
with self.assertRaises(KeyError):
send_course_email(, 101, [], {}, False)
def test_no_course_email_obj(self):
# Make sure send_course_email handles CourseEmail.DoesNotExist exception.
entry = InstructorTaskFactory.create(task_key='', task_id='dummy')
with self.assertRaises(CourseEmail.DoesNotExist):
send_course_email(, 101, [], {'course_title': 'Test'}, False)
......@@ -2,11 +2,16 @@
Unit tests for handling email sending errors
from itertools import cycle
from mock import patch, Mock
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
from unittest import skip
from django.test.utils import override_settings
from django.conf import settings
from import call_command
from django.core.urlresolvers import reverse
from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
......@@ -16,9 +21,6 @@ from bulk_email.models import CourseEmail
from bulk_email.tasks import perform_delegate_email_batches
from instructor_task.models import InstructorTask
from mock import patch, Mock
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
class EmailTestException(Exception):
"""Mock exception for email testing."""
......@@ -65,14 +67,15 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertTrue(type(exc) == SMTPDataError)
@patch('bulk_email.tasks.get_connection', autospec=True)
def test_data_err_fail(self, retry, result, get_conn):
Test that celery handles permanent SMTPDataErrors by failing and not retrying.
# have every fourth email fail due to blacklisting:
get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"),
None, None, None])
students = [UserFactory() for _ in xrange(settings.EMAILS_PER_TASK)]
for student in students:
......@@ -88,10 +91,10 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error
# Test that after the rejected email, the rest still successfully send
((sent, fail, optouts), _) = result.call_args
((_, sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, settings.EMAILS_PER_TASK / 2)
self.assertEquals(sent, settings.EMAILS_PER_TASK / 2)
self.assertEquals(fail, settings.EMAILS_PER_TASK / 4)
self.assertEquals(sent, 3 * settings.EMAILS_PER_TASK / 4)
@patch('bulk_email.tasks.get_connection', autospec=True)
......@@ -134,10 +137,11 @@ class TestEmailErrors(ModuleStoreTestCase):
exc = kwargs['exc']
self.assertTrue(type(exc) == SMTPConnectError)
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
def test_general_exception(self, mock_log, retry, result):
Tests the if the error is not SMTP-related, we log and reraise
......@@ -148,19 +152,29 @@ class TestEmailErrors(ModuleStoreTestCase):
'subject': 'test subject for myself',
'message': 'test message for myself'
# TODO: This whole test is flawed. Figure out how to make it work correctly,
# possibly moving it elsewhere. It's hitting the wrong exception.
# For some reason (probably the weirdness of testing with celery tasks) assertRaises doesn't work here
# so we assert on the arguments of log.exception
# TODO: This is way too fragile, because if any additional log statement is added anywhere in the flow,
# this test will break., test_email)
((log_str, email_id, to_list), _) = mock_log.exception.call_args
# ((log_str, email_id, to_list), _) = mock_log.exception.call_args
# instead, use call_args_list[-1] to get the last call?
self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
self.assertEqual(email_id, 1)
self.assertEqual(to_list, [])
# self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
# self.assertEqual(email_id, 1)
# self.assertEqual(to_list, [])
# @patch('bulk_email.tasks.delegate_email_batches.retry')
# TODO: cannot use the result method to determine if a result was generated,
# because we now call the particular method as part of all subtask calls.
# So use result.called_count to track this...
# self.assertFalse(result.called)
# call_args_list = result.call_args_list
num_calls = result.called_count
self.assertTrue(num_calls == 2)
def test_nonexist_email(self, mock_log, result):
......@@ -190,7 +190,6 @@ def submit_bulk_course_email(request, course_id, email_id):
# check arguments: make sure that the course is defined?
# TODO: what is the right test here?
# modulestore().get_instance(course_id, problem_url)
# This should also make sure that the email exists.
# We can also pull out the To argument here, so that is displayed in
......@@ -200,10 +199,10 @@ def submit_bulk_course_email(request, course_id, email_id):
task_type = 'bulk_course_email'
task_class = send_bulk_course_email
# TODO: figure out if we need to encode in a standard way, or if we can get away
# with doing this manually. Shouldn't be hard to make the encode call explicitly,
# and allow no problem_url or student to be defined. Like this:
# task_input, task_key = encode_problem_and_student_input()
# Pass in the to_option as a separate argument, even though it's (currently)
# in the CourseEmail. That way it's visible in the progress status.
# (At some point in the future, we might take the recipient out of the CourseEmail,
# so that the same saved email can be sent to different recipients, as it is tested.)
task_input = {'email_id': email_id, 'to_option': to_option}
task_key_stub = "{email_id}_{to_option}".format(email_id=email_id, to_option=to_option)
# create the key value by using MD5 hash:
......@@ -6,16 +6,21 @@ from xmodule.modulestore.exceptions import ItemNotFoundError
from courseware.tests.factories import UserFactory
from instructor_task.api import (get_running_instructor_tasks,
from bulk_email.models import CourseEmail, SEND_TO_ALL
from instructor_task.api import (
from instructor_task.api_helper import AlreadyRunningError
from instructor_task.models import InstructorTask, PROGRESS
from instructor_task.tests.test_base import (InstructorTaskTestCase,
......@@ -46,8 +51,8 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
self.assertEquals(set(task_ids), set(expected_ids))
class InstructorTaskSubmitTest(InstructorTaskModuleTestCase):
"""Tests API methods that involve the submission of background tasks."""
class InstructorTaskModuleSubmitTest(InstructorTaskModuleTestCase):
"""Tests API methods that involve the submission of module-based background tasks."""
def setUp(self):
......@@ -136,3 +141,28 @@ class InstructorTaskSubmitTest(InstructorTaskModuleTestCase):
def test_submit_delete_all(self):
class InstructorTaskCourseSubmitTest(InstructorTaskCourseTestCase):
"""Tests API methods that involve the submission of course-based background tasks."""
def setUp(self):
self.student = UserFactory.create(username="student", email="")
self.instructor = UserFactory.create(username="instructor", email="")
def _define_course_email(self):
course_email = CourseEmail.create(, self.instructor, SEND_TO_ALL, "Test Subject", "<p>This is a test message</p>")
def test_submit_bulk_email_all(self):
email_id = self._define_course_email()
instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor),, email_id)
# test resubmitting, by updating the existing record:
instructor_task = InstructorTask.objects.get(
instructor_task.task_state = PROGRESS
with self.assertRaises(AlreadyRunningError):
instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor),, email_id)
......@@ -96,10 +96,10 @@ class InstructorTaskTestCase(TestCase):
class InstructorTaskModuleTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
class InstructorTaskCourseTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
Base test class for InstructorTask-related tests that require
the setup of a course and problem in order to access StudentModule state.
the setup of a course.
course = None
current_user = None
......@@ -150,6 +150,31 @@ class InstructorTaskModuleTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase)
return self._create_user(username, is_staff=False)
def get_task_status(task_id):
"""Use api method to fetch task status, using mock request."""
mock_request = Mock()
mock_request.REQUEST = {'task_id': task_id}
response = instructor_task_status(mock_request)
status = json.loads(response.content)
return status
def create_task_request(self, requester_username):
"""Generate request that can be used for submitting tasks"""
request = Mock()
request.user = User.objects.get(username=requester_username)
request.get_host = Mock(return_value="testhost")
request.META = {'REMOTE_ADDR': '0:0:0:0', 'SERVER_NAME': 'testhost'}
request.is_secure = Mock(return_value=False)
return request
class InstructorTaskModuleTestCase(InstructorTaskCourseTestCase):
Base test class for InstructorTask-related tests that require
the setup of a course and problem in order to access StudentModule state.
def problem_location(problem_url_name):
Create an internal location for a test problem.
......@@ -192,21 +217,3 @@ class InstructorTaskModuleTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase)
def get_task_status(task_id):
"""Use api method to fetch task status, using mock request."""
mock_request = Mock()
mock_request.REQUEST = {'task_id': task_id}
response = instructor_task_status(mock_request)
status = json.loads(response.content)
return status
def create_task_request(self, requester_username):
"""Generate request that can be used for submitting tasks"""
request = Mock()
request.user = User.objects.get(username=requester_username)
request.get_host = Mock(return_value="testhost")
request.META = {'REMOTE_ADDR': '0:0:0:0', 'SERVER_NAME': 'testhost'}
request.is_secure = Mock(return_value=False)
return request
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment