Commit 0fd75181 by Brian Wilson

Update handling of bulk-email retries to update InstructorTask before each retry.

parent 7988b71b
......@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout
from instructor_task.subtasks import update_subtask_result
from instructor_task.subtasks import create_subtask_result
STAFF_COUNT = 3
STUDENT_COUNT = 10
......@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
"""
emails_sent = 0
def get_mock_update_subtask_result(self):
def get_mock_create_subtask_result(self):
"""Wrapper for mock email function."""
def mock_update_subtask_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
def mock_create_subtask_result(sent, failed, output, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return update_subtask_result(prev_results, sent, failed, output)
return mock_update_subtask_result
return create_subtask_result(sent, failed, output)
return mock_create_subtask_result
@override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE)
......@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
)
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
@patch('bulk_email.tasks.update_subtask_result')
@patch('bulk_email.tasks.create_subtask_result')
def test_chunked_queries_send_numerous_emails(self, email_mock):
"""
Test sending a large number of emails, to test the chunked querying
"""
mock_factory = MockCourseEmailResult()
email_mock.side_effect = mock_factory.get_mock_update_subtask_result()
email_mock.side_effect = mock_factory.get_mock_create_subtask_result()
added_users = []
for _ in xrange(LARGE_NUM_EMAILS):
user = UserFactory()
......
......@@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertTrue(type(exc) == SMTPDataError)
@patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.update_subtask_result')
@patch('bulk_email.tasks.create_subtask_result')
@patch('bulk_email.tasks.send_course_email.retry')
def test_data_err_fail(self, retry, result, get_conn):
"""
......@@ -91,10 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error
self.assertFalse(retry.called)
# Test that after the rejected email, the rest still successfully send
((_, sent, fail, optouts), _) = result.call_args
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, settings.EMAILS_PER_TASK / 4)
self.assertEquals(sent, 3 * settings.EMAILS_PER_TASK / 4)
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(fail, expectedNumFails)
self.assertEquals(sent, settings.EMAILS_PER_TASK - expectedNumFails)
@patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.send_course_email.retry')
......@@ -137,11 +138,10 @@ class TestEmailErrors(ModuleStoreTestCase):
exc = kwargs['exc']
self.assertTrue(type(exc) == SMTPConnectError)
@patch('bulk_email.tasks.update_subtask_result')
@patch('bulk_email.tasks.create_subtask_result')
@patch('bulk_email.tasks.send_course_email.retry')
@patch('bulk_email.tasks.log')
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
@skip
def test_general_exception(self, mock_log, retry, result):
"""
Tests the if the error is not SMTP-related, we log and reraise
......@@ -152,29 +152,23 @@ class TestEmailErrors(ModuleStoreTestCase):
'subject': 'test subject for myself',
'message': 'test message for myself'
}
# TODO: This whole test is flawed. Figure out how to make it work correctly,
# possibly moving it elsewhere. It's hitting the wrong exception.
# For some reason (probably the weirdness of testing with celery tasks) assertRaises doesn't work here
# so we assert on the arguments of log.exception
# TODO: This is way too fragile, because if any additional log statement is added anywhere in the flow,
# this test will break.
self.client.post(self.url, test_email)
# ((log_str, email_id, to_list), _) = mock_log.exception.call_args
# instead, use call_args_list[-1] to get the last call?
self.assertTrue(mock_log.exception.called)
# self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
# self.assertEqual(email_id, 1)
# self.assertEqual(to_list, [self.instructor.email])
((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args
self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
self.assertEqual(email_id, 1)
self.assertEqual(to_list, [self.instructor.email])
self.assertFalse(retry.called)
# TODO: cannot use the result method to determine if a result was generated,
# because we now call the particular method as part of all subtask calls.
# So use result.called_count to track this...
# self.assertFalse(result.called)
# call_args_list = result.call_args_list
num_calls = result.called_count
self.assertTrue(num_calls == 2)
@patch('bulk_email.tasks.update_subtask_result')
# check the results being returned
self.assertTrue(result.called)
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, 1) # just myself
self.assertEquals(sent, 0)
@patch('bulk_email.tasks.create_subtask_result')
@patch('bulk_email.tasks.log')
def test_nonexist_email(self, mock_log, result):
"""
......
......@@ -5,31 +5,22 @@ from time import time
import json
from celery.utils.log import get_task_logger
from celery.states import SUCCESS
from celery.states import SUCCESS, RETRY
from django.db import transaction
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
log = get_task_logger(__name__)
TASK_LOG = get_task_logger(__name__)
def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout):
def create_subtask_result(new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
# add in any previous results:
if previous_result is not None:
for keyname in current_result:
if keyname in previous_result:
current_result[keyname] += previous_result[keyname]
return current_result
def create_subtask_result():
return update_subtask_result(None, 0, 0, 0)
def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list):
"""
Store initial subtask information to InstructorTask object.
......@@ -61,7 +52,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
# Write out the subtasks information.
num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status}
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'retried': 0, 'status': subtask_status}
entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work:
......@@ -74,8 +65,8 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
"""
Update the status of the subtask in the parent InstructorTask object tracking its progress.
"""
log.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
try:
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
......@@ -85,9 +76,17 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
log.warning(msg)
TASK_LOG.warning(msg)
raise ValueError(msg)
subtask_status[current_task_id] = status
# Update status unless it has already been set. This can happen
# when a task is retried and running in eager mode -- the retries
# will be updating before the original call, and we don't want their
# ultimate status to be clobbered by the "earlier" updates. This
# should not be a problem in normal (non-eager) processing.
old_status = subtask_status[current_task_id]
if status != RETRY or old_status == QUEUING:
subtask_status[current_task_id] = status
# Update the parent task progress
task_progress = json.loads(entry.task_output)
......@@ -102,6 +101,8 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# entire subtask_status dict.
if status == SUCCESS:
subtask_dict['succeeded'] += 1
elif status == RETRY:
subtask_dict['retried'] += 1
else:
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
......@@ -111,15 +112,13 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
entry.subtasks = json.dumps(subtask_dict)
entry.task_output = InstructorTask.create_output_for_success(task_progress)
log.info("Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug once working
log.info("about to save....")
TASK_LOG.info("Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
TASK_LOG.debug("about to save....")
entry.save()
except:
log.exception("Unexpected error while updating InstructorTask.")
except Exception:
TASK_LOG.exception("Unexpected error while updating InstructorTask.")
transaction.rollback()
else:
# TODO: temporary -- switch to debug once working
log.info("about to commit....")
TASK_LOG.debug("about to commit....")
transaction.commit()
......@@ -131,12 +131,12 @@ class InstructorTaskCourseTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase)
def login_username(self, username):
"""Login the user, given the `username`."""
if self.current_user != username:
self.login(InstructorTaskModuleTestCase.get_user_email(username), "test")
self.login(InstructorTaskCourseTestCase.get_user_email(username), "test")
self.current_user = username
def _create_user(self, username, is_staff=False):
"""Creates a user and enrolls them in the test course."""
email = InstructorTaskModuleTestCase.get_user_email(username)
email = InstructorTaskCourseTestCase.get_user_email(username)
thisuser = UserFactory.create(username=username, email=email, is_staff=is_staff)
CourseEnrollmentFactory.create(user=thisuser, course_id=self.course.id)
return thisuser
......
......@@ -8,7 +8,6 @@ paths actually work.
import json
from uuid import uuid4
from unittest import skip
from functools import partial
from mock import Mock, MagicMock, patch
......@@ -24,7 +23,7 @@ from instructor_task.models import InstructorTask
from instructor_task.tests.test_base import InstructorTaskModuleTestCase
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state
from instructor_task.tasks_helper import UpdateProblemModuleStateError, run_main_task, perform_module_state_update, UPDATE_STATUS_SUCCEEDED
from instructor_task.tasks_helper import UpdateProblemModuleStateError
PROBLEM_URL_NAME = "test_urlname"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment