Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
0f8f82c8
Commit
0f8f82c8
authored
Oct 25, 2013
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Define and use SubtaskStatus class.
parent
ed4b954a
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
145 additions
and
190 deletions
+145
-190
lms/djangoapps/bulk_email/tasks.py
+37
-86
lms/djangoapps/bulk_email/tests/test_email.py
+8
-8
lms/djangoapps/bulk_email/tests/test_err_handling.py
+23
-23
lms/djangoapps/bulk_email/tests/test_tasks.py
+4
-11
lms/djangoapps/instructor_task/subtasks.py
+73
-62
No files found.
lms/djangoapps/bulk_email/tasks.py
View file @
0f8f82c8
...
...
@@ -42,8 +42,7 @@ from instructor_task.models import InstructorTask
from
instructor_task.subtasks
import
(
create_subtask_ids
,
generate_items_for_subtask
,
create_subtask_status
,
increment_subtask_status
,
SubtaskStatus
,
update_subtask_status
,
initialize_subtask_info
,
check_subtask_is_valid
,
...
...
@@ -239,14 +238,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
for
recipient_list
in
recipient_generator
:
subtask_id
=
subtask_id_list
[
num_subtasks
]
num_subtasks
+=
1
subtask_status
=
create_subtask_status
(
subtask_id
)
subtask_status
_dict
=
SubtaskStatus
.
create
(
subtask_id
)
.
to_dict
(
)
new_subtask
=
send_course_email
.
subtask
(
(
entry_id
,
email_id
,
recipient_list
,
global_email_context
,
subtask_status
,
subtask_status
_dict
,
),
task_id
=
subtask_id
,
routing_key
=
settings
.
BULK_EMAIL_ROUTING_KEY
,
...
...
@@ -268,7 +267,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
@task
(
default_retry_delay
=
settings
.
BULK_EMAIL_DEFAULT_RETRY_DELAY
,
max_retries
=
settings
.
BULK_EMAIL_MAX_RETRIES
)
# pylint: disable=E1102
def
send_course_email
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
subtask_status
):
def
send_course_email
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
subtask_status
_dict
):
"""
Sends an email to a list of recipients.
...
...
@@ -282,7 +281,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
* `global_email_context`: dict containing values that are unique for this email but the same
for all recipients of this email. This dict is to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
* `subtask_status` : dict containing values representing current status. Keys are:
* `subtask_status
_dict
` : dict containing values representing current status. Keys are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
...
...
@@ -302,7 +301,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
Emails are sent multi-part, in both plain text and html. Updates InstructorTask object
with status information (sends, failures, skips) and updates number of subtasks completed.
"""
current_task_id
=
subtask_status
[
'task_id'
]
subtask_status
=
SubtaskStatus
.
from_dict
(
subtask_status_dict
)
current_task_id
=
subtask_status
.
task_id
num_to_send
=
len
(
to_list
)
log
.
info
(
"Preparing to send email
%
s to
%
d recipients as subtask
%
s for instructor task
%
d: context =
%
s, status=
%
s"
,
email_id
,
num_to_send
,
current_task_id
,
entry_id
,
global_email_context
,
subtask_status
)
...
...
@@ -336,8 +336,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
# We got here for really unexpected reasons. Since we don't know how far
# the task got in emailing, we count all recipients as having failed.
# It at least keeps the counts consistent.
new_subtask_status
=
increment_subtask_status
(
subtask_status
,
failed
=
num_to_send
,
state
=
FAILURE
)
update_subtask_status
(
entry_id
,
current_task_id
,
new_
subtask_status
)
subtask_status
.
increment
(
failed
=
num_to_send
,
state
=
FAILURE
)
update_subtask_status
(
entry_id
,
current_task_id
,
subtask_status
)
raise
if
send_exception
is
None
:
...
...
@@ -419,37 +419,20 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
* `global_email_context`: dict containing values that are unique for this email but the same
for all recipients of this email. This dict is to be used to fill in slots in email
template. It does not include 'name' and 'email', which will be provided by the to_list.
* `subtask_status` : dict containing values representing current status. Keys are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
* `subtask_status` : object of class SubtaskStatus representing current status.
Sends to all addresses contained in to_list that are not also in the Optout table.
Emails are sent multi-part, in both plain text and html.
Returns a tuple of two values:
* First value is a dict which represents current progress at the end of this call. Keys are
the same as for the input subtask_status.
* First value is a SubtaskStatus object which represents current progress at the end of this call.
* Second value is an exception returned by the innards of the method, indicating a fatal error.
In this case, the number of recipients that were not sent have already been added to the
'failed' count above.
"""
# Get information from current task's request:
task_id
=
subtask_status
[
'task_id'
]
# collect stats on progress:
num_optout
=
0
num_sent
=
0
num_error
=
0
task_id
=
subtask_status
.
task_id
try
:
course_email
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
...
...
@@ -463,8 +446,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# attempt. Anyone on the to_list on a retry has already passed the filter
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
if
(
subtask_status
[
'retried_nomax'
]
+
subtask_status
[
'retried_withmax'
]
)
==
0
:
if
subtask_status
.
get_retry_count
(
)
==
0
:
to_list
,
num_optout
=
_filter_optouts_from_recipients
(
to_list
,
course_email
.
course_id
)
subtask_status
.
increment
(
skipped
=
num_optout
)
course_title
=
global_email_context
[
'course_title'
]
subject
=
"["
+
course_title
+
"] "
+
course_email
.
subject
...
...
@@ -509,7 +493,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# for a period of time between all emails within this task. Choice of
# the value depends on the number of workers that might be sending email in
# parallel, and what the SES throttle rate is.
if
subtask_status
[
'retried_nomax'
]
>
0
:
if
subtask_status
.
retried_nomax
>
0
:
sleep
(
settings
.
BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS
)
try
:
...
...
@@ -527,13 +511,13 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# This will fall through and not retry the message.
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
.
smtp_error
)
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
num_error
+=
1
subtask_status
.
increment
(
failed
=
1
)
except
SINGLE_EMAIL_FAILURE_ERRORS
as
exc
:
# This will fall through and not retry the message.
log
.
warning
(
'Task
%
s: email with id
%
s not delivered to
%
s due to error
%
s'
,
task_id
,
email_id
,
email
,
exc
)
dog_stats_api
.
increment
(
'course_email.error'
,
tags
=
[
_statsd_tag
(
course_title
)])
num_error
+=
1
subtask_status
.
increment
(
failed
=
1
)
else
:
dog_stats_api
.
increment
(
'course_email.sent'
,
tags
=
[
_statsd_tag
(
course_title
)])
...
...
@@ -541,7 +525,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
log
.
info
(
'Email with id
%
s sent to
%
s'
,
email_id
,
email
)
else
:
log
.
debug
(
'Email with id
%
s sent to
%
s'
,
email_id
,
email
)
num_sent
+=
1
subtask_status
.
increment
(
succeeded
=
1
)
# Pop the user that was emailed off the end of the list only once they have
# successfully been processed. (That way, if there were a failure that
...
...
@@ -552,16 +536,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
dog_stats_api
.
increment
(
'course_email.infinite_retry'
,
tags
=
[
_statsd_tag
(
course_title
)])
# Increment the "retried_nomax" counter, update other counters with progress to date,
# and set the state to RETRY:
subtask_progress
=
increment_subtask_status
(
subtask_status
,
succeeded
=
num_sent
,
failed
=
num_error
,
skipped
=
num_optout
,
retried_nomax
=
1
,
state
=
RETRY
)
subtask_status
.
increment
(
retried_nomax
=
1
,
state
=
RETRY
)
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
progres
s
,
skip_retry_max
=
True
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
statu
s
,
skip_retry_max
=
True
)
except
LIMITED_RETRY_ERRORS
as
exc
:
...
...
@@ -571,16 +548,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
dog_stats_api
.
increment
(
'course_email.limited_retry'
,
tags
=
[
_statsd_tag
(
course_title
)])
# Increment the "retried_withmax" counter, update other counters with progress to date,
# and set the state to RETRY:
subtask_progress
=
increment_subtask_status
(
subtask_status
,
succeeded
=
num_sent
,
failed
=
num_error
,
skipped
=
num_optout
,
retried_withmax
=
1
,
state
=
RETRY
)
subtask_status
.
increment
(
retried_withmax
=
1
,
state
=
RETRY
)
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
progres
s
,
skip_retry_max
=
False
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
statu
s
,
skip_retry_max
=
False
)
except
BULK_EMAIL_FAILURE_ERRORS
as
exc
:
...
...
@@ -590,14 +560,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
task_id
,
email_id
,
num_pending
)
# Update counters with progress to date, counting unsent emails as failures,
# and set the state to FAILURE:
subtask_progress
=
increment_subtask_status
(
subtask_status
,
succeeded
=
num_sent
,
failed
=
(
num_error
+
num_pending
),
skipped
=
num_optout
,
state
=
FAILURE
)
return
subtask_progress
,
exc
subtask_status
.
increment
(
failed
=
num_pending
,
state
=
FAILURE
)
return
subtask_status
,
exc
except
Exception
as
exc
:
# Errors caught here cause the email to be retried. The entire task is actually retried
...
...
@@ -609,30 +573,17 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
task_id
,
email_id
)
# Increment the "retried_withmax" counter, update other counters with progress to date,
# and set the state to RETRY:
subtask_progress
=
increment_subtask_status
(
subtask_status
,
succeeded
=
num_sent
,
failed
=
num_error
,
skipped
=
num_optout
,
retried_withmax
=
1
,
state
=
RETRY
)
subtask_status
.
increment
(
retried_withmax
=
1
,
state
=
RETRY
)
return
_submit_for_retry
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
progres
s
,
skip_retry_max
=
False
entry_id
,
email_id
,
to_list
,
global_email_context
,
exc
,
subtask_
statu
s
,
skip_retry_max
=
False
)
else
:
# All went well. Update counters with progress to date,
# and set the state to SUCCESS:
subtask_progress
=
increment_subtask_status
(
subtask_status
,
succeeded
=
num_sent
,
failed
=
num_error
,
skipped
=
num_optout
,
state
=
SUCCESS
)
subtask_status
.
increment
(
state
=
SUCCESS
)
# Successful completion is marked by an exception value of None.
return
subtask_
progres
s
,
None
return
subtask_
statu
s
,
None
finally
:
# Clean up at the end.
connection
.
close
()
...
...
@@ -678,26 +629,26 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
successfully submitted, this value will be the RetryTaskError that retry() returns.
Otherwise, it (ought to be) the current_exception passed in.
"""
task_id
=
subtask_status
[
'task_id'
]
task_id
=
subtask_status
.
task_id
log
.
info
(
"Task
%
s: Successfully sent to
%
s users; failed to send to
%
s users (and skipped
%
s users)"
,
task_id
,
subtask_status
[
'succeeded'
],
subtask_status
[
'failed'
],
subtask_status
[
'skipped'
]
)
task_id
,
subtask_status
.
succeeded
,
subtask_status
.
failed
,
subtask_status
.
skipped
)
# Calculate time until we retry this task (in seconds):
# The value for max_retries is increased by the number of times an "infinite-retry" exception
# has been retried. We want the regular retries to trigger max-retry checking, but not these
# special retries. So we count them separately.
max_retries
=
_get_current_task
()
.
max_retries
+
subtask_status
[
'retried_nomax'
]
max_retries
=
_get_current_task
()
.
max_retries
+
subtask_status
.
retried_nomax
base_delay
=
_get_current_task
()
.
default_retry_delay
if
skip_retry_max
:
# once we reach five retries, don't increase the countdown further.
retry_index
=
min
(
subtask_status
[
'retried_nomax'
]
,
5
)
retry_index
=
min
(
subtask_status
.
retried_nomax
,
5
)
exception_type
=
'sending-rate'
# if we have a cap, after all, apply it now:
if
hasattr
(
settings
,
'BULK_EMAIL_INFINITE_RETRY_CAP'
):
retry_cap
=
settings
.
BULK_EMAIL_INFINITE_RETRY_CAP
+
subtask_status
[
'retried_withmax'
]
retry_cap
=
settings
.
BULK_EMAIL_INFINITE_RETRY_CAP
+
subtask_status
.
retried_withmax
max_retries
=
min
(
max_retries
,
retry_cap
)
else
:
retry_index
=
subtask_status
[
'retried_withmax'
]
retry_index
=
subtask_status
.
retried_withmax
exception_type
=
'transient'
# Skew the new countdown value by a random factor, so that not all
...
...
@@ -722,7 +673,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
email_id
,
to_list
,
global_email_context
,
subtask_status
,
subtask_status
.
to_dict
()
,
],
exc
=
current_exception
,
countdown
=
countdown
,
...
...
@@ -743,8 +694,8 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
log
.
exception
(
'Task
%
s: email with id
%
d caused send_course_email task to fail to retry. To list:
%
s'
,
task_id
,
email_id
,
[
i
[
'email'
]
for
i
in
to_list
])
num_failed
=
len
(
to_list
)
new_subtask_progress
=
increment_subtask_status
(
subtask_status
,
failed
=
num_failed
,
state
=
FAILURE
)
return
new_subtask_progres
s
,
retry_exc
subtask_status
.
increment
(
subtask_status
,
failed
=
num_failed
,
state
=
FAILURE
)
return
subtask_statu
s
,
retry_exc
def
_statsd_tag
(
course_title
):
...
...
lms/djangoapps/bulk_email/tests/test_email.py
View file @
0f8f82c8
...
...
@@ -15,7 +15,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from
xmodule.modulestore.tests.django_utils
import
ModuleStoreTestCase
from
xmodule.modulestore.tests.factories
import
CourseFactory
from
bulk_email.models
import
Optout
from
instructor_task.subtasks
import
increment
_subtask_status
from
instructor_task.subtasks
import
update
_subtask_status
STAFF_COUNT
=
3
STUDENT_COUNT
=
10
...
...
@@ -29,13 +29,13 @@ class MockCourseEmailResult(object):
"""
emails_sent
=
0
def
get_mock_
increment
_subtask_status
(
self
):
def
get_mock_
update
_subtask_status
(
self
):
"""Wrapper for mock email function."""
def
mock_
increment_subtask_status
(
original_status
,
**
kwarg
s
):
# pylint: disable=W0613
def
mock_
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_statu
s
):
# pylint: disable=W0613
"""Increments count of number of emails sent."""
self
.
emails_sent
+=
kwargs
.
get
(
'succeeded'
,
0
)
return
increment_subtask_status
(
original_status
,
**
kwarg
s
)
return
mock_
increment
_subtask_status
self
.
emails_sent
+=
new_subtask_status
.
succeeded
return
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_statu
s
)
return
mock_
update
_subtask_status
@override_settings
(
MODULESTORE
=
TEST_DATA_MONGO_MODULESTORE
)
...
...
@@ -244,13 +244,13 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
)
@override_settings
(
BULK_EMAIL_EMAILS_PER_TASK
=
3
,
BULK_EMAIL_EMAILS_PER_QUERY
=
7
)
@patch
(
'bulk_email.tasks.
increment
_subtask_status'
)
@patch
(
'bulk_email.tasks.
update
_subtask_status'
)
def
test_chunked_queries_send_numerous_emails
(
self
,
email_mock
):
"""
Test sending a large number of emails, to test the chunked querying
"""
mock_factory
=
MockCourseEmailResult
()
email_mock
.
side_effect
=
mock_factory
.
get_mock_
increment
_subtask_status
()
email_mock
.
side_effect
=
mock_factory
.
get_mock_
update
_subtask_status
()
added_users
=
[]
for
_
in
xrange
(
LARGE_NUM_EMAILS
):
user
=
UserFactory
()
...
...
lms/djangoapps/bulk_email/tests/test_err_handling.py
View file @
0f8f82c8
...
...
@@ -22,8 +22,8 @@ from bulk_email.models import CourseEmail, SEND_TO_ALL
from
bulk_email.tasks
import
perform_delegate_email_batches
,
send_course_email
from
instructor_task.models
import
InstructorTask
from
instructor_task.subtasks
import
(
create_subtask_status
,
initialize_subtask_info
,
SubtaskStatus
,
check_subtask_is_valid
,
update_subtask_status
,
DuplicateTaskException
,
...
...
@@ -75,7 +75,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self
.
assertIsInstance
(
exc
,
SMTPDataError
)
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.
increment
_subtask_status'
)
@patch
(
'bulk_email.tasks.
update
_subtask_status'
)
@patch
(
'bulk_email.tasks.send_course_email.retry'
)
def
test_data_err_fail
(
self
,
retry
,
result
,
get_conn
):
"""
...
...
@@ -99,11 +99,11 @@ class TestEmailErrors(ModuleStoreTestCase):
# We shouldn't retry when hitting a 5xx error
self
.
assertFalse
(
retry
.
called
)
# Test that after the rejected email, the rest still successfully send
((
_
initial_results
),
kwargs
)
=
result
.
call_args
self
.
assertEquals
(
kwargs
[
'skipped'
]
,
0
)
((
_
entry_id
,
_current_task_id
,
subtask_status
),
_
kwargs
)
=
result
.
call_args
self
.
assertEquals
(
subtask_status
.
skipped
,
0
)
expected_fails
=
int
((
settings
.
BULK_EMAIL_EMAILS_PER_TASK
+
3
)
/
4.0
)
self
.
assertEquals
(
kwargs
[
'failed'
]
,
expected_fails
)
self
.
assertEquals
(
kwargs
[
'succeeded'
]
,
settings
.
BULK_EMAIL_EMAILS_PER_TASK
-
expected_fails
)
self
.
assertEquals
(
subtask_status
.
failed
,
expected_fails
)
self
.
assertEquals
(
subtask_status
.
succeeded
,
settings
.
BULK_EMAIL_EMAILS_PER_TASK
-
expected_fails
)
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.send_course_email.retry'
)
...
...
@@ -146,7 +146,7 @@ class TestEmailErrors(ModuleStoreTestCase):
exc
=
kwargs
[
'exc'
]
self
.
assertIsInstance
(
exc
,
SMTPConnectError
)
@patch
(
'bulk_email.tasks.
increment_subtask_status
'
)
@patch
(
'bulk_email.tasks.
SubtaskStatus.increment
'
)
@patch
(
'bulk_email.tasks.log'
)
def
test_nonexistent_email
(
self
,
mock_log
,
result
):
"""
...
...
@@ -216,10 +216,10 @@ class TestEmailErrors(ModuleStoreTestCase):
to_list
=
[
'test@test.com'
]
global_email_context
=
{
'course_title'
:
'dummy course'
}
subtask_id
=
"subtask-id-value"
subtask_status
=
create_subtask_status
(
subtask_id
)
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
email_id
=
1001
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'unable to find subtasks of instructor task'
):
send_course_email
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
subtask_status
)
send_course_email
(
entry_id
,
email_id
,
to_list
,
global_email_context
,
subtask_status
.
to_dict
()
)
def
test_send_email_missing_subtask
(
self
):
# test at a lower level, to ensure that the course gets checked down below too.
...
...
@@ -230,10 +230,10 @@ class TestEmailErrors(ModuleStoreTestCase):
subtask_id
=
"subtask-id-value"
initialize_subtask_info
(
entry
,
"emailed"
,
100
,
[
subtask_id
])
different_subtask_id
=
"bogus-subtask-id-value"
subtask_status
=
create_subtask_status
(
different_subtask_id
)
subtask_status
=
SubtaskStatus
.
create
(
different_subtask_id
)
bogus_email_id
=
1001
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'unable to find status for subtask of instructor task'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
.
to_dict
()
)
def
test_send_email_completed_subtask
(
self
):
# test at a lower level, to ensure that the course gets checked down below too.
...
...
@@ -241,14 +241,14 @@ class TestEmailErrors(ModuleStoreTestCase):
entry_id
=
entry
.
id
# pylint: disable=E1101
subtask_id
=
"subtask-id-value"
initialize_subtask_info
(
entry
,
"emailed"
,
100
,
[
subtask_id
])
subtask_status
=
create_subtask_status
(
subtask_id
,
state
=
SUCCESS
)
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
,
state
=
SUCCESS
)
update_subtask_status
(
entry_id
,
subtask_id
,
subtask_status
)
bogus_email_id
=
1001
to_list
=
[
'test@test.com'
]
global_email_context
=
{
'course_title'
:
'dummy course'
}
new_subtask_status
=
create_subtask_status
(
subtask_id
)
new_subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'already completed'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
.
to_dict
()
)
def
test_send_email_running_subtask
(
self
):
# test at a lower level, to ensure that the course gets checked down below too.
...
...
@@ -256,14 +256,14 @@ class TestEmailErrors(ModuleStoreTestCase):
entry_id
=
entry
.
id
# pylint: disable=E1101
subtask_id
=
"subtask-id-value"
initialize_subtask_info
(
entry
,
"emailed"
,
100
,
[
subtask_id
])
subtask_status
=
create_subtask_status
(
subtask_id
)
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
update_subtask_status
(
entry_id
,
subtask_id
,
subtask_status
)
check_subtask_is_valid
(
entry_id
,
subtask_id
,
subtask_status
)
bogus_email_id
=
1001
to_list
=
[
'test@test.com'
]
global_email_context
=
{
'course_title'
:
'dummy course'
}
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'already being executed'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
.
to_dict
()
)
def
test_send_email_retried_subtask
(
self
):
# test at a lower level, to ensure that the course gets checked down below too.
...
...
@@ -271,19 +271,19 @@ class TestEmailErrors(ModuleStoreTestCase):
entry_id
=
entry
.
id
# pylint: disable=E1101
subtask_id
=
"subtask-id-value"
initialize_subtask_info
(
entry
,
"emailed"
,
100
,
[
subtask_id
])
subtask_status
=
create_subtask_status
(
subtask_id
,
state
=
RETRY
,
retried_nomax
=
2
)
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
,
state
=
RETRY
,
retried_nomax
=
2
)
update_subtask_status
(
entry_id
,
subtask_id
,
subtask_status
)
bogus_email_id
=
1001
to_list
=
[
'test@test.com'
]
global_email_context
=
{
'course_title'
:
'dummy course'
}
# try running with a clean subtask:
new_subtask_status
=
create_subtask_status
(
subtask_id
)
new_subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'already retried'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
.
to_dict
()
)
# try again, with a retried subtask with lower count:
new_subtask_status
=
create_subtask_status
(
subtask_id
,
state
=
RETRY
,
retried_nomax
=
1
)
new_subtask_status
=
SubtaskStatus
.
create
(
subtask_id
,
state
=
RETRY
,
retried_nomax
=
1
)
with
self
.
assertRaisesRegexp
(
DuplicateTaskException
,
'already retried'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
new_subtask_status
.
to_dict
()
)
def
dont_test_send_email_undefined_email
(
self
):
# test at a lower level, to ensure that the course gets checked down below too.
...
...
@@ -293,10 +293,10 @@ class TestEmailErrors(ModuleStoreTestCase):
global_email_context
=
{
'course_title'
:
'dummy course'
}
subtask_id
=
"subtask-id-value"
initialize_subtask_info
(
entry
,
"emailed"
,
100
,
[
subtask_id
])
subtask_status
=
create_subtask_status
(
subtask_id
)
subtask_status
=
SubtaskStatus
.
create
(
subtask_id
)
bogus_email_id
=
1001
with
self
.
assertRaises
(
CourseEmail
.
DoesNotExist
):
# we skip the call that updates subtask status, since we've not set up the InstructorTask
# for the subtask, and it's not important to the test.
with
patch
(
'bulk_email.tasks.update_subtask_status'
):
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
)
send_course_email
(
entry_id
,
bogus_email_id
,
to_list
,
global_email_context
,
subtask_status
.
to_dict
()
)
lms/djangoapps/bulk_email/tests/test_tasks.py
View file @
0f8f82c8
...
...
@@ -31,7 +31,7 @@ from django.core.management import call_command
from
bulk_email.models
import
CourseEmail
,
Optout
,
SEND_TO_ALL
from
instructor_task.tasks
import
send_bulk_course_email
from
instructor_task.subtasks
import
update_subtask_status
from
instructor_task.subtasks
import
update_subtask_status
,
SubtaskStatus
from
instructor_task.models
import
InstructorTask
from
instructor_task.tests.test_base
import
InstructorTaskCourseTestCase
from
instructor_task.tests.factories
import
InstructorTaskFactory
...
...
@@ -63,16 +63,9 @@ def my_update_subtask_status(entry_id, current_task_id, new_subtask_status):
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
subtask_dict
=
json
.
loads
(
entry
.
subtasks
)
subtask_status_info
=
subtask_dict
[
'status'
]
current_subtask_status
=
subtask_status_info
[
current_task_id
]
def
_get_retry_count
(
subtask_result
):
"""Return the number of retries counted for the given subtask."""
retry_count
=
subtask_result
.
get
(
'retried_nomax'
,
0
)
retry_count
+=
subtask_result
.
get
(
'retried_withmax'
,
0
)
return
retry_count
current_retry_count
=
_get_retry_count
(
current_subtask_status
)
new_retry_count
=
_get_retry_count
(
new_subtask_status
)
current_subtask_status
=
SubtaskStatus
.
from_dict
(
subtask_status_info
[
current_task_id
])
current_retry_count
=
current_subtask_status
.
get_retry_count
()
new_retry_count
=
new_subtask_status
.
get_retry_count
()
if
current_retry_count
<=
new_retry_count
:
update_subtask_status
(
entry_id
,
current_task_id
,
new_subtask_status
)
...
...
lms/djangoapps/instructor_task/subtasks.py
View file @
0f8f82c8
...
...
@@ -87,11 +87,11 @@ def generate_items_for_subtask(item_queryset, item_fields, total_num_items, item
raise
ValueError
(
error_msg
)
def
create_subtask_status
(
task_id
,
succeeded
=
0
,
failed
=
0
,
skipped
=
0
,
retried_nomax
=
0
,
retried_withmax
=
0
,
state
=
None
):
class
SubtaskStatus
(
object
):
"""
Create and return a dict for tracking the status of a subtask.
Subtask
status key
s are:
Subtask
Status value
s are:
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
...
...
@@ -104,70 +104,77 @@ def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nom
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
Object
must be JSON-serializable, so that it can be passed as an argumen
t
to tasks.
Object
is not JSON-serializable, so to_dict and from_dict methods are provided so tha
t
it can be passed as a serializable argument
to tasks.
In future, we may want to include specific error information
indicating the reason for failure.
Also, we should count up "not attempted" separately from attempted/failed.
"""
attempted
=
succeeded
+
failed
current_result
=
{
'task_id'
:
task_id
,
'attempted'
:
attempted
,
'succeeded'
:
succeeded
,
'skipped'
:
skipped
,
'failed'
:
failed
,
'retried_nomax'
:
retried_nomax
,
'retried_withmax'
:
retried_withmax
,
'state'
:
state
if
state
is
not
None
else
QUEUING
,
}
return
current_result
def
increment_subtask_status
(
subtask_result
,
succeeded
=
0
,
failed
=
0
,
skipped
=
0
,
retried_nomax
=
0
,
retried_withmax
=
0
,
state
=
None
):
def
__init__
(
self
,
task_id
,
attempted
=
None
,
succeeded
=
0
,
failed
=
0
,
skipped
=
0
,
retried_nomax
=
0
,
retried_withmax
=
0
,
state
=
None
):
"""Construct a SubtaskStatus object."""
self
.
task_id
=
task_id
if
attempted
is
not
None
:
self
.
attempted
=
attempted
else
:
self
.
attempted
=
succeeded
+
failed
self
.
succeeded
=
succeeded
self
.
failed
=
failed
self
.
skipped
=
skipped
self
.
retried_nomax
=
retried_nomax
self
.
retried_withmax
=
retried_withmax
self
.
state
=
state
if
state
is
not
None
else
QUEUING
@classmethod
def
from_dict
(
self
,
d
):
"""Construct a SubtaskStatus object from a dict representation."""
options
=
dict
(
d
)
task_id
=
options
[
'task_id'
]
del
options
[
'task_id'
]
return
SubtaskStatus
.
create
(
task_id
,
**
options
)
@classmethod
def
create
(
self
,
task_id
,
**
options
):
"""Construct a SubtaskStatus object."""
newobj
=
self
(
task_id
,
**
options
)
return
newobj
def
to_dict
(
self
):
"""
Update the result of a subtask with additional results.
Create and return a dict for tracking the status of a subtask.
Output a dict representation of a SubtaskStatus object.
Keys for input `subtask_result` and returned subtask_status are:
Use for creating a JSON-serializable representation for use by tasks.
"""
return
self
.
__dict__
'task_id' : id of subtask. This is used to pass task information across retries.
'attempted' : number of attempts -- should equal succeeded plus failed
'succeeded' : number that succeeded in processing
'skipped' : number that were not processed.
'failed' : number that failed during processing
'retried_nomax' : number of times the subtask has been retried for conditions that
should not have a maximum count applied
'retried_withmax' : number of times the subtask has been retried for conditions that
should have a maximum count applied
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
def
increment
(
self
,
succeeded
=
0
,
failed
=
0
,
skipped
=
0
,
retried_nomax
=
0
,
retried_withmax
=
0
,
state
=
None
):
"""
Update the result of a subtask with additional results.
Kwarg arguments are incremented to the corresponding key in `subtask_result`
.
Kwarg arguments are incremented to the existing values
.
The exception is for `state`, which if specified is used to override the existing value.
"""
new_result
=
dict
(
subtask_result
)
new_result
[
'attempted'
]
+=
(
succeeded
+
failed
)
new_result
[
'succeeded'
]
+=
succeeded
new_result
[
'failed'
]
+=
failed
new_result
[
'skipped'
]
+=
skipped
new_result
[
'retried_nomax'
]
+=
retried_nomax
new_result
[
'retried_withmax'
]
+=
retried_withmax
self
.
attempted
+=
(
succeeded
+
failed
)
self
.
succeeded
+=
succeeded
self
.
failed
+=
failed
self
.
skipped
+=
skipped
self
.
retried_nomax
+=
retried_nomax
self
.
retried_withmax
+=
retried_withmax
if
state
is
not
None
:
new_result
[
'state'
]
=
state
self
.
state
=
state
return
new_result
def
get_retry_count
(
self
):
"""Returns the number of retries of any kind."""
return
self
.
retried_nomax
+
self
.
retried_withmax
def
__repr__
(
self
):
"""Return print representation of a SubtaskStatus object."""
return
'SubtaskStatus<
%
r>'
%
(
self
.
to_dict
(),)
def
_get_retry_count
(
subtask_status
):
"""
Calculate the total number of retries.
"""
count
=
0
for
keyname
in
[
'retried_nomax'
,
'retried_withmax'
]:
count
+=
subtask_status
.
get
(
keyname
,
0
)
return
count
def
__unicode__
(
self
):
"""Return unicode version of a SubtaskStatus object representation."""
return
unicode
(
repr
(
self
))
def
initialize_subtask_info
(
entry
,
action_name
,
total_num
,
subtask_id_list
):
...
...
@@ -189,7 +196,7 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
information for each subtask. The value for each subtask (keyed by its task_id)
is its subtask status, as defined by
create_subtask_status
().
is its subtask status, as defined by
SubtaskStatus.to_dict
().
This information needs to be set up in the InstructorTask before any of the subtasks start
running. If not, there is a chance that the subtasks could complete before the parent task
...
...
@@ -216,7 +223,8 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
# Write out the subtasks information.
num_subtasks
=
len
(
subtask_id_list
)
# Note that may not be necessary to store initial value with all those zeroes!
subtask_status
=
{
subtask_id
:
create_subtask_status
(
subtask_id
)
for
subtask_id
in
subtask_id_list
}
# Write out as a dict, so it will go more smoothly into json.
subtask_status
=
{
subtask_id
:
(
SubtaskStatus
.
create
(
subtask_id
))
.
to_dict
()
for
subtask_id
in
subtask_id_list
}
subtask_dict
=
{
'total'
:
num_subtasks
,
'succeeded'
:
0
,
...
...
@@ -266,7 +274,7 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
Problems can occur when the parent task has been run twice, and results in duplicate
subtasks being created for the same InstructorTask entry. This
can happen
when Celery
subtasks being created for the same InstructorTask entry. This
maybe happens
when Celery
loses its connection to its broker, and any current tasks get requeued.
If a parent task gets requeued, then the same InstructorTask may have a different set of
...
...
@@ -280,6 +288,9 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
The other worker is allowed to finish, and this raises an exception.
Raises a DuplicateTaskException exception if it's not a task that should be run.
If this succeeds, it requires that update_subtask_status() is called to release the lock on the
task.
"""
# Confirm that the InstructorTask actually defines subtasks.
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
...
...
@@ -300,8 +311,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
# Confirm that the InstructorTask doesn't think that this subtask has already been
# performed successfully.
subtask_status
=
subtask_status_info
[
current_task_id
]
subtask_state
=
subtask_status
.
get
(
'state'
)
subtask_status
=
SubtaskStatus
.
from_dict
(
subtask_status_info
[
current_task_id
])
subtask_state
=
subtask_status
.
state
if
subtask_state
in
READY_STATES
:
format_str
=
"Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}': rejecting task {}"
msg
=
format_str
.
format
(
current_task_id
,
subtask_status
,
entry
,
new_subtask_status
)
...
...
@@ -313,8 +324,8 @@ def check_subtask_is_valid(entry_id, current_task_id, new_subtask_status):
if
subtask_state
==
RETRY
:
# Check to see if the input number of retries is less than the recorded number.
# If so, then this is an earlier version of the task, and a duplicate.
new_retry_count
=
_get_retry_count
(
new_subtask_status
)
current_retry_count
=
_get_retry_count
(
subtask_status
)
new_retry_count
=
new_subtask_status
.
get_retry_count
(
)
current_retry_count
=
subtask_status
.
get_retry_count
(
)
if
new_retry_count
<
current_retry_count
:
format_str
=
"Unexpected task_id '{}': already retried - status {} for subtask of instructor task '{}': rejecting task {}"
msg
=
format_str
.
format
(
current_task_id
,
subtask_status
,
entry
,
new_subtask_status
)
...
...
@@ -356,8 +367,8 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
is the value of
`status`, but could be expanded in future to store information about failure
messages, progress made, etc.
is the value of
the SubtaskStatus.to_dict(), but could be expanded in future to store information
about failure
messages, progress made, etc.
"""
TASK_LOG
.
info
(
"Preparing to update status for email subtask
%
s for instructor task
%
d with status
%
s"
,
current_task_id
,
entry_id
,
new_subtask_status
)
...
...
@@ -374,7 +385,7 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
raise
ValueError
(
msg
)
# Update status:
subtask_status_info
[
current_task_id
]
=
new_subtask_status
subtask_status_info
[
current_task_id
]
=
new_subtask_status
.
to_dict
()
# Update the parent task progress.
# Set the estimate of duration, but only if it
...
...
@@ -390,10 +401,10 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
# In future, we can make this more responsive by updating status
# between retries, by comparing counts that change from previous
# retry.
new_state
=
new_subtask_status
[
'state'
]
new_state
=
new_subtask_status
.
state
if
new_subtask_status
is
not
None
and
new_state
in
READY_STATES
:
for
statname
in
[
'attempted'
,
'succeeded'
,
'failed'
,
'skipped'
]:
task_progress
[
statname
]
+=
new_subtask_status
[
statname
]
task_progress
[
statname
]
+=
getattr
(
new_subtask_status
,
statname
)
# Figure out if we're actually done (i.e. this is the last task to complete).
# This is easier if we just maintain a counter, rather than scanning the
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment