Commit 16b85c0c by Brian Wilson

Check for requeued subtasks when in RETRY state.

parent 2d192875
...@@ -346,7 +346,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask ...@@ -346,7 +346,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
# task that is resubmitted, but just in case we fail to do so there, we check here as well. # task that is resubmitted, but just in case we fail to do so there, we check here as well.
# There is also a possibility that this task will be run twice by Celery, for the same reason. # There is also a possibility that this task will be run twice by Celery, for the same reason.
# To deal with that, we need to confirm that the task has not already been completed. # To deal with that, we need to confirm that the task has not already been completed.
check_subtask_is_valid(entry_id, current_task_id) check_subtask_is_valid(entry_id, current_task_id, subtask_status)
send_exception = None send_exception = None
new_subtask_status = None new_subtask_status = None
......
...@@ -5,7 +5,7 @@ from itertools import cycle ...@@ -5,7 +5,7 @@ from itertools import cycle
from mock import patch from mock import patch
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
from celery.states import SUCCESS from celery.states import SUCCESS, RETRY
from django.test.utils import override_settings from django.test.utils import override_settings
from django.conf import settings from django.conf import settings
...@@ -258,13 +258,33 @@ class TestEmailErrors(ModuleStoreTestCase): ...@@ -258,13 +258,33 @@ class TestEmailErrors(ModuleStoreTestCase):
initialize_subtask_info(entry, "emailed", 100, [subtask_id]) initialize_subtask_info(entry, "emailed", 100, [subtask_id])
subtask_status = create_subtask_status(subtask_id) subtask_status = create_subtask_status(subtask_id)
update_subtask_status(entry_id, subtask_id, subtask_status) update_subtask_status(entry_id, subtask_id, subtask_status)
check_subtask_is_valid(entry_id, subtask_id) check_subtask_is_valid(entry_id, subtask_id, subtask_status)
bogus_email_id = 1001 bogus_email_id = 1001
to_list = ['test@test.com'] to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'} global_email_context = {'course_title': 'dummy course'}
with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'): with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status) send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
def test_send_email_retried_subtask(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
subtask_id = "subtask-id-value"
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=2)
update_subtask_status(entry_id, subtask_id, subtask_status)
bogus_email_id = 1001
to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'}
# try running with a clean subtask:
new_subtask_status = create_subtask_status(subtask_id)
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
# try again, with a retried subtask with lower count:
new_subtask_status = create_subtask_status(subtask_id, state=RETRY, retried_nomax=1)
with self.assertRaisesRegexp(DuplicateTaskException, 'already retried'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
def dont_test_send_email_undefined_email(self): def dont_test_send_email_undefined_email(self):
# test at a lower level, to ensure that the course gets checked down below too. # test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor) entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
......
...@@ -5,7 +5,7 @@ from time import time ...@@ -5,7 +5,7 @@ from time import time
import json import json
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.states import SUCCESS, READY_STATES from celery.states import SUCCESS, READY_STATES, RETRY
from django.db import transaction from django.db import transaction
from django.core.cache import cache from django.core.cache import cache
...@@ -96,6 +96,16 @@ def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, r ...@@ -96,6 +96,16 @@ def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, r
return new_result return new_result
def _get_retry_count(subtask_status):
"""
Calculate the total number of retries.
"""
count = 0
for keyname in ['retried_nomax', 'retried_withmax']:
count += subtask_status.get(keyname, 0)
return count
def initialize_subtask_info(entry, action_name, total_num, subtask_id_list): def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
""" """
Store initial subtask information to InstructorTask object. Store initial subtask information to InstructorTask object.
...@@ -187,7 +197,7 @@ def _release_subtask_lock(task_id): ...@@ -187,7 +197,7 @@ def _release_subtask_lock(task_id):
cache.delete(key) cache.delete(key)
def check_subtask_is_valid(entry_id, current_task_id): def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
""" """
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed. Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
...@@ -210,8 +220,8 @@ def check_subtask_is_valid(entry_id, current_task_id): ...@@ -210,8 +220,8 @@ def check_subtask_is_valid(entry_id, current_task_id):
# Confirm that the InstructorTask actually defines subtasks. # Confirm that the InstructorTask actually defines subtasks.
entry = InstructorTask.objects.get(pk=entry_id) entry = InstructorTask.objects.get(pk=entry_id)
if len(entry.subtasks) == 0: if len(entry.subtasks) == 0:
format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}'" format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}': rejecting task {}"
msg = format_str.format(current_task_id, entry) msg = format_str.format(current_task_id, entry, new_subtask_status)
TASK_LOG.warning(msg) TASK_LOG.warning(msg)
raise DuplicateTaskException(msg) raise DuplicateTaskException(msg)
...@@ -219,8 +229,8 @@ def check_subtask_is_valid(entry_id, current_task_id): ...@@ -219,8 +229,8 @@ def check_subtask_is_valid(entry_id, current_task_id):
subtask_dict = json.loads(entry.subtasks) subtask_dict = json.loads(entry.subtasks)
subtask_status_info = subtask_dict['status'] subtask_status_info = subtask_dict['status']
if current_task_id not in subtask_status_info: if current_task_id not in subtask_status_info:
format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}'" format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}': rejecting task {}"
msg = format_str.format(current_task_id, entry) msg = format_str.format(current_task_id, entry, new_subtask_status)
TASK_LOG.warning(msg) TASK_LOG.warning(msg)
raise DuplicateTaskException(msg) raise DuplicateTaskException(msg)
...@@ -229,11 +239,24 @@ def check_subtask_is_valid(entry_id, current_task_id): ...@@ -229,11 +239,24 @@ def check_subtask_is_valid(entry_id, current_task_id):
subtask_status = subtask_status_info[current_task_id] subtask_status = subtask_status_info[current_task_id]
subtask_state = subtask_status.get('state') subtask_state = subtask_status.get('state')
if subtask_state in READY_STATES: if subtask_state in READY_STATES:
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}'" format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}"
msg = format_str.format(current_task_id, subtask_status, entry) msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
TASK_LOG.warning(msg) TASK_LOG.warning(msg)
raise DuplicateTaskException(msg) raise DuplicateTaskException(msg)
# Confirm that the InstructorTask doesn't think that this subtask is already being
# retried by another task.
if subtask_state == RETRY:
# Check to see if the input number of retries is less than the recorded number.
# If so, then this is an earlier version of the task, and a duplicate.
new_retry_count = _get_retry_count(new_subtask_status)
current_retry_count = _get_retry_count(subtask_status)
if new_retry_count < current_retry_count:
format_str = "Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}"
msg = format_str.format(current_task_id, subtask_status, entry, new_subtask_status)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
# Now we are ready to start working on this. Try to lock it. # Now we are ready to start working on this. Try to lock it.
# If it fails, then it means that another worker is already in the # If it fails, then it means that another worker is already in the
# middle of working on this. # middle of working on this.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment