Commit a7b5663c by Brian Wilson

Add cache-based locking to subtasks to ensure that the same task is not running…

Add cache-based locking to subtasks to ensure that the same task is not running in two workers at the same time.
parent 2bb8bed2
......@@ -24,6 +24,7 @@ from instructor_task.models import InstructorTask
from instructor_task.subtasks import (
create_subtask_status,
initialize_subtask_info,
check_subtask_is_valid,
update_subtask_status,
DuplicateTaskException,
)
......@@ -217,7 +218,7 @@ class TestEmailErrors(ModuleStoreTestCase):
subtask_id = "subtask-id-value"
subtask_status = create_subtask_status(subtask_id)
email_id = 1001
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find email subtasks of instructor task'):
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find subtasks of instructor task'):
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
def test_send_email_missing_subtask(self):
......@@ -231,7 +232,7 @@ class TestEmailErrors(ModuleStoreTestCase):
different_subtask_id = "bogus-subtask-id-value"
subtask_status = create_subtask_status(different_subtask_id)
bogus_email_id = 1001
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for email subtask of instructor task'):
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for subtask of instructor task'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
def test_send_email_completed_subtask(self):
......@@ -249,6 +250,21 @@ class TestEmailErrors(ModuleStoreTestCase):
with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
def test_send_email_running_subtask(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
subtask_id = "subtask-id-value"
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
subtask_status = create_subtask_status(subtask_id)
update_subtask_status(entry_id, subtask_id, subtask_status)
check_subtask_is_valid(entry_id, subtask_id)
bogus_email_id = 1001
to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'}
with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
def dont_test_send_email_undefined_email(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
......
......@@ -8,11 +8,15 @@ from celery.utils.log import get_task_logger
from celery.states import SUCCESS, READY_STATES
from django.db import transaction
from django.core.cache import cache
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG = get_task_logger(__name__)
# Lock expiration should be long enough to allow a send_course_email task to complete.
SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
class DuplicateTaskException(Exception):
"""Exception indicating that a task already exists or has already completed."""
......@@ -152,6 +156,37 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return task_progress
def _acquire_subtask_lock(task_id):
"""
Mark the specified task_id as being in progress.
This is used to make sure that the same task is not worked on by more than one worker
at the same time. This can occur when tasks are requeued by Celery in response to
loss of connection to the task broker. Most of the time, such duplicate tasks are
run sequentially, but they can overlap in processing as well.
Returns true if the task_id was not already locked; false if it was.
"""
# cache.add fails if the key already exists
key = "subtask-{}".format(task_id)
succeeded = cache.add(key, 'true', SUBTASK_LOCK_EXPIRE)
if not succeeded:
TASK_LOG.warning("task_id '%s': already locked. Contains value '%s'", task_id, cache.get(key))
return succeeded
def _release_subtask_lock(task_id):
"""
Unmark the specified task_id as being no longer in progress.
This is most important to permit a task to be retried.
"""
# According to Celery task cookbook, "Memcache delete is very slow, but we have
# to use it to take advantage of using add() for atomic locking."
key = "subtask-{}".format(task_id)
cache.delete(key)
def check_subtask_is_valid(entry_id, current_task_id):
"""
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
......@@ -166,14 +201,16 @@ def check_subtask_is_valid(entry_id, current_task_id):
If a subtask gets requeued, then the first time the subtask runs it should run fine to completion.
However, we want to prevent it from running again, so we check here to see what the existing
subtask's status is. If it is complete, we return an exception.
subtask's status is. If it is complete, we raise an exception. We also take a lock on the task,
so that we can detect if another worker has started work but has not yet completed that work.
The other worker is allowed to finish, and this raises an exception.
Raises a DuplicateTaskException exception if it's not a task that should be run.
"""
# Confirm that the InstructorTask actually defines subtasks.
entry = InstructorTask.objects.get(pk=entry_id)
if len(entry.subtasks) == 0:
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
......@@ -182,7 +219,7 @@ def check_subtask_is_valid(entry_id, current_task_id):
subtask_dict = json.loads(entry.subtasks)
subtask_status_info = subtask_dict['status']
if current_task_id not in subtask_status_info:
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
......@@ -192,11 +229,20 @@ def check_subtask_is_valid(entry_id, current_task_id):
subtask_status = subtask_status_info[current_task_id]
subtask_state = subtask_status.get('state')
if subtask_state in READY_STATES:
format_str = "Unexpected task_id '{}': already completed - status {} for email subtask of instructor task '{}'"
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}'"
msg = format_str.format(current_task_id, subtask_status, entry)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
# Now we are ready to start working on this. Try to lock it.
# If it fails, then it means that another worker is already in the
# middle of working on this.
if not _acquire_subtask_lock(current_task_id):
format_str = "Unexpected task_id '{}': already being executed - for subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise DuplicateTaskException(msg)
@transaction.commit_manually
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
......@@ -291,3 +337,5 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
else:
TASK_LOG.debug("about to commit....")
transaction.commit()
finally:
_release_subtask_lock(current_task_id)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment