Commit 506f91a9 by Brian Wilson

Use separate retry count for calculating retry delay.

parent 32c84624
......@@ -12,8 +12,14 @@ from sys import exc_info
from traceback import format_exc
from dogapi import dog_stats_api
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError
from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException
from boto.ses.exceptions import (
SESDailyQuotaExceededError,
SESMaxSendingRateExceededError,
SESAddressBlacklistedError,
SESIllegalAddressError,
SESLocalAddressCharacterError,
)
from boto.exception import AWSConnectionError
from celery import task, current_task, group
......@@ -44,18 +50,25 @@ from instructor_task.subtasks import (
log = get_task_logger(__name__)
# Exceptions that, if caught, should cause the task to be re-tried.
# These errors will be caught a maximum of 5 times before the task fails.
RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError)
# Errors that an individual email is failing to be sent, and should just
# be treated as a fail.
SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressError, SESLocalAddressCharacterError)
# Errors that involve exceeding a quota of sent email
QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, )
# Exceptions that, if caught, should cause the task to be re-tried.
# These errors will be caught a limited number of times before the task fails.
LIMITED_RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError)
# Errors that mail is being sent too quickly. When caught by a task, it
# triggers an exponential backoff and retry. Retries happen continuously until
# the email is sent.
# Errors that indicate that a mailing task should be retried without limit.
# An example is if email is being sent too quickly, but may succeed if sent
# more slowly. When caught by a task, it triggers an exponential backoff and retry.
# Retries happen continuously until the email is sent.
INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, )
# Errors that are known to indicate an inability to send any more emails,
# and should therefore not be retried. For example, exceeding a quota for emails.
# Also, any SMTP errors that are not explicitly enumerated above.
BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException)
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
"""
......@@ -118,12 +131,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
"""
entry = InstructorTask.objects.get(pk=entry_id)
# get inputs to use in this task from the entry:
#task_id = entry.task_id
user_id = entry.requester.id
task_id = entry.task_id
# TODO: check this against argument passed in?
# course_id = entry.course_id
# perfunctory check, since expansion is made for convenience of other task
# code that doesn't need the entry_id.
if course_id != entry.course_id:
format_msg = "Course id conflict: explicit value %s does not match task value %s"
raise ValueError(format_msg.format(course_id, entry.course_id))
email_id = task_input['email_id']
try:
......@@ -138,15 +153,16 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_option = email_obj.to_option
# TODO: instead of fetching from email object, compare instead to
# confirm that they match, and raise an exception if they don't.
# course_id = email_obj.course_id
# sanity check that course for email_obj matches that of the task referencing it:
if course_id != email_obj.course_id:
format_msg = "Course id conflict: explicit value %s does not match email value %s"
raise ValueError(format_msg.format(course_id, email_obj.course_id))
try:
course = get_course_by_id(course_id, depth=1)
except Http404 as exc:
log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0])
raise Exception("get_course_by_id failed: " + exc.args[0])
raise ValueError("Course not found: " + exc.args[0])
global_email_context = _get_course_email_context(course)
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
......@@ -173,23 +189,26 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
for i in range(num_tasks_this_query):
if i == num_tasks_this_query - 1:
# Avoid cutting off the very last email when chunking a task that divides perfectly
# (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100)
# (e.g. num_emails_this_query = 297 and EMAILS_PER_TASK is 100)
to_list = recipient_sublist[i * chunk:]
else:
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
retry_progress = create_subtask_status()
task_list.append(send_course_email.subtask((
entry_id,
email_id,
to_list,
global_email_context,
retry_progress,
),
task_id=subtask_id,
routing_key=settings.HIGH_PRIORITY_QUEUE,
))
subtask_status = create_subtask_status(subtask_id)
# create subtask, passing args and kwargs:
new_subtask = send_course_email.subtask(
(
entry_id,
email_id,
to_list,
global_email_context,
subtask_status,
),
task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
......@@ -208,7 +227,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# now group the subtasks, and start them running:
task_group = group(task_list)
task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE)
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
# We want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value.
......@@ -218,13 +237,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
return progress
# TODO: figure out if we really need this after all (for unit tests...)
def _get_current_task():
"""Stub to make it easier to test without actually running Celery"""
return current_task
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
"""
Sends an email to a list of recipients.
......@@ -249,8 +262,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
# with it right away, but we also don't expect it to fail.
InstructorTask.objects.get(pk=entry_id)
# Get information from current task's request:
current_task_id = _get_current_task().request.id
current_task_id = subtask_status['task_id']
num_to_send = len(to_list)
log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
......@@ -295,6 +307,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
update_subtask_status(entry_id, current_task_id, new_subtask_status)
raise send_exception
log.info("background task (%s) returning status %s", current_task_id, new_subtask_status)
return new_subtask_status
......@@ -332,12 +345,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
'failed' count above.
"""
# Get information from current task's request:
task_id = _get_current_task().request.id
retry_index = _get_current_task().request.retries
#task_id = _get_current_task().request.id
#retry_index = _get_current_task().request.retries
task_id = subtask_status['task_id']
# If this is a second attempt, then throttle the speed at which mail is sent:
throttle = retry_index > 0
throttle = subtask_status['retried_nomax'] > 0
# collect stats on progress:
num_optout = 0
num_sent = 0
num_error = 0
......@@ -354,7 +369,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# attempt. Anyone on the to_list on a retry has already passed the filter
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
if retry_index == 0:
if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0:
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
.values_list('user__email', flat=True))
......@@ -412,7 +427,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
)
email_msg.attach_alternative(html_msg, 'text/html')
# Throttle if we tried a few times and got the rate limiter
# Throttle if we have gotten the rate limiter
if throttle:
sleep(0.2)
......@@ -422,11 +437,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]):
connection.send_messages([email_msg])
dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
log.info('Email with id %s sent to %s', email_id, email)
num_sent += 1
except SMTPDataError as exc:
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
if exc.smtp_code >= 400 and exc.smtp_code < 500:
......@@ -438,52 +448,56 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
num_error += 1
except SINGLE_EMAIL_FAILURE_ERRORS as exc:
# This will fall through and not retry the message, since it will be popped
log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc)
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
num_error += 1
else:
dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
log.info('Email with id %s sent to %s', email_id, email)
num_sent += 1
# Pop the user that was emailed off the end of the list:
to_list.pop()
except INFINITE_RETRY_ERRORS as exc:
dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)])
subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
retriedA=1,
retried_nomax=1,
state=RETRY
)
return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True
)
except RETRY_ERRORS as exc:
except LIMITED_RETRY_ERRORS as exc:
# Errors caught here cause the email to be retried. The entire task is actually retried
# without popping the current recipient off of the existing list.
# Errors caught are those that indicate a temporary condition that might succeed on retry.
dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
retriedB=1,
retried_withmax=1,
state=RETRY
)
return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
)
except Exception as exc:
# If we have a general exception for this request, we need to figure out what to do with it.
# If we're going to just mark it as failed
# And the log message below should indicate which task_id is failing, so we have a chance to
# reconstruct the problems.
if isinstance(exc, QUOTA_EXCEEDED_ERRORS):
log.exception('WARNING: Course "%s" exceeded quota!', course_title)
log.exception('Email with id %d not sent due to exceeding quota. To list: %s',
email_id,
[i['email'] for i in to_list])
else:
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
except BULK_EMAIL_FAILURE_ERRORS as exc:
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
num_error += len(to_list)
subtask_progress = increment_subtask_status(
subtask_status,
......@@ -493,6 +507,27 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
state=FAILURE
)
return subtask_progress, exc
except Exception as exc:
# Errors caught here cause the email to be retried. The entire task is actually retried
# without popping the current recipient off of the existing list.
# These are unexpected errors. Since they might be due to a temporary condition that might
# succeed on retry, we give them a retry.
dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception. Generating retry.',
task_id, email_id)
subtask_progress = increment_subtask_status(
subtask_status,
succeeded=num_sent,
failed=num_error,
skipped=num_optout,
retried_withmax=1,
state=RETRY
)
return _submit_for_retry(
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
)
else:
# Successful completion is marked by an exception value of None:
subtask_progress = increment_subtask_status(
......@@ -508,13 +543,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
connection.close()
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error):
def _get_current_task():
"""Stub to make it easier to test without actually running Celery"""
return current_task
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, is_sending_rate_error):
"""
Helper function to requeue a task for retry, using the new version of arguments provided.
Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
These include the `current_exception` that the task encountered that is causing the retry attempt,
and the `subtask_progress` that is to be returned.
and the `subtask_status` that is to be returned.
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
......@@ -528,27 +568,29 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
successfully submitted, this value will be the RetryTaskError that retry() returns.
Otherwise, it (ought to be) the current_exception passed in.
"""
task_id = _get_current_task().request.id
retry_index = _get_current_task().request.retries
# task_id = _get_current_task().request.id
task_id = subtask_status['task_id']
log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped'])
task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped'])
# Calculate time until we retry this task (in seconds):
max_retries = _get_current_task().max_retries + subtask_status['retried_nomax']
base_delay = _get_current_task().default_retry_delay
if is_sending_rate_error:
retry_index = subtask_status['retried_nomax']
exp = min(retry_index, 5)
countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25)
countdown = ((2 ** exp) * base_delay) * random.uniform(.5, 1.25)
exception_type = 'sending-rate'
else:
countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5)
retry_index = subtask_status['retried_withmax']
countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.5)
exception_type = 'transient'
# max_retries is increased by the number of times an "infinite-retry" exception
# has been retried. We want the regular retries to trigger a retry, but not these
# has been retried. We want the regular retries to trigger max-retry checking, but not these
# special retries. So we count them separately.
max_retries = _get_current_task().max_retries + subtask_progress['retriedA']
log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)',
task_id, email_id, exception_type, current_exception, len(to_list), max_retries)
log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)',
task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries)
try:
send_course_email.retry(
......@@ -557,7 +599,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
email_id,
to_list,
global_email_context,
subtask_progress,
subtask_status,
],
exc=current_exception,
countdown=countdown,
......@@ -568,7 +610,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
# If retry call is successful, update with the current progress:
log.exception('Task %s: email with id %d caused send_course_email task to retry.',
task_id, email_id)
return subtask_progress, retry_error
return subtask_status, retry_error
except Exception as retry_exc:
# If there are no more retries, because the maximum has been reached,
# we expect the original exception to be raised. We catch it here
......@@ -578,7 +620,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
num_failed = len(to_list)
new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE)
new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE)
return new_subtask_progress, retry_exc
......
......@@ -4,7 +4,6 @@ Unit tests for handling email sending errors
from itertools import cycle
from mock import patch, Mock
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
from unittest import skip
from django.test.utils import override_settings
from django.conf import settings
......@@ -93,9 +92,9 @@ class TestEmailErrors(ModuleStoreTestCase):
# Test that after the rejected email, the rest still successfully send
((_initial_results), kwargs) = result.call_args
self.assertEquals(kwargs['skipped'], 0)
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(kwargs['failed'], expectedNumFails)
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails)
expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(kwargs['failed'], expected_fails)
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails)
@patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.send_course_email.retry')
......@@ -144,7 +143,7 @@ class TestEmailErrors(ModuleStoreTestCase):
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
def test_general_exception(self, mock_log, retry, result):
"""
Tests the if the error is not SMTP-related, we log and reraise
Tests the if the error is unexpected, we log and retry
"""
test_email = {
'action': 'Send email',
......@@ -156,11 +155,10 @@ class TestEmailErrors(ModuleStoreTestCase):
# so we assert on the arguments of log.exception
self.client.post(self.url, test_email)
self.assertTrue(mock_log.exception.called)
((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args
self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
((log_str, _task_id, email_id), _) = mock_log.exception.call_args
self.assertIn('caused send_course_email task to fail with unexpected exception.', log_str)
self.assertEqual(email_id, 1)
self.assertEqual(to_list, [self.instructor.email])
self.assertFalse(retry.called)
self.assertTrue(retry.called)
# check the results being returned
self.assertTrue(result.called)
((initial_results, ), kwargs) = result.call_args
......@@ -180,7 +178,7 @@ class TestEmailErrors(ModuleStoreTestCase):
entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor)
task_input = {"email_id": -1}
with self.assertRaises(CourseEmail.DoesNotExist):
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101
((log_str, _, email_id), _) = mock_log.warning.call_args
self.assertTrue(mock_log.warning.called)
self.assertIn('Failed to get CourseEmail with id', log_str)
......@@ -196,9 +194,9 @@ class TestEmailErrors(ModuleStoreTestCase):
email = CourseEmail(course_id=course_id)
email.save()
entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor)
task_input = {"email_id": email.id}
task_input = {"email_id": email.id} # pylint: disable=E1101
with self.assertRaises(Exception):
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101
((log_str, _, _), _) = mock_log.exception.call_args
self.assertTrue(mock_log.exception.called)
self.assertIn('get_course_by_id failed:', log_str)
......@@ -211,9 +209,9 @@ class TestEmailErrors(ModuleStoreTestCase):
email = CourseEmail(course_id=self.course.id, to_option="IDONTEXIST")
email.save()
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
task_input = {"email_id": email.id}
task_input = {"email_id": email.id} # pylint: disable=E1101
with self.assertRaises(Exception):
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name")
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101
((log_str, opt_str), _) = mock_log.error.call_args
self.assertTrue(mock_log.error.called)
self.assertIn('Unexpected bulk email TO_OPTION found', log_str)
......
"""
Unit tests for LMS instructor-initiated background tasks.
Runs tasks on answers to course problems to validate that code
paths actually work.
"""
import json
from uuid import uuid4
from itertools import cycle
from mock import patch, Mock
from smtplib import SMTPDataError, SMTPServerDisconnected
from celery.states import SUCCESS
# from django.test.utils import override_settings
from django.conf import settings
from django.core.management import call_command
from bulk_email.models import CourseEmail, SEND_TO_ALL
# from instructor_task.tests.test_tasks import TestInstructorTasks
from instructor_task.tasks import send_bulk_course_email
from instructor_task.models import InstructorTask
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
from instructor_task.tests.factories import InstructorTaskFactory
class TestTaskFailure(Exception):
"""Dummy exception used for unit tests."""
pass
class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
"""Tests instructor task that send bulk email."""
def setUp(self):
super(TestBulkEmailInstructorTask, self).setUp()
self.initialize_course()
self.instructor = self.create_instructor('instructor')
# load initial content (since we don't run migrations as part of tests):
call_command("loaddata", "course_email_template.json")
def _create_input_entry(self, course_id=None):
"""
Creates a InstructorTask entry for testing.
Overrides the base class version in that this creates CourseEmail.
"""
to_option = SEND_TO_ALL
course_id = course_id or self.course.id
course_email = CourseEmail.create(course_id, self.instructor, to_option, "Test Subject", "<p>This is a test message</p>")
task_input = {'email_id': course_email.id}
task_id = str(uuid4())
instructor_task = InstructorTaskFactory.create(
course_id=course_id,
requester=self.instructor,
task_input=json.dumps(task_input),
task_key='dummy value',
task_id=task_id,
)
return instructor_task
def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None):
"""Submit a task and mock how celery provides a current_task."""
self.current_task = Mock()
self.current_task.max_retries = settings.BULK_EMAIL_MAX_RETRIES
self.current_task.default_retry_delay = settings.BULK_EMAIL_DEFAULT_RETRY_DELAY
task_args = [entry_id, {}]
with patch('bulk_email.tasks._get_current_task') as mock_get_task:
mock_get_task.return_value = self.current_task
return task_class.apply(task_args, task_id=task_id).get()
def test_email_missing_current_task(self):
task_entry = self._create_input_entry()
with self.assertRaises(ValueError):
send_bulk_course_email(task_entry.id, {})
def test_email_undefined_course(self):
# Check that we fail when passing in a course that doesn't exist.
task_entry = self._create_input_entry(course_id="bogus/course/id")
with self.assertRaises(ValueError):
self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
def _create_students(self, num_students):
"""Create students, a problem, and StudentModule objects for testing"""
students = [
self.create_student('robot%d' % i) for i in xrange(num_students)
]
return students
def _test_run_with_task(self, task_class, action_name, total, succeeded, failed=0, skipped=0):
"""Run a task and check the number of emails processed."""
task_entry = self._create_input_entry()
parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
# check return value
self.assertEquals(parent_status.get('total'), total)
self.assertEquals(parent_status.get('action_name'), action_name)
# compare with entry in table:
entry = InstructorTask.objects.get(id=task_entry.id)
status = json.loads(entry.task_output)
self.assertEquals(status.get('attempted'), succeeded + failed)
self.assertEquals(status.get('succeeded'), succeeded)
self.assertEquals(status['skipped'], skipped)
self.assertEquals(status['failed'], failed)
self.assertEquals(status.get('total'), total)
self.assertEquals(status.get('action_name'), action_name)
self.assertGreater(status.get('duration_ms'), 0)
self.assertEquals(entry.task_state, SUCCESS)
def test_successful(self):
num_students = settings.EMAILS_PER_TASK
self._create_students(num_students)
# we also send email to the instructor:
num_emails = num_students + 1
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
get_conn.return_value.send_messages.side_effect = cycle([None])
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
def test_data_err_fail(self):
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
num_students = settings.EMAILS_PER_TASK
self._create_students(num_students)
# we also send email to the instructor:
num_emails = num_students + 1
expected_fails = int((num_emails + 3) / 4.0)
expected_succeeds = num_emails - expected_fails
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
# have every fourth email fail due to blacklisting:
get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"),
None, None, None])
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
def test_retry_after_limited_retry_error(self):
# Test that celery handles connection failures by retrying.
num_students = 1
self._create_students(num_students)
# we also send email to the instructor:
num_emails = num_students + 1
expected_fails = 0
expected_succeeds = num_emails
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
# have every other mail attempt fail due to disconnection:
get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting"), None])
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
def test_max_retry(self):
# Test that celery can hit a maximum number of retries.
num_students = 1
self._create_students(num_students)
# we also send email to the instructor:
num_emails = num_students + 1
# This is an ugly hack: the failures that are reported by the EAGER version of retry
# are multiplied by the attempted number of retries (equals max plus one).
expected_fails = num_emails * (settings.BULK_EMAIL_MAX_RETRIES + 1)
expected_succeeds = 0
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
# always fail to connect, triggering repeated retries until limit is hit:
get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting")])
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
......@@ -723,18 +723,13 @@ def instructor_dashboard(request, course_id):
email_subject = request.POST.get("subject")
html_message = request.POST.get("message")
# TODO: make sure this is committed before submitting it to the task.
# However, it should probably be enough to do the submit below, which
# will commit the transaction for the InstructorTask object. Both should
# therefore be committed. (Still, it might be clearer to do so here as well.)
# Actually, this should probably be moved out, so that all the validation logic
# we might want to add to it can be added. There might also be something
# that would permit validation of the email beforehand.
# Create the CourseEmail object. This is saved immediately, so that
# any transaction that has been pending up to this point will also be
# committed.
email = CourseEmail.create(course_id, request.user, email_to_option, email_subject, html_message)
# TODO: make this into a task submission, so that the correct
# InstructorTask object gets created (for monitoring purposes)
submit_bulk_course_email(request, course_id, email.id)
# Submit the task, so that the correct InstructorTask object gets created (for monitoring purposes)
submit_bulk_course_email(request, course_id, email.id) # pylint: disable=E1101
if email_to_option == "all":
email_msg = '<div class="msg msg-confirm"><p class="copy">Your email was successfully queued for sending. Please note that for large public classes (~10k), it may take 1-2 hours to send all emails.</p></div>'
......@@ -1548,7 +1543,6 @@ def get_background_task_table(course_id, problem_url=None, student=None, task_ty
# (note that we don't have to check that the arguments are valid; it
# just won't find any entries.)
if (history_entries.count()) == 0:
# TODO: figure out how to deal with task_type better here...
if problem_url is None:
msg += '<font color="red">Failed to find any background tasks for course "{course}".</font>'.format(course=course_id)
elif student is not None:
......
......@@ -178,8 +178,8 @@ def submit_bulk_course_email(request, course_id, email_id):
The specified CourseEmail object will be sent be updated for all students who have enrolled
in a course. Parameters are the `course_id` and the `email_id`, the id of the CourseEmail object.
AlreadyRunningError is raised if the course's students are already being emailed.
TODO: is this the right behavior? Or should multiple emails be allowed in the pipeline at the same time?
AlreadyRunningError is raised if the same recipients are already being emailed with the same
CourseEmail object.
This method makes sure the InstructorTask entry is committed.
When called from any view that is wrapped by TransactionMiddleware,
......@@ -188,11 +188,9 @@ def submit_bulk_course_email(request, course_id, email_id):
save here. Any future database operations will take place in a
separate transaction.
"""
# check arguments: make sure that the course is defined?
# TODO: what is the right test here?
# This should also make sure that the email exists.
# We can also pull out the To argument here, so that is displayed in
# Assume that the course is defined, and that the user has already been verified to have
# appropriate access to the course. But make sure that the email exists.
# We also pull out the To argument here, so that is displayed in
# the InstructorTask status.
email_obj = CourseEmail.objects.get(id=email_id)
to_option = email_obj.to_option
......
......@@ -268,7 +268,7 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key)
# submit task:
task_id = instructor_task.task_id
task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)]
task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)] # pylint: disable=E1101
task_class.apply_async(task_args, task_id=task_id)
return instructor_task
......@@ -14,50 +14,75 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG = get_task_logger(__name__)
def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
"""
Create a dict for tracking the status of a subtask.
Create and return a dict for tracking the status of a subtask.
Subtask status keys are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
TODO: update
Object must be JSON-serializable, so that it can be passed as an argument
to tasks.
TODO: decide if in future we want to include specific error information
In future, we may want to include specific error information
indicating the reason for failure.
Also, we should count up "not attempted" separately from
attempted/failed.
Also, we should count up "not attempted" separately from attempted/failed.
"""
attempted = succeeded + failed
current_result = {
'task_id': task_id,
'attempted': attempted,
'succeeded': succeeded,
'pending': pending,
'skipped': skipped,
'failed': failed,
'retriedA': retriedA,
'retriedB': retriedB,
'retried_nomax': retried_nomax,
'retried_withmax': retried_withmax,
'state': state if state is not None else QUEUING,
}
return current_result
def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
"""
Update the result of a subtask with additional results.
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
Create and return a dict for tracking the status of a subtask.
Keys for input `subtask_result` and returned subtask_status are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Kwarg arguments are incremented to the corresponding key in `subtask_result`.
The exception is for `state`, which if specified is used to override the existing value.
"""
# TODO: rewrite this if we have additional fields added to original subtask_result,
# that are not part of the increment. Tradeoff on duplicating the 'attempts' logic.
new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state)
for keyname in new_result:
if keyname == 'state':
# does not get incremented. If no new value, copy old value:
if state is None:
new_result[keyname] = subtask_result[keyname]
elif keyname in subtask_result:
new_result[keyname] += subtask_result[keyname]
new_result = dict(subtask_result)
new_result['attempted'] += (succeeded + failed)
new_result['succeeded'] += succeeded
new_result['failed'] += failed
new_result['skipped'] += skipped
new_result['retried_nomax'] += retried_nomax
new_result['retried_withmax'] += retried_withmax
if state is not None:
new_result['state'] = state
return new_result
......@@ -70,7 +95,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,
as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,
and the total number of "things to do" is set, so the user can be told how much needs to be
done overall. The `action_name` is also stored, to also help with constructing more readable
done overall. The `action_name` is also stored, to help with constructing more readable
task_progress messages.
The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.
......@@ -80,8 +105,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
the InstructorTask's "status" will be changed to SUCCESS.
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
is the value of `status`, which is initialized here to QUEUING.
information for each subtask. The value for each subtask (keyed by its task_id)
is its subtask status, as defined by create_subtask_status().
This information needs to be set up in the InstructorTask before any of the subtasks start
running. If not, there is a chance that the subtasks could complete before the parent task
......@@ -92,7 +117,6 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
rely on the status stored in the InstructorTask object, rather than status stored in the
corresponding AsyncResult.
"""
# TODO: also add 'pending' count here? (Even though it's total-attempted-skipped
task_progress = {
'action_name': action_name,
'attempted': 0,
......@@ -108,12 +132,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
# Write out the subtasks information.
num_subtasks = len(subtask_id_list)
# using fromkeys to initialize uses a single value. we need the value
# to be distinct, since it's now a dict:
# subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
# TODO: may not be necessary to store initial value with all those zeroes!
# Instead, use a placemarker....
subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list}
# Note that may not be necessary to store initial value with all those zeroes!
subtask_status = {subtask_id: create_subtask_status(subtask_id) for subtask_id in subtask_id_list}
subtask_dict = {
'total': num_subtasks,
'succeeded': 0,
......@@ -129,7 +149,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
@transaction.commit_manually
def update_subtask_status(entry_id, current_task_id, subtask_status):
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
"""
Update the status of the subtask in the parent InstructorTask object tracking its progress.
......@@ -138,9 +158,11 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
committed on completion, or rolled back on error.
The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict.
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress`
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `new_subtask_status`
into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms'
value with the current interval since the original InstructorTask started.
value with the current interval since the original InstructorTask started. Note that this
value is only approximate, since the subtask may be running on a different server than the
original task, so is subject to clock skew.
The InstructorTask's "subtasks" field is also updated. This is also a JSON-serialized dict.
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
......@@ -155,13 +177,13 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
messages, progress made, etc.
"""
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_status)
current_task_id, entry_id, new_subtask_status)
try:
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
subtask_dict = json.loads(entry.subtasks)
subtask_status = subtask_dict['status']
if current_task_id not in subtask_status:
subtask_status_info = subtask_dict['status']
if current_task_id not in subtask_status_info:
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
......@@ -173,39 +195,45 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
# will be updating before the original call, and we don't want their
# ultimate status to be clobbered by the "earlier" updates. This
# should not be a problem in normal (non-eager) processing.
old_status = subtask_status[current_task_id]
# TODO: check this logic...
state = subtask_status['state']
# if state != RETRY or old_status['status'] == QUEUING:
# instead replace the status only if it's 'newer'
# i.e. has fewer pending
if subtask_status['pending'] <= old_status['pending']:
subtask_status[current_task_id] = subtask_status
current_subtask_status = subtask_status_info[current_task_id]
current_state = current_subtask_status['state']
new_state = new_subtask_status['state']
if new_state != RETRY or current_state == QUEUING or current_state in READY_STATES:
subtask_status_info[current_task_id] = new_subtask_status
# Update the parent task progress
# Set the estimate of duration, but only if it
# increases. Clock skew between time() returned by different machines
# may result in non-monotonic values for duration.
task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000)
# change behavior so we don't update on progress now:
# TODO: figure out if we can make this more responsive later,
# by figuring out how to handle retries better.
if subtask_status is not None and state in READY_STATES:
prev_duration = task_progress['duration_ms']
new_duration = int((time() - start_time) * 1000)
task_progress['duration_ms'] = max(prev_duration, new_duration)
# Update counts only when subtask is done.
# In future, we can make this more responsive by updating status
# between retries, by comparing counts that change from previous
# retry.
if new_subtask_status is not None and new_state in READY_STATES:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_status[statname]
task_progress[statname] += new_subtask_status[statname]
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict.
if state == SUCCESS:
# entire new_subtask_status dict.
if new_state == SUCCESS:
subtask_dict['succeeded'] += 1
elif state == RETRY:
elif new_state == RETRY:
subtask_dict['retried'] += 1
else:
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that:
# TODO: see if there was a catastrophic failure that occurred, and figure out
# how to report that here.
# If we're done with the last task, update the parent status to indicate that.
# At present, we mark the task as having succeeded. In future, we should see
# if there was a catastrophic failure that occurred, and figure out how to
# report that here.
if num_remaining <= 0:
entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict)
......
......@@ -32,7 +32,7 @@ from instructor_task.tasks_helper import (
from bulk_email.tasks import perform_delegate_email_batches
@task(base=BaseInstructorTask)
@task(base=BaseInstructorTask) # pylint: disable=E1102
def rescore_problem(entry_id, xmodule_instance_args):
"""Rescores a problem in a course, for all students or one specific student.
......@@ -55,13 +55,14 @@ def rescore_problem(entry_id, xmodule_instance_args):
update_fcn = partial(rescore_problem_module_state, xmodule_instance_args)
def filter_fcn(modules_to_update):
"""Filter that matches problems which are marked as being done"""
return modules_to_update.filter(state__contains='"done": true')
visit_fcn = partial(perform_module_state_update, update_fcn, filter_fcn)
return run_main_task(entry_id, visit_fcn, action_name)
@task(base=BaseInstructorTask)
@task(base=BaseInstructorTask) # pylint: disable=E1102
def reset_problem_attempts(entry_id, xmodule_instance_args):
"""Resets problem attempts to zero for a particular problem for all students in a course.
......@@ -82,7 +83,7 @@ def reset_problem_attempts(entry_id, xmodule_instance_args):
return run_main_task(entry_id, visit_fcn, action_name)
@task(base=BaseInstructorTask)
@task(base=BaseInstructorTask) # pylint: disable=E1102
def delete_problem_state(entry_id, xmodule_instance_args):
"""Deletes problem state entirely for all students on a particular problem in a course.
......@@ -103,18 +104,20 @@ def delete_problem_state(entry_id, xmodule_instance_args):
return run_main_task(entry_id, visit_fcn, action_name)
@task(base=BaseInstructorTask)
def send_bulk_course_email(entry_id, xmodule_instance_args):
"""Sends emails to in a course.
@task(base=BaseInstructorTask) # pylint: disable=E1102
def send_bulk_course_email(entry_id, _xmodule_instance_args):
"""Sends emails to recipients enrolled in a course.
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
The entry contains the `course_id` that identifies the course, as well as the
`task_input`, which contains task-specific input.
The task_input should be a dict with no entries.
The task_input should be a dict with the following entries:
`xmodule_instance_args` provides information needed by _get_module_instance_for_task()
to instantiate an xmodule instance.
'email_id': the full URL to the problem to be rescored. (required)
`_xmodule_instance_args` provides information needed by _get_module_instance_for_task()
to instantiate an xmodule instance. This is unused here.
"""
action_name = 'emailed'
visit_fcn = perform_delegate_email_batches
......
......@@ -42,6 +42,12 @@ class BaseInstructorTask(Task):
Permits updating information about task in corresponding InstructorTask for monitoring purposes.
Assumes that the entry_id of the InstructorTask model is the first argument to the task.
The `entry_id` is the primary key for the InstructorTask entry representing the task. This class
updates the entry on success and failure of the task it wraps. It is setting the entry's value
for task_state based on what Celery would set it to once the task returns to Celery:
FAILURE if an exception is encountered, and SUCCESS if it returns normally.
Other arguments are pass-throughs to perform_module_state_update, and documented there.
"""
abstract = True
......@@ -51,8 +57,22 @@ class BaseInstructorTask(Task):
Updates task_output and task_state. But it shouldn't actually do anything
if the task is only creating subtasks to actually do the work.
Assumes `task_progress` is a dict containing the task's result, with the following keys:
'attempted': number of attempts made
'succeeded': number of attempts that "succeeded"
'skipped': number of attempts that "skipped"
'failed': number of attempts that "failed"
'total': number of possible subtasks to attempt
'action_name': user-visible verb to use in status messages. Should be past-tense.
Pass-through of input `action_name`.
'duration_ms': how long the task has (or had) been running.
This is JSON-serialized and stored in the task_output column of the InstructorTask entry.
"""
TASK_LOG.info('Task success returned: %r' % (self.request, ))
TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress)
# We should be able to find the InstructorTask object to update
# based on the task_id here, without having to dig into the
# original args to the task. On the other hand, the entry_id
......@@ -72,9 +92,20 @@ class BaseInstructorTask(Task):
"""
Update InstructorTask object corresponding to this task with info about failure.
Fetches and updates exception and traceback information on failure.
Fetches and updates exception and traceback information on failure.
If an exception is raised internal to the task, it is caught by celery and provided here.
The information is recorded in the InstructorTask object as a JSON-serialized dict
stored in the task_output column. It contains the following keys:
'exception': type of exception object
'message': error message from exception object
'traceback': traceback information (truncated if necessary)
Note that there is no way to record progress made within the task (e.g. attempted,
succeeded, etc.) when such failures occur.
"""
TASK_LOG.info('Task failure returned: %r' % (self.request, ))
TASK_LOG.debug('Task %s: failure returned', task_id)
entry_id = args[0]
try:
entry = InstructorTask.objects.get(pk=entry_id)
......@@ -88,12 +119,6 @@ class BaseInstructorTask(Task):
entry.task_state = FAILURE
entry.save_now()
def on_retry(self, exc, task_id, args, kwargs, einfo):
# We don't expect this to be called for top-level tasks, at the moment....
# If it were, not sure what kind of status to report for it.
# But it would be good to know that it's being called, so at least log it.
TASK_LOG.info('Task retry returned: %r' % (self.request, ))
class UpdateProblemModuleStateError(Exception):
"""
......@@ -110,6 +135,67 @@ def _get_current_task():
return current_task
def run_main_task(entry_id, task_fcn, action_name):
"""
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
Arguments passed to `task_fcn` are:
`entry_id` : the primary key for the InstructorTask entry representing the task.
`course_id` : the id for the course.
`task_input` : dict containing task-specific arguments, JSON-decoded from InstructorTask's task_input.
`action_name` : past-tense verb to use for constructing status messages.
If no exceptions are raised, the `task_fcn` should return a dict containing
the task's result with the following keys:
'attempted': number of attempts made
'succeeded': number of attempts that "succeeded"
'skipped': number of attempts that "skipped"
'failed': number of attempts that "failed"
'total': number of possible subtasks to attempt
'action_name': user-visible verb to use in status messages.
Should be past-tense. Pass-through of input `action_name`.
'duration_ms': how long the task has (or had) been running.
"""
# get the InstructorTask to be updated. If this fails, then let the exception return to Celery.
# There's no point in catching it here.
entry = InstructorTask.objects.get(pk=entry_id)
# get inputs to use in this task from the entry:
task_id = entry.task_id
course_id = entry.course_id
task_input = json.loads(entry.task_input)
# construct log message:
fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"'
task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)
TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)
# Check that the task_id submitted in the InstructorTask matches the current task
# that is running.
request_task_id = _get_current_task().request.id
if task_id != request_task_id:
fmt = 'Requested task did not match actual task "{actual_id}": {task_info}'
message = fmt.format(actual_id=request_task_id, task_info=task_info_string)
TASK_LOG.error(message)
raise ValueError(message)
# Now do the work:
with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
task_progress = task_fcn(entry_id, course_id, task_input, action_name)
# Release any queries that the connection has been hanging onto:
reset_queries()
# log and exit, returning task_progress info as task result:
TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress)
return task_progress
def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name):
"""
Performs generic update by visiting StudentModule instances with the update_fcn provided.
......@@ -220,92 +306,13 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
return task_progress
def run_main_task(entry_id, task_fcn, action_name):
"""
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
TODO: UPDATE THIS DOCSTRING
(IT's not just visiting StudentModule instances....)
Performs generic update by visiting StudentModule instances with the update_fcn provided.
The `entry_id` is the primary key for the InstructorTask entry representing the task. This function
updates the entry on success and failure of the perform_module_state_update function it
wraps. It is setting the entry's value for task_state based on what Celery would set it to once
the task returns to Celery: FAILURE if an exception is encountered, and SUCCESS if it returns normally.
Other arguments are pass-throughs to perform_module_state_update, and documented there.
If no exceptions are raised, a dict containing the task's result is returned, with the following keys:
'attempted': number of attempts made
'succeeded': number of attempts that "succeeded"
'skipped': number of attempts that "skipped"
'failed': number of attempts that "failed"
'total': number of possible subtasks to attempt
'action_name': user-visible verb to use in status messages. Should be past-tense.
Pass-through of input `action_name`.
'duration_ms': how long the task has (or had) been running.
Before returning, this is also JSON-serialized and stored in the task_output column of the InstructorTask entry.
If an exception is raised internally, it is caught and recorded in the InstructorTask entry.
This is also a JSON-serialized dict, stored in the task_output column, containing the following keys:
'exception': type of exception object
'message': error message from exception object
'traceback': traceback information (truncated if necessary)
Once the exception is caught, it is raised again and allowed to pass up to the
task-running level, so that it can also set the failure modes and capture the error trace in the
result object that Celery creates.
"""
# get the InstructorTask to be updated. If this fails, then let the exception return to Celery.
# There's no point in catching it here.
entry = InstructorTask.objects.get(pk=entry_id)
# get inputs to use in this task from the entry:
task_id = entry.task_id
course_id = entry.course_id
task_input = json.loads(entry.task_input)
# construct log message:
fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"'
task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)
TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)
# Check that the task_id submitted in the InstructorTask matches the current task
# that is running.
request_task_id = _get_current_task().request.id
if task_id != request_task_id:
fmt = 'Requested task did not match actual task "{actual_id}": {task_info}'
message = fmt.format(actual_id=request_task_id, task_info=task_info_string)
TASK_LOG.error(message)
raise UpdateProblemModuleStateError(message)
# Now do the work:
with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
task_progress = task_fcn(entry_id, course_id, task_input, action_name)
# Release any queries that the connection has been hanging onto:
reset_queries()
# log and exit, returning task_progress info as task result:
TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress)
return task_progress
def _get_task_id_from_xmodule_args(xmodule_instance_args):
"""Gets task_id from `xmodule_instance_args` dict, or returns default value if missing."""
return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID
def _get_xqueue_callback_url_prefix(xmodule_instance_args):
"""
"""
"""Gets prefix to use when constructing xqueue_callback_url."""
return xmodule_instance_args.get('xqueue_callback_url_prefix', '') if xmodule_instance_args is not None else ''
......
......@@ -152,15 +152,16 @@ class InstructorTaskCourseSubmitTest(InstructorTaskCourseTestCase):
self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
def _define_course_email(self):
"""Create CourseEmail object for testing."""
course_email = CourseEmail.create(self.course.id, self.instructor, SEND_TO_ALL, "Test Subject", "<p>This is a test message</p>")
return course_email.id
return course_email.id # pylint: disable=E1101
def test_submit_bulk_email_all(self):
email_id = self._define_course_email()
instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor), self.course.id, email_id)
# test resubmitting, by updating the existing record:
instructor_task = InstructorTask.objects.get(id=instructor_task.id)
instructor_task = InstructorTask.objects.get(id=instructor_task.id) # pylint: disable=E1101
instructor_task.task_state = PROGRESS
instructor_task.save()
......
......@@ -85,11 +85,11 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
def _test_missing_current_task(self, task_class):
"""Check that a task_class fails when celery doesn't provide a current_task."""
task_entry = self._create_input_entry()
with self.assertRaises(UpdateProblemModuleStateError):
with self.assertRaises(ValueError):
task_class(task_entry.id, self._get_xmodule_instance_args())
def _test_undefined_course(self, task_class):
# run with celery, but no course defined
"""Run with celery, but with no course defined."""
task_entry = self._create_input_entry(course_id="bogus/course/id")
with self.assertRaises(ItemNotFoundError):
self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
......
......@@ -95,6 +95,10 @@ CELERY_QUEUES = {
DEFAULT_PRIORITY_QUEUE: {}
}
# We want Bulk Email running on the high-priority queue, so we define the
# routing key that points to it. At the moment, the name is the same.
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
########################## NON-SECURE ENV CONFIG ##############################
# Things like server locations, ports, etc.
......@@ -130,7 +134,7 @@ LOG_DIR = ENV_TOKENS['LOG_DIR']
CACHES = ENV_TOKENS['CACHES']
#Email overrides
# Email overrides
DEFAULT_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_FROM_EMAIL', DEFAULT_FROM_EMAIL)
DEFAULT_FEEDBACK_EMAIL = ENV_TOKENS.get('DEFAULT_FEEDBACK_EMAIL', DEFAULT_FEEDBACK_EMAIL)
DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL)
......@@ -142,8 +146,10 @@ BUGS_EMAIL = ENV_TOKENS.get('BUGS_EMAIL', BUGS_EMAIL)
PAYMENT_SUPPORT_EMAIL = ENV_TOKENS.get('PAYMENT_SUPPORT_EMAIL', PAYMENT_SUPPORT_EMAIL)
PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CURRENCY',
PAID_COURSE_REGISTRATION_CURRENCY)
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
#Theme overrides
# Theme overrides
THEME_NAME = ENV_TOKENS.get('THEME_NAME', None)
if not THEME_NAME is None:
enable_theme(THEME_NAME)
......@@ -152,10 +158,10 @@ if not THEME_NAME is None:
# Marketing link overrides
MKTG_URL_LINK_MAP.update(ENV_TOKENS.get('MKTG_URL_LINK_MAP', {}))
#Timezone overrides
# Timezone overrides
TIME_ZONE = ENV_TOKENS.get('TIME_ZONE', TIME_ZONE)
#Additional installed apps
# Additional installed apps
for app in ENV_TOKENS.get('ADDL_INSTALLED_APPS', []):
INSTALLED_APPS += (app,)
......
......@@ -812,6 +812,17 @@ CELERY_QUEUES = {
DEFAULT_PRIORITY_QUEUE: {}
}
# let logging work as configured:
CELERYD_HIJACK_ROOT_LOGGER = False
################################ Bulk Email ###################################
# We want Bulk Email running on the high-priority queue, so we define the
# routing key that points to it. At the moment, the name is the same.
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
BULK_EMAIL_DEFAULT_RETRY_DELAY = 15
BULK_EMAIL_MAX_RETRIES = 5
################################### APPS ######################################
INSTALLED_APPS = (
# Standard ones that are always installed...
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment