Commit 37158bab by Daniel Friedman

Create standard way of updating a task's progress

TNL-566
parent e9292fb5
...@@ -148,6 +148,49 @@ def _get_current_task(): ...@@ -148,6 +148,49 @@ def _get_current_task():
return current_task return current_task
class TaskProgress(object):
"""
Encapsulates the current task's progress by keeping track of
'attempted', 'succeeded', 'skipped', 'failed', 'total',
'action_name', and 'duration_ms' values.
"""
def __init__(self, action_name, total, start_time):
self.action_name = action_name
self.total = total
self.start_time = start_time
self.attempted = 0
self.succeeded = 0
self.skipped = 0
self.failed = 0
def update_task_state(self, extra_meta=None):
"""
Update the current celery task's state to the progress state
specified by the current object. Returns the progress
dictionary for use by `run_main_task` and
`BaseInstructorTask.on_success`.
Arguments:
extra_meta (dict): Extra metadata to pass to `update_state`
Returns:
dict: The current task's progress dict
"""
progress_dict = {
'action_name': self.action_name,
'attempted': self.attempted,
'succeeded': self.succeeded,
'skipped': self.skipped,
'failed': self.failed,
'total': self.total,
'duration_ms': int((time() - self.start_time) * 1000),
}
if extra_meta is not None:
progress_dict.update(extra_meta)
_get_current_task().update_state(state=PROGRESS, meta=progress_dict)
return progress_dict
def run_main_task(entry_id, task_fcn, action_name): def run_main_task(entry_id, task_fcn, action_name):
""" """
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask. Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
...@@ -243,9 +286,7 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta ...@@ -243,9 +286,7 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
result object. result object.
""" """
# get start time for task:
start_time = time() start_time = time()
usage_key = course_id.make_usage_key_from_deprecated_string(task_input.get('problem_url')) usage_key = course_id.make_usage_key_from_deprecated_string(task_input.get('problem_url'))
student_identifier = task_input.get('student') student_identifier = task_input.get('student')
...@@ -272,30 +313,11 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta ...@@ -272,30 +313,11 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
if filter_fcn is not None: if filter_fcn is not None:
modules_to_update = filter_fcn(modules_to_update) modules_to_update = filter_fcn(modules_to_update)
# perform the main loop task_progress = TaskProgress(action_name, modules_to_update.count(), start_time)
num_attempted = 0 task_progress.update_task_state()
num_succeeded = 0
num_skipped = 0
num_failed = 0
num_total = modules_to_update.count()
def get_task_progress():
"""Return a dict containing info about current task"""
current_time = time()
progress = {'action_name': action_name,
'attempted': num_attempted,
'succeeded': num_succeeded,
'skipped': num_skipped,
'failed': num_failed,
'total': num_total,
'duration_ms': int((current_time - start_time) * 1000),
}
return progress
task_progress = get_task_progress()
_get_current_task().update_state(state=PROGRESS, meta=task_progress)
for module_to_update in modules_to_update: for module_to_update in modules_to_update:
num_attempted += 1 task_progress.attempted += 1
# There is no try here: if there's an error, we let it throw, and the task will # There is no try here: if there's an error, we let it throw, and the task will
# be marked as FAILED, with a stack trace. # be marked as FAILED, with a stack trace.
with dog_stats_api.timer('instructor_tasks.module.time.step', tags=[u'action:{name}'.format(name=action_name)]): with dog_stats_api.timer('instructor_tasks.module.time.step', tags=[u'action:{name}'.format(name=action_name)]):
...@@ -303,19 +325,15 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta ...@@ -303,19 +325,15 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
if update_status == UPDATE_STATUS_SUCCEEDED: if update_status == UPDATE_STATUS_SUCCEEDED:
# If the update_fcn returns true, then it performed some kind of work. # If the update_fcn returns true, then it performed some kind of work.
# Logging of failures is left to the update_fcn itself. # Logging of failures is left to the update_fcn itself.
num_succeeded += 1 task_progress.succeeded += 1
elif update_status == UPDATE_STATUS_FAILED: elif update_status == UPDATE_STATUS_FAILED:
num_failed += 1 task_progress.failed += 1
elif update_status == UPDATE_STATUS_SKIPPED: elif update_status == UPDATE_STATUS_SKIPPED:
num_skipped += 1 task_progress.skipped += 1
else: else:
raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status)) raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status))
# update task status: return task_progress.update_task_state()
task_progress = get_task_progress()
_get_current_task().update_state(state=PROGRESS, meta=task_progress)
return task_progress
def _get_task_id_from_xmodule_args(xmodule_instance_args): def _get_task_id_from_xmodule_args(xmodule_instance_args):
...@@ -518,45 +536,26 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, ...@@ -518,45 +536,26 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input,
make a more general CSVDoc class instead of building out the rows like we make a more general CSVDoc class instead of building out the rows like we
do here. do here.
""" """
start_time = datetime.now(UTC) start_time = time()
start_date = datetime.now(UTC)
status_interval = 100 status_interval = 100
enrolled_students = CourseEnrollment.users_enrolled_in(course_id) enrolled_students = CourseEnrollment.users_enrolled_in(course_id)
num_total = enrolled_students.count() task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)
num_attempted = 0
num_succeeded = 0
num_failed = 0
curr_step = "Calculating Grades"
def update_task_progress():
"""Return a dict containing info about current task"""
current_time = datetime.now(UTC)
progress = {
'action_name': action_name,
'attempted': num_attempted,
'succeeded': num_succeeded,
'failed': num_failed,
'total': num_total,
'duration_ms': int((current_time - start_time).total_seconds() * 1000),
'step': curr_step,
}
_get_current_task().update_state(state=PROGRESS, meta=progress)
return progress
# Loop over all our students and build our CSV lists in memory # Loop over all our students and build our CSV lists in memory
header = None header = None
rows = [] rows = []
err_rows = [["id", "username", "error_msg"]] err_rows = [["id", "username", "error_msg"]]
current_step = {'step': 'Calculating Grades'}
for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students): for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students):
# Periodically update task status (this is a cache write) # Periodically update task status (this is a cache write)
if num_attempted % status_interval == 0: if task_progress.attempted % status_interval == 0:
update_task_progress() task_progress.update_task_state(extra_meta=current_step)
num_attempted += 1 task_progress.attempted += 1
if gradeset: if gradeset:
# We were able to successfully grade this student for this course. # We were able to successfully grade this student for this course.
num_succeeded += 1 task_progress.succeeded += 1
if not header: if not header:
# Encode the header row in utf-8 encoding in case there are unicode characters # Encode the header row in utf-8 encoding in case there are unicode characters
header = [section['label'].encode('utf-8') for section in gradeset[u'section_breakdown']] header = [section['label'].encode('utf-8') for section in gradeset[u'section_breakdown']]
...@@ -578,37 +577,50 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, ...@@ -578,37 +577,50 @@ def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input,
rows.append([student.id, student.email, student.username, gradeset['percent']] + row_percents) rows.append([student.id, student.email, student.username, gradeset['percent']] + row_percents)
else: else:
# An empty gradeset means we failed to grade a student. # An empty gradeset means we failed to grade a student.
num_failed += 1 task_progress.failed += 1
err_rows.append([student.id, student.username, err_msg]) err_rows.append([student.id, student.username, err_msg])
# By this point, we've got the rows we're going to stuff into our CSV files. # By this point, we've got the rows we're going to stuff into our CSV files.
curr_step = "Uploading CSVs" current_step = {'step': 'Uploading CSVs'}
update_task_progress() task_progress.update_task_state(extra_meta=current_step)
# Perform the actual upload # Perform the actual upload
upload_csv_to_report_store(rows, 'grade_report', course_id, start_time) upload_csv_to_report_store(rows, 'grade_report', course_id, start_date)
# If there are any error rows (don't count the header), write them out as well # If there are any error rows (don't count the header), write them out as well
if len(err_rows) > 1: if len(err_rows) > 1:
upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_time) upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_date)
# One last update before we close out... # One last update before we close out...
return update_task_progress() return task_progress.update_task_state(extra_meta=current_step)
def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, _action_name): def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
""" """
For a given `course_id`, generate a CSV file containing profile For a given `course_id`, generate a CSV file containing profile
information for all students that are enrolled, and store using a information for all students that are enrolled, and store using a
`ReportStore`. `ReportStore`.
""" """
start_time = time()
start_date = datetime.now(UTC)
task_progress = TaskProgress(action_name, CourseEnrollment.num_enrolled_in(course_id), start_time)
current_step = {'step': 'Calculating Profile Info'}
task_progress.update_task_state(extra_meta=current_step)
# compute the student features table and format it # compute the student features table and format it
query_features = task_input.get('features') query_features = task_input.get('features')
student_data = enrolled_students_features(course_id, query_features) student_data = enrolled_students_features(course_id, query_features)
header, rows = format_dictlist(student_data, query_features) header, rows = format_dictlist(student_data, query_features)
task_progress.attempted = task_progress.succeeded = len(rows)
task_progress.skipped = task_progress.total - task_progress.attempted
rows.insert(0, header) rows.insert(0, header)
current_step = {'step': 'Uploading CSV'}
task_progress.update_task_state(extra_meta=current_step)
# Perform the upload # Perform the upload
upload_csv_to_report_store(rows, 'student_profile_info', course_id, datetime.now(UTC)) upload_csv_to_report_store(rows, 'student_profile_info', course_id, start_date)
return UPDATE_STATUS_SUCCEEDED return task_progress.update_task_state(extra_meta=current_step)
...@@ -19,7 +19,7 @@ from xmodule.modulestore.tests.factories import CourseFactory ...@@ -19,7 +19,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
from student.tests.factories import CourseEnrollmentFactory, UserFactory from student.tests.factories import CourseEnrollmentFactory, UserFactory
from instructor_task.models import ReportStore from instructor_task.models import ReportStore
from instructor_task.tasks_helper import upload_grades_csv, upload_students_csv, UPDATE_STATUS_SUCCEEDED from instructor_task.tasks_helper import upload_grades_csv, upload_students_csv
class TestReport(ModuleStoreTestCase): class TestReport(ModuleStoreTestCase):
...@@ -36,6 +36,7 @@ class TestReport(ModuleStoreTestCase): ...@@ -36,6 +36,7 @@ class TestReport(ModuleStoreTestCase):
def create_student(self, username, email): def create_student(self, username, email):
student = UserFactory.create(username=username, email=email) student = UserFactory.create(username=username, email=email)
CourseEnrollmentFactory.create(user=student, course_id=self.course.id) CourseEnrollmentFactory.create(user=student, course_id=self.course.id)
return student
@ddt.ddt @ddt.ddt
...@@ -56,8 +57,25 @@ class TestInstructorGradeReport(TestReport): ...@@ -56,8 +57,25 @@ class TestInstructorGradeReport(TestReport):
with patch('instructor_task.tasks_helper._get_current_task') as mock_current_task: with patch('instructor_task.tasks_helper._get_current_task') as mock_current_task:
mock_current_task.return_value = self.current_task mock_current_task.return_value = self.current_task
result = upload_grades_csv(None, None, self.course.id, None, 'graded') result = upload_grades_csv(None, None, self.course.id, None, 'graded')
#This assertion simply confirms that the generation completed with no errors num_students = len(emails)
self.assertEquals(result['succeeded'], result['attempted']) self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)
@patch('instructor_task.tasks_helper._get_current_task')
@patch('instructor_task.tasks_helper.iterate_grades_for')
def test_grading_failure(self, mock_iterate_grades_for, _mock_current_task):
"""
Test that any grading errors are properly reported in the
progress dict and uploaded to the report store.
"""
# mock an error response from `iterate_grades_for`
mock_iterate_grades_for.return_value = [
(self.create_student('username', 'student@example.com'), {}, 'Cannot grade student')
]
result = upload_grades_csv(None, None, self.course.id, None, 'graded')
self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result)
report_store = ReportStore.from_config()
self.assertTrue(any('grade_report_err' in item[0] for item in report_store.links_for(self.course.id)))
@ddt.ddt @ddt.ddt
...@@ -66,6 +84,7 @@ class TestStudentReport(TestReport): ...@@ -66,6 +84,7 @@ class TestStudentReport(TestReport):
Tests that CSV student profile report generation works. Tests that CSV student profile report generation works.
""" """
def test_success(self): def test_success(self):
self.create_student('student', 'student@example.com')
task_input = {'features': []} task_input = {'features': []}
with patch('instructor_task.tasks_helper._get_current_task'): with patch('instructor_task.tasks_helper._get_current_task'):
result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') result = upload_students_csv(None, None, self.course.id, task_input, 'calculated')
...@@ -73,7 +92,7 @@ class TestStudentReport(TestReport): ...@@ -73,7 +92,7 @@ class TestStudentReport(TestReport):
links = report_store.links_for(self.course.id) links = report_store.links_for(self.course.id)
self.assertEquals(len(links), 1) self.assertEquals(len(links), 1)
self.assertEquals(result, UPDATE_STATUS_SUCCEEDED) self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result)
@ddt.data([u'student', u'student\xec']) @ddt.data([u'student', u'student\xec'])
def test_unicode_usernames(self, students): def test_unicode_usernames(self, students):
...@@ -97,4 +116,5 @@ class TestStudentReport(TestReport): ...@@ -97,4 +116,5 @@ class TestStudentReport(TestReport):
mock_current_task.return_value = self.current_task mock_current_task.return_value = self.current_task
result = upload_students_csv(None, None, self.course.id, task_input, 'calculated') result = upload_students_csv(None, None, self.course.id, task_input, 'calculated')
#This assertion simply confirms that the generation completed with no errors #This assertion simply confirms that the generation completed with no errors
self.assertEquals(result, UPDATE_STATUS_SUCCEEDED) num_students = len(students)
self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment