Commit b8239068 by Brian Wilson

Check that email subtasks are known to the InstructorTask before executing.

parent 9861c935
......@@ -5,6 +5,7 @@ to a course.
import math
import re
import random
import json
from uuid import uuid4
from time import sleep
......@@ -45,6 +46,7 @@ from instructor_task.subtasks import (
create_subtask_status,
increment_subtask_status,
initialize_subtask_info,
check_subtask_is_valid,
)
log = get_task_logger(__name__)
......@@ -208,6 +210,18 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
raise
# Check to see if email batches have already been defined. This seems to
# happen sometimes when there is a loss of connection while a task is being
# queued. When this happens, the same task gets called again, and a whole
# new raft of subtasks gets queued up. We will assume that if subtasks
# have already been defined, there is no need to redefine them below.
# So we just return right away. We don't raise an exception, because we want
# the current task to be marked with whatever it had been marked with before.
if len(entry.subtasks) > 0 and len(entry.task_output) > 0:
log.warning("Task %s has already been processed for email %s! InstructorTask = %s", task_id, email_id, entry)
progress = json.loads(entry.task_output)
return progress
# Sanity check that course for email_obj matches that of the task referencing it.
if course_id != email_obj.course_id:
format_msg = "Course id conflict: explicit value {} does not match email value {}"
......@@ -300,15 +314,19 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
with status information (sends, failures, skips) and updates number of subtasks completed.
"""
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
# with it right away, but we also don't expect it to fail.
InstructorTask.objects.get(pk=entry_id)
current_task_id = subtask_status['task_id']
num_to_send = len(to_list)
log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
# Check that the requested subtask is actually known to the current InstructorTask entry.
# If this fails, it throws an exception, which should fail this subtask immediately.
# This can happen when the parent task has been run twice, and results in duplicate
# subtasks being created for the same InstructorTask entry. We hope to catch this condition
# in perform_delegate_email_batches(), but just in case we fail to do so there,
# we check here as well.
check_subtask_is_valid(entry_id, current_task_id)
send_exception = None
new_subtask_status = None
try:
......
......@@ -19,7 +19,7 @@ from student.tests.factories import UserFactory, AdminFactory, CourseEnrollmentF
from bulk_email.models import CourseEmail, SEND_TO_ALL
from bulk_email.tasks import perform_delegate_email_batches, send_course_email
from instructor_task.models import InstructorTask
from instructor_task.subtasks import create_subtask_status
from instructor_task.subtasks import create_subtask_status, initialize_subtask_info
class EmailTestException(Exception):
......@@ -201,7 +201,7 @@ class TestEmailErrors(ModuleStoreTestCase):
with self.assertRaisesRegexp(ValueError, 'does not match email value'):
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101
def test_send_email_undefined_email(self):
def test_send_email_undefined_subtask(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
......@@ -209,6 +209,33 @@ class TestEmailErrors(ModuleStoreTestCase):
global_email_context = {'course_title': 'dummy course'}
subtask_id = "subtask-id-value"
subtask_status = create_subtask_status(subtask_id)
email_id = 1001
with self.assertRaisesRegexp(ValueError, 'unable to find email subtasks of instructor task'):
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
def test_send_email_missing_subtask(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'}
subtask_id = "subtask-id-value"
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
different_subtask_id = "bogus-subtask-id-value"
subtask_status = create_subtask_status(different_subtask_id)
bogus_email_id = 1001
with self.assertRaisesRegexp(ValueError, 'unable to find status for email subtask of instructor task'):
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
def dont_test_send_email_undefined_email(self):
# test at a lower level, to ensure that the course gets checked down below too.
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
entry_id = entry.id # pylint: disable=E1101
to_list = ['test@test.com']
global_email_context = {'course_title': 'dummy course'}
subtask_id = "subtask-id-value"
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
subtask_status = create_subtask_status(subtask_id)
bogus_email_id = 1001
with self.assertRaises(CourseEmail.DoesNotExist):
# we skip the call that updates subtask status, since we've not set up the InstructorTask
......
......@@ -186,6 +186,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
self.assertGreater(status.get('duration_ms'), 0)
self.assertEquals(entry.task_state, SUCCESS)
self._assert_single_subtask_status(entry, succeeded, failed, skipped, retried_nomax, retried_withmax)
return entry
def test_successful(self):
# Select number of emails to fit into a single subtask.
......@@ -196,6 +197,23 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
get_conn.return_value.send_messages.side_effect = cycle([None])
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
def test_successful_twice(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
self._create_students(num_emails - 1)
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
get_conn.return_value.send_messages.side_effect = cycle([None])
task_entry = self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
# submit the same task a second time, and confirm that it is not run again.
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
get_conn.return_value.send_messages.side_effect = cycle([Exception("This should not happen!")])
parent_status = self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
self.assertEquals(parent_status.get('total'), num_emails)
self.assertEquals(parent_status.get('succeeded'), num_emails)
self.assertEquals(parent_status.get('failed'), 0)
def test_unactivated_user(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
......
......@@ -147,6 +147,34 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return task_progress
def check_subtask_is_valid(entry_id, current_task_id):
"""
Confirms that the current subtask is known to the InstructorTask.
This may happen if a task that spawns subtasks is called twice with
the same task_id and InstructorTask entry_id. The set of subtasks
that are recorded in the InstructorTask from the first call get clobbered
by the the second set of subtasks. So when the first set of subtasks
actually run, they won't be found in the InstructorTask.
Raises a ValueError exception if not.
"""
entry = InstructorTask.objects.get(pk=entry_id)
if len(entry.subtasks) == 0:
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise ValueError(msg)
subtask_dict = json.loads(entry.subtasks)
subtask_status_info = subtask_dict['status']
if current_task_id not in subtask_status_info:
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry)
TASK_LOG.warning(msg)
raise ValueError(msg)
@transaction.commit_manually
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment