Commit 7b7afd47 by Brian Wilson

Incorporate changes in max_retry logic, adding subtask_status as bulk_email arg.

parent 08a08448
...@@ -36,8 +36,8 @@ from courseware.courses import get_course_by_id, course_image_url ...@@ -36,8 +36,8 @@ from courseware.courses import get_course_by_id, course_image_url
from instructor_task.models import InstructorTask from instructor_task.models import InstructorTask
from instructor_task.subtasks import ( from instructor_task.subtasks import (
update_subtask_status, update_subtask_status,
create_subtask_result, create_subtask_status,
increment_subtask_result, increment_subtask_status,
update_instructor_task_for_subtasks, update_instructor_task_for_subtasks,
) )
...@@ -54,7 +54,7 @@ QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, ) ...@@ -54,7 +54,7 @@ QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, )
# Errors that mail is being sent too quickly. When caught by a task, it # Errors that mail is being sent too quickly. When caught by a task, it
# triggers an exponential backoff and retry. Retries happen continuously until # triggers an exponential backoff and retry. Retries happen continuously until
# the email is sent. # the email is sent.
SENDING_RATE_ERRORS = (SESMaxSendingRateExceededError, ) INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, )
def _get_recipient_queryset(user_id, to_option, course_id, course_location): def _get_recipient_queryset(user_id, to_option, course_id, course_location):
...@@ -120,6 +120,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -120,6 +120,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# get inputs to use in this task from the entry: # get inputs to use in this task from the entry:
#task_id = entry.task_id #task_id = entry.task_id
user_id = entry.requester.id user_id = entry.requester.id
task_id = entry.task_id
# TODO: check this against argument passed in? # TODO: check this against argument passed in?
# course_id = entry.course_id # course_id = entry.course_id
...@@ -132,7 +133,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -132,7 +133,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# is submitted and reaches this point. It is possible to add retry behavior here, # is submitted and reaches this point. It is possible to add retry behavior here,
# to keep trying until the object is actually committed by the view function's return, # to keep trying until the object is actually committed by the view function's return,
# but it's cleaner to just expect to be done. # but it's cleaner to just expect to be done.
log.warning("Failed to get CourseEmail with id %s", email_id) log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
raise raise
to_option = email_obj.to_option to_option = email_obj.to_option
...@@ -144,26 +145,25 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -144,26 +145,25 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
try: try:
course = get_course_by_id(course_id, depth=1) course = get_course_by_id(course_id, depth=1)
except Http404 as exc: except Http404 as exc:
log.exception("get_course_by_id failed: %s", exc.args[0]) log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0])
raise Exception("get_course_by_id failed: " + exc.args[0]) raise Exception("get_course_by_id failed: " + exc.args[0])
global_email_context = _get_course_email_context(course) global_email_context = _get_course_email_context(course)
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location) recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
total_num_emails = recipient_qset.count() total_num_emails = recipient_qset.count()
log.info("Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s", log.info("Task %s: Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s",
total_num_emails, course_id, email_id, to_option) task_id, total_num_emails, course_id, email_id, to_option)
num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY))) num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1 last_pk = recipient_qset[0].pk - 1
num_workers = 0 num_emails_queued = 0
task_list = [] task_list = []
subtask_id_list = [] subtask_id_list = []
for _ in range(num_queries): for _ in range(num_queries):
# Note that if we were doing this for regrading we probably only need 'pk', and not # Note that if we were doing this for regrading we probably only need 'pk', and not
# either profile__name or email. That's because we'll have to do # either profile__name or email. That's because we'll have to do
# a lot more work in the individual regrade for each user, but using user_id as a key. # a lot more work in the individual regrade for each user, but using user_id as a key.
# TODO: figure out how to pass these values as an argument, when refactoring this code.
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk) recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk)
.values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY]) .values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk'] last_pk = recipient_sublist[-1]['pk']
...@@ -179,22 +179,32 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -179,22 +179,32 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_list = recipient_sublist[i * chunk:i * chunk + chunk] to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4()) subtask_id = str(uuid4())
subtask_id_list.append(subtask_id) subtask_id_list.append(subtask_id)
retry_progress = create_subtask_status()
task_list.append(send_course_email.subtask(( task_list.append(send_course_email.subtask((
entry_id, entry_id,
email_id, email_id,
to_list, to_list,
global_email_context, global_email_context,
retry_progress,
), ),
task_id=subtask_id, task_id=subtask_id,
routing_key=settings.HIGH_PRIORITY_QUEUE, routing_key=settings.HIGH_PRIORITY_QUEUE,
)) ))
num_workers += num_tasks_this_query num_emails_queued += num_emails_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(
task_id, num_emails_queued, total_num_emails
)
log.error(error_msg)
raise Exception(error_msg)
# Update the InstructorTask with information about the subtasks we've defined. # Update the InstructorTask with information about the subtasks we've defined.
progress = update_instructor_task_for_subtasks(entry, action_name, total_num_emails, subtask_id_list) progress = update_instructor_task_for_subtasks(entry, action_name, total_num_emails, subtask_id_list)
num_subtasks = len(subtask_id_list) num_subtasks = len(subtask_id_list)
log.info("Preparing to queue %d email tasks for course %s, email %s, to %s", log.info("Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
num_subtasks, course_id, email_id, to_option) num_subtasks, total_num_emails, course_id, email_id, to_option)
# now group the subtasks, and start them running: # now group the subtasks, and start them running:
task_group = group(task_list) task_group = group(task_list)
...@@ -202,9 +212,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -202,9 +212,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# We want to return progress here, as this is what will be stored in the # We want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value. # AsyncResult for the parent task as its return value.
# The AsyncResult will then be marked as SUCCEEDED, and have this return value as it's "result". # The AsyncResult will then be marked as SUCCEEDED, and have this return value as its "result".
# That's okay, for the InstructorTask will have the "real" status, and monitoring code # That's okay, for the InstructorTask will have the "real" status, and monitoring code
# will use that instead. # should be using that instead.
return progress return progress
...@@ -215,7 +225,7 @@ def _get_current_task(): ...@@ -215,7 +225,7 @@ def _get_current_task():
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102 @task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
def send_course_email(entry_id, email_id, to_list, global_email_context): def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
""" """
Sends an email to a list of recipients. Sends an email to a list of recipients.
...@@ -242,19 +252,20 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -242,19 +252,20 @@ def send_course_email(entry_id, email_id, to_list, global_email_context):
# Get information from current task's request: # Get information from current task's request:
current_task_id = _get_current_task().request.id current_task_id = _get_current_task().request.id
num_to_send = len(to_list) num_to_send = len(to_list)
log.info("Preparing to send %s emails as subtask %s for instructor task %d: request = %s", log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
num_to_send, current_task_id, entry_id, _get_current_task().request) email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
send_exception = None send_exception = None
course_email_result_value = None new_subtask_status = None
try: try:
course_title = global_email_context['course_title'] course_title = global_email_context['course_title']
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]): with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
course_email_result_value, send_exception = _send_course_email( new_subtask_status, send_exception = _send_course_email(
entry_id, entry_id,
email_id, email_id,
to_list, to_list,
global_email_context, global_email_context,
subtask_status,
) )
except Exception: except Exception:
# Unexpected exception. Try to write out the failure to the entry before failing # Unexpected exception. Try to write out the failure to the entry before failing
...@@ -264,28 +275,30 @@ def send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -264,28 +275,30 @@ def send_course_email(entry_id, email_id, to_list, global_email_context):
# We got here for really unexpected reasons. Since we don't know how far # We got here for really unexpected reasons. Since we don't know how far
# the task got in emailing, we count all recipients as having failed. # the task got in emailing, we count all recipients as having failed.
# It at least keeps the counts consistent. # It at least keeps the counts consistent.
course_email_result_value = create_subtask_result(0, num_to_send, 0) new_subtask_status = increment_subtask_status(subtask_status, failed=num_to_send, state=FAILURE)
update_subtask_status(entry_id, current_task_id, new_subtask_status)
raise send_exception
if send_exception is None: if send_exception is None:
# Update the InstructorTask object that is storing its progress. # Update the InstructorTask object that is storing its progress.
log.info("background task (%s) succeeded", current_task_id) log.info("background task (%s) succeeded", current_task_id)
update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value) update_subtask_status(entry_id, current_task_id, new_subtask_status)
elif isinstance(send_exception, RetryTaskError): elif isinstance(send_exception, RetryTaskError):
# If retrying, record the progress made before the retry condition # If retrying, record the progress made before the retry condition
# was encountered. Once the retry is running, it will be only processing # was encountered. Once the retry is running, it will be only processing
# what wasn't already accomplished. # what wasn't already accomplished.
log.warning("background task (%s) being retried", current_task_id) log.warning("background task (%s) being retried", current_task_id)
update_subtask_status(entry_id, current_task_id, RETRY, course_email_result_value) update_subtask_status(entry_id, current_task_id, new_subtask_status)
raise send_exception raise send_exception
else: else:
log.error("background task (%s) failed: %s", current_task_id, send_exception) log.error("background task (%s) failed: %s", current_task_id, send_exception)
update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value) update_subtask_status(entry_id, current_task_id, new_subtask_status)
raise send_exception raise send_exception
return course_email_result_value return new_subtask_status
def _send_course_email(entry_id, email_id, to_list, global_email_context): def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
""" """
Performs the email sending task. Performs the email sending task.
...@@ -312,6 +325,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -312,6 +325,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
'skipped': number of emails skipped (due to optout) 'skipped': number of emails skipped (due to optout)
'failed': number of emails not sent because of some failure 'failed': number of emails not sent because of some failure
The dict may also contain information about retries.
* Second value is an exception returned by the innards of the method, indicating a fatal error. * Second value is an exception returned by the innards of the method, indicating a fatal error.
In this case, the number of recipients that were not sent have already been added to the In this case, the number of recipients that were not sent have already been added to the
'failed' count above. 'failed' count above.
...@@ -319,6 +334,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -319,6 +334,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
# Get information from current task's request: # Get information from current task's request:
task_id = _get_current_task().request.id task_id = _get_current_task().request.id
retry_index = _get_current_task().request.retries retry_index = _get_current_task().request.retries
# If this is a second attempt, then throttle the speed at which mail is sent:
throttle = retry_index > 0 throttle = retry_index > 0
num_optout = 0 num_optout = 0
...@@ -400,6 +417,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -400,6 +417,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
log.info('Email with id %s sent to %s', email_id, email) log.info('Email with id %s sent to %s', email_id, email)
num_sent += 1 num_sent += 1
except SMTPDataError as exc: except SMTPDataError as exc:
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure. # According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
if exc.smtp_code >= 400 and exc.smtp_code < 500: if exc.smtp_code >= 400 and exc.smtp_code < 500:
...@@ -414,8 +432,15 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -414,8 +432,15 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
# Pop the user that was emailed off the end of the list: # Pop the user that was emailed off the end of the list:
to_list.pop() to_list.pop()
except SENDING_RATE_ERRORS as exc: except INFINITE_RETRY_ERRORS as exc:
subtask_progress = create_subtask_result(num_sent, num_error, num_optout) subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
retriedA=1,
state=RETRY
)
return _submit_for_retry( return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True
) )
...@@ -424,7 +449,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -424,7 +449,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
# Errors caught here cause the email to be retried. The entire task is actually retried # Errors caught here cause the email to be retried. The entire task is actually retried
# without popping the current recipient off of the existing list. # without popping the current recipient off of the existing list.
# Errors caught are those that indicate a temporary condition that might succeed on retry. # Errors caught are those that indicate a temporary condition that might succeed on retry.
subtask_progress = create_subtask_result(num_sent, num_error, num_optout) subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
retriedB=1,
state=RETRY
)
return _submit_for_retry( return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
) )
...@@ -444,10 +476,24 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context): ...@@ -444,10 +476,24 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context):
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s', log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
task_id, email_id, [i['email'] for i in to_list]) task_id, email_id, [i['email'] for i in to_list])
num_error += len(to_list) num_error += len(to_list)
return create_subtask_result(num_sent, num_error, num_optout), exc subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
state=FAILURE
)
return subtask_progress, exc
else: else:
# Successful completion is marked by an exception value of None: # Successful completion is marked by an exception value of None:
return create_subtask_result(num_sent, num_error, num_optout), None subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
state=SUCCESS
)
return subtask_progress, None
finally: finally:
# clean up at the end # clean up at the end
connection.close() connection.close()
...@@ -476,16 +522,24 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current ...@@ -476,16 +522,24 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
task_id = _get_current_task().request.id task_id = _get_current_task().request.id
retry_index = _get_current_task().request.retries retry_index = _get_current_task().request.retries
log.warning('Task %s: email with id %d not delivered due to temporary error %s, retrying send to %d recipients', log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
task_id, email_id, current_exception, len(to_list)) current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped'])
# Don't resend emails that have already succeeded.
# Retry the email at increasing exponential backoff.
# Calculate time until we retry this task (in seconds):
if is_sending_rate_error: if is_sending_rate_error:
countdown = ((2 ** retry_index) * 15) * random.uniform(.5, 1.5) exp = min(retry_index, 5)
countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25)
exception_type = 'sending-rate'
else: else:
countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5) countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5)
exception_type = 'transient'
# max_retries is increased by the number of times an "infinite-retry" exception
# has been retried. We want the regular retries to trigger a retry, but not these
# special retries. So we count them separately.
max_retries = _get_current_task().max_retries + subtask_progress['retriedA']
log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)',
task_id, email_id, exception_type, current_exception, len(to_list), max_retries)
try: try:
send_course_email.retry( send_course_email.retry(
...@@ -494,9 +548,11 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current ...@@ -494,9 +548,11 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
email_id, email_id,
to_list, to_list,
global_email_context, global_email_context,
subtask_progress,
], ],
exc=current_exception, exc=current_exception,
countdown=countdown, countdown=countdown,
max_retries=max_retries,
throw=True, throw=True,
) )
except RetryTaskError as retry_error: except RetryTaskError as retry_error:
...@@ -513,7 +569,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current ...@@ -513,7 +569,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s', log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
task_id, email_id, [i['email'] for i in to_list]) task_id, email_id, [i['email'] for i in to_list])
num_failed = len(to_list) num_failed = len(to_list)
new_subtask_progress = increment_subtask_result(subtask_progress, 0, num_failed, 0) new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE)
return new_subtask_progress, retry_exc return new_subtask_progress, retry_exc
......
...@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF ...@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout from bulk_email.models import Optout
from instructor_task.subtasks import create_subtask_result from instructor_task.subtasks import increment_subtask_status
STAFF_COUNT = 3 STAFF_COUNT = 3
STUDENT_COUNT = 10 STUDENT_COUNT = 10
...@@ -29,13 +29,13 @@ class MockCourseEmailResult(object): ...@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
""" """
emails_sent = 0 emails_sent = 0
def get_mock_create_subtask_result(self): def get_mock_increment_subtask_status(self):
"""Wrapper for mock email function.""" """Wrapper for mock email function."""
def mock_create_subtask_result(sent, failed, output, **kwargs): # pylint: disable=W0613 def mock_increment_subtask_status(original_status, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent.""" """Increments count of number of emails sent."""
self.emails_sent += sent self.emails_sent += kwargs['succeeded']
return create_subtask_result(sent, failed, output) return increment_subtask_status(original_status, **kwargs)
return mock_create_subtask_result return mock_increment_subtask_status
@override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE) @override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE)
...@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase): ...@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
) )
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7) @override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
@patch('bulk_email.tasks.create_subtask_result') @patch('bulk_email.tasks.increment_subtask_status')
def test_chunked_queries_send_numerous_emails(self, email_mock): def test_chunked_queries_send_numerous_emails(self, email_mock):
""" """
Test sending a large number of emails, to test the chunked querying Test sending a large number of emails, to test the chunked querying
""" """
mock_factory = MockCourseEmailResult() mock_factory = MockCourseEmailResult()
email_mock.side_effect = mock_factory.get_mock_create_subtask_result() email_mock.side_effect = mock_factory.get_mock_increment_subtask_status()
added_users = [] added_users = []
for _ in xrange(LARGE_NUM_EMAILS): for _ in xrange(LARGE_NUM_EMAILS):
user = UserFactory() user = UserFactory()
......
...@@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -67,7 +67,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertTrue(type(exc) == SMTPDataError) self.assertTrue(type(exc) == SMTPDataError)
@patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.create_subtask_result') @patch('bulk_email.tasks.increment_subtask_status')
@patch('bulk_email.tasks.send_course_email.retry') @patch('bulk_email.tasks.send_course_email.retry')
def test_data_err_fail(self, retry, result, get_conn): def test_data_err_fail(self, retry, result, get_conn):
""" """
...@@ -91,11 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -91,11 +91,11 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error # We shouldn't retry when hitting a 5xx error
self.assertFalse(retry.called) self.assertFalse(retry.called)
# Test that after the rejected email, the rest still successfully send # Test that after the rejected email, the rest still successfully send
((sent, fail, optouts), _) = result.call_args ((_initial_results), kwargs) = result.call_args
self.assertEquals(optouts, 0) self.assertEquals(kwargs['skipped'], 0)
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0) expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(fail, expectedNumFails) self.assertEquals(kwargs['failed'], expectedNumFails)
self.assertEquals(sent, settings.EMAILS_PER_TASK - expectedNumFails) self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails)
@patch('bulk_email.tasks.get_connection', autospec=True) @patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.send_course_email.retry') @patch('bulk_email.tasks.send_course_email.retry')
...@@ -138,7 +138,7 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -138,7 +138,7 @@ class TestEmailErrors(ModuleStoreTestCase):
exc = kwargs['exc'] exc = kwargs['exc']
self.assertTrue(type(exc) == SMTPConnectError) self.assertTrue(type(exc) == SMTPConnectError)
@patch('bulk_email.tasks.create_subtask_result') @patch('bulk_email.tasks.increment_subtask_status')
@patch('bulk_email.tasks.send_course_email.retry') @patch('bulk_email.tasks.send_course_email.retry')
@patch('bulk_email.tasks.log') @patch('bulk_email.tasks.log')
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException)) @patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
...@@ -163,12 +163,13 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -163,12 +163,13 @@ class TestEmailErrors(ModuleStoreTestCase):
self.assertFalse(retry.called) self.assertFalse(retry.called)
# check the results being returned # check the results being returned
self.assertTrue(result.called) self.assertTrue(result.called)
((sent, fail, optouts), _) = result.call_args ((initial_results, ), kwargs) = result.call_args
self.assertEquals(optouts, 0) self.assertEquals(initial_results['skipped'], 0)
self.assertEquals(fail, 1) # just myself self.assertEquals(initial_results['failed'], 0)
self.assertEquals(sent, 0) self.assertEquals(initial_results['succeeded'], 0)
self.assertEquals(kwargs['failed'], 1)
@patch('bulk_email.tasks.create_subtask_result') @patch('bulk_email.tasks.increment_subtask_status')
@patch('bulk_email.tasks.log') @patch('bulk_email.tasks.log')
def test_nonexist_email(self, mock_log, result): def test_nonexist_email(self, mock_log, result):
""" """
...@@ -180,7 +181,7 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -180,7 +181,7 @@ class TestEmailErrors(ModuleStoreTestCase):
task_input = {"email_id": -1} task_input = {"email_id": -1}
with self.assertRaises(CourseEmail.DoesNotExist): with self.assertRaises(CourseEmail.DoesNotExist):
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
((log_str, email_id), _) = mock_log.warning.call_args ((log_str, _, email_id), _) = mock_log.warning.call_args
self.assertTrue(mock_log.warning.called) self.assertTrue(mock_log.warning.called)
self.assertIn('Failed to get CourseEmail with id', log_str) self.assertIn('Failed to get CourseEmail with id', log_str)
self.assertEqual(email_id, -1) self.assertEqual(email_id, -1)
...@@ -198,7 +199,7 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -198,7 +199,7 @@ class TestEmailErrors(ModuleStoreTestCase):
task_input = {"email_id": email.id} task_input = {"email_id": email.id}
with self.assertRaises(Exception): with self.assertRaises(Exception):
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
((log_str, _), _) = mock_log.exception.call_args ((log_str, _, _), _) = mock_log.exception.call_args
self.assertTrue(mock_log.exception.called) self.assertTrue(mock_log.exception.called)
self.assertIn('get_course_by_id failed:', log_str) self.assertIn('get_course_by_id failed:', log_str)
......
...@@ -5,7 +5,7 @@ from time import time ...@@ -5,7 +5,7 @@ from time import time
import json import json
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.states import SUCCESS, RETRY from celery.states import SUCCESS, RETRY, READY_STATES
from django.db import transaction from django.db import transaction
...@@ -14,29 +14,51 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING ...@@ -14,29 +14,51 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG = get_task_logger(__name__) TASK_LOG = get_task_logger(__name__)
def create_subtask_result(num_sent, num_error, num_optout): def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
""" """
Create a result of a subtask. Create a dict for tracking the status of a subtask.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
TODO: update
Object must be JSON-serializable, so that it can be passed as an argument
to tasks.
Object must be JSON-serializable. TODO: decide if in future we want to include specific error information
indicating the reason for failure.
Also, we should count up "not attempted" separately from
attempted/failed.
""" """
attempted = num_sent + num_error attempted = succeeded + failed
current_result = {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error} current_result = {
'attempted': attempted,
'succeeded': succeeded,
'pending': pending,
'skipped': skipped,
'failed': failed,
'retriedA': retriedA,
'retriedB': retriedB,
'state': state if state is not None else QUEUING,
}
return current_result return current_result
def increment_subtask_result(subtask_result, new_num_sent, new_num_error, new_num_optout): def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
""" """
Update the result of a subtask with additional results. Update the result of a subtask with additional results.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed'. Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
""" """
new_result = create_subtask_result(new_num_sent, new_num_error, new_num_optout) # TODO: rewrite this if we have additional fields added to original subtask_result,
# that are not part of the increment. Tradeoff on duplicating the 'attempts' logic.
new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state)
for keyname in new_result: for keyname in new_result:
if keyname in subtask_result: if keyname == 'state':
# does not get incremented. If no new value, copy old value:
if state is None:
new_result[keyname] = subtask_result[keyname]
elif keyname in subtask_result:
new_result[keyname] += subtask_result[keyname] new_result[keyname] += subtask_result[keyname]
return new_result return new_result
...@@ -49,7 +71,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i ...@@ -49,7 +71,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations, as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,
and the total number of "things to do" is set, so the user can be told how much needs to be and the total number of "things to do" is set, so the user can be told how much needs to be
done overall. The `action_name` is also stored, to also help with constructing more readable done overall. The `action_name` is also stored, to also help with constructing more readable
progress messages. task_progress messages.
The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict. The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
...@@ -70,7 +92,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i ...@@ -70,7 +92,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
rely on the status stored in the InstructorTask object, rather than status stored in the rely on the status stored in the InstructorTask object, rather than status stored in the
corresponding AsyncResult. corresponding AsyncResult.
""" """
progress = { # TODO: also add 'pending' count here? (Even though it's total-attempted-skipped
task_progress = {
'action_name': action_name, 'action_name': action_name,
'attempted': 0, 'attempted': 0,
'failed': 0, 'failed': 0,
...@@ -80,22 +103,33 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i ...@@ -80,22 +103,33 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
'duration_ms': int(0), 'duration_ms': int(0),
'start_time': time() 'start_time': time()
} }
entry.task_output = InstructorTask.create_output_for_success(progress) entry.task_output = InstructorTask.create_output_for_success(task_progress)
entry.task_state = PROGRESS entry.task_state = PROGRESS
# Write out the subtasks information. # Write out the subtasks information.
num_subtasks = len(subtask_id_list) num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING) # using fromkeys to initialize uses a single value. we need the value
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'retried': 0, 'status': subtask_status} # to be distinct, since it's now a dict:
# subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
# TODO: may not be necessary to store initial value with all those zeroes!
# Instead, use a placemarker....
subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list}
subtask_dict = {
'total': num_subtasks,
'succeeded': 0,
'failed': 0,
'retried': 0,
'status': subtask_status
}
entry.subtasks = json.dumps(subtask_dict) entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work: # and save the entry immediately, before any subtasks actually start work:
entry.save_now() entry.save_now()
return progress return task_progress
@transaction.commit_manually @transaction.commit_manually
def update_subtask_status(entry_id, current_task_id, status, subtask_result): def update_subtask_status(entry_id, current_task_id, subtask_status):
""" """
Update the status of the subtask in the parent InstructorTask object tracking its progress. Update the status of the subtask in the parent InstructorTask object tracking its progress.
...@@ -104,7 +138,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): ...@@ -104,7 +138,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
committed on completion, or rolled back on error. committed on completion, or rolled back on error.
The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict. The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict.
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_result` Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress`
into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms' into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms'
value with the current interval since the original InstructorTask started. value with the current interval since the original InstructorTask started.
...@@ -121,7 +155,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): ...@@ -121,7 +155,7 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
messages, progress made, etc. messages, progress made, etc.
""" """
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s", TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result) current_task_id, entry_id, subtask_status)
try: try:
entry = InstructorTask.objects.select_for_update().get(pk=entry_id) entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
...@@ -140,28 +174,38 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result): ...@@ -140,28 +174,38 @@ def update_subtask_status(entry_id, current_task_id, status, subtask_result):
# ultimate status to be clobbered by the "earlier" updates. This # ultimate status to be clobbered by the "earlier" updates. This
# should not be a problem in normal (non-eager) processing. # should not be a problem in normal (non-eager) processing.
old_status = subtask_status[current_task_id] old_status = subtask_status[current_task_id]
if status != RETRY or old_status == QUEUING: # TODO: check this logic...
subtask_status[current_task_id] = status state = subtask_status['state']
# if state != RETRY or old_status['status'] == QUEUING:
# instead replace the status only if it's 'newer'
# i.e. has fewer pending
if subtask_status['pending'] <= old_status['pending']:
subtask_status[current_task_id] = subtask_status
# Update the parent task progress # Update the parent task progress
task_progress = json.loads(entry.task_output) task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time'] start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000) task_progress['duration_ms'] = int((time() - start_time) * 1000)
if subtask_result is not None: # change behavior so we don't update on progress now:
# TODO: figure out if we can make this more responsive later,
# by figuring out how to handle retries better.
if subtask_status is not None and state in READY_STATES:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']: for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_result[statname] task_progress[statname] += subtask_status[statname]
# Figure out if we're actually done (i.e. this is the last task to complete). # Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the # This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict. # entire subtask_status dict.
if status == SUCCESS: if state == SUCCESS:
subtask_dict['succeeded'] += 1 subtask_dict['succeeded'] += 1
elif status == RETRY: elif state == RETRY:
subtask_dict['retried'] += 1 subtask_dict['retried'] += 1
else: else:
subtask_dict['failed'] += 1 subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed'] num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that: # If we're done with the last task, update the parent status to indicate that:
# TODO: see if there was a catastrophic failure that occurred, and figure out
# how to report that here.
if num_remaining <= 0: if num_remaining <= 0:
entry.task_state = SUCCESS entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict) entry.subtasks = json.dumps(subtask_dict)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment