Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
5b48ed84
Commit
5b48ed84
authored
Oct 27, 2013
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refactor subtask creation logic to be less email-specific.
parent
0f8f82c8
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
111 additions
and
66 deletions
+111
-66
CHANGELOG.rst
+3
-0
lms/djangoapps/bulk_email/tasks.py
+32
-53
lms/djangoapps/instructor_task/subtasks.py
+76
-13
No files found.
CHANGELOG.rst
View file @
5b48ed84
...
...
@@ -5,6 +5,9 @@ These are notable changes in edx-platform. This is a rolling list of changes,
in roughly chronological order, most recent first. Add your entries at or near
the top. Include a label indicating the component affected.
LMS: Change bulk email implementation to use less memory, and to better handle
duplicate tasks in celery.
LMS: Improve forum error handling so that errors in the logs are
clearer and HTTP status codes from the comments service indicating
client error are correctly passed through to the client.
...
...
lms/djangoapps/bulk_email/tasks.py
View file @
5b48ed84
...
...
@@ -40,12 +40,10 @@ from courseware.access import _course_staff_group_name, _course_instructor_group
from
courseware.courses
import
get_course
,
course_image_url
from
instructor_task.models
import
InstructorTask
from
instructor_task.subtasks
import
(
create_subtask_ids
,
generate_items_for_subtask
,
SubtaskStatus
,
update_subtask_status
,
initialize_subtask_info
,
queue_subtasks_for_query
,
check_subtask_is_valid
,
update_subtask_status
,
)
log
=
get_task_logger
(
__name__
)
...
...
@@ -154,10 +152,8 @@ def _get_course_email_context(course):
def
perform_delegate_email_batches
(
entry_id
,
course_id
,
task_input
,
action_name
):
"""
Delegates emails by querying for the list of recipients who should
get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size,
and queueing up worker jobs.
Returns the number of batches (workers) kicked off.
get the mail, chopping up into batches of no more than settings.BULK_EMAIL_EMAILS_PER_TASK
in size, and queueing up worker jobs.
"""
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
# Get inputs to use in this task from the entry.
...
...
@@ -208,55 +204,37 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
to_option
=
email_obj
.
to_option
global_email_context
=
_get_course_email_context
(
course
)
# Figure out the number of needed subtasks, getting id values to use for each.
recipient_qset
=
_get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course
.
location
)
total_num_emails
=
recipient_qset
.
count
()
subtask_id_list
=
create_subtask_ids
(
total_num_emails
,
settings
.
BULK_EMAIL_EMAILS_PER_QUERY
,
settings
.
BULK_EMAIL_EMAILS_PER_TASK
)
# Update the InstructorTask with information about the subtasks we've defined.
log
.
info
(
"Task
%
s: Preparing to update task for sending
%
d emails for course
%
s, email
%
s, to_option
%
s"
,
task_id
,
total_num_emails
,
course_id
,
email_id
,
to_option
)
progress
=
initialize_subtask_info
(
entry
,
action_name
,
total_num_emails
,
subtask_id_list
)
# Construct a generator that will return the recipients to use for each subtask.
# Pass in the desired fields to fetch for each recipient.
recipient_fields
=
[
'profile__name'
,
'email'
]
recipient_generator
=
generate_items_for_subtask
(
recipient_qset
,
recipient_fields
,
total_num_emails
,
settings
.
BULK_EMAIL_EMAILS_PER_QUERY
,
settings
.
BULK_EMAIL_EMAILS_PER_TASK
,
)
# Now create the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
num_subtasks
=
len
(
subtask_id_list
)
log
.
info
(
"Task
%
s: Preparing to generate and queue
%
s subtasks for course
%
s, email
%
s, to_option
%
s"
,
task_id
,
num_subtasks
,
course_id
,
email_id
,
to_option
)
num_subtasks
=
0
for
recipient_list
in
recipient_generator
:
subtask_id
=
subtask_id_list
[
num_subtasks
]
num_subtasks
+=
1
subtask_status_dict
=
SubtaskStatus
.
create
(
subtask_id
)
.
to_dict
()
def
_create_send_email_subtask
(
to_list
,
initial_subtask_status
):
"""Creates a subtask to send email to a given recipient list."""
subtask_id
=
initial_subtask_status
.
task_id
new_subtask
=
send_course_email
.
subtask
(
(
entry_id
,
email_id
,
recipient
_list
,
to
_list
,
global_email_context
,
subtask_status_dict
,
initial_subtask_status
.
to_dict
()
,
),
task_id
=
subtask_id
,
routing_key
=
settings
.
BULK_EMAIL_ROUTING_KEY
,
)
new_subtask
.
apply_async
()
return
new_subtask
recipient_qset
=
_get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course
.
location
)
recipient_fields
=
[
'profile__name'
,
'email'
]
# Sanity check: we expect the subtask to be properly summing to the original count:
if
num_subtasks
!=
len
(
subtask_id_list
):
error_msg
=
"Task {}: number of tasks generated {} not equal to original total {}"
.
format
(
task_id
,
num_subtasks
,
len
(
subtask_id_list
))
log
.
error
(
error_msg
)
raise
ValueError
(
error_msg
)
log
.
info
(
"Task
%
s: Preparing to queue subtasks for sending emails for course
%
s, email
%
s, to_option
%
s"
,
task_id
,
course_id
,
email_id
,
to_option
)
progress
=
queue_subtasks_for_query
(
entry
,
action_name
,
_create_send_email_subtask
,
recipient_qset
,
recipient_fields
,
settings
.
BULK_EMAIL_EMAILS_PER_QUERY
,
settings
.
BULK_EMAIL_EMAILS_PER_TASK
)
# We want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value.
...
...
@@ -332,7 +310,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
)
except
Exception
:
# Unexpected exception. Try to write out the failure to the entry before failing.
log
.
exception
(
"Send-email task
%
s
: failed unexpectedly!"
,
current_task
_id
)
log
.
exception
(
"Send-email task
%
s
for email
%
s: failed unexpectedly!"
,
current_task_id
,
email
_id
)
# We got here for really unexpected reasons. Since we don't know how far
# the task got in emailing, we count all recipients as having failed.
# It at least keeps the counts consistent.
...
...
@@ -342,22 +320,23 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
if
send_exception
is
None
:
# Update the InstructorTask object that is storing its progress.
log
.
info
(
"Send-email task
%
s
: succeeded"
,
current_task
_id
)
log
.
info
(
"Send-email task
%
s
for email
%
s: succeeded"
,
current_task_id
,
email
_id
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
elif
isinstance
(
send_exception
,
RetryTaskError
):
# If retrying, a RetryTaskError needs to be returned to Celery.
# We assume that the the progress made before the retry condition
# was encountered has already been updated before the retry call was made,
# so we only log here.
log
.
warning
(
"Send-email task
%
s
: being retried"
,
current_task
_id
)
log
.
warning
(
"Send-email task
%
s
for email
%
s: being retried"
,
current_task_id
,
email
_id
)
raise
send_exception
# pylint: disable=E0702
else
:
log
.
error
(
"Send-email task
%
s
: failed:
%
s"
,
current_task
_id
,
send_exception
)
log
.
error
(
"Send-email task
%
s
for email
%
s: failed:
%
s"
,
current_task_id
,
email
_id
,
send_exception
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
raise
send_exception
# pylint: disable=E0702
log
.
info
(
"Send-email task
%
s: returning status
%
s"
,
current_task_id
,
new_subtask_status
)
return
new_subtask_status
# return status in a form that can be serialized by Celery into JSON:
log
.
info
(
"Send-email task
%
s for email
%
s: returning status
%
s"
,
current_task_id
,
email_id
,
new_subtask_status
)
return
new_subtask_status
.
to_dict
()
def
_filter_optouts_from_recipients
(
to_list
,
course_id
):
...
...
lms/djangoapps/instructor_task/subtasks.py
View file @
5b48ed84
...
...
@@ -16,7 +16,7 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
TASK_LOG
=
get_task_logger
(
__name__
)
# Lock expiration should be long enough to allow a s
end_course_email
task to complete.
# Lock expiration should be long enough to allow a s
ub
task to complete.
SUBTASK_LOCK_EXPIRE
=
60
*
10
# Lock expires in 10 minutes
...
...
@@ -25,9 +25,9 @@ class DuplicateTaskException(Exception):
pass
def
create_subtask_id
s
(
total_num_items
,
items_per_query
,
items_per_task
):
def
_get_number_of_subtask
s
(
total_num_items
,
items_per_query
,
items_per_task
):
"""
Determines number of subtasks that
need to be generated, and provides a list of id values to use
.
Determines number of subtasks that
would be generated by _generate_items_for_subtask
.
This needs to be calculated before a query is executed so that the list of all subtasks can be
stored in the InstructorTask before any subtasks are started.
...
...
@@ -44,11 +44,10 @@ def create_subtask_ids(total_num_items, items_per_query, items_per_task):
num_tasks_this_query
=
int
(
math
.
ceil
(
float
(
num_items_this_query
)
/
float
(
items_per_task
)))
total_num_tasks
+=
num_tasks_this_query
# Now that the number of tasks is known, return a list of ids for each task.
return
[
str
(
uuid4
())
for
_
in
range
(
total_num_tasks
)]
return
total_num_tasks
def
generate_items_for_subtask
(
item_queryset
,
item_fields
,
total_num_items
,
items_per_query
,
items_per_task
):
def
_
generate_items_for_subtask
(
item_queryset
,
item_fields
,
total_num_items
,
items_per_query
,
items_per_task
):
"""
Generates a chunk of "items" that should be passed into a subtask.
...
...
@@ -62,6 +61,7 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item
Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.
Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.
"""
num_queries
=
int
(
math
.
ceil
(
float
(
total_num_items
)
/
float
(
items_per_query
)))
last_pk
=
item_queryset
[
0
]
.
pk
-
1
...
...
@@ -82,7 +82,7 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item
# Sanity check: we expect the chunking to be properly summing to the original count:
if
num_items_queued
!=
total_num_items
:
error_msg
=
"
Task {}: n
umber of items generated by chunking {} not equal to original total {}"
.
format
(
num_items_queued
,
total_num_items
)
error_msg
=
"
N
umber of items generated by chunking {} not equal to original total {}"
.
format
(
num_items_queued
,
total_num_items
)
TASK_LOG
.
error
(
error_msg
)
raise
ValueError
(
error_msg
)
...
...
@@ -105,7 +105,7 @@ class SubtaskStatus(object):
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Object is not JSON-serializable, so to_dict and from_dict methods are provided so that
it can be passed as a serializable argument to tasks.
it can be passed as a serializable argument to tasks
(and be reconstituted within such tasks)
.
In future, we may want to include specific error information
indicating the reason for failure.
...
...
@@ -137,8 +137,7 @@ class SubtaskStatus(object):
@classmethod
def
create
(
self
,
task_id
,
**
options
):
"""Construct a SubtaskStatus object."""
newobj
=
self
(
task_id
,
**
options
)
return
newobj
return
self
(
task_id
,
**
options
)
def
to_dict
(
self
):
"""
...
...
@@ -238,6 +237,70 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
return
task_progress
def
queue_subtasks_for_query
(
entry
,
action_name
,
create_subtask_fcn
,
item_queryset
,
item_fields
,
items_per_query
,
items_per_task
):
"""
Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.
Arguments:
`entry` : the InstructorTask object for which subtasks are being queued.
`action_name` : a past-tense verb that can be used for constructing readable status messages.
`create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.
Arguments are the list of items to be processed by this subtask, and a SubtaskStatus
object reflecting initial status (and containing the subtask's id).
`item_queryset` : a query set that defines the "items" that should be passed to subtasks.
`item_fields` : the fields that should be included in the dict that is returned.
These are in addition to the 'pk' field.
`items_per_query` : size of chunks to break the query operation into.
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
Returns: the task progress as stored in the InstructorTask object.
"""
task_id
=
entry
.
task_id
total_num_items
=
item_queryset
.
count
()
# Calculate the number of tasks that will be created, and create a list of ids for each task.
total_num_subtasks
=
_get_number_of_subtasks
(
total_num_items
,
items_per_query
,
items_per_task
)
subtask_id_list
=
[
str
(
uuid4
())
for
_
in
range
(
total_num_subtasks
)]
# Update the InstructorTask with information about the subtasks we've defined.
TASK_LOG
.
info
(
"Task
%
s: updating InstructorTask
%
s with subtask info for
%
s subtasks to process
%
s items."
,
task_id
,
entry
.
id
,
total_num_subtasks
,
total_num_items
)
# pylint: disable=E1101
progress
=
initialize_subtask_info
(
entry
,
action_name
,
total_num_items
,
subtask_id_list
)
# Construct a generator that will return the recipients to use for each subtask.
# Pass in the desired fields to fetch for each recipient.
item_generator
=
_generate_items_for_subtask
(
item_queryset
,
item_fields
,
total_num_items
,
items_per_query
,
items_per_task
)
# Now create the subtasks, and start them running.
TASK_LOG
.
info
(
"Task
%
s: creating
%
s subtasks to process
%
s items."
,
task_id
,
total_num_subtasks
,
total_num_items
)
num_subtasks
=
0
for
item_list
in
item_generator
:
subtask_id
=
subtask_id_list
[
num_subtasks
]
num_subtasks
+=
1
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
new_subtask
=
create_subtask_fcn
(
item_list
,
subtask_status
)
new_subtask
.
apply_async
()
# Sanity check: we expect the subtask to be properly summing to the original count:
if
num_subtasks
!=
len
(
subtask_id_list
):
task_id
=
entry
.
task_id
error_fmt
=
"Task {}: number of tasks generated {} not equal to original total {}"
error_msg
=
error_fmt
.
format
(
task_id
,
num_subtasks
,
len
(
subtask_id_list
))
TASK_LOG
.
error
(
error_msg
)
raise
ValueError
(
error_msg
)
# Return the task progress as stored in the InstructorTask object.
return
progress
def
_acquire_subtask_lock
(
task_id
):
"""
Mark the specified task_id as being in progress.
...
...
@@ -370,7 +433,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
is the value of the SubtaskStatus.to_dict(), but could be expanded in future to store information
about failure messages, progress made, etc.
"""
TASK_LOG
.
info
(
"Preparing to update status for
email
subtask
%
s for instructor task
%
d with status
%
s"
,
TASK_LOG
.
info
(
"Preparing to update status for subtask
%
s for instructor task
%
d with status
%
s"
,
current_task_id
,
entry_id
,
new_subtask_status
)
try
:
...
...
@@ -379,7 +442,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
subtask_status_info
=
subtask_dict
[
'status'
]
if
current_task_id
not
in
subtask_status_info
:
# unexpected error -- raise an exception
format_str
=
"Unexpected task_id '{}': unable to update status for
email
subtask of instructor task '{}'"
format_str
=
"Unexpected task_id '{}': unable to update status for subtask of instructor task '{}'"
msg
=
format_str
.
format
(
current_task_id
,
entry_id
)
TASK_LOG
.
warning
(
msg
)
raise
ValueError
(
msg
)
...
...
@@ -424,7 +487,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
entry
.
subtasks
=
json
.
dumps
(
subtask_dict
)
entry
.
task_output
=
InstructorTask
.
create_output_for_success
(
task_progress
)
TASK_LOG
.
info
(
"Task output updated to
%
s for
email
subtask
%
s of instructor task
%
d"
,
TASK_LOG
.
info
(
"Task output updated to
%
s for subtask
%
s of instructor task
%
d"
,
entry
.
task_output
,
current_task_id
,
entry_id
)
TASK_LOG
.
debug
(
"about to save...."
)
entry
.
save
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment