Commit 32c84624 by Brian Wilson

Incorporate changes in max_retry logic, adding subtask_status as bulk_email arg.

parent 853cd874
......@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout
from instructor_task.subtasks import create_subtask_result
from instructor_task.subtasks import increment_subtask_status
......@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
emails_sent = 0
def get_mock_create_subtask_result(self):
def get_mock_increment_subtask_status(self):
"""Wrapper for mock email function."""
def mock_create_subtask_result(sent, failed, output, **kwargs): # pylint: disable=W0613
def mock_increment_subtask_status(original_status, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return create_subtask_result(sent, failed, output)
return mock_create_subtask_result
self.emails_sent += kwargs['succeeded']
return increment_subtask_status(original_status, **kwargs)
return mock_increment_subtask_status
......@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
def test_chunked_queries_send_numerous_emails(self, email_mock):
Test sending a large number of emails, to test the chunked querying
mock_factory = MockCourseEmailResult()
email_mock.side_effect = mock_factory.get_mock_create_subtask_result()
email_mock.side_effect = mock_factory.get_mock_increment_subtask_status()
added_users = []
for _ in xrange(LARGE_NUM_EMAILS):
user = UserFactory()
......@@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertIsInstance(exc, SMTPDataError)
@patch('bulk_email.tasks.get_connection', autospec=True)
def test_data_err_fail(self, retry, result, get_conn):
......@@ -91,11 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error
# Test that after the rejected email, the rest still successfully send
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
((_initial_results), kwargs) = result.call_args
self.assertEquals(kwargs['skipped'], 0)
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(fail, expectedNumFails)
self.assertEquals(sent, settings.EMAILS_PER_TASK - expectedNumFails)
self.assertEquals(kwargs['failed'], expectedNumFails)
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails)
@patch('bulk_email.tasks.get_connection', autospec=True)
......@@ -138,7 +138,7 @@ class TestEmailErrors(ModuleStoreTestCase):
exc = kwargs['exc']
self.assertIsInstance(exc, SMTPConnectError)
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
......@@ -163,12 +163,13 @@ class TestEmailErrors(ModuleStoreTestCase):
# check the results being returned
((sent, fail, optouts), _) = result.call_args
self.assertEquals(optouts, 0)
self.assertEquals(fail, 1) # just myself
self.assertEquals(sent, 0)
((initial_results, ), kwargs) = result.call_args
self.assertEquals(initial_results['skipped'], 0)
self.assertEquals(initial_results['failed'], 0)
self.assertEquals(initial_results['succeeded'], 0)
self.assertEquals(kwargs['failed'], 1)
def test_nonexist_email(self, mock_log, result):
......@@ -180,7 +181,7 @@ class TestEmailErrors(ModuleStoreTestCase):
task_input = {"email_id": -1}
with self.assertRaises(CourseEmail.DoesNotExist):
perform_delegate_email_batches(, course_id, task_input, "action_name")
((log_str, email_id), _) = mock_log.warning.call_args
((log_str, _, email_id), _) = mock_log.warning.call_args
self.assertIn('Failed to get CourseEmail with id', log_str)
self.assertEqual(email_id, -1)
......@@ -198,7 +199,7 @@ class TestEmailErrors(ModuleStoreTestCase):
task_input = {"email_id":}
with self.assertRaises(Exception):
perform_delegate_email_batches(, course_id, task_input, "action_name")
((log_str, _), _) = mock_log.exception.call_args
((log_str, _, _), _) = mock_log.exception.call_args
self.assertIn('get_course_by_id failed:', log_str)
......@@ -5,7 +5,7 @@ from time import time
import json
from celery.utils.log import get_task_logger
from celery.states import SUCCESS, RETRY
from celery.states import SUCCESS, RETRY, READY_STATES
from django.db import transaction
......@@ -14,29 +14,51 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG = get_task_logger(__name__)
def create_subtask_result(num_sent, num_error, num_optout):
def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
Create a result of a subtask.
Create a dict for tracking the status of a subtask.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
TODO: update
Object must be JSON-serializable, so that it can be passed as an argument
to tasks.
Object must be JSON-serializable.
TODO: decide if in future we want to include specific error information
indicating the reason for failure.
Also, we should count up "not attempted" separately from
attempted = num_sent + num_error
current_result = {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error}
attempted = succeeded + failed
current_result = {
'attempted': attempted,
'succeeded': succeeded,
'pending': pending,
'skipped': skipped,
'failed': failed,
'retriedA': retriedA,
'retriedB': retriedB,
'state': state if state is not None else QUEUING,
return current_result
def increment_subtask_result(subtask_result, new_num_sent, new_num_error, new_num_optout):
def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
Update the result of a subtask with additional results.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
new_result = create_subtask_result(new_num_sent, new_num_error, new_num_optout)
# TODO: rewrite this if we have additional fields added to original subtask_result,
# that are not part of the increment. Tradeoff on duplicating the 'attempts' logic.
new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state)
for keyname in new_result:
if keyname in subtask_result:
if keyname == 'state':
# does not get incremented. If no new value, copy old value:
if state is None:
new_result[keyname] = subtask_result[keyname]
elif keyname in subtask_result:
new_result[keyname] += subtask_result[keyname]
return new_result
......@@ -49,7 +71,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,
and the total number of "things to do" is set, so the user can be told how much needs to be
done overall. The `action_name` is also stored, to also help with constructing more readable
progress messages.
task_progress messages.
The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
......@@ -70,7 +92,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
rely on the status stored in the InstructorTask object, rather than status stored in the
corresponding AsyncResult.
progress = {
# TODO: also add 'pending' count here? (Even though it's total-attempted-skipped
task_progress = {
'action_name': action_name,
'attempted': 0,
'failed': 0,
......@@ -80,22 +103,33 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
'duration_ms': int(0),
'start_time': time()
entry.task_output = InstructorTask.create_output_for_success(progress)
entry.task_output = InstructorTask.create_output_for_success(task_progress)
entry.task_state = PROGRESS
# Write out the subtasks information.
num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'retried': 0, 'status': subtask_status}
# using fromkeys to initialize uses a single value. we need the value
# to be distinct, since it's now a dict:
# subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
# TODO: may not be necessary to store initial value with all those zeroes!
# Instead, use a placemarker....
subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list}
subtask_dict = {
'total': num_subtasks,
'succeeded': 0,
'failed': 0,
'retried': 0,
'status': subtask_status
entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work:
return progress
return task_progress
def update_subtask_status(entry_id, current_task_id, status, subtask_result):
def update_subtask_status(entry_id, current_task_id, subtask_status):
Update the status of the subtask in the parent InstructorTask object tracking its progress.
......@@ -104,7 +138,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
committed on completion, or rolled back on error.
The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict.
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_result`
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress`
into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms'
value with the current interval since the original InstructorTask started.
......@@ -121,7 +155,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
messages, progress made, etc.
""""Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
current_task_id, entry_id, subtask_status)
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
......@@ -140,28 +174,38 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# ultimate status to be clobbered by the "earlier" updates. This
# should not be a problem in normal (non-eager) processing.
old_status = subtask_status[current_task_id]
if status != RETRY or old_status == QUEUING:
subtask_status[current_task_id] = status
# TODO: check this logic...
state = subtask_status['state']
# if state != RETRY or old_status['status'] == QUEUING:
# instead replace the status only if it's 'newer'
# i.e. has fewer pending
if subtask_status['pending'] <= old_status['pending']:
subtask_status[current_task_id] = subtask_status
# Update the parent task progress
task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000)
if subtask_result is not None:
# change behavior so we don't update on progress now:
# TODO: figure out if we can make this more responsive later,
# by figuring out how to handle retries better.
if subtask_status is not None and state in READY_STATES:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_result[statname]
task_progress[statname] += subtask_status[statname]
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict.
if status == SUCCESS:
if state == SUCCESS:
subtask_dict['succeeded'] += 1
elif status == RETRY:
elif state == RETRY:
subtask_dict['retried'] += 1
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that:
# TODO: see if there was a catastrophic failure that occurred, and figure out
# how to report that here.
if num_remaining <= 0:
entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment