Commit 2f4774f4 by Brian Wilson

Pass status into course_email for tracking retry status.

parent ffbb228a
......@@ -52,8 +52,10 @@ def get_recipient_queryset(user_id, to_option, course_id, course_location):
instructor_qset = instructor_group.user_set.all()
recipient_qset = staff_qset | instructor_qset
if to_option == SEND_TO_ALL:
enrollment_qset = User.objects.filter(courseenrollment__course_id=course_id,
courseenrollment__is_active=True)
enrollment_qset = User.objects.filter(
courseenrollment__course_id=course_id,
courseenrollment__is_active=True
)
recipient_qset = recipient_qset | enrollment_qset
recipient_qset = recipient_qset.distinct()
else:
......@@ -164,12 +166,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
subtask_progress = _course_email_result(None, 0, 0, 0)
task_list.append(send_course_email.subtask((
entry_id,
email_id,
to_list,
global_email_context,
False
subtask_progress,
), task_id=subtask_id
))
num_workers += num_tasks_this_query
......@@ -206,6 +209,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
return progress
# TODO: figure out if we really need this after all (for unit tests...)
def _get_current_task():
"""Stub to make it easier to test without actually running Celery"""
return current_task
......@@ -224,47 +228,51 @@ def _update_subtask_status(entry_id, current_task_id, status, subtask_result):
subtask_dict = json.loads(entry.subtasks)
subtask_status = subtask_dict['status']
if current_task_id not in subtask_status:
# unexpected error -- raise an exception?
log.warning("Unexpected task_id '%s': unable to update status for email subtask of instructor task %d",
current_task_id, entry_id)
pass
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
log.warning(msg)
raise ValueError(msg)
subtask_status[current_task_id] = status
# now update the parent task progress
# Update the parent task progress
task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000)
if subtask_result is not None:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_result[statname]
# now figure out if we're actually done (i.e. this is the last task to complete)
# (This might be easier by just maintaining a counter, rather than scanning the
# entire subtask_status dict.)
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict.
if status == SUCCESS:
subtask_dict['succeeded'] += 1
else:
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that:
if num_remaining <= 0:
# we're done with the last task: update the parent status to indicate that:
entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict)
entry.task_output = InstructorTask.create_output_for_success(task_progress)
log.info("Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug
log.info("about to save....")
entry.save()
except:
log.exception("Unexpected error while updating InstructorTask.")
transaction.rollback()
else:
# TODO: temporary -- switch to debug
log.info("about to commit....")
transaction.commit()
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
def send_course_email(entry_id, email_id, to_list, global_email_context, throttle=False):
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress):
"""
Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are
'profile__name', 'email' (address), and 'pk' (in the user table).
......@@ -276,49 +284,64 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, throttl
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
# with it right away.
InstructorTask.objects.get(pk=entry_id)
# Get information from current task's request:
current_task_id = _get_current_task().request.id
retry_index = _get_current_task().request.retries
log.info("Preparing to send email as subtask %s for instructor task %d",
current_task_id, entry_id)
try:
course_title = global_email_context['course_title']
course_email_result_value = None
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
course_email_result = _send_course_email(email_id, to_list, global_email_context, throttle)
course_email_result_value = _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index)
# Assume that if we get here without a raise, the task was successful.
# Update the InstructorTask object that is storing its progress.
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result)
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
except Exception:
# try to write out the failure to the entry before failing
_, exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
log.warning("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
_update_subtask_status(entry_id, current_task_id, FAILURE, None)
_update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
raise
return course_email_result
return course_email_result_value
def _send_course_email(email_id, to_list, global_email_context, throttle):
def _send_course_email(email_id, to_list, global_email_context, subtask_progress, retry_index):
"""
Performs the email sending task.
"""
throttle = retry_index > 0
try:
course_email = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist:
log.exception("Could not find email id:{} to send.".format(email_id))
raise
# exclude optouts
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
.values_list('user__email', flat=True))
optouts = set(optouts)
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
# exclude optouts (if not a retry):
# Note that we don't have to do the optout logic at all if this is a retry,
# because we have presumably already performed the optout logic on the first
# attempt. Anyone on the to_list on a retry has already passed the filter
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
num_optout = 0
if retry_index == 0:
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
.values_list('user__email', flat=True))
optouts = set(optouts)
# Only count the num_optout for the first time the optouts are calculated.
# We assume that the number will not change on retries, and so we don't need
# to calculate it each time.
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
course_title = global_email_context['course_title']
subject = "[" + course_title + "] " + course_email.subject
......@@ -327,11 +350,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
course_email_template = CourseEmailTemplate.get_template()
num_sent = 0
num_error = 0
try:
connection = get_connection()
connection.open()
num_sent = 0
num_error = 0
# Define context values to use in all course emails:
email_context = {
......@@ -361,7 +384,7 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
email_msg.attach_alternative(html_msg, 'text/html')
# Throttle if we tried a few times and got the rate limiter
if throttle or current_task.request.retries > 0:
if throttle:
sleep(0.2)
try:
......@@ -389,14 +412,11 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
to_list.pop()
connection.close()
# TODO: figure out how to get (or persist) real statistics for this task, so that reflects progress
# made over multiple retries.
return course_email_result(num_sent, num_error, num_optout)
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
# Error caught here cause the email to be retried. The entire task is actually retried without popping the list
# Reasoning is that all of these errors may be temporary condition.
# TODO: figure out what this means. Presumably we have popped the list with those that have succeeded
# and failed, rather than those needing a later retry.
log.warning('Email with id %d not delivered due to temporary error %s, retrying send to %d recipients',
email_id, exc, len(to_list))
raise send_course_email.retry(
......@@ -404,10 +424,10 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
email_id,
to_list,
global_email_context,
current_task.request.retries > 0
_course_email_result(subtask_progress, num_sent, num_error, num_optout),
],
exc=exc,
countdown=(2 ** current_task.request.retries) * 15
countdown=(2 ** retry_index) * 15
)
except:
log.exception('Email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
......@@ -416,12 +436,22 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
# Close the connection before we exit
connection.close()
raise
else:
connection.close()
# Add current progress to any progress stemming from previous retries:
return _course_email_result(subtask_progress, num_sent, num_error, num_optout)
def course_email_result(num_sent, num_error, num_optout):
def _course_email_result(previous_result, new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = num_sent + num_error
return {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error}
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
# add in any previous results:
if previous_result is not None:
for keyname in current_result:
if keyname in previous_result:
current_result[keyname] += previous_result[keyname]
return current_result
def _statsd_tag(course_title):
......
......@@ -34,7 +34,7 @@ class MockCourseEmailResult(object):
def get_mock_course_email_result(self):
"""Wrapper for mock email function."""
def mock_course_email_result(sent, failed, output, **kwargs): # pylint: disable=W0613
def mock_course_email_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return True
......@@ -247,7 +247,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
)
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
@patch('bulk_email.tasks.course_email_result')
@patch('bulk_email.tasks._course_email_result')
def test_chunked_queries_send_numerous_emails(self, email_mock):
"""
Test sending a large number of emails, to test the chunked querying
......@@ -304,4 +304,3 @@ class TestEmailSendExceptions(ModuleStoreTestCase):
entry = InstructorTaskFactory.create(task_key='', task_id='dummy')
with self.assertRaises(CourseEmail.DoesNotExist):
send_course_email(entry.id, 101, [], {'course_title': 'Test'}, False)
......@@ -271,4 +271,4 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key)
task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)]
task_class.apply_async(task_args, task_id=task_id)
return instructor_task
\ No newline at end of file
return instructor_task
......@@ -23,7 +23,6 @@ from celery import task
from functools import partial
from instructor_task.tasks_helper import (run_main_task,
perform_module_state_update,
# perform_delegate_email_batches,
rescore_problem_module_state,
reset_attempts_module_state,
delete_problem_module_state,
......@@ -52,7 +51,10 @@ def rescore_problem(entry_id, xmodule_instance_args):
"""
action_name = 'rescored'
update_fcn = partial(rescore_problem_module_state, xmodule_instance_args)
filter_fcn = lambda(modules_to_update): modules_to_update.filter(state__contains='"done": true')
def filter_fcn(modules_to_update):
return modules_to_update.filter(state__contains='"done": true')
visit_fcn = partial(perform_module_state_update, update_fcn, filter_fcn)
return run_main_task(entry_id, visit_fcn, action_name)
......
......@@ -52,7 +52,7 @@ def _get_current_task():
return current_task
def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, task_input, action_name):
def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name):
"""
Performs generic update by visiting StudentModule instances with the update_fcn provided.
......@@ -76,7 +76,7 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas
'succeeded': number of attempts that "succeeded"
'skipped': number of attempts that "skipped"
'failed': number of attempts that "failed"
'total': number of possible subtasks to attempt
'total': number of possible updates to attempt
'action_name': user-visible verb to use in status messages. Should be past-tense.
Pass-through of input `action_name`.
'duration_ms': how long the task has (or had) been running.
......
......@@ -23,7 +23,7 @@ from instructor_task.models import InstructorTask
from instructor_task.tests.test_base import InstructorTaskModuleTestCase
from instructor_task.tests.factories import InstructorTaskFactory
from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state
from instructor_task.tasks_helper import UpdateProblemModuleStateError #, update_problem_module_state
from instructor_task.tasks_helper import UpdateProblemModuleStateError
PROBLEM_URL_NAME = "test_urlname"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment