Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
d48e90ee
Commit
d48e90ee
authored
Sep 11, 2013
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Initial refactoring for bulk_email monitoring.
parent
5d47779b
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
543 additions
and
164 deletions
+543
-164
lms/djangoapps/bulk_email/tasks.py
+155
-61
lms/djangoapps/bulk_email/tests/test_email.py
+6
-3
lms/djangoapps/bulk_email/tests/test_err_handling.py
+27
-15
lms/djangoapps/instructor/views/legacy.py
+12
-6
lms/djangoapps/instructor_task/api.py
+52
-5
lms/djangoapps/instructor_task/api_helper.py
+7
-4
lms/djangoapps/instructor_task/migrations/0002_add_subtask_field.py
+77
-0
lms/djangoapps/instructor_task/models.py
+1
-0
lms/djangoapps/instructor_task/tasks.py
+34
-14
lms/djangoapps/instructor_task/tasks_helper.py
+133
-38
lms/djangoapps/instructor_task/tests/test_tasks.py
+6
-6
lms/djangoapps/instructor_task/tests/test_views.py
+1
-1
lms/djangoapps/instructor_task/views.py
+32
-11
No files found.
lms/djangoapps/bulk_email/tasks.py
View file @
d48e90ee
...
...
@@ -4,17 +4,19 @@ to a course.
"""
import
math
import
re
import
time
from
uuid
import
uuid4
from
time
import
time
,
sleep
import
json
from
dogapi
import
dog_stats_api
from
smtplib
import
SMTPServerDisconnected
,
SMTPDataError
,
SMTPConnectError
from
celery
import
task
,
current_task
,
group
from
celery.utils.log
import
get_task_logger
from
django.conf
import
settings
from
django.contrib.auth.models
import
User
,
Group
from
django.core.mail
import
EmailMultiAlternatives
,
get_connection
from
django.http
import
Http404
from
celery
import
task
,
current_task
from
celery.utils.log
import
get_task_logger
from
django.core.urlresolvers
import
reverse
from
bulk_email.models
import
(
...
...
@@ -23,12 +25,61 @@ from bulk_email.models import (
)
from
courseware.access
import
_course_staff_group_name
,
_course_instructor_group_name
from
courseware.courses
import
get_course_by_id
,
course_image_url
from
instructor_task.models
import
InstructorTask
,
PROGRESS
,
QUEUING
log
=
get_task_logger
(
__name__
)
@task
(
default_retry_delay
=
10
,
max_retries
=
5
)
# pylint: disable=E1102
def
delegate_email_batches
(
email_id
,
user_id
):
def
get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course_location
):
"""
Generates a query set corresponding to the requested category.
`to_option` is either SEND_TO_MYSELF, SEND_TO_STAFF, or SEND_TO_ALL.
"""
if
to_option
==
SEND_TO_MYSELF
:
recipient_qset
=
User
.
objects
.
filter
(
id
=
user_id
)
elif
to_option
==
SEND_TO_ALL
or
to_option
==
SEND_TO_STAFF
:
staff_grpname
=
_course_staff_group_name
(
course_location
)
staff_group
,
_
=
Group
.
objects
.
get_or_create
(
name
=
staff_grpname
)
staff_qset
=
staff_group
.
user_set
.
all
()
instructor_grpname
=
_course_instructor_group_name
(
course_location
)
instructor_group
,
_
=
Group
.
objects
.
get_or_create
(
name
=
instructor_grpname
)
instructor_qset
=
instructor_group
.
user_set
.
all
()
recipient_qset
=
staff_qset
|
instructor_qset
if
to_option
==
SEND_TO_ALL
:
enrollment_qset
=
User
.
objects
.
filter
(
courseenrollment__course_id
=
course_id
,
courseenrollment__is_active
=
True
)
recipient_qset
=
recipient_qset
|
enrollment_qset
recipient_qset
=
recipient_qset
.
distinct
()
else
:
log
.
error
(
"Unexpected bulk email TO_OPTION found:
%
s"
,
to_option
)
raise
Exception
(
"Unexpected bulk email TO_OPTION found: {0}"
.
format
(
to_option
))
recipient_qset
=
recipient_qset
.
order_by
(
'pk'
)
return
recipient_qset
def
get_course_email_context
(
course
):
"""
Returns context arguments to apply to all emails, independent of recipient.
"""
course_id
=
course
.
id
course_title
=
course
.
display_name
course_url
=
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
reverse
(
'course_root'
,
kwargs
=
{
'course_id'
:
course_id
})
)
image_url
=
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
course_image_url
(
course
))
email_context
=
{
'course_title'
:
course_title
,
'course_url'
:
course_url
,
'course_image_url'
:
image_url
,
'account_settings_url'
:
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
reverse
(
'dashboard'
)),
'platform_name'
:
settings
.
PLATFORM_NAME
,
}
return
email_context
def
perform_delegate_email_batches
(
entry_id
,
course_id
,
task_input
,
action_name
):
"""
Delegates emails by querying for the list of recipients who should
get the mail, chopping up into batches of settings.EMAILS_PER_TASK size,
...
...
@@ -36,17 +87,31 @@ def delegate_email_batches(email_id, user_id):
Returns the number of batches (workers) kicked off.
"""
entry
=
InstructorTask
.
objects
.
get
(
pk
=
entry_id
)
# get inputs to use in this task from the entry:
#task_id = entry.task_id
user_id
=
entry
.
requester
.
id
# TODO: check this against argument passed in?
# course_id = entry.course_id
email_id
=
task_input
[
'email_id'
]
try
:
email_obj
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
except
CourseEmail
.
DoesNotExist
as
exc
:
# The retry behavior here is necessary because of a race condition between the commit of the transaction
# that creates this CourseEmail row and the celery pipeline that starts this task.
# We might possibly want to move the blocking into the view function rather than have it in this task.
log
.
warning
(
"Failed to get CourseEmail with id
%
s, retry
%
d"
,
email_id
,
current_task
.
request
.
retries
)
raise
delegate_email_batches
.
retry
(
arg
=
[
email_id
,
user_id
],
exc
=
exc
)
# log.warning("Failed to get CourseEmail with id %s, retry %d", email_id, current_task.request.retries)
# raise delegate_email_batches.retry(arg=[email_id, user_id], exc=exc)
log
.
warning
(
"Failed to get CourseEmail with id
%
s"
,
email_id
)
raise
to_option
=
email_obj
.
to_option
course_id
=
email_obj
.
course_id
# TODO: instead of fetching from email object, compare instead to
# confirm that they match, and raise an exception if they don't.
# course_id = email_obj.course_id
try
:
course
=
get_course_by_id
(
course_id
,
depth
=
1
)
...
...
@@ -54,38 +119,32 @@ def delegate_email_batches(email_id, user_id):
log
.
exception
(
"get_course_by_id failed:
%
s"
,
exc
.
args
[
0
])
raise
Exception
(
"get_course_by_id failed: "
+
exc
.
args
[
0
])
course_url
=
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
reverse
(
'course_root'
,
kwargs
=
{
'course_id'
:
course_id
})
)
image_url
=
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
course_image_url
(
course
))
if
to_option
==
SEND_TO_MYSELF
:
recipient_qset
=
User
.
objects
.
filter
(
id
=
user_id
)
elif
to_option
==
SEND_TO_ALL
or
to_option
==
SEND_TO_STAFF
:
staff_grpname
=
_course_staff_group_name
(
course
.
location
)
staff_group
,
_
=
Group
.
objects
.
get_or_create
(
name
=
staff_grpname
)
staff_qset
=
staff_group
.
user_set
.
all
()
instructor_grpname
=
_course_instructor_group_name
(
course
.
location
)
instructor_group
,
_
=
Group
.
objects
.
get_or_create
(
name
=
instructor_grpname
)
instructor_qset
=
instructor_group
.
user_set
.
all
()
recipient_qset
=
staff_qset
|
instructor_qset
global_email_context
=
get_course_email_context
(
course
)
recipient_qset
=
get_recipient_queryset
(
user_id
,
to_option
,
course_id
,
course
.
location
)
total_num_emails
=
recipient_qset
.
count
()
if
to_option
==
SEND_TO_ALL
:
enrollment_qset
=
User
.
objects
.
filter
(
courseenrollment__course_id
=
course_id
,
courseenrollment__is_active
=
True
)
recipient_qset
=
recipient_qset
|
enrollment_qset
recipient_qset
=
recipient_qset
.
distinct
()
else
:
log
.
error
(
"Unexpected bulk email TO_OPTION found:
%
s"
,
to_option
)
raise
Exception
(
"Unexpected bulk email TO_OPTION found: {0}"
.
format
(
to_option
))
# At this point, we have some status that we can report, as to the magnitude of the overall
# task. That is, we know the total. Set that, and our subtasks should work towards that goal.
# Note that we add start_time in here, so that it can be used
# by subtasks to calculate duration_ms values:
progress
=
{
'action_name'
:
action_name
,
'attempted'
:
0
,
'updated'
:
0
,
'total'
:
total_num_emails
,
'duration_ms'
:
int
(
0
),
'start_time'
:
time
(),
}
recipient_qset
=
recipient_qset
.
order_by
(
'pk'
)
total_num_emails
=
recipient_qset
.
count
()
num_queries
=
int
(
math
.
ceil
(
float
(
total_num_emails
)
/
float
(
settings
.
EMAILS_PER_QUERY
)))
last_pk
=
recipient_qset
[
0
]
.
pk
-
1
num_workers
=
0
task_list
=
[]
subtask_id_list
=
[]
for
_
in
range
(
num_queries
):
# Note that if we were doing this for regrading we probably only need 'pk', and not
# either profile__name or email. That's because we'll have to do
# a lot more work in the individual regrade for each user, but using user_id as a key.
# TODO: figure out how to pass these values as an argument, when refactoring this code.
recipient_sublist
=
list
(
recipient_qset
.
order_by
(
'pk'
)
.
filter
(
pk__gt
=
last_pk
)
.
values
(
'profile__name'
,
'email'
,
'pk'
)[:
settings
.
EMAILS_PER_QUERY
])
last_pk
=
recipient_sublist
[
-
1
][
'pk'
]
...
...
@@ -94,20 +153,59 @@ def delegate_email_batches(email_id, user_id):
chunk
=
int
(
math
.
ceil
(
float
(
num_emails_this_query
)
/
float
(
num_tasks_this_query
)))
for
i
in
range
(
num_tasks_this_query
):
to_list
=
recipient_sublist
[
i
*
chunk
:
i
*
chunk
+
chunk
]
course_email
.
delay
(
subtask_id
=
str
(
uuid4
())
subtask_id_list
.
append
(
subtask_id
)
task_list
.
append
(
send_course_email
.
subtask
((
email_id
,
to_list
,
course
.
display_name
,
course_url
,
image_url
,
global_email_context
,
False
)
),
task_id
=
subtask_id
))
num_workers
+=
num_tasks_this_query
return
num_workers
# Before we actually start running the tasks we've defined,
# the InstructorTask needs to be updated with their information.
# So at this point, we need to update the InstructorTask object here,
# not in the return.
entry
.
task_output
=
InstructorTask
.
create_output_for_success
(
progress
)
# TODO: the monitoring may need to track a different value here to know
# that it shouldn't go to the InstructorTask's task's Result for its
# progress. It might be that this is getting saved.
# It might be enough, on the other hand, for the monitoring code to see
# that there are subtasks, and that it can scan these for the overall
# status. (And that it shouldn't clobber the progress that is being
# accumulated.) If there are no subtasks, then work as is current.
entry
.
task_state
=
PROGRESS
# now write out the subtasks information.
subtask_status
=
dict
.
fromkeys
(
subtask_id_list
,
QUEUING
)
entry
.
subtasks
=
json
.
dumps
(
subtask_status
)
# and save the entry immediately, before any subtasks actually start work:
entry
.
save_now
()
# now group the subtasks, and start them running:
task_group
=
group
(
task_list
)
task_group_result
=
task_group
.
apply_async
()
# ISSUE: we can return this result now, but it's not really the result for this task.
# So if we use the task_id to fetch a task result, we won't get this one. But it
# might still work. The caller just has to hold onto this, and access it in some way.
# Ugh. That seems unlikely...
# return task_group_result
# Still want to return progress here, as this is what will be stored in the
# AsyncResult for the parent task as its return value.
# TODO: Humph. But it will be marked as SUCCEEDED. And have
# this return value as it's "result". So be it. The InstructorTask
# will not match, because it will have different info.
return
progress
@task
(
default_retry_delay
=
15
,
max_retries
=
5
)
# pylint: disable=E1102
def
course_email
(
email_id
,
to_list
,
course_title
,
course_url
,
image_url
,
throttle
=
False
):
def
send_course_email
(
email_id
,
to_list
,
global_email_context
,
throttle
=
False
):
"""
Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are
'profile__name', 'email' (address), and 'pk' (in the user table).
...
...
@@ -116,21 +214,23 @@ def course_email(email_id, to_list, course_title, course_url, image_url, throttl
Sends to all addresses contained in to_list. Emails are sent multi-part, in both plain
text and html.
"""
course_title
=
global_email_context
[
'course_title'
]
with
dog_stats_api
.
timer
(
'course_email.single_task.time.overall'
,
tags
=
[
_statsd_tag
(
course_title
)]):
_send_course_email
(
email_id
,
to_list
,
course_title
,
course_url
,
image_url
,
throttle
)
_send_course_email
(
email_id
,
to_list
,
global_email_context
,
throttle
)
def
_send_course_email
(
email_id
,
to_list
,
course_title
,
course_url
,
image_url
,
throttle
):
def
_send_course_email
(
email_id
,
to_list
,
global_email_context
,
throttle
):
"""
Performs the email sending task.
"""
try
:
msg
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
course_email
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
except
CourseEmail
.
DoesNotExist
:
log
.
exception
(
"Could not find email id:{} to send."
.
format
(
email_id
))
raise
# exclude optouts
optouts
=
(
Optout
.
objects
.
filter
(
course_id
=
msg
.
course_id
,
optouts
=
(
Optout
.
objects
.
filter
(
course_id
=
course_email
.
course_id
,
user__in
=
[
i
[
'pk'
]
for
i
in
to_list
])
.
values_list
(
'user__email'
,
flat
=
True
))
...
...
@@ -139,8 +239,8 @@ def _send_course_email(email_id, to_list, course_title, course_url, image_url, t
to_list
=
[
recipient
for
recipient
in
to_list
if
recipient
[
'email'
]
not
in
optouts
]
subject
=
"["
+
course_title
+
"] "
+
msg
.
subject
course_title
=
global_email_context
[
'course_title'
]
subject
=
"["
+
course_title
+
"] "
+
course_email
.
subject
course_title_no_quotes
=
re
.
sub
(
r'"'
,
''
,
course_title
)
course_num
=
msg
.
course_id
.
split
(
'/'
)[
1
]
# course_id = 'org/course_num/run'
# Substitute a '_' anywhere a non-(ascii, period, or dash) character appears.
...
...
@@ -164,13 +264,9 @@ def _send_course_email(email_id, to_list, course_title, course_url, image_url, t
# Define context values to use in all course emails:
email_context
=
{
'name'
:
''
,
'email'
:
''
,
'course_title'
:
course_title
,
'course_url'
:
course_url
,
'course_image_url'
:
image_url
,
'account_settings_url'
:
'https://{}{}'
.
format
(
settings
.
SITE_NAME
,
reverse
(
'dashboard'
)),
'platform_name'
:
settings
.
PLATFORM_NAME
,
'email'
:
''
}
email_context
.
update
(
global_email_context
)
while
to_list
:
# Update context with user-specific values:
...
...
@@ -179,8 +275,8 @@ def _send_course_email(email_id, to_list, course_title, course_url, image_url, t
email_context
[
'name'
]
=
to_list
[
-
1
][
'profile__name'
]
# Construct message content using templates and context:
plaintext_msg
=
course_email_template
.
render_plaintext
(
msg
.
text_message
,
email_context
)
html_msg
=
course_email_template
.
render_htmltext
(
msg
.
html_message
,
email_context
)
plaintext_msg
=
course_email_template
.
render_plaintext
(
course_email
.
text_message
,
email_context
)
html_msg
=
course_email_template
.
render_htmltext
(
course_email
.
html_message
,
email_context
)
# Create email:
email_msg
=
EmailMultiAlternatives
(
...
...
@@ -194,7 +290,7 @@ def _send_course_email(email_id, to_list, course_title, course_url, image_url, t
# Throttle if we tried a few times and got the rate limiter
if
throttle
or
current_task
.
request
.
retries
>
0
:
time
.
sleep
(
0.2
)
sleep
(
0.2
)
try
:
with
dog_stats_api
.
timer
(
'course_email.single_send.time.overall'
,
tags
=
[
_statsd_tag
(
course_title
)]):
...
...
@@ -227,20 +323,18 @@ def _send_course_email(email_id, to_list, course_title, course_url, image_url, t
# Reasoning is that all of these errors may be temporary condition.
log
.
warning
(
'Email with id
%
d not delivered due to temporary error
%
s, retrying send to
%
d recipients'
,
email_id
,
exc
,
len
(
to_list
))
raise
course_email
.
retry
(
raise
send_
course_email
.
retry
(
arg
=
[
email_id
,
to_list
,
course_title
,
course_url
,
image_url
,
global_email_context
,
current_task
.
request
.
retries
>
0
],
exc
=
exc
,
countdown
=
(
2
**
current_task
.
request
.
retries
)
*
15
)
except
:
log
.
exception
(
'Email with id
%
d caused course_email task to fail with uncaught exception. To list:
%
s'
,
log
.
exception
(
'Email with id
%
d caused
send_
course_email task to fail with uncaught exception. To list:
%
s'
,
email_id
,
[
i
[
'email'
]
for
i
in
to_list
])
# Close the connection before we exit
...
...
lms/djangoapps/bulk_email/tests/test_email.py
View file @
d48e90ee
...
...
@@ -13,7 +13,7 @@ from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentF
from
xmodule.modulestore.tests.django_utils
import
ModuleStoreTestCase
from
xmodule.modulestore.tests.factories
import
CourseFactory
from
bulk_email.tasks
import
delegate_email_batches
,
course_email
from
bulk_email.tasks
import
send_
course_email
from
bulk_email.models
import
CourseEmail
,
Optout
from
mock
import
patch
...
...
@@ -289,6 +289,9 @@ class TestEmailSendExceptions(ModuleStoreTestCase):
Test that exceptions are handled correctly.
"""
def
test_no_course_email_obj
(
self
):
# Make sure course_email handles CourseEmail.DoesNotExist exception.
# Make sure send_course_email handles CourseEmail.DoesNotExist exception.
with
self
.
assertRaises
(
KeyError
):
send_course_email
(
101
,
[],
{},
False
)
with
self
.
assertRaises
(
CourseEmail
.
DoesNotExist
):
course_email
(
101
,
[],
"_"
,
"_"
,
"_"
,
False
)
send_course_email
(
101
,
[],
{
'course_title'
:
'Test'
}
,
False
)
lms/djangoapps/bulk_email/tests/test_err_handling.py
View file @
d48e90ee
...
...
@@ -13,7 +13,8 @@ from xmodule.modulestore.tests.factories import CourseFactory
from
student.tests.factories
import
UserFactory
,
AdminFactory
,
CourseEnrollmentFactory
from
bulk_email.models
import
CourseEmail
from
bulk_email.tasks
import
delegate_email_batches
from
bulk_email.tasks
import
perform_delegate_email_batches
from
instructor_task.models
import
InstructorTask
from
mock
import
patch
,
Mock
from
smtplib
import
SMTPDataError
,
SMTPServerDisconnected
,
SMTPConnectError
...
...
@@ -43,7 +44,7 @@ class TestEmailErrors(ModuleStoreTestCase):
patch
.
stopall
()
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.course_email.retry'
)
@patch
(
'bulk_email.tasks.
send_
course_email.retry'
)
def
test_data_err_retry
(
self
,
retry
,
get_conn
):
"""
Test that celery handles transient SMTPDataErrors by retrying.
...
...
@@ -65,7 +66,7 @@ class TestEmailErrors(ModuleStoreTestCase):
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.course_email_result'
)
@patch
(
'bulk_email.tasks.course_email.retry'
)
@patch
(
'bulk_email.tasks.
send_
course_email.retry'
)
def
test_data_err_fail
(
self
,
retry
,
result
,
get_conn
):
"""
Test that celery handles permanent SMTPDataErrors by failing and not retrying.
...
...
@@ -93,7 +94,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self
.
assertEquals
(
sent
,
settings
.
EMAILS_PER_TASK
/
2
)
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.course_email.retry'
)
@patch
(
'bulk_email.tasks.
send_
course_email.retry'
)
def
test_disconn_err_retry
(
self
,
retry
,
get_conn
):
"""
Test that celery handles SMTPServerDisconnected by retrying.
...
...
@@ -113,7 +114,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self
.
assertIsInstance
(
exc
,
SMTPServerDisconnected
)
@patch
(
'bulk_email.tasks.get_connection'
,
autospec
=
True
)
@patch
(
'bulk_email.tasks.course_email.retry'
)
@patch
(
'bulk_email.tasks.
send_
course_email.retry'
)
def
test_conn_err_retry
(
self
,
retry
,
get_conn
):
"""
Test that celery handles SMTPConnectError by retrying.
...
...
@@ -134,7 +135,7 @@ class TestEmailErrors(ModuleStoreTestCase):
self
.
assertIsInstance
(
exc
,
SMTPConnectError
)
@patch
(
'bulk_email.tasks.course_email_result'
)
@patch
(
'bulk_email.tasks.course_email.retry'
)
@patch
(
'bulk_email.tasks.
send_
course_email.retry'
)
@patch
(
'bulk_email.tasks.log'
)
@patch
(
'bulk_email.tasks.get_connection'
,
Mock
(
return_value
=
EmailTestException
))
def
test_general_exception
(
self
,
mock_log
,
retry
,
result
):
...
...
@@ -152,25 +153,29 @@ class TestEmailErrors(ModuleStoreTestCase):
self
.
client
.
post
(
self
.
url
,
test_email
)
((
log_str
,
email_id
,
to_list
),
_
)
=
mock_log
.
exception
.
call_args
self
.
assertTrue
(
mock_log
.
exception
.
called
)
self
.
assertIn
(
'caused course_email task to fail with uncaught exception.'
,
log_str
)
self
.
assertIn
(
'caused
send_
course_email task to fail with uncaught exception.'
,
log_str
)
self
.
assertEqual
(
email_id
,
1
)
self
.
assertEqual
(
to_list
,
[
self
.
instructor
.
email
])
self
.
assertFalse
(
retry
.
called
)
self
.
assertFalse
(
result
.
called
)
@patch
(
'bulk_email.tasks.course_email_result'
)
@patch
(
'bulk_email.tasks.delegate_email_batches.retry'
)
#
@patch('bulk_email.tasks.delegate_email_batches.retry')
@patch
(
'bulk_email.tasks.log'
)
def
test_nonexist_email
(
self
,
mock_log
,
re
try
,
re
sult
):
def
test_nonexist_email
(
self
,
mock_log
,
result
):
"""
Tests retries when the email doesn't exist
"""
delegate_email_batches
.
delay
(
-
1
,
self
.
instructor
.
id
)
((
log_str
,
email_id
,
_num_retries
),
_
)
=
mock_log
.
warning
.
call_args
# create an InstructorTask object to pass through
course_id
=
self
.
course
.
id
entry
=
InstructorTask
.
create
(
course_id
,
"task_type"
,
"task_key"
,
"task_input"
,
self
.
instructor
)
task_input
=
{
"email_id"
:
-
1
}
with
self
.
assertRaises
(
CourseEmail
.
DoesNotExist
):
perform_delegate_email_batches
(
entry
.
id
,
course_id
,
task_input
,
"action_name"
)
((
log_str
,
email_id
),
_
)
=
mock_log
.
warning
.
call_args
self
.
assertTrue
(
mock_log
.
warning
.
called
)
self
.
assertIn
(
'Failed to get CourseEmail with id'
,
log_str
)
self
.
assertEqual
(
email_id
,
-
1
)
self
.
assertTrue
(
retry
.
called
)
self
.
assertFalse
(
result
.
called
)
@patch
(
'bulk_email.tasks.log'
)
...
...
@@ -178,9 +183,13 @@ class TestEmailErrors(ModuleStoreTestCase):
"""
Tests exception when the course in the email doesn't exist
"""
email
=
CourseEmail
(
course_id
=
"I/DONT/EXIST"
)
course_id
=
"I/DONT/EXIST"
email
=
CourseEmail
(
course_id
=
course_id
)
email
.
save
()
delegate_email_batches
.
delay
(
email
.
id
,
self
.
instructor
.
id
)
entry
=
InstructorTask
.
create
(
course_id
,
"task_type"
,
"task_key"
,
"task_input"
,
self
.
instructor
)
task_input
=
{
"email_id"
:
email
.
id
}
with
self
.
assertRaises
(
Exception
):
perform_delegate_email_batches
(
entry
.
id
,
course_id
,
task_input
,
"action_name"
)
((
log_str
,
_
),
_
)
=
mock_log
.
exception
.
call_args
self
.
assertTrue
(
mock_log
.
exception
.
called
)
self
.
assertIn
(
'get_course_by_id failed:'
,
log_str
)
...
...
@@ -192,7 +201,10 @@ class TestEmailErrors(ModuleStoreTestCase):
"""
email
=
CourseEmail
(
course_id
=
self
.
course
.
id
,
to_option
=
"IDONTEXIST"
)
email
.
save
()
delegate_email_batches
.
delay
(
email
.
id
,
self
.
instructor
.
id
)
entry
=
InstructorTask
.
create
(
self
.
course
.
id
,
"task_type"
,
"task_key"
,
"task_input"
,
self
.
instructor
)
task_input
=
{
"email_id"
:
email
.
id
}
with
self
.
assertRaises
(
Exception
):
perform_delegate_email_batches
(
entry
.
id
,
self
.
course
.
id
,
task_input
,
"action_name"
)
((
log_str
,
opt_str
),
_
)
=
mock_log
.
error
.
call_args
self
.
assertTrue
(
mock_log
.
error
.
called
)
self
.
assertIn
(
'Unexpected bulk email TO_OPTION found'
,
log_str
)
...
...
lms/djangoapps/instructor/views/legacy.py
View file @
d48e90ee
...
...
@@ -46,7 +46,8 @@ from instructor_task.api import (get_running_instructor_tasks,
get_instructor_task_history
,
submit_rescore_problem_for_all_students
,
submit_rescore_problem_for_student
,
submit_reset_problem_attempts_for_all_students
)
submit_reset_problem_attempts_for_all_students
,
submit_bulk_course_email
)
from
instructor_task.views
import
get_task_completion_info
from
mitxmako.shortcuts
import
render_to_response
from
psychometrics
import
psychoanalyze
...
...
@@ -722,6 +723,13 @@ def instructor_dashboard(request, course_id):
html_message
=
request
.
POST
.
get
(
"message"
)
text_message
=
html_to_text
(
html_message
)
# TODO: make sure this is committed before submitting it to the task.
# However, it should probably be enough to do the submit below, which
# will commit the transaction for the InstructorTask object. Both should
# therefore be committed. (Still, it might be clearer to do so here as well.)
# Actually, this should probably be moved out, so that all the validation logic
# we might want to add to it can be added. There might also be something
# that would permit validation of the email beforehand.
email
=
CourseEmail
(
course_id
=
course_id
,
sender
=
request
.
user
,
...
...
@@ -730,13 +738,11 @@ def instructor_dashboard(request, course_id):
html_message
=
html_message
,
text_message
=
text_message
)
email
.
save
()
tasks
.
delegate_email_batches
.
delay
(
email
.
id
,
request
.
user
.
id
)
# TODO: make this into a task submission, so that the correct
# InstructorTask object gets created (for monitoring purposes)
submit_bulk_course_email
(
request
,
course_id
,
email
.
id
)
if
email_to_option
==
"all"
:
email_msg
=
'<div class="msg msg-confirm"><p class="copy">Your email was successfully queued for sending. Please note that for large public classes (~10k), it may take 1-2 hours to send all emails.</p></div>'
...
...
lms/djangoapps/instructor_task/api.py
View file @
d48e90ee
...
...
@@ -6,6 +6,7 @@ already been submitted, filtered either by running state or input
arguments.
"""
import
hashlib
from
celery.states
import
READY_STATES
...
...
@@ -14,11 +15,13 @@ from xmodule.modulestore.django import modulestore
from
instructor_task.models
import
InstructorTask
from
instructor_task.tasks
import
(
rescore_problem
,
reset_problem_attempts
,
delete_problem_state
)
delete_problem_state
,
send_bulk_course_email
)
from
instructor_task.api_helper
import
(
check_arguments_for_rescoring
,
encode_problem_and_student_input
,
submit_task
)
from
bulk_email.models
import
CourseEmail
def
get_running_instructor_tasks
(
course_id
):
...
...
@@ -34,14 +37,18 @@ def get_running_instructor_tasks(course_id):
return
instructor_tasks
.
order_by
(
'-id'
)
def
get_instructor_task_history
(
course_id
,
problem_url
,
student
=
None
):
def
get_instructor_task_history
(
course_id
,
problem_url
=
None
,
student
=
None
,
task_type
=
None
):
"""
Returns a query of InstructorTask objects of historical tasks for a given course,
that
match a particular problem and optionally a student
.
that
optionally match a particular problem, a student, and/or a task type
.
"""
_
,
task_key
=
encode_problem_and_student_input
(
problem_url
,
student
)
instructor_tasks
=
InstructorTask
.
objects
.
filter
(
course_id
=
course_id
)
if
problem_url
is
not
None
or
student
is
not
None
:
_
,
task_key
=
encode_problem_and_student_input
(
problem_url
,
student
)
instructor_tasks
=
instructor_tasks
.
filter
(
task_key
=
task_key
)
if
task_type
is
not
None
:
instructor_tasks
=
instructor_tasks
.
filter
(
task_type
=
task_type
)
instructor_tasks
=
InstructorTask
.
objects
.
filter
(
course_id
=
course_id
,
task_key
=
task_key
)
return
instructor_tasks
.
order_by
(
'-id'
)
...
...
@@ -162,3 +169,43 @@ def submit_delete_problem_state_for_all_students(request, course_id, problem_url
task_class
=
delete_problem_state
task_input
,
task_key
=
encode_problem_and_student_input
(
problem_url
)
return
submit_task
(
request
,
task_type
,
task_class
,
course_id
,
task_input
,
task_key
)
def
submit_bulk_course_email
(
request
,
course_id
,
email_id
):
"""
Request to have bulk email sent as a background task.
The specified CourseEmail object will be sent be updated for all students who have enrolled
in a course. Parameters are the `course_id` and the `email_id`, the id of the CourseEmail object.
AlreadyRunningError is raised if the course's students are already being emailed.
TODO: is this the right behavior? Or should multiple emails be allowed in the pipeline at the same time?
This method makes sure the InstructorTask entry is committed.
When called from any view that is wrapped by TransactionMiddleware,
and thus in a "commit-on-success" transaction, an autocommit buried within here
will cause any pending transaction to be committed by a successful
save here. Any future database operations will take place in a
separate transaction.
"""
# check arguments: make sure that the course is defined?
# TODO: what is the right test here?
# modulestore().get_instance(course_id, problem_url)
# This should also make sure that the email exists.
# We can also pull out the To argument here, so that is displayed in
# the InstructorTask status.
email_obj
=
CourseEmail
.
objects
.
get
(
id
=
email_id
)
to_option
=
email_obj
.
to_option
task_type
=
'bulk_course_email'
task_class
=
send_bulk_course_email
# TODO: figure out if we need to encode in a standard way, or if we can get away
# with doing this manually. Shouldn't be hard to make the encode call explicitly,
# and allow no problem_url or student to be defined. Like this:
# task_input, task_key = encode_problem_and_student_input()
task_input
=
{
'email_id'
:
email_id
,
'to_option'
:
to_option
}
task_key_stub
=
"{email_id}_{to_option}"
.
format
(
email_id
=
email_id
,
to_option
=
to_option
)
# create the key value by using MD5 hash:
task_key
=
hashlib
.
md5
(
task_key_stub
)
.
hexdigest
()
return
submit_task
(
request
,
task_type
,
task_class
,
course_id
,
task_input
,
task_key
)
lms/djangoapps/instructor_task/api_helper.py
View file @
d48e90ee
...
...
@@ -58,13 +58,14 @@ def _reserve_task(course_id, task_type, task_key, task_input, requester):
return
InstructorTask
.
create
(
course_id
,
task_type
,
task_key
,
task_input
,
requester
)
def
_get_xmodule_instance_args
(
request
):
def
_get_xmodule_instance_args
(
request
,
task_id
):
"""
Calculate parameters needed for instantiating xmodule instances.
The `request_info` will be passed to a tracking log function, to provide information
about the source of the task request. The `xqueue_callback_url_prefix` is used to
permit old-style xqueue callbacks directly to the appropriate module in the LMS.
The `task_id` is also passed to the tracking log function.
"""
request_info
=
{
'username'
:
request
.
user
.
username
,
'ip'
:
request
.
META
[
'REMOTE_ADDR'
],
...
...
@@ -74,6 +75,7 @@ def _get_xmodule_instance_args(request):
xmodule_instance_args
=
{
'xqueue_callback_url_prefix'
:
get_xqueue_callback_url_prefix
(
request
),
'request_info'
:
request_info
,
'task_id'
:
task_id
,
}
return
xmodule_instance_args
...
...
@@ -214,7 +216,7 @@ def check_arguments_for_rescoring(course_id, problem_url):
def
encode_problem_and_student_input
(
problem_url
,
student
=
None
):
"""
Encode problem_url and optional student into task_key and task_input values.
Encode
optional
problem_url and optional student into task_key and task_input values.
`problem_url` is full URL of the problem.
`student` is the user object of the student
...
...
@@ -257,7 +259,7 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key)
# submit task:
task_id
=
instructor_task
.
task_id
task_args
=
[
instructor_task
.
id
,
_get_xmodule_instance_args
(
request
)]
task_args
=
[
instructor_task
.
id
,
_get_xmodule_instance_args
(
request
,
task_id
)]
task_class
.
apply_async
(
task_args
,
task_id
=
task_id
)
return
instructor_task
return
instructor_task
\ No newline at end of file
lms/djangoapps/instructor_task/migrations/0002_add_subtask_field.py
0 → 100644
View file @
d48e90ee
# -*- coding: utf-8 -*-
import
datetime
from
south.db
import
db
from
south.v2
import
SchemaMigration
from
django.db
import
models
class
Migration
(
SchemaMigration
):
def
forwards
(
self
,
orm
):
# Adding field 'InstructorTask.subtasks'
db
.
add_column
(
'instructor_task_instructortask'
,
'subtasks'
,
self
.
gf
(
'django.db.models.fields.TextField'
)(
default
=
''
,
blank
=
True
),
keep_default
=
False
)
def
backwards
(
self
,
orm
):
# Deleting field 'InstructorTask.subtasks'
db
.
delete_column
(
'instructor_task_instructortask'
,
'subtasks'
)
models
=
{
'auth.group'
:
{
'Meta'
:
{
'object_name'
:
'Group'
},
'id'
:
(
'django.db.models.fields.AutoField'
,
[],
{
'primary_key'
:
'True'
}),
'name'
:
(
'django.db.models.fields.CharField'
,
[],
{
'unique'
:
'True'
,
'max_length'
:
'80'
}),
'permissions'
:
(
'django.db.models.fields.related.ManyToManyField'
,
[],
{
'to'
:
"orm['auth.Permission']"
,
'symmetrical'
:
'False'
,
'blank'
:
'True'
})
},
'auth.permission'
:
{
'Meta'
:
{
'ordering'
:
"('content_type__app_label', 'content_type__model', 'codename')"
,
'unique_together'
:
"(('content_type', 'codename'),)"
,
'object_name'
:
'Permission'
},
'codename'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'100'
}),
'content_type'
:
(
'django.db.models.fields.related.ForeignKey'
,
[],
{
'to'
:
"orm['contenttypes.ContentType']"
}),
'id'
:
(
'django.db.models.fields.AutoField'
,
[],
{
'primary_key'
:
'True'
}),
'name'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'50'
})
},
'auth.user'
:
{
'Meta'
:
{
'object_name'
:
'User'
},
'date_joined'
:
(
'django.db.models.fields.DateTimeField'
,
[],
{
'default'
:
'datetime.datetime.now'
}),
'email'
:
(
'django.db.models.fields.EmailField'
,
[],
{
'max_length'
:
'75'
,
'blank'
:
'True'
}),
'first_name'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'30'
,
'blank'
:
'True'
}),
'groups'
:
(
'django.db.models.fields.related.ManyToManyField'
,
[],
{
'to'
:
"orm['auth.Group']"
,
'symmetrical'
:
'False'
,
'blank'
:
'True'
}),
'id'
:
(
'django.db.models.fields.AutoField'
,
[],
{
'primary_key'
:
'True'
}),
'is_active'
:
(
'django.db.models.fields.BooleanField'
,
[],
{
'default'
:
'True'
}),
'is_staff'
:
(
'django.db.models.fields.BooleanField'
,
[],
{
'default'
:
'False'
}),
'is_superuser'
:
(
'django.db.models.fields.BooleanField'
,
[],
{
'default'
:
'False'
}),
'last_login'
:
(
'django.db.models.fields.DateTimeField'
,
[],
{
'default'
:
'datetime.datetime.now'
}),
'last_name'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'30'
,
'blank'
:
'True'
}),
'password'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'128'
}),
'user_permissions'
:
(
'django.db.models.fields.related.ManyToManyField'
,
[],
{
'to'
:
"orm['auth.Permission']"
,
'symmetrical'
:
'False'
,
'blank'
:
'True'
}),
'username'
:
(
'django.db.models.fields.CharField'
,
[],
{
'unique'
:
'True'
,
'max_length'
:
'30'
})
},
'contenttypes.contenttype'
:
{
'Meta'
:
{
'ordering'
:
"('name',)"
,
'unique_together'
:
"(('app_label', 'model'),)"
,
'object_name'
:
'ContentType'
,
'db_table'
:
"'django_content_type'"
},
'app_label'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'100'
}),
'id'
:
(
'django.db.models.fields.AutoField'
,
[],
{
'primary_key'
:
'True'
}),
'model'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'100'
}),
'name'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'100'
})
},
'instructor_task.instructortask'
:
{
'Meta'
:
{
'object_name'
:
'InstructorTask'
},
'course_id'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'255'
,
'db_index'
:
'True'
}),
'created'
:
(
'django.db.models.fields.DateTimeField'
,
[],
{
'auto_now_add'
:
'True'
,
'null'
:
'True'
,
'blank'
:
'True'
}),
'id'
:
(
'django.db.models.fields.AutoField'
,
[],
{
'primary_key'
:
'True'
}),
'requester'
:
(
'django.db.models.fields.related.ForeignKey'
,
[],
{
'to'
:
"orm['auth.User']"
}),
'subtasks'
:
(
'django.db.models.fields.TextField'
,
[],
{
'blank'
:
'True'
}),
'task_id'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'255'
,
'db_index'
:
'True'
}),
'task_input'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'255'
}),
'task_key'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'255'
,
'db_index'
:
'True'
}),
'task_output'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'1024'
,
'null'
:
'True'
}),
'task_state'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'50'
,
'null'
:
'True'
,
'db_index'
:
'True'
}),
'task_type'
:
(
'django.db.models.fields.CharField'
,
[],
{
'max_length'
:
'50'
,
'db_index'
:
'True'
}),
'updated'
:
(
'django.db.models.fields.DateTimeField'
,
[],
{
'auto_now'
:
'True'
,
'blank'
:
'True'
})
}
}
complete_apps
=
[
'instructor_task'
]
\ No newline at end of file
lms/djangoapps/instructor_task/models.py
View file @
d48e90ee
...
...
@@ -56,6 +56,7 @@ class InstructorTask(models.Model):
requester
=
models
.
ForeignKey
(
User
,
db_index
=
True
)
created
=
models
.
DateTimeField
(
auto_now_add
=
True
,
null
=
True
)
updated
=
models
.
DateTimeField
(
auto_now
=
True
)
subtasks
=
models
.
TextField
(
blank
=
True
)
# JSON dictionary
def
__repr__
(
self
):
return
'InstructorTask<
%
r>'
%
({
...
...
lms/djangoapps/instructor_task/tasks.py
View file @
d48e90ee
...
...
@@ -20,10 +20,15 @@ of the query for traversing StudentModule objects.
"""
from
celery
import
task
from
instructor_task.tasks_helper
import
(
update_problem_module_state
,
from
functools
import
partial
from
instructor_task.tasks_helper
import
(
run_main_task
,
perform_module_state_update
,
# perform_delegate_email_batches,
rescore_problem_module_state
,
reset_attempts_module_state
,
delete_problem_module_state
)
delete_problem_module_state
,
)
from
bulk_email.tasks
import
perform_delegate_email_batches
@task
...
...
@@ -46,11 +51,10 @@ def rescore_problem(entry_id, xmodule_instance_args):
to instantiate an xmodule instance.
"""
action_name
=
'rescored'
update_fcn
=
rescore_problem_module_state
update_fcn
=
partial
(
rescore_problem_module_state
,
xmodule_instance_args
)
filter_fcn
=
lambda
(
modules_to_update
):
modules_to_update
.
filter
(
state__contains
=
'"done": true'
)
return
update_problem_module_state
(
entry_id
,
update_fcn
,
action_name
,
filter_fcn
=
filter_fcn
,
xmodule_instance_args
=
xmodule_instance_args
)
visit_fcn
=
partial
(
perform_module_state_update
,
update_fcn
,
filter_fcn
)
return
run_main_task
(
entry_id
,
visit_fcn
,
action_name
)
@task
...
...
@@ -69,10 +73,9 @@ def reset_problem_attempts(entry_id, xmodule_instance_args):
to instantiate an xmodule instance.
"""
action_name
=
'reset'
update_fcn
=
reset_attempts_module_state
return
update_problem_module_state
(
entry_id
,
update_fcn
,
action_name
,
filter_fcn
=
None
,
xmodule_instance_args
=
xmodule_instance_args
)
update_fcn
=
partial
(
reset_attempts_module_state
,
xmodule_instance_args
)
visit_fcn
=
partial
(
perform_module_state_update
,
update_fcn
,
None
)
return
run_main_task
(
entry_id
,
visit_fcn
,
action_name
)
@task
...
...
@@ -91,7 +94,24 @@ def delete_problem_state(entry_id, xmodule_instance_args):
to instantiate an xmodule instance.
"""
action_name
=
'deleted'
update_fcn
=
delete_problem_module_state
return
update_problem_module_state
(
entry_id
,
update_fcn
,
action_name
,
filter_fcn
=
None
,
xmodule_instance_args
=
xmodule_instance_args
)
update_fcn
=
partial
(
delete_problem_module_state
,
xmodule_instance_args
)
visit_fcn
=
partial
(
perform_module_state_update
,
update_fcn
,
None
)
return
run_main_task
(
entry_id
,
visit_fcn
,
action_name
)
@task
def
send_bulk_course_email
(
entry_id
,
xmodule_instance_args
):
"""Sends emails to in a course.
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
The entry contains the `course_id` that identifies the course, as well as the
`task_input`, which contains task-specific input.
The task_input should be a dict with no entries.
`xmodule_instance_args` provides information needed by _get_module_instance_for_task()
to instantiate an xmodule instance.
"""
action_name
=
'emailed'
visit_fcn
=
perform_delegate_email_batches
return
run_main_task
(
entry_id
,
visit_fcn
,
action_name
,
spawns_subtasks
=
True
)
lms/djangoapps/instructor_task/tasks_helper.py
View file @
d48e90ee
...
...
@@ -3,7 +3,6 @@ This file contains tasks that are designed to perform background operations on t
running state of a course.
"""
import
json
from
time
import
time
from
sys
import
exc_info
...
...
@@ -11,11 +10,10 @@ from traceback import format_exc
from
celery
import
current_task
from
celery.utils.log
import
get_task_logger
from
celery.signals
import
worker_process_init
from
celery.states
import
SUCCESS
,
FAILURE
from
django.contrib.auth.models
import
User
from
django.db
import
transaction
from
django.db
import
transaction
,
reset_queries
from
dogapi
import
dog_stats_api
from
xmodule.modulestore.django
import
modulestore
...
...
@@ -49,8 +47,8 @@ def _get_current_task():
return
current_task
def
_perform_module_state_update
(
course_id
,
module_state_key
,
student_identifier
,
update_fcn
,
action_name
,
filter_fcn
,
xmodule_instance_args
):
# def perform_module_state_update(course_id, module_state_key, student_identifier, update_fcn, action_name, filter_fcn):
def
perform_module_state_update
(
update_fcn
,
filter_fcn
,
entry_id
,
course_id
,
task_input
,
action_name
):
"""
Performs generic update by visiting StudentModule instances with the update_fcn provided.
...
...
@@ -85,6 +83,9 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
# get start time for task:
start_time
=
time
()
module_state_key
=
task_input
.
get
(
'problem_url'
)
student_identifier
=
task_input
.
get
(
'student'
)
# find the problem descriptor:
module_descriptor
=
modulestore
()
.
get_instance
(
course_id
,
module_state_key
)
...
...
@@ -92,8 +93,8 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
modules_to_update
=
StudentModule
.
objects
.
filter
(
course_id
=
course_id
,
module_state_key
=
module_state_key
)
# give the option of
rescor
ing an individual student. If not specified,
# then
rescor
es all students who have responded to a problem so far
# give the option of
updat
ing an individual student. If not specified,
# then
updat
es all students who have responded to a problem so far
student
=
None
if
student_identifier
is
not
None
:
# if an identifier is supplied, then look for the student,
...
...
@@ -132,7 +133,7 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
# There is no try here: if there's an error, we let it throw, and the task will
# be marked as FAILED, with a stack trace.
with
dog_stats_api
.
timer
(
'instructor_tasks.module.time.step'
,
tags
=
[
'action:{name}'
.
format
(
name
=
action_name
)]):
if
update_fcn
(
module_descriptor
,
module_to_update
,
xmodule_instance_args
):
if
update_fcn
(
module_descriptor
,
module_to_update
):
# If the update_fcn returns true, then it performed some kind of work.
# Logging of failures is left to the update_fcn itself.
num_updated
+=
1
...
...
@@ -144,16 +145,20 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
return
task_progress
def
update_problem_module_state
(
entry_id
,
update_fcn
,
action_name
,
filter_fcn
,
xmodule_instance_args
):
def
run_main_task
(
entry_id
,
task_fcn
,
action_name
,
spawns_subtasks
=
False
):
"""
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
TODO: UPDATE THIS DOCSTRING
(IT's not just visiting StudentModule instances....)
Performs generic update by visiting StudentModule instances with the update_fcn provided.
The `entry_id` is the primary key for the InstructorTask entry representing the task. This function
updates the entry on success and failure of the
_
perform_module_state_update function it
updates the entry on success and failure of the perform_module_state_update function it
wraps. It is setting the entry's value for task_state based on what Celery would set it to once
the task returns to Celery: FAILURE if an exception is encountered, and SUCCESS if it returns normally.
Other arguments are pass-throughs to
_
perform_module_state_update, and documented there.
Other arguments are pass-throughs to perform_module_state_update, and documented there.
If no exceptions are raised, a dict containing the task's result is returned, with the following keys:
...
...
@@ -187,15 +192,15 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
task_id
=
entry
.
task_id
course_id
=
entry
.
course_id
task_input
=
json
.
loads
(
entry
.
task_input
)
module_state_key
=
task_input
.
get
(
'problem_url'
)
student_ident
=
task_input
[
'student'
]
if
'student'
in
task_input
else
None
fmt
=
'Starting to update problem modules as task "{task_id}": course "{course_id}" problem "{state_key}": nothing {action} yet'
TASK_LOG
.
info
(
fmt
.
format
(
task_id
=
task_id
,
course_id
=
course_id
,
state_key
=
module_state_key
,
action
=
action_name
))
# construct log message:
# TODO: generalize this beyond just problem and student, so it includes email_id and to_option.
# Can we just loop over all keys and output them all? Just print the task_input dict itself?
module_state_key
=
task_input
.
get
(
'problem_url'
)
fmt
=
'task "{task_id}": course "{course_id}" problem "{state_key}"'
task_info_string
=
fmt
.
format
(
task_id
=
task_id
,
course_id
=
course_id
,
state_key
=
module_state_key
)
# add task_id to xmodule_instance_args, so that it can be output with tracking info:
if
xmodule_instance_args
is
not
None
:
xmodule_instance_args
[
'task_id'
]
=
task_id
TASK_LOG
.
info
(
'Starting update (nothing
%
s yet):
%
s'
,
action_name
,
task_info_string
)
# Now that we have an entry we can try to catch failures:
task_progress
=
None
...
...
@@ -204,21 +209,47 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
# that is running.
request_task_id
=
_get_current_task
()
.
request
.
id
if
task_id
!=
request_task_id
:
fmt
=
'Requested task
"{task_id}" did not match actual task "{actual_id}"
'
message
=
fmt
.
format
(
task_id
=
task_id
,
course_id
=
course_id
,
state_key
=
module_state_key
,
actual_id
=
request_task_id
)
fmt
=
'Requested task
did not match actual task "{actual_id}": {task_info}
'
message
=
fmt
.
format
(
actual_id
=
request_task_id
,
task_info
=
task_info_string
)
TASK_LOG
.
error
(
message
)
raise
UpdateProblemModuleStateError
(
message
)
# Now do the work:
with
dog_stats_api
.
timer
(
'instructor_tasks.module.time.overall'
,
tags
=
[
'action:{name}'
.
format
(
name
=
action_name
)]):
task_progress
=
_perform_module_state_update
(
course_id
,
module_state_key
,
student_ident
,
update_fcn
,
action_name
,
filter_fcn
,
xmodule_instance_args
)
with
dog_stats_api
.
timer
(
'instructor_tasks.time.overall'
,
tags
=
[
'action:{name}'
.
format
(
name
=
action_name
)]):
# REMOVE: task_progress = visit_fcn(course_id, module_state_key, student_ident, update_fcn, action_name, filter_fcn)
task_progress
=
task_fcn
(
entry_id
,
course_id
,
task_input
,
action_name
)
# If we get here, we assume we've succeeded, so update the InstructorTask entry in anticipation.
# But we do this within the try, in case creating the task_output causes an exception to be
# raised.
entry
.
task_output
=
InstructorTask
.
create_output_for_success
(
task_progress
)
entry
.
task_state
=
SUCCESS
entry
.
save_now
()
# TODO: This is not the case if there are outstanding subtasks that were spawned asynchronously
# as part of the main task. There is probably some way to represent this more elegantly, but for
# now, we will just use an explicit flag.
if
spawns_subtasks
:
# we change the rules here. If it's a task with subtasks running, then we
# explicitly set its state, with the idea that progress will be updated
# directly into the InstructorTask object, rather than into the parent task's
# AsyncResult object. This is because we have to write to the InstructorTask
# object anyway, so we may as well put status in there. And because multiple
# clients are writing to it, we need the locking that a DB can provide, rather
# than the speed that the AsyncResult provides.
# So we need to change the logic of the monitor to pull status from the
# InstructorTask directly when the state is PROGRESS, and to pull from the
# AsyncResult when it's running but not marked as in PROGRESS state. (I.e.
# if it's started.) Admittedly, it's misnamed, but it should work.
# But we've already started the subtasks by the time we get here,
# so these values should already have been written. Too late.
# entry.task_output = InstructorTask.create_output_for_success(task_progress)
# entry.task_state = PROGRESS
# Weird. Note that by exiting this function successfully, will
# result in the AsyncResult for this task as being marked as SUCCESS.
# Below, we were just marking the entry to match. But it shouldn't
# match, if it's not really done.
pass
else
:
entry
.
task_output
=
InstructorTask
.
create_output_for_success
(
task_progress
)
entry
.
task_state
=
SUCCESS
entry
.
save_now
()
except
Exception
:
# try to write out the failure to the entry before failing
...
...
@@ -230,9 +261,11 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
entry
.
save_now
()
raise
# Release any queries that the connection has been hanging onto:
reset_queries
()
# log and exit, returning task_progress info as task result:
fmt
=
'Finishing task "{task_id}": course "{course_id}" problem "{state_key}": final: {progress}'
TASK_LOG
.
info
(
fmt
.
format
(
task_id
=
task_id
,
course_id
=
course_id
,
state_key
=
module_state_key
,
progress
=
task_progress
))
TASK_LOG
.
info
(
'Finishing
%
s: final:
%
s'
,
task_info_string
,
task_progress
)
return
task_progress
...
...
@@ -241,6 +274,29 @@ def _get_task_id_from_xmodule_args(xmodule_instance_args):
return
xmodule_instance_args
.
get
(
'task_id'
,
UNKNOWN_TASK_ID
)
if
xmodule_instance_args
is
not
None
else
UNKNOWN_TASK_ID
def
_get_xqueue_callback_url_prefix
(
xmodule_instance_args
):
"""
"""
return
xmodule_instance_args
.
get
(
'xqueue_callback_url_prefix'
,
''
)
if
xmodule_instance_args
is
not
None
else
''
def
_get_track_function_for_task
(
student
,
xmodule_instance_args
=
None
,
source_page
=
'x_module_task'
):
"""
Make a tracking function that logs what happened.
For insertion into ModuleSystem, and used by CapaModule, which will
provide the event_type (as string) and event (as dict) as arguments.
The request_info and task_info (and page) are provided here.
"""
# get request-related tracking information from args passthrough, and supplement with task-specific
# information:
request_info
=
xmodule_instance_args
.
get
(
'request_info'
,
{})
if
xmodule_instance_args
is
not
None
else
{}
task_info
=
{
'student'
:
student
.
username
,
'task_id'
:
_get_task_id_from_xmodule_args
(
xmodule_instance_args
)}
return
lambda
event_type
,
event
:
task_track
(
request_info
,
task_info
,
event_type
,
event
,
page
=
source_page
)
def
_get_module_instance_for_task
(
course_id
,
student
,
module_descriptor
,
xmodule_instance_args
=
None
,
grade_bucket_type
=
None
):
"""
...
...
@@ -277,7 +333,7 @@ def _get_module_instance_for_task(course_id, student, module_descriptor, xmodule
@transaction.autocommit
def
rescore_problem_module_state
(
module_descriptor
,
student_module
,
xmodule_instance_args
=
Non
e
):
def
rescore_problem_module_state
(
xmodule_instance_args
,
module_descriptor
,
student_modul
e
):
'''
Takes an XModule descriptor and a corresponding StudentModule object, and
performs rescoring on the student's problem submission.
...
...
@@ -327,7 +383,7 @@ def rescore_problem_module_state(module_descriptor, student_module, xmodule_inst
@transaction.autocommit
def
reset_attempts_module_state
(
_module_descriptor
,
student_module
,
xmodule_instance_args
=
Non
e
):
def
reset_attempts_module_state
(
xmodule_instance_args
,
_module_descriptor
,
student_modul
e
):
"""
Resets problem attempts to zero for specified `student_module`.
...
...
@@ -343,17 +399,16 @@ def reset_attempts_module_state(_module_descriptor, student_module, xmodule_inst
student_module
.
save
()
# get request-related tracking information from args passthrough,
# and supplement with task-specific information:
request_info
=
xmodule_instance_args
.
get
(
'request_info'
,
{})
if
xmodule_instance_args
is
not
None
else
{}
task_info
=
{
"student"
:
student_module
.
student
.
username
,
"task_id"
:
_get_task_id_from_xmodule_args
(
xmodule_instance_args
)}
track_function
=
_get_track_function_for_task
(
student_module
.
student
,
xmodule_instance_args
)
event_info
=
{
"old_attempts"
:
old_number_of_attempts
,
"new_attempts"
:
0
}
t
ask_track
(
request_info
,
task_info
,
'problem_reset_attempts'
,
event_info
,
page
=
'x_module_task'
)
t
rack_function
(
'problem_reset_attempts'
,
event_info
)
# consider the reset to be successful, even if no update was performed. (It's just "optimized".)
return
True
@transaction.autocommit
def
delete_problem_module_state
(
_module_descriptor
,
student_module
,
xmodule_instance_args
=
Non
e
):
def
delete_problem_module_state
(
xmodule_instance_args
,
_module_descriptor
,
student_modul
e
):
"""
Delete the StudentModule entry.
...
...
@@ -362,7 +417,47 @@ def delete_problem_module_state(_module_descriptor, student_module, xmodule_inst
student_module
.
delete
()
# get request-related tracking information from args passthrough,
# and supplement with task-specific information:
request_info
=
xmodule_instance_args
.
get
(
'request_info'
,
{})
if
xmodule_instance_args
is
not
None
else
{}
task_info
=
{
"student"
:
student_module
.
student
.
username
,
"task_id"
:
_get_task_id_from_xmodule_args
(
xmodule_instance_args
)}
task_track
(
request_info
,
task_info
,
'problem_delete_state'
,
{},
page
=
'x_module_task'
)
track_function
=
_get_track_function_for_task
(
student_module
.
student
,
xmodule_instance_args
)
track_function
(
'problem_delete_state'
,
{})
return
True
#def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
# """
# """
# # Get start time for task:
# start_time = time()
#
# # perform the main loop
# num_updated = 0
# num_attempted = 0
# num_total = enrolled_students.count()
#
# def get_task_progress():
# """Return a dict containing info about current task"""
# current_time = time()
# progress = {'action_name': action_name,
# 'attempted': num_attempted,
# 'updated': num_updated,
# 'total': num_total,
# 'duration_ms': int((current_time - start_time) * 1000),
# }
# return progress
#
# task_progress = get_task_progress()
# _get_current_task().update_state(state=PROGRESS, meta=task_progress)
# for enrolled_student in enrolled_students:
# num_attempted += 1
# # There is no try here: if there's an error, we let it throw, and the task will
# # be marked as FAILED, with a stack trace.
# with dog_stats_api.timer('instructor_tasks.student.time.step', tags=['action:{name}'.format(name=action_name)]):
# if update_fcn(course_descriptor, enrolled_student):
# # If the update_fcn returns true, then it performed some kind of work.
# # Logging of failures is left to the update_fcn itself.
# num_updated += 1
#
# # update task status:
# task_progress = get_task_progress()
# _get_current_task().update_state(state=PROGRESS, meta=task_progress)
#
# return task_progress
lms/djangoapps/instructor_task/tests/test_tasks.py
View file @
d48e90ee
...
...
@@ -23,7 +23,7 @@ from instructor_task.models import InstructorTask
from
instructor_task.tests.test_base
import
InstructorTaskModuleTestCase
from
instructor_task.tests.factories
import
InstructorTaskFactory
from
instructor_task.tasks
import
rescore_problem
,
reset_problem_attempts
,
delete_problem_state
from
instructor_task.tasks_helper
import
UpdateProblemModuleStateError
,
update_problem_module_state
from
instructor_task.tasks_helper
import
UpdateProblemModuleStateError
#
, update_problem_module_state
PROBLEM_URL_NAME
=
"test_urlname"
...
...
@@ -313,17 +313,17 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
def
test_delete_with_short_error_msg
(
self
):
self
.
_test_run_with_short_error_msg
(
delete_problem_state
)
def
test_successful_result_too_long
(
self
):
def
te
DONT
st_successful_result_too_long
(
self
):
# while we don't expect the existing tasks to generate output that is too
# long, we can test the framework will handle such an occurrence.
task_entry
=
self
.
_create_input_entry
()
self
.
define_option_problem
(
PROBLEM_URL_NAME
)
action_name
=
'x'
*
1000
update_fcn
=
lambda
(
_module_descriptor
,
_student_module
,
_xmodule_instance_args
):
True
task_function
=
(
lambda
entry_id
,
xmodule_instance_args
:
update_problem_module_state
(
entry_id
,
update_fcn
,
action_name
,
filter_fcn
=
None
,
xmodule_instance_args
=
None
))
#
task_function = (lambda entry_id, xmodule_instance_args:
#
update_problem_module_state(entry_id,
#
update_fcn, action_name, filter_fcn=None,
#
xmodule_instance_args=None))
with
self
.
assertRaises
(
ValueError
):
self
.
_run_task_with_mock_celery
(
task_function
,
task_entry
.
id
,
task_entry
.
task_id
)
...
...
lms/djangoapps/instructor_task/tests/test_views.py
View file @
d48e90ee
...
...
@@ -262,4 +262,4 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
instructor_task
.
task_input
=
"{ bad"
succeeded
,
message
=
get_task_completion_info
(
instructor_task
)
self
.
assertFalse
(
succeeded
)
self
.
assertEquals
(
message
,
"
Problem rescored for 2 of 3 students
(out of 5)"
)
self
.
assertEquals
(
message
,
"
Status: rescored 2 of 3
(out of 5)"
)
lms/djangoapps/instructor_task/views.py
View file @
d48e90ee
...
...
@@ -40,7 +40,7 @@ def instructor_task_status(request):
Status is returned as a JSON-serialized dict, wrapped as the content of a HTTPResponse.
The task_id can be specified to this view in one of t
hree
ways:
The task_id can be specified to this view in one of t
wo
ways:
* by making a request containing 'task_id' as a parameter with a single value
Returns a dict containing status information for the specified task_id
...
...
@@ -133,6 +133,8 @@ def get_task_completion_info(instructor_task):
num_total
=
task_output
[
'total'
]
student
=
None
problem_url
=
None
email_id
=
None
try
:
task_input
=
json
.
loads
(
instructor_task
.
task_input
)
except
ValueError
:
...
...
@@ -140,11 +142,14 @@ def get_task_completion_info(instructor_task):
log
.
warning
(
fmt
.
format
(
instructor_task
.
task_id
,
instructor_task
.
task_input
))
else
:
student
=
task_input
.
get
(
'student'
)
problem_url
=
task_input
.
get
(
'problem_url'
)
email_id
=
task_input
.
get
(
'email_id'
)
if
instructor_task
.
task_state
==
PROGRESS
:
# special message for providing progress updates:
msg_format
=
"Progress: {action} {updated} of {attempted} so far"
elif
student
is
not
None
:
elif
student
is
not
None
and
problem_url
is
not
None
:
# this reports on actions on problems for a particular student:
if
num_attempted
==
0
:
msg_format
=
"Unable to find submission to be {action} for student '{student}'"
elif
num_updated
==
0
:
...
...
@@ -152,15 +157,31 @@ def get_task_completion_info(instructor_task):
else
:
succeeded
=
True
msg_format
=
"Problem successfully {action} for student '{student}'"
elif
num_attempted
==
0
:
msg_format
=
"Unable to find any students with submissions to be {action}"
elif
num_updated
==
0
:
msg_format
=
"Problem failed to be {action} for any of {attempted} students"
elif
num_updated
==
num_attempted
:
succeeded
=
True
msg_format
=
"Problem successfully {action} for {attempted} students"
else
:
# num_updated < num_attempted
msg_format
=
"Problem {action} for {updated} of {attempted} students"
elif
student
is
None
and
problem_url
is
not
None
:
# this reports on actions on problems for all students:
if
num_attempted
==
0
:
msg_format
=
"Unable to find any students with submissions to be {action}"
elif
num_updated
==
0
:
msg_format
=
"Problem failed to be {action} for any of {attempted} students"
elif
num_updated
==
num_attempted
:
succeeded
=
True
msg_format
=
"Problem successfully {action} for {attempted} students"
else
:
# num_updated < num_attempted
msg_format
=
"Problem {action} for {updated} of {attempted} students"
elif
email_id
is
not
None
:
# this reports on actions on bulk emails
if
num_attempted
==
0
:
msg_format
=
"Unable to find any recipients to be {action}"
elif
num_updated
==
0
:
msg_format
=
"Message failed to be {action} for any of {attempted} recipients "
elif
num_updated
==
num_attempted
:
succeeded
=
True
msg_format
=
"Message successfully {action} for {attempted} recipients"
else
:
# num_updated < num_attempted
msg_format
=
"Message {action} for {updated} of {attempted} recipients"
else
:
# provide a default:
msg_format
=
"Status: {action} {updated} of {attempted}"
if
student
is
None
and
num_attempted
!=
num_total
:
msg_format
+=
" (out of {total})"
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment