Commit ed4b954a by Brian Wilson

Remove the use of celery.group from bulk email subtasks.

parent c4434cb1
...@@ -2,11 +2,9 @@ ...@@ -2,11 +2,9 @@
This module contains celery task functions for handling the sending of bulk email This module contains celery task functions for handling the sending of bulk email
to a course. to a course.
""" """
import math
import re import re
import random import random
import json import json
from uuid import uuid4
from time import sleep from time import sleep
from dogapi import dog_stats_api from dogapi import dog_stats_api
...@@ -24,7 +22,7 @@ from boto.ses.exceptions import ( ...@@ -24,7 +22,7 @@ from boto.ses.exceptions import (
) )
from boto.exception import AWSConnectionError from boto.exception import AWSConnectionError
from celery import task, current_task, group from celery import task, current_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.states import SUCCESS, FAILURE, RETRY from celery.states import SUCCESS, FAILURE, RETRY
from celery.exceptions import RetryTaskError from celery.exceptions import RetryTaskError
...@@ -42,9 +40,11 @@ from courseware.access import _course_staff_group_name, _course_instructor_group ...@@ -42,9 +40,11 @@ from courseware.access import _course_staff_group_name, _course_instructor_group
from courseware.courses import get_course, course_image_url from courseware.courses import get_course, course_image_url
from instructor_task.models import InstructorTask from instructor_task.models import InstructorTask
from instructor_task.subtasks import ( from instructor_task.subtasks import (
update_subtask_status, create_subtask_ids,
generate_items_for_subtask,
create_subtask_status, create_subtask_status,
increment_subtask_status, increment_subtask_status,
update_subtask_status,
initialize_subtask_info, initialize_subtask_info,
check_subtask_is_valid, check_subtask_is_valid,
) )
...@@ -152,53 +152,6 @@ def _get_course_email_context(course): ...@@ -152,53 +152,6 @@ def _get_course_email_context(course):
return email_context return email_context
def _generate_subtasks(create_subtask_fcn, recipient_qset):
"""
Generates a list of subtasks to send email to a given set of recipients.
Arguments:
`create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id
to assign to the new subtask. Returns the subtask that will send email to that
list of recipients.
`recipient_qset` : a query set that defines the recipients who should receive emails.
Returns: a tuple, containing:
* A list of subtasks that will send emails to all recipients.
* A list of subtask_ids corresponding to those subtasks.
* A count of the total number of emails being sent.
"""
total_num_emails = recipient_qset.count()
num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_emails_queued = 0
task_list = []
subtask_id_list = []
for _ in range(num_queries):
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk']
num_emails_this_query = len(recipient_sublist)
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK)))
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
new_subtask = create_subtask_fcn(to_list, subtask_id)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails)
log.error(error_msg)
raise ValueError(error_msg)
return task_list, subtask_id_list, total_num_emails
def perform_delegate_email_batches(entry_id, course_id, task_input, action_name): def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
""" """
Delegates emails by querying for the list of recipients who should Delegates emails by querying for the list of recipients who should
...@@ -252,42 +205,59 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) ...@@ -252,42 +205,59 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
log.exception("Task %s: course not found: %s", task_id, course_id) log.exception("Task %s: course not found: %s", task_id, course_id)
raise raise
# Get arguments that will be passed to every subtask.
to_option = email_obj.to_option to_option = email_obj.to_option
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
global_email_context = _get_course_email_context(course) global_email_context = _get_course_email_context(course)
def _create_send_email_subtask(to_list, subtask_id): # Figure out the number of needed subtasks, getting id values to use for each.
"""Creates a subtask to send email to a given recipient list.""" recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
total_num_emails = recipient_qset.count()
subtask_id_list = create_subtask_ids(total_num_emails, settings.BULK_EMAIL_EMAILS_PER_QUERY, settings.BULK_EMAIL_EMAILS_PER_TASK)
# Update the InstructorTask with information about the subtasks we've defined.
log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s",
task_id, total_num_emails, course_id, email_id, to_option)
progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list)
# Construct a generator that will return the recipients to use for each subtask.
# Pass in the desired fields to fetch for each recipient.
recipient_fields = ['profile__name', 'email']
recipient_generator = generate_items_for_subtask(
recipient_qset,
recipient_fields,
total_num_emails,
settings.BULK_EMAIL_EMAILS_PER_QUERY,
settings.BULK_EMAIL_EMAILS_PER_TASK,
)
# Now create the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
num_subtasks = len(subtask_id_list)
log.info("Task %s: Preparing to generate and queue %s subtasks for course %s, email %s, to_option %s",
task_id, num_subtasks, course_id, email_id, to_option)
num_subtasks = 0
for recipient_list in recipient_generator:
subtask_id = subtask_id_list[num_subtasks]
num_subtasks += 1
subtask_status = create_subtask_status(subtask_id) subtask_status = create_subtask_status(subtask_id)
new_subtask = send_course_email.subtask( new_subtask = send_course_email.subtask(
( (
entry_id, entry_id,
email_id, email_id,
to_list, recipient_list,
global_email_context, global_email_context,
subtask_status, subtask_status,
), ),
task_id=subtask_id, task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY, routing_key=settings.BULK_EMAIL_ROUTING_KEY,
) )
return new_subtask new_subtask.apply_async()
log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s",
task_id, course_id, email_id, to_option)
task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset)
# Update the InstructorTask with information about the subtasks we've defined. # Sanity check: we expect the subtask to be properly summing to the original count:
log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s", if num_subtasks != len(subtask_id_list):
task_id, total_num_emails, course_id, email_id, to_option) error_msg = "Task {}: number of tasks generated {} not equal to original total {}".format(task_id, num_subtasks, len(subtask_id_list))
progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list) log.error(error_msg)
num_subtasks = len(subtask_id_list) raise ValueError(error_msg)
# Now group the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
task_id, num_subtasks, total_num_emails, course_id, email_id, to_option)
task_group = group(task_list)
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
# We want to return progress here, as this is what will be stored in the # We want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value. # AsyncResult for the parent task as its return value.
......
...@@ -3,6 +3,8 @@ This module contains celery task functions for handling the management of subtas ...@@ -3,6 +3,8 @@ This module contains celery task functions for handling the management of subtas
""" """
from time import time from time import time
import json import json
from uuid import uuid4
import math
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.states import SUCCESS, READY_STATES, RETRY from celery.states import SUCCESS, READY_STATES, RETRY
...@@ -23,6 +25,68 @@ class DuplicateTaskException(Exception): ...@@ -23,6 +25,68 @@ class DuplicateTaskException(Exception):
pass pass
def create_subtask_ids(total_num_items, items_per_query, items_per_task):
"""
Determines number of subtasks that need to be generated, and provides a list of id values to use.
This needs to be calculated before a query is executed so that the list of all subtasks can be
stored in the InstructorTask before any subtasks are started.
The number of subtask_id values returned by this should match the number of chunks returned
by the generate_items_for_subtask generator.
"""
total_num_tasks = 0
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
num_items_remaining = total_num_items
for _ in range(num_queries):
num_items_this_query = min(num_items_remaining, items_per_query)
num_items_remaining -= num_items_this_query
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
total_num_tasks += num_tasks_this_query
# Now that the number of tasks is known, return a list of ids for each task.
return [str(uuid4()) for _ in range(total_num_tasks)]
def generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task):
"""
Generates a chunk of "items" that should be passed into a subtask.
Arguments:
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
`item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field.
`total_num_items` : the result of item_queryset.count().
`items_per_query` : size of chunks to break the query operation into.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.
"""
num_queries = int(math.ceil(float(total_num_items) / float(items_per_query)))
last_pk = item_queryset[0].pk - 1
num_items_queued = 0
all_item_fields = list(item_fields)
all_item_fields.append('pk')
for _ in range(num_queries):
item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query])
last_pk = item_sublist[-1]['pk']
num_items_this_query = len(item_sublist)
num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task)))
chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
items_for_task = item_sublist[i * chunk:i * chunk + chunk]
yield items_for_task
num_items_queued += num_items_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_items_queued != total_num_items:
error_msg = "Task {}: number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items)
TASK_LOG.error(error_msg)
raise ValueError(error_msg)
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None): def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
""" """
Create and return a dict for tracking the status of a subtask. Create and return a dict for tracking the status of a subtask.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment