Commit c01fa459 by Brian Wilson

Factor out subtask-specific code into subtasks.py.

parent d171dc3e
......@@ -22,7 +22,6 @@ from django.contrib.auth.models import User, Group
from django.core.mail import EmailMultiAlternatives, get_connection
from django.http import Http404
from django.core.urlresolvers import reverse
from django.db import transaction
from bulk_email.models import (
CourseEmail, Optout, CourseEmailTemplate,
......@@ -30,12 +29,16 @@ from bulk_email.models import (
)
from courseware.access import _course_staff_group_name, _course_instructor_group_name
from courseware.courses import get_course_by_id, course_image_url
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
from instructor_task.models import InstructorTask
from instructor_task.subtasks import (
update_subtask_result, update_subtask_status, create_subtask_result,
update_instructor_task_for_subtasks
)
log = get_task_logger(__name__)
def get_recipient_queryset(user_id, to_option, course_id, course_location):
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
"""
Generates a query set corresponding to the requested category.
......@@ -65,7 +68,7 @@ def get_recipient_queryset(user_id, to_option, course_id, course_location):
return recipient_qset
def get_course_email_context(course):
def _get_course_email_context(course):
"""
Returns context arguments to apply to all emails, independent of recipient.
"""
......@@ -125,27 +128,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
log.exception("get_course_by_id failed: %s", exc.args[0])
raise Exception("get_course_by_id failed: " + exc.args[0])
global_email_context = get_course_email_context(course)
recipient_qset = get_recipient_queryset(user_id, to_option, course_id, course.location)
global_email_context = _get_course_email_context(course)
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
total_num_emails = recipient_qset.count()
log.info("Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s",
total_num_emails, course_id, email_id, to_option)
# At this point, we have some status that we can report, as to the magnitude of the overall
# task. That is, we know the total. Set that, and our subtasks should work towards that goal.
# Note that we add start_time in here, so that it can be used
# by subtasks to calculate duration_ms values:
progress = {'action_name': action_name,
'attempted': 0,
'failed': 0,
'skipped': 0,
'succeeded': 0,
'total': total_num_emails,
'duration_ms': int(0),
'start_time': time(),
}
num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_workers = 0
......@@ -166,7 +155,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
subtask_progress = update_subtask_result(None, 0, 0, 0)
subtask_progress = create_subtask_result()
task_list.append(send_course_email.subtask((
entry_id,
email_id,
......@@ -177,24 +166,9 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
))
num_workers += num_tasks_this_query
# Before we actually start running the tasks we've defined,
# the InstructorTask needs to be updated with their information.
# So we update the InstructorTask object here, not in the return.
# The monitoring code knows that it shouldn't go to the InstructorTask's task's
# Result for its progress when there are subtasks. So we accumulate
# the results of each subtask as it completes into the InstructorTask.
entry.task_output = InstructorTask.create_output_for_success(progress)
entry.task_state = PROGRESS
# now write out the subtasks information.
# Update the InstructorTask with information about the subtasks we've defined.
progress = update_instructor_task_for_subtasks(entry, action_name, total_num_emails, subtask_id_list)
num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status}
entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work:
entry.save_now()
log.info("Preparing to queue %d email tasks for course %s, email %s, to %s",
num_subtasks, course_id, email_id, to_option)
......@@ -215,62 +189,6 @@ def _get_current_task():
return current_task
@transaction.commit_manually
def _update_subtask_status(entry_id, current_task_id, status, subtask_result):
"""
Update the status of the subtask in the parent InstructorTask object tracking its progress.
"""
log.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
try:
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
subtask_dict = json.loads(entry.subtasks)
subtask_status = subtask_dict['status']
if current_task_id not in subtask_status:
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
log.warning(msg)
raise ValueError(msg)
subtask_status[current_task_id] = status
# Update the parent task progress
task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000)
if subtask_result is not None:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_result[statname]
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict.
if status == SUCCESS:
subtask_dict['succeeded'] += 1
else:
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that:
if num_remaining <= 0:
entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict)
entry.task_output = InstructorTask.create_output_for_success(task_progress)
log.info("Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug once working
log.info("about to save....")
entry.save()
except:
log.exception("Unexpected error while updating InstructorTask.")
transaction.rollback()
else:
# TODO: temporary -- switch to debug once working
log.info("about to commit....")
transaction.commit()
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_progress):
"""
......@@ -307,10 +225,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
)
if send_exception is None:
# Update the InstructorTask object that is storing its progress.
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result_value)
else:
log.error("background task (%s) failed: %s", current_task_id, send_exception)
_update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value)
update_subtask_status(entry_id, current_task_id, FAILURE, course_email_result_value)
raise send_exception
except Exception:
......@@ -318,7 +236,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
_, exception, traceback = exc_info()
traceback_string = format_exc(traceback) if traceback is not None else ''
log.error("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
_update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
update_subtask_status(entry_id, current_task_id, FAILURE, subtask_progress)
raise
return course_email_result_value
......@@ -462,6 +380,9 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
exc=exc,
countdown=(2 ** retry_index) * 15
)
# TODO: what happens if there are no more retries, because the maximum has been reached?
# Assume that this then just results in the "exc" being raised directly, which means that the
# subtask status is not going to get updated correctly.
except Exception as exc:
# If we have a general exception for this request, we need to figure out what to do with it.
......@@ -479,18 +400,6 @@ def _send_course_email(task_id, email_id, to_list, global_email_context, subtask
return update_subtask_result(subtask_progress, num_sent, num_error, num_optout), None
def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
# add in any previous results:
if previous_result is not None:
for keyname in current_result:
if keyname in previous_result:
current_result[keyname] += previous_result[keyname]
return current_result
def _statsd_tag(course_title):
"""
Calculate the tag we will use for DataDog.
......
......@@ -15,6 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from bulk_email.models import Optout
from instructor_task.subtasks import update_subtask_result
STAFF_COUNT = 3
STUDENT_COUNT = 10
......@@ -33,7 +34,7 @@ class MockCourseEmailResult(object):
def mock_update_subtask_result(prev_results, sent, failed, output, **kwargs): # pylint: disable=W0613
"""Increments count of number of emails sent."""
self.emails_sent += sent
return True
return update_subtask_result(prev_results, sent, failed, output)
return mock_update_subtask_result
......
"""
This module contains celery task functions for handling the management of subtasks.
"""
from time import time
import json
from celery.utils.log import get_task_logger
from celery.states import SUCCESS
from django.db import transaction
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
log = get_task_logger(__name__)
def update_subtask_result(previous_result, new_num_sent, new_num_error, new_num_optout):
"""Return the result of course_email sending as a dict (not a string)."""
attempted = new_num_sent + new_num_error
current_result = {'attempted': attempted, 'succeeded': new_num_sent, 'skipped': new_num_optout, 'failed': new_num_error}
# add in any previous results:
if previous_result is not None:
for keyname in current_result:
if keyname in previous_result:
current_result[keyname] += previous_result[keyname]
return current_result
def create_subtask_result():
return update_subtask_result(None, 0, 0, 0)
def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_id_list):
"""
Store initial subtask information to InstructorTask object.
# Before we actually start running the tasks we've defined,
# the InstructorTask needs to be updated with their information.
# So we update the InstructorTask object here, not in the return.
# The monitoring code knows that it shouldn't go to the InstructorTask's task's
# Result for its progress when there are subtasks. So we accumulate
# the results of each subtask as it completes into the InstructorTask.
# At this point, we have some status that we can report, as to the magnitude of the overall
# task. That is, we know the total. Set that, and our subtasks should work towards that goal.
# Note that we add start_time in here, so that it can be used
# by subtasks to calculate duration_ms values:
"""
progress = {
'action_name': action_name,
'attempted': 0,
'failed': 0,
'skipped': 0,
'succeeded': 0,
'total': total_num,
'duration_ms': int(0),
'start_time': time()
}
entry.task_output = InstructorTask.create_output_for_success(progress)
entry.task_state = PROGRESS
# Write out the subtasks information.
num_subtasks = len(subtask_id_list)
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status}
entry.subtasks = json.dumps(subtask_dict)
# and save the entry immediately, before any subtasks actually start work:
entry.save_now()
return progress
@transaction.commit_manually
def update_subtask_status(entry_id, current_task_id, status, subtask_result):
"""
Update the status of the subtask in the parent InstructorTask object tracking its progress.
"""
log.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
current_task_id, entry_id, subtask_result)
try:
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
subtask_dict = json.loads(entry.subtasks)
subtask_status = subtask_dict['status']
if current_task_id not in subtask_status:
# unexpected error -- raise an exception
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
msg = format_str.format(current_task_id, entry_id)
log.warning(msg)
raise ValueError(msg)
subtask_status[current_task_id] = status
# Update the parent task progress
task_progress = json.loads(entry.task_output)
start_time = task_progress['start_time']
task_progress['duration_ms'] = int((time() - start_time) * 1000)
if subtask_result is not None:
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
task_progress[statname] += subtask_result[statname]
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
# entire subtask_status dict.
if status == SUCCESS:
subtask_dict['succeeded'] += 1
else:
subtask_dict['failed'] += 1
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
# If we're done with the last task, update the parent status to indicate that:
if num_remaining <= 0:
entry.task_state = SUCCESS
entry.subtasks = json.dumps(subtask_dict)
entry.task_output = InstructorTask.create_output_for_success(task_progress)
log.info("Task output updated to %s for email subtask %s of instructor task %d",
entry.task_output, current_task_id, entry_id)
# TODO: temporary -- switch to debug once working
log.info("about to save....")
entry.save()
except:
log.exception("Unexpected error while updating InstructorTask.")
transaction.rollback()
else:
# TODO: temporary -- switch to debug once working
log.info("about to commit....")
transaction.commit()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment