Commit a6996740 by Brian Wilson

Check that a subtask has not already completed before running.

parent 12d98dae
......@@ -318,9 +318,12 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
# Check that the requested subtask is actually known to the current InstructorTask entry.
# If this fails, it throws an exception, which should fail this subtask immediately.
# This can happen when the parent task has been run twice, and results in duplicate
# subtasks being created for the same InstructorTask entry. We hope to catch this condition
# in perform_delegate_email_batches(), but just in case we fail to do so there,
# we check here as well.
# subtasks being created for the same InstructorTask entry. This can happen when Celery
# loses its connection to its broker, and any current tasks get requeued.
# We hope to catch this condition in perform_delegate_email_batches() when it's the parent
# task that is resubmitted, but just in case we fail to do so there, we check here as well.
# There is also a possibility that this task will be run twice by Celery, for the same reason.
# To deal with that, we need to confirm that the task has not already been completed.
check_subtask_is_valid(entry_id, current_task_id)
send_exception = None
......
......@@ -5,6 +5,8 @@ from itertools import cycle
from mock import patch
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
from celery.states import SUCCESS
from django.test.utils import override_settings
from django.conf import settings
from django.core.management import call_command
......@@ -19,7 +21,12 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF
from bulk_email.models import CourseEmail, SEND_TO_ALL
from bulk_email.tasks import perform_delegate_email_batches, send_course_email
from instructor_task.models import InstructorTask
from instructor_task.subtasks import create_subtask_status, initialize_subtask_info
from instructor_task.subtasks import (
create_subtask_status,
initialize_subtask_info,
update_subtask_status,
DuplicateTaskException,
)
class EmailTestException(Exception):
......@@ -210,7 +217,7 @@ class TestEmailErrors(ModuleStoreTestCase):
subtask_id = "subtask-id-value"
subtask_status = create_subtask_status(subtask_id)
email_id = 1001
with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'):
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find email subtasks of instructor task'):
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
def test_send_email_missing_subtask(self):
......@@ -224,9 +231,24 @@ class TestEmailErrors(ModuleStoreTestCase):
different_subtask_id = "bogus-subtask-id-value"
subtask_status = create_subtask_status(different_subtask_id)
bogus_email_id = 1001
with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'):
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for email subtask of instructor task'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
def test_send_email_completed_subtask(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
subtask_id = "subtask-id-value"
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
subtask_status = create_subtask_status(subtask_id, state=SUCCESS)
update_subtask_status(entry_id, subtask_id, subtask_status)
bogus_email_id = 1001
to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'}
new_subtask_status = create_subtask_status(subtask_id)
with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
def dont_test_send_email_undefined_email(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
......
......@@ -14,6 +14,11 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG = get_task_logger(__name__)
class DuplicateTaskException(Exception):
"""Exception indicating that a task already exists or has already completed."""
pass
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
"""
Create and return a dict for tracking the status of a subtask.
......@@ -149,30 +154,48 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
def check_subtask_is_valid(entry_id, current_task_id):
"""
Confirms that the current subtask is known to the InstructorTask.
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
Problems can occur when the parent task has been run twice, and results in duplicate
subtasks being created for the same InstructorTask entry. This can happen when Celery
loses its connection to its broker, and any current tasks get requeued.
If a parent task gets requeued, then the same InstructorTask may have a different set of
subtasks defined (to do the same thing), so the subtasks from the first queuing would not
be known to the InstructorTask. We return an exception in this case.
This may happen if a task that spawns subtasks is called twice with
the same task_id and InstructorTask entry_id. The set of subtasks
that are recorded in the InstructorTask from the first call get clobbered
by the the second set of subtasks. So when the first set of subtasks
actually run, they won't be found in the InstructorTask.
If a subtask gets requeued, then the first time the subtask runs it should run fine to completion.
However, we want to prevent it from running again, so we check here to see what the existing
subtask's status is. If it is complete, we return an exception.
Raises a ValueError exception if not.
Raises a DuplicateTaskException exception if it's not a task that should be run.
"""
# Confirm that the InstructorTask actually defines subtasks.
entry = InstructorTask.objects.get(pk=entry_id)
if len(entry.subtasks) == 0:
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise ValueError(msg)
raise DuplicateTaskException(msg)
# Confirm that the InstructorTask knows about this particular subtask.
subtask_dict = json.loads(entry.subtasks)
subtask_status_info = subtask_dict['status']
if current_task_id not in subtask_status_info:
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise ValueError(msg)
raise DuplicateTaskException(msg)
# Confirm that the InstructorTask doesn't think that this subtask has already been
# performed successfully.
subtask_status = subtask_status_info[current_task_id]
subtask_state = subtask_status.get('state')
if subtask_state in READY_STATES:
format_str = "Unexpected task_id '{}': already completed - status {} for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, subtask_status, entry)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
@transaction.commit_manually
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment