Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
c787a8f5
Commit
c787a8f5
authored
Oct 04, 2013
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add more task-level tests for retries and other errors. Respond to initial comments.
parent
e052dde4
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
242 additions
and
89 deletions
+242
-89
lms/djangoapps/bulk_email/tasks.py
+87
-57
lms/djangoapps/bulk_email/tests/test_tasks.py
+155
-32
No files found.
lms/djangoapps/bulk_email/tasks.py
View file @
c787a8f5
...
@@ -56,13 +56,16 @@ SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressErro
...
@@ -56,13 +56,16 @@ SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressErro
# Exceptions that, if caught, should cause the task to be re-tried.
# Exceptions that, if caught, should cause the task to be re-tried.
# These errors will be caught a limited number of times before the task fails.
# These errors will be caught a limited number of times before the task fails.
LIMITED_RETRY_ERRORS
=
(
SMTP
DataError
,
SMTP
ConnectError
,
SMTPServerDisconnected
,
AWSConnectionError
)
LIMITED_RETRY_ERRORS
=
(
SMTPConnectError
,
SMTPServerDisconnected
,
AWSConnectionError
)
# Errors that indicate that a mailing task should be retried without limit.
# Errors that indicate that a mailing task should be retried without limit.
# An example is if email is being sent too quickly, but may succeed if sent
# An example is if email is being sent too quickly, but may succeed if sent
# more slowly. When caught by a task, it triggers an exponential backoff and retry.
# more slowly. When caught by a task, it triggers an exponential backoff and retry.
# Retries happen continuously until the email is sent.
# Retries happen continuously until the email is sent.
INFINITE_RETRY_ERRORS
=
(
SESMaxSendingRateExceededError
,
)
# Note that the SMTPDataErrors here are only those within the 4xx range.
# Those not in this range (i.e. in the 5xx range) are treated as hard failures
# and thus like SINGLE_EMAIL_FAILURE_ERRORS.
INFINITE_RETRY_ERRORS
=
(
SESMaxSendingRateExceededError
,
SMTPDataError
)
# Errors that are known to indicate an inability to send any more emails,
# Errors that are known to indicate an inability to send any more emails,
# and should therefore not be retried. For example, exceeding a quota for emails.
# and should therefore not be retried. For example, exceeding a quota for emails.
...
@@ -72,9 +75,12 @@ BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException)
...
@@ -72,9 +75,12 @@ BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException)
def
_get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course_location
):
def
_get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course_location
):
"""
"""
Generates a query set corresponding to the requested
category.
Returns a query set of email recipients corresponding to the requested to_option
category.
`to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
`to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
Recipients who are in more than one category (e.g. enrolled in the course and are staff or self)
will be properly deduped.
"""
"""
if
to_option
==
SEND_TO_MYSELF
:
if
to_option
==
SEND_TO_MYSELF
:
recipient_qset
=
User
.
objects
.
filter
(
id
=
user_id
)
recipient_qset
=
User
.
objects
.
filter
(
id
=
user_id
)
...
@@ -130,11 +136,11 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
...
@@ -130,11 +136,11 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
Returns the number of batches (workers) kicked off.
Returns the number of batches (workers) kicked off.
"""
"""
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
#
get inputs to use in this task from the entry:
#
Get inputs to use in this task from the entry.
user_id
=
entry
.
requester
.
id
user_id
=
entry
.
requester
.
id
task_id
=
entry
.
task_id
task_id
=
entry
.
task_id
#
p
erfunctory check, since expansion is made for convenience of other task
#
P
erfunctory check, since expansion is made for convenience of other task
# code that doesn't need the entry_id.
# code that doesn't need the entry_id.
if
course_id
!=
entry
.
course_id
:
if
course_id
!=
entry
.
course_id
:
format_msg
=
"Course id conflict: explicit value
%
s does not match task value
%
s"
format_msg
=
"Course id conflict: explicit value
%
s does not match task value
%
s"
...
@@ -145,15 +151,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
...
@@ -145,15 +151,13 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
email_obj
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
email_obj
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
except
CourseEmail
.
DoesNotExist
as
exc
:
except
CourseEmail
.
DoesNotExist
as
exc
:
# The CourseEmail object should be committed in the view function before the task
# The CourseEmail object should be committed in the view function before the task
# is submitted and reaches this point. It is possible to add retry behavior here,
# is submitted and reaches this point.
# to keep trying until the object is actually committed by the view function's return,
# but it's cleaner to just expect to be done.
log
.
warning
(
"Task
%
s: Failed to get CourseEmail with id
%
s"
,
task_id
,
email_id
)
log
.
warning
(
"Task
%
s: Failed to get CourseEmail with id
%
s"
,
task_id
,
email_id
)
raise
raise
to_option
=
email_obj
.
to_option
to_option
=
email_obj
.
to_option
#
sanity check that course for email_obj matches that of the task referencing it:
#
Sanity check that course for email_obj matches that of the task referencing it.
if
course_id
!=
email_obj
.
course_id
:
if
course_id
!=
email_obj
.
course_id
:
format_msg
=
"Course id conflict: explicit value
%
s does not match email value
%
s"
format_msg
=
"Course id conflict: explicit value
%
s does not match email value
%
s"
raise
ValueError
(
format_msg
.
format
(
course_id
,
email_obj
.
course_id
))
raise
ValueError
(
format_msg
.
format
(
course_id
,
email_obj
.
course_id
))
...
@@ -177,9 +181,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
...
@@ -177,9 +181,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
task_list
=
[]
task_list
=
[]
subtask_id_list
=
[]
subtask_id_list
=
[]
for
_
in
range
(
num_queries
):
for
_
in
range
(
num_queries
):
# Note that if we were doing this for regrading we probably only need 'pk', and not
# either profile__name or email. That's because we'll have to do
# a lot more work in the individual regrade for each user, but using user_id as a key.
recipient_sublist
=
list
(
recipient_qset
.
order_by
(
'pk'
)
.
filter
(
pk__gt
=
last_pk
)
recipient_sublist
=
list
(
recipient_qset
.
order_by
(
'pk'
)
.
filter
(
pk__gt
=
last_pk
)
.
values
(
'profile__name'
,
'email'
,
'pk'
)[:
settings
.
EMAILS_PER_QUERY
])
.
values
(
'profile__name'
,
'email'
,
'pk'
)[:
settings
.
EMAILS_PER_QUERY
])
last_pk
=
recipient_sublist
[
-
1
][
'pk'
]
last_pk
=
recipient_sublist
[
-
1
][
'pk'
]
...
@@ -196,7 +197,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
...
@@ -196,7 +197,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
subtask_id
=
str
(
uuid4
())
subtask_id
=
str
(
uuid4
())
subtask_id_list
.
append
(
subtask_id
)
subtask_id_list
.
append
(
subtask_id
)
subtask_status
=
create_subtask_status
(
subtask_id
)
subtask_status
=
create_subtask_status
(
subtask_id
)
# create subtask, passing args and kwargs:
# Create subtask, passing args and kwargs.
# This includes specifying the task_id to use, so we can track it.
# Specify the routing key as part of it, which is used by
# Celery to route the task request to the right worker.
new_subtask
=
send_course_email
.
subtask
(
new_subtask
=
send_course_email
.
subtask
(
(
(
entry_id
,
entry_id
,
...
@@ -225,7 +229,8 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
...
@@ -225,7 +229,8 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
log
.
info
(
"Preparing to queue
%
d email tasks (
%
d emails) for course
%
s, email
%
s, to
%
s"
,
log
.
info
(
"Preparing to queue
%
d email tasks (
%
d emails) for course
%
s, email
%
s, to
%
s"
,
num_subtasks
,
total_num_emails
,
course_id
,
email_id
,
to_option
)
num_subtasks
,
total_num_emails
,
course_id
,
email_id
,
to_option
)
# now group the subtasks, and start them running:
# Now group the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
task_group
=
group
(
task_list
)
task_group
=
group
(
task_list
)
task_group
.
apply_async
(
routing_key
=
settings
.
BULK_EMAIL_ROUTING_KEY
)
task_group
.
apply_async
(
routing_key
=
settings
.
BULK_EMAIL_ROUTING_KEY
)
...
@@ -249,10 +254,24 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
...
@@ -249,10 +254,24 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
- 'profile__name': full name of User.
- 'profile__name': full name of User.
- 'email': email address of User.
- 'email': email address of User.
- 'pk': primary key of User model.
- 'pk': primary key of User model.
* `global_email_context`: dict containing values to be used to fill in slots in email
* `global_email_context`: dict containing values that are unique for this email but the same
for all recipients of this email. This dict is to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
template. It does not include 'name' and 'email', which will be provided by the to_list.
* retry_index: counter indicating how many times this task has been retried. Set to zero
* `subtask_status` : dict containing values representing current status. Keys are:
on initial call.
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Most values will be zero on initial call, but may be different when the task is
invoked as part of a retry.
Sends to all addresses contained in to_list that are not also in the Optout table.
Sends to all addresses contained in to_list that are not also in the Optout table.
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
...
@@ -280,10 +299,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
...
@@ -280,10 +299,10 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
subtask_status
,
subtask_status
,
)
)
except
Exception
:
except
Exception
:
# Unexpected exception. Try to write out the failure to the entry before failing
# Unexpected exception. Try to write out the failure to the entry before failing
.
_
,
send_exception
,
traceback
=
exc_info
()
_
,
send_exception
,
traceback
=
exc_info
()
traceback_string
=
format_exc
(
traceback
)
if
traceback
is
not
None
else
''
traceback_string
=
format_exc
(
traceback
)
if
traceback
is
not
None
else
''
log
.
error
(
"
background task (
%
s)
failed unexpectedly:
%
s
%
s"
,
current_task_id
,
send_exception
,
traceback_string
)
log
.
error
(
"
Send-email task
%
s:
failed unexpectedly:
%
s
%
s"
,
current_task_id
,
send_exception
,
traceback_string
)
# We got here for really unexpected reasons. Since we don't know how far
# We got here for really unexpected reasons. Since we don't know how far
# the task got in emailing, we count all recipients as having failed.
# the task got in emailing, we count all recipients as having failed.
# It at least keeps the counts consistent.
# It at least keeps the counts consistent.
...
@@ -293,21 +312,21 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
...
@@ -293,21 +312,21 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
if
send_exception
is
None
:
if
send_exception
is
None
:
# Update the InstructorTask object that is storing its progress.
# Update the InstructorTask object that is storing its progress.
log
.
info
(
"
background task (
%
s)
succeeded"
,
current_task_id
)
log
.
info
(
"
Send-email task
%
s:
succeeded"
,
current_task_id
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
elif
isinstance
(
send_exception
,
RetryTaskError
):
elif
isinstance
(
send_exception
,
RetryTaskError
):
# If retrying, record the progress made before the retry condition
# If retrying, record the progress made before the retry condition
# was encountered. Once the retry is running, it will be only processing
# was encountered. Once the retry is running, it will be only processing
# what wasn't already accomplished.
# what wasn't already accomplished.
log
.
warning
(
"
background task (
%
s)
being retried"
,
current_task_id
)
log
.
warning
(
"
Send-email task
%
s:
being retried"
,
current_task_id
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
raise
send_exception
raise
send_exception
else
:
else
:
log
.
error
(
"
background task (
%
s)
failed:
%
s"
,
current_task_id
,
send_exception
)
log
.
error
(
"
Send-email task
%
s:
failed:
%
s"
,
current_task_id
,
send_exception
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
raise
send_exception
raise
send_exception
log
.
info
(
"
background task (
%
s)
returning status
%
s"
,
current_task_id
,
new_subtask_status
)
log
.
info
(
"
Send-email task
%
s:
returning status
%
s"
,
current_task_id
,
new_subtask_status
)
return
new_subtask_status
return
new_subtask_status
...
@@ -324,32 +343,37 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -324,32 +343,37 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
- 'profile__name': full name of User.
- 'profile__name': full name of User.
- 'email': email address of User.
- 'email': email address of User.
- 'pk': primary key of User model.
- 'pk': primary key of User model.
* `global_email_context`: dict containing values to be used to fill in slots in email
* `global_email_context`: dict containing values that are unique for this email but the same
for all recipients of this email. This dict is to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
template. It does not include 'name' and 'email', which will be provided by the to_list.
* `subtask_status` : dict containing values representing current status. Keys are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Sends to all addresses contained in to_list that are not also in the Optout table.
Sends to all addresses contained in to_list that are not also in the Optout table.
Emails are sent multi-part, in both plain text and html.
Emails are sent multi-part, in both plain text and html.
Returns a tuple of two values:
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
* First value is a dict which represents current progress at the end of this call. Keys are
the same as for the input subtask_status.
'attempted': number of emails attempted
'succeeded': number of emails succeeded
'skipped': number of emails skipped (due to optout)
'failed': number of emails not sent because of some failure
The dict may also contain information about retries.
* Second value is an exception returned by the innards of the method, indicating a fatal error.
* Second value is an exception returned by the innards of the method, indicating a fatal error.
In this case, the number of recipients that were not sent have already been added to the
In this case, the number of recipients that were not sent have already been added to the
'failed' count above.
'failed' count above.
"""
"""
# Get information from current task's request:
# Get information from current task's request:
#task_id = _get_current_task().request.id
#retry_index = _get_current_task().request.retries
task_id
=
subtask_status
[
'task_id'
]
task_id
=
subtask_status
[
'task_id'
]
# If this is a second attempt, then throttle the speed at which mail is sent:
# If this is a second attempt
due to rate-limits
, then throttle the speed at which mail is sent:
throttle
=
subtask_status
[
'retried_nomax'
]
>
0
throttle
=
subtask_status
[
'retried_nomax'
]
>
0
# collect stats on progress:
# collect stats on progress:
...
@@ -432,7 +456,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -432,7 +456,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
sleep
(
0.2
)
sleep
(
0.2
)
try
:
try
:
log
.
info
(
'Email with id
%
s to be sent to
%
s'
,
email_id
,
email
)
log
.
debug
(
'Email with id
%
s to be sent to
%
s'
,
email_id
,
email
)
with
dog_stats_api
.
timer
(
'course_email.single_send.time.overall'
,
tags
=
[
_statsd_tag
(
course_title
)]):
with
dog_stats_api
.
timer
(
'course_email.single_send.time.overall'
,
tags
=
[
_statsd_tag
(
course_title
)]):
connection
.
send_messages
([
email_msg
])
connection
.
send_messages
([
email_msg
])
...
@@ -440,16 +464,16 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -440,16 +464,16 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
except
SMTPDataError
as
exc
:
except
SMTPDataError
as
exc
:
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
if
exc
.
smtp_code
>=
400
and
exc
.
smtp_code
<
500
:
if
exc
.
smtp_code
>=
400
and
exc
.
smtp_code
<
500
:
# This will cause the outer handler to catch the exception and retry the entire task
# This will cause the outer handler to catch the exception and retry the entire task
.
raise
exc
raise
exc
else
:
else
:
# This will fall through and not retry the message
, since it will be popped
# This will fall through and not retry the message
.
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
.
smtp_error
)
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
.
smtp_error
)
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
num_error
+=
1
num_error
+=
1
except
SINGLE_EMAIL_FAILURE_ERRORS
as
exc
:
except
SINGLE_EMAIL_FAILURE_ERRORS
as
exc
:
# This will fall through and not retry the message
, since it will be popped
# This will fall through and not retry the message
.
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
)
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
)
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
num_error
+=
1
num_error
+=
1
...
@@ -457,7 +481,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -457,7 +481,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
else
:
else
:
dog_stats_api
.
increment
(
'course_email.sent'
,
tags
=
[
_statsd_tag
(
course_title
)])
dog_stats_api
.
increment
(
'course_email.sent'
,
tags
=
[
_statsd_tag
(
course_title
)])
log
.
info
(
'Email with id
%
s sent to
%
s'
,
email_id
,
email
)
log
.
debug
(
'Email with id
%
s sent to
%
s'
,
email_id
,
email
)
num_sent
+=
1
num_sent
+=
1
# Pop the user that was emailed off the end of the list:
# Pop the user that was emailed off the end of the list:
...
@@ -474,7 +498,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -474,7 +498,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
state
=
RETRY
state
=
RETRY
)
)
return
_submit_for_retry
(
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
True
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
skip_retry_max
=
True
)
)
except
LIMITED_RETRY_ERRORS
as
exc
:
except
LIMITED_RETRY_ERRORS
as
exc
:
...
@@ -491,18 +515,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -491,18 +515,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
state
=
RETRY
state
=
RETRY
)
)
return
_submit_for_retry
(
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
False
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
skip_retry_max
=
False
)
)
except
BULK_EMAIL_FAILURE_ERRORS
as
exc
:
except
BULK_EMAIL_FAILURE_ERRORS
as
exc
:
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
log
.
exception
(
'Task
%
s: email with id
%
d caused send_course_email task to fail with "fatal" exception. To list:
%
s'
,
num_pending
=
len
(
to_list
)
task_id
,
email_id
,
[
i
[
'email'
]
for
i
in
to_list
])
log
.
exception
(
'Task
%
s: email with id
%
d caused send_course_email task to fail with "fatal" exception.
%
d emails unsent.'
,
num_error
+=
len
(
to_list
)
task_id
,
email_id
,
num_pending
)
subtask_progress
=
increment_subtask_status
(
subtask_progress
=
increment_subtask_status
(
subtask_status
,
subtask_status
,
succeeded
=
num_sent
,
succeeded
=
num_sent
,
failed
=
num_error
,
failed
=
(
num_error
+
num_pending
)
,
skipped
=
num_optout
,
skipped
=
num_optout
,
state
=
FAILURE
state
=
FAILURE
)
)
...
@@ -525,11 +549,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -525,11 +549,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
state
=
RETRY
state
=
RETRY
)
)
return
_submit_for_retry
(
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
False
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_progress
,
skip_retry_max
=
False
)
)
else
:
else
:
# Successful completion is marked by an exception value of None
:
# Successful completion is marked by an exception value of None
.
subtask_progress
=
increment_subtask_status
(
subtask_progress
=
increment_subtask_status
(
subtask_status
,
subtask_status
,
succeeded
=
num_sent
,
succeeded
=
num_sent
,
...
@@ -539,7 +563,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
...
@@ -539,7 +563,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
)
)
return
subtask_progress
,
None
return
subtask_progress
,
None
finally
:
finally
:
#
clean up at the end
#
Clean up at the end.
connection
.
close
()
connection
.
close
()
...
@@ -548,27 +572,33 @@ def _get_current_task():
...
@@ -548,27 +572,33 @@ def _get_current_task():
return
current_task
return
current_task
def
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
current_exception
,
subtask_status
,
is_sending_rate_error
):
def
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
current_exception
,
subtask_status
,
skip_retry_max
=
False
):
"""
"""
Helper function to requeue a task for retry, using the new version of arguments provided.
Helper function to requeue a task for retry, using the new version of arguments provided.
Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
These include the `current_exception` that the task encountered that is causing the retry attempt,
These include the `current_exception` that the task encountered that is causing the retry attempt,
and the `subtask_status` that is to be returned.
and the `subtask_status` that is to be returned. A third extra argument `skip_retry_max`
indicates whether the current retry should be subject to a maximum test.
Returns a tuple of two values:
Returns a tuple of two values:
* First value is a dict which represents current progress. Keys are:
* First value is a dict which represents current progress. Keys are:
'attempted': number of emails attempted
'task_id' : id of subtask. This is used to pass task information across retries.
'succeeded': number of emails succeeded
'attempted' : number of attempts -- should equal succeeded plus failed
'skipped': number of emails skipped (due to optout)
'succeeded' : number that succeeded in processing
'failed': number of emails not sent because of some failure
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
* Second value is an exception returned by the innards of the method. If the retry was
* Second value is an exception returned by the innards of the method. If the retry was
successfully submitted, this value will be the RetryTaskError that retry() returns.
successfully submitted, this value will be the RetryTaskError that retry() returns.
Otherwise, it (ought to be) the current_exception passed in.
Otherwise, it (ought to be) the current_exception passed in.
"""
"""
# task_id = _get_current_task().request.id
task_id
=
subtask_status
[
'task_id'
]
task_id
=
subtask_status
[
'task_id'
]
log
.
info
(
"Task
%
s: Successfully sent to
%
s users; failed to send to
%
s users (and skipped
%
s users)"
,
log
.
info
(
"Task
%
s: Successfully sent to
%
s users; failed to send to
%
s users (and skipped
%
s users)"
,
task_id
,
subtask_status
[
'succeeded'
],
subtask_status
[
'failed'
],
subtask_status
[
'skipped'
])
task_id
,
subtask_status
[
'succeeded'
],
subtask_status
[
'failed'
],
subtask_status
[
'skipped'
])
...
@@ -576,7 +606,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
...
@@ -576,7 +606,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
# Calculate time until we retry this task (in seconds):
# Calculate time until we retry this task (in seconds):
max_retries
=
_get_current_task
()
.
max_retries
+
subtask_status
[
'retried_nomax'
]
max_retries
=
_get_current_task
()
.
max_retries
+
subtask_status
[
'retried_nomax'
]
base_delay
=
_get_current_task
()
.
default_retry_delay
base_delay
=
_get_current_task
()
.
default_retry_delay
if
is_sending_rate_error
:
if
skip_retry_max
:
retry_index
=
subtask_status
[
'retried_nomax'
]
retry_index
=
subtask_status
[
'retried_nomax'
]
exp
=
min
(
retry_index
,
5
)
exp
=
min
(
retry_index
,
5
)
countdown
=
((
2
**
exp
)
*
base_delay
)
*
random
.
uniform
(
.
5
,
1.25
)
countdown
=
((
2
**
exp
)
*
base_delay
)
*
random
.
uniform
(
.
5
,
1.25
)
...
...
lms/djangoapps/bulk_email/tests/test_tasks.py
View file @
c787a8f5
...
@@ -7,9 +7,17 @@ paths actually work.
...
@@ -7,9 +7,17 @@ paths actually work.
"""
"""
import
json
import
json
from
uuid
import
uuid4
from
uuid
import
uuid4
from
itertools
import
cycle
from
itertools
import
cycle
,
chain
,
repeat
from
mock
import
patch
,
Mock
from
mock
import
patch
,
Mock
from
smtplib
import
SMTPDataError
,
SMTPServerDisconnected
from
smtplib
import
SMTPServerDisconnected
,
SMTPDataError
,
SMTPConnectError
,
SMTPAuthenticationError
from
boto.ses.exceptions
import
(
SESDailyQuotaExceededError
,
SESMaxSendingRateExceededError
,
SESAddressBlacklistedError
,
SESIllegalAddressError
,
SESLocalAddressCharacterError
,
)
from
boto.exception
import
AWSConnectionError
from
celery.states
import
SUCCESS
,
FAILURE
from
celery.states
import
SUCCESS
,
FAILURE
...
@@ -17,7 +25,7 @@ from celery.states import SUCCESS, FAILURE
...
@@ -17,7 +25,7 @@ from celery.states import SUCCESS, FAILURE
from
django.conf
import
settings
from
django.conf
import
settings
from
django.core.management
import
call_command
from
django.core.management
import
call_command
from
bulk_email.models
import
CourseEmail
,
SEND_TO_ALL
from
bulk_email.models
import
CourseEmail
,
Optout
,
SEND_TO_ALL
# from instructor_task.tests.test_tasks import TestInstructorTasks
# from instructor_task.tests.test_tasks import TestInstructorTasks
from
instructor_task.tasks
import
send_bulk_course_email
from
instructor_task.tasks
import
send_bulk_course_email
...
@@ -62,7 +70,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
...
@@ -62,7 +70,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
)
)
return
instructor_task
return
instructor_task
def
_run_task_with_mock_celery
(
self
,
task_class
,
entry_id
,
task_id
,
expected_failure_message
=
None
):
def
_run_task_with_mock_celery
(
self
,
task_class
,
entry_id
,
task_id
):
"""Submit a task and mock how celery provides a current_task."""
"""Submit a task and mock how celery provides a current_task."""
self
.
current_task
=
Mock
()
self
.
current_task
=
Mock
()
self
.
current_task
.
max_retries
=
settings
.
BULK_EMAIL_MAX_RETRIES
self
.
current_task
.
max_retries
=
settings
.
BULK_EMAIL_MAX_RETRIES
...
@@ -138,39 +146,70 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
...
@@ -138,39 +146,70 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
self
.
_assert_single_subtask_status
(
entry
,
succeeded
,
failed
,
skipped
,
retried_nomax
,
retried_withmax
)
self
.
_assert_single_subtask_status
(
entry
,
succeeded
,
failed
,
skipped
,
retried_nomax
,
retried_withmax
)
def
test_successful
(
self
):
def
test_successful
(
self
):
num_students
=
settings
.
EMAILS_PER_TASK
-
1
# Select number of emails to fit into a single subtask.
self
.
_create_students
(
num_students
)
num_emails
=
settings
.
EMAILS_PER_TASK
#
w
e also send email to the instructor:
#
W
e also send email to the instructor:
num_emails
=
num_students
+
1
self
.
_create_students
(
num_emails
-
1
)
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
None
])
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
None
])
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
num_emails
)
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
num_emails
)
def
test_smtp_blacklisted_user
(
self
):
def
test_skipped
(
self
):
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
# Select number of emails to fit into a single subtask.
num_students
=
settings
.
EMAILS_PER_TASK
-
1
num_emails
=
settings
.
EMAILS_PER_TASK
self
.
_create_students
(
num_students
)
# We also send email to the instructor:
# we also send email to the instructor:
students
=
self
.
_create_students
(
num_emails
-
1
)
num_emails
=
num_students
+
1
# have every fourth student optout:
expected_skipped
=
int
((
num_emails
+
3
)
/
4.0
)
expected_succeeds
=
num_emails
-
expected_skipped
for
index
in
range
(
0
,
num_emails
,
4
):
Optout
.
objects
.
create
(
user
=
students
[
index
],
course_id
=
self
.
course
.
id
)
# mark some students as opting out
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
None
])
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
expected_succeeds
,
skipped
=
expected_skipped
)
def
_test_email_address_failures
(
self
,
exception
):
"""Test that celery handles bad address errors by failing and not retrying."""
# Select number of emails to fit into a single subtask.
num_emails
=
settings
.
EMAILS_PER_TASK
# We also send email to the instructor:
self
.
_create_students
(
num_emails
-
1
)
expected_fails
=
int
((
num_emails
+
3
)
/
4.0
)
expected_fails
=
int
((
num_emails
+
3
)
/
4.0
)
expected_succeeds
=
num_emails
-
expected_fails
expected_succeeds
=
num_emails
-
expected_fails
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
# have every fourth email fail due to blacklisting:
# have every fourth email fail due to some address failure:
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
SMTPDataError
(
554
,
"Email address is blacklisted"
),
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
exception
,
None
,
None
,
None
])
None
,
None
,
None
])
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
expected_succeeds
,
failed
=
expected_fails
)
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
expected_succeeds
,
failed
=
expected_fails
)
def
test_retry_after_limited_retry_error
(
self
):
def
test_smtp_blacklisted_user
(
self
):
# Test that celery handles connection failures by retrying.
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
num_students
=
1
self
.
_test_email_address_failures
(
SMTPDataError
(
554
,
"Email address is blacklisted"
))
self
.
_create_students
(
num_students
)
# we also send email to the instructor:
def
test_ses_blacklisted_user
(
self
):
num_emails
=
num_students
+
1
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
self
.
_test_email_address_failures
(
SESAddressBlacklistedError
(
554
,
"Email address is blacklisted"
))
def
test_ses_illegal_address
(
self
):
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
self
.
_test_email_address_failures
(
SESIllegalAddressError
(
554
,
"Email address is illegal"
))
def
test_ses_local_address_character_error
(
self
):
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
self
.
_test_email_address_failures
(
SESLocalAddressCharacterError
(
554
,
"Email address contains a bad character"
))
def
_test_retry_after_limited_retry_error
(
self
,
exception
):
"""Test that celery handles connection failures by retrying."""
# If we want the batch to succeed, we need to send fewer emails
# than the max retries, so that the max is not triggered.
num_emails
=
settings
.
BULK_EMAIL_MAX_RETRIES
# We also send email to the instructor:
self
.
_create_students
(
num_emails
-
1
)
expected_fails
=
0
expected_fails
=
0
expected_succeeds
=
num_emails
expected_succeeds
=
num_emails
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
#
have every other mail attempt fail due to disconnection:
#
Have every other mail attempt fail due to disconnection.
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
SMTPServerDisconnected
(
425
,
"Disconnecting"
)
,
None
])
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
exception
,
None
])
self
.
_test_run_with_task
(
self
.
_test_run_with_task
(
send_bulk_course_email
,
send_bulk_course_email
,
'emailed'
,
'emailed'
,
...
@@ -180,17 +219,18 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
...
@@ -180,17 +219,18 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
retried_withmax
=
num_emails
retried_withmax
=
num_emails
)
)
def
test_max_retry_limit_causes_failure
(
self
):
def
_test_max_retry_limit_causes_failure
(
self
,
exception
):
# Test that celery can hit a maximum number of retries.
"""Test that celery can hit a maximum number of retries."""
num_students
=
1
# Doesn't really matter how many recipients, since we expect
self
.
_create_students
(
num_students
)
# to fail on the first.
# we also send email to the instructor:
num_emails
=
10
num_emails
=
num_students
+
1
# We also send email to the instructor:
self
.
_create_students
(
num_emails
-
1
)
expected_fails
=
num_emails
expected_fails
=
num_emails
expected_succeeds
=
0
expected_succeeds
=
0
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
# always fail to connect, triggering repeated retries until limit is hit:
# always fail to connect, triggering repeated retries until limit is hit:
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
SMTPServerDisconnected
(
425
,
"Disconnecting"
)
])
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
exception
])
self
.
_test_run_with_task
(
self
.
_test_run_with_task
(
send_bulk_course_email
,
send_bulk_course_email
,
'emailed'
,
'emailed'
,
...
@@ -199,3 +239,86 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
...
@@ -199,3 +239,86 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
failed
=
expected_fails
,
failed
=
expected_fails
,
retried_withmax
=
(
settings
.
BULK_EMAIL_MAX_RETRIES
+
1
)
retried_withmax
=
(
settings
.
BULK_EMAIL_MAX_RETRIES
+
1
)
)
)
def
test_retry_after_smtp_disconnect
(
self
):
self
.
_test_retry_after_limited_retry_error
(
SMTPServerDisconnected
(
425
,
"Disconnecting"
))
def
test_max_retry_after_smtp_disconnect
(
self
):
self
.
_test_max_retry_limit_causes_failure
(
SMTPServerDisconnected
(
425
,
"Disconnecting"
))
def
test_retry_after_smtp_connect_error
(
self
):
self
.
_test_retry_after_limited_retry_error
(
SMTPConnectError
(
424
,
"Bad Connection"
))
def
test_max_retry_after_smtp_connect_error
(
self
):
self
.
_test_max_retry_limit_causes_failure
(
SMTPConnectError
(
424
,
"Bad Connection"
))
def
test_retry_after_aws_connect_error
(
self
):
self
.
_test_retry_after_limited_retry_error
(
AWSConnectionError
(
"Unable to provide secure connection through proxy"
))
def
test_max_retry_after_aws_connect_error
(
self
):
self
.
_test_max_retry_limit_causes_failure
(
AWSConnectionError
(
"Unable to provide secure connection through proxy"
))
def
test_retry_after_general_error
(
self
):
self
.
_test_retry_after_limited_retry_error
(
Exception
(
"This is some random exception."
))
def
test_max_retry_after_general_error
(
self
):
self
.
_test_max_retry_limit_causes_failure
(
Exception
(
"This is some random exception."
))
def
_test_retry_after_unlimited_retry_error
(
self
,
exception
):
"""Test that celery handles throttling failures by retrying."""
num_emails
=
8
# We also send email to the instructor:
self
.
_create_students
(
num_emails
-
1
)
expected_fails
=
0
expected_succeeds
=
num_emails
# Note that because celery in eager mode will call retries synchronously,
# each retry will increase the stack depth. It turns out that there is a
# maximum depth at which a RuntimeError is raised ("maximum recursion depth
# exceeded"). The maximum recursion depth is 90, so
# num_emails * expected_retries < 90.
expected_retries
=
10
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
# Cycle through N throttling errors followed by a success.
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
(
chain
(
repeat
(
exception
,
expected_retries
),
[
None
])
)
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
expected_succeeds
,
failed
=
expected_fails
,
retried_nomax
=
(
expected_retries
*
num_emails
)
)
def
test_retry_after_smtp_throttling_error
(
self
):
self
.
_test_retry_after_unlimited_retry_error
(
SMTPDataError
(
455
,
"Throttling: Sending rate exceeded"
))
def
test_retry_after_ses_throttling_error
(
self
):
self
.
_test_retry_after_unlimited_retry_error
(
SESMaxSendingRateExceededError
(
455
,
"Throttling: Sending rate exceeded"
))
def
_test_immediate_failure
(
self
,
exception
):
"""Test that celery can hit a maximum number of retries."""
# Doesn't really matter how many recipients, since we expect
# to fail on the first.
num_emails
=
10
# We also send email to the instructor:
self
.
_create_students
(
num_emails
-
1
)
expected_fails
=
num_emails
expected_succeeds
=
0
with
patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
as
get_conn
:
# always fail to connect, triggering repeated retries until limit is hit:
get_conn
.
return_value
.
send_messages
.
side_effect
=
cycle
([
exception
])
self
.
_test_run_with_task
(
send_bulk_course_email
,
'emailed'
,
num_emails
,
expected_succeeds
,
failed
=
expected_fails
,
)
def
test_failure_on_unhandled_smtp
(
self
):
self
.
_test_immediate_failure
(
SMTPAuthenticationError
(
403
,
"That password doesn't work!"
))
def
test_failure_on_ses_quota_exceeded
(
self
):
self
.
_test_immediate_failure
(
SESDailyQuotaExceededError
(
403
,
"You're done for the day!"
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment