Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-ora2
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-ora2
Commits
3cbd6b7b
Commit
3cbd6b7b
authored
Jun 04, 2014
by
gradyward
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #397 from edx/gradyward/exception_handling
Unify Exception Handling and Logging policies
parents
5bc79e19
c33019e3
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
222 additions
and
161 deletions
+222
-161
apps/openassessment/assessment/api/ai.py
+48
-41
apps/openassessment/assessment/api/ai_worker.py
+5
-4
apps/openassessment/assessment/errors/ai.py
+18
-0
apps/openassessment/assessment/models/ai.py
+26
-33
apps/openassessment/assessment/test/test_ai.py
+75
-64
apps/openassessment/assessment/worker/grading.py
+26
-8
apps/openassessment/assessment/worker/training.py
+23
-10
apps/openassessment/xblock/staff_info_mixin.py
+1
-1
No files found.
apps/openassessment/assessment/api/ai.py
View file @
3cbd6b7b
...
@@ -4,22 +4,24 @@ Public interface for AI training and grading, used by students/course authors.
...
@@ -4,22 +4,24 @@ Public interface for AI training and grading, used by students/course authors.
import
logging
import
logging
from
django.db
import
DatabaseError
from
django.db
import
DatabaseError
from
submissions
import
api
as
sub_api
from
submissions
import
api
as
sub_api
from
celery.exceptions
import
(
ChordError
,
InvalidTaskError
,
NotConfigured
,
NotRegistered
,
QueueNotFound
,
TaskRevokedError
)
from
openassessment.assessment.serializers
import
(
from
openassessment.assessment.serializers
import
(
deserialize_training_examples
,
InvalidTrainingExample
,
InvalidRubric
,
full_assessment_dict
deserialize_training_examples
,
InvalidTrainingExample
,
InvalidRubric
,
full_assessment_dict
)
)
from
openassessment.assessment.errors
import
(
from
openassessment.assessment.errors
import
(
AITrainingRequestError
,
AITrainingInternalError
,
AITrainingRequestError
,
AITrainingInternalError
,
AIGradingRequestError
,
AIGrading
RequestError
,
AIGradingInternalError
,
AIError
AIGrading
InternalError
,
AIReschedulingRequestError
,
ANTICIPATED_CELERY_ERRORS
)
)
from
openassessment.assessment.models
import
(
from
openassessment.assessment.models
import
(
Assessment
,
AITrainingWorkflow
,
AIGradingWorkflow
,
Assessment
,
AITrainingWorkflow
,
AIGradingWorkflow
,
InvalidOptionSelection
,
NoTrainingExamples
,
InvalidOptionSelection
,
NoTrainingExamples
,
AI_ASSESSMENT_TYPE
AI_ASSESSMENT_TYPE
,
AIClassifierSet
)
)
from
openassessment.assessment.worker
import
training
as
training_tasks
from
openassessment.assessment.worker
import
training
as
training_tasks
from
openassessment.assessment.worker
import
grading
as
grading_tasks
from
openassessment.assessment.worker
import
grading
as
grading_tasks
logger
=
logging
.
getLogger
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
...
@@ -129,27 +131,46 @@ def submit(submission_uuid, rubric, algorithm_id):
...
@@ -129,27 +131,46 @@ def submit(submission_uuid, rubric, algorithm_id):
raise
AIGradingInternalError
(
msg
)
raise
AIGradingInternalError
(
msg
)
try
:
try
:
# Schedule the grading task only if the workflow has a classifier set
classifier_set_candidates
=
AIClassifierSet
.
objects
.
filter
(
if
workflow
.
classifier_set
is
not
None
:
rubric
=
workflow
.
rubric
,
algorithm_id
=
algorithm_id
grading_tasks
.
grade_essay
.
apply_async
(
args
=
[
workflow
.
uuid
])
)[:
1
]
logger
.
info
((
except
DatabaseError
as
ex
:
u"Scheduled grading task for AI grading workflow with UUID {workflow_uuid} "
u"(submission UUID = {sub_uuid}, algorithm ID = {algorithm_id})"
)
.
format
(
workflow_uuid
=
workflow
.
uuid
,
sub_uuid
=
submission_uuid
,
algorithm_id
=
algorithm_id
))
else
:
logger
.
info
((
u"Cannot schedule a grading task for AI grading workflow with UUID {workflow_uuid} "
u"because no classifiers are available for the rubric associated with submission {sub_uuid} "
u"for the algorithm {algorithm_id}"
)
.
format
(
workflow_uuid
=
workflow
.
uuid
,
sub_uuid
=
submission_uuid
,
algorithm_id
=
algorithm_id
))
return
workflow
.
uuid
except
Exception
as
ex
:
msg
=
(
msg
=
(
u"An unexpected error occurred while scheduling the "
u"An unexpected error occurred while scheduling the "
u"AI grading task for the submission with UUID {uuid}: {ex}"
u"AI grading task for the submission with UUID {uuid}: {ex}"
)
.
format
(
uuid
=
submission_uuid
,
ex
=
ex
)
)
.
format
(
uuid
=
submission_uuid
,
ex
=
ex
)
raise
AIGradingInternalError
(
msg
)
raise
AIGradingInternalError
(
msg
)
# If we find classifiers for this rubric/algorithm
# then associate the classifiers with the workflow
# and schedule a grading task.
# Otherwise, the task will need to be scheduled later,
# once the classifiers have been trained.
if
len
(
classifier_set_candidates
)
>
0
:
workflow
.
classifier_set
=
classifier_set_candidates
[
0
]
try
:
workflow
.
save
()
grading_tasks
.
grade_essay
.
apply_async
(
args
=
[
workflow
.
uuid
])
logger
.
info
((
u"Scheduled grading task for AI grading workflow with UUID {workflow_uuid} "
u"(submission UUID = {sub_uuid}, algorithm ID = {algorithm_id})"
)
.
format
(
workflow_uuid
=
workflow
.
uuid
,
sub_uuid
=
submission_uuid
,
algorithm_id
=
algorithm_id
))
return
workflow
.
uuid
except
(
DatabaseError
,)
+
ANTICIPATED_CELERY_ERRORS
as
ex
:
msg
=
(
u"An unexpected error occurred while scheduling the "
u"AI grading task for the submission with UUID {uuid}: {ex}"
)
.
format
(
uuid
=
submission_uuid
,
ex
=
ex
)
logger
.
exception
(
msg
)
raise
AIGradingInternalError
(
msg
)
else
:
logger
.
info
((
u"Cannot schedule a grading task for AI grading workflow with UUID {workflow_uuid} "
u"because no classifiers are available for the rubric associated with submission {sub_uuid} "
u"for the algorithm {algorithm_id}"
)
.
format
(
workflow_uuid
=
workflow
.
uuid
,
sub_uuid
=
submission_uuid
,
algorithm_id
=
algorithm_id
))
def
get_latest_assessment
(
submission_uuid
):
def
get_latest_assessment
(
submission_uuid
):
"""
"""
...
@@ -245,7 +266,6 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
...
@@ -245,7 +266,6 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
Raises:
Raises:
AITrainingRequestError
AITrainingRequestError
AITrainingInternalError
AITrainingInternalError
AIGradingInternalError
Example usage:
Example usage:
>>> train_classifiers(rubric, examples, 'ease')
>>> train_classifiers(rubric, examples, 'ease')
...
@@ -275,26 +295,13 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
...
@@ -275,26 +295,13 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
# Schedule the task, parametrized by the workflow UUID
# Schedule the task, parametrized by the workflow UUID
try
:
try
:
training_tasks
.
train_classifiers
.
apply_async
(
args
=
[
workflow
.
uuid
])
training_tasks
.
train_classifiers
.
apply_async
(
args
=
[
workflow
.
uuid
])
logger
.
info
((
except
ANTICIPATED_CELERY_ERRORS
as
ex
:
u"Scheduled training task for the AI training workflow with UUID {workflow_uuid} "
u"(algorithm ID = {algorithm_id})"
)
.
format
(
workflow_uuid
=
workflow
.
uuid
,
algorithm_id
=
algorithm_id
))
except
(
AITrainingInternalError
,
AITrainingRequestError
):
msg
=
(
msg
=
(
u"An unexpected error occurred while scheduling "
u"An unexpected error occurred while scheduling
incomplete training workflows with
"
u"
the task for training workflow with UUID {
}"
u"
course_id={cid} and item_id={iid}: {ex
}"
)
.
format
(
workflow
.
uuid
)
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
logger
.
exception
(
msg
)
logger
.
exception
(
msg
)
raise
AITrainingInternalError
(
msg
)
raise
AITrainingInternalError
(
msg
)
except
AIGradingInternalError
:
# If we have an error that is coming from the rescheduled grading after successful completion:
msg
=
(
u"An unexpected error occurred while scheduling incomplete grading workflows after "
u"the training task was completed successfully. The course_id and item_id for the failed "
u"grading workflows are course_id={cid}, item_id={iid}."
)
.
format
(
cid
=
course_id
,
iid
=
item_id
)
logger
.
exception
(
msg
)
raise
AIGradingInternalError
(
msg
)
# Return the workflow UUID
# Return the workflow UUID
return
workflow
.
uuid
return
workflow
.
uuid
...
@@ -317,19 +324,19 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
...
@@ -317,19 +324,19 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
Raises:
Raises:
AIGradingInternalError
AIGradingInternalError
AITrainingInternalError
AITrainingInternalError
AIError
AI
ReschedulingRequest
Error
"""
"""
if
course_id
is
None
or
item_id
is
None
:
if
course_id
is
None
or
item_id
is
None
:
msg
=
u"Rescheduling tasks was not possible because the course_id / item_id was not assigned."
msg
=
u"Rescheduling tasks was not possible because the course_id / item_id was not assigned."
logger
.
exception
(
msg
)
logger
.
exception
(
msg
)
raise
AIError
raise
AI
ReschedulingRequest
Error
# Reschedules all of the training tasks
# Reschedules all of the training tasks
if
task_type
==
u"train"
or
task_type
is
None
:
if
task_type
==
u"train"
or
task_type
is
None
:
try
:
try
:
training_tasks
.
reschedule_training_tasks
.
apply_async
(
args
=
[
course_id
,
item_id
])
training_tasks
.
reschedule_training_tasks
.
apply_async
(
args
=
[
course_id
,
item_id
])
except
Exception
as
ex
:
except
ANTICIPATED_CELERY_ERRORS
as
ex
:
msg
=
(
msg
=
(
u"Rescheduling training tasks for course {cid} and item {iid} failed with exception: {ex}"
u"Rescheduling training tasks for course {cid} and item {iid} failed with exception: {ex}"
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
...
@@ -340,7 +347,7 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
...
@@ -340,7 +347,7 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
if
task_type
==
u"grade"
or
task_type
is
None
:
if
task_type
==
u"grade"
or
task_type
is
None
:
try
:
try
:
grading_tasks
.
reschedule_grading_tasks
.
apply_async
(
args
=
[
course_id
,
item_id
])
grading_tasks
.
reschedule_grading_tasks
.
apply_async
(
args
=
[
course_id
,
item_id
])
except
Exception
as
ex
:
except
ANTICIPATED_CELERY_ERRORS
as
ex
:
msg
=
(
msg
=
(
u"Rescheduling grading tasks for course {cid} and item {iid} failed with exception: {ex}"
u"Rescheduling grading tasks for course {cid} and item {iid} failed with exception: {ex}"
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
...
...
apps/openassessment/assessment/api/ai_worker.py
View file @
3cbd6b7b
...
@@ -55,11 +55,12 @@ def get_grading_task_params(grading_workflow_uuid):
...
@@ -55,11 +55,12 @@ def get_grading_task_params(grading_workflow_uuid):
raise
AIGradingInternalError
(
msg
)
raise
AIGradingInternalError
(
msg
)
classifier_set
=
workflow
.
classifier_set
classifier_set
=
workflow
.
classifier_set
# Tasks shouldn't be scheduled until a classifier set is
# Though tasks shouldn't be scheduled until classifer set(s) exist, off of the happy path this is a likely
# available, so this is a serious internal error.
# occurrence. Our response is to log this lack of compliance to dependency as an exception, and then thrown
# an error with the purpose of killing the celery task running this code.
if
classifier_set
is
None
:
if
classifier_set
is
None
:
msg
=
(
msg
=
(
u"AI grading workflow with UUID {} has no classifier set"
u"AI grading workflow with UUID {} has no classifier set
, but was scheduled for grading
"
)
.
format
(
grading_workflow_uuid
)
)
.
format
(
grading_workflow_uuid
)
logger
.
exception
(
msg
)
logger
.
exception
(
msg
)
raise
AIGradingInternalError
(
msg
)
raise
AIGradingInternalError
(
msg
)
...
@@ -72,7 +73,7 @@ def get_grading_task_params(grading_workflow_uuid):
...
@@ -72,7 +73,7 @@ def get_grading_task_params(grading_workflow_uuid):
'item_id'
:
workflow
.
item_id
,
'item_id'
:
workflow
.
item_id
,
'algorithm_id'
:
workflow
.
algorithm_id
,
'algorithm_id'
:
workflow
.
algorithm_id
,
}
}
except
Exception
as
ex
:
except
(
DatabaseError
,
ClassifierSerializeError
,
IncompleteClassifierSet
,
ValueError
)
as
ex
:
msg
=
(
msg
=
(
u"An unexpected error occurred while retrieving "
u"An unexpected error occurred while retrieving "
u"classifiers for the grading workflow with UUID {uuid}: {ex}"
u"classifiers for the grading workflow with UUID {uuid}: {ex}"
...
...
apps/openassessment/assessment/errors/ai.py
View file @
3cbd6b7b
...
@@ -2,6 +2,9 @@
...
@@ -2,6 +2,9 @@
Errors related to AI assessment.
Errors related to AI assessment.
"""
"""
from
celery.exceptions
import
InvalidTaskError
,
NotConfigured
,
NotRegistered
,
QueueNotFound
ANTICIPATED_CELERY_ERRORS
=
(
InvalidTaskError
,
NotConfigured
,
NotRegistered
,
QueueNotFound
)
class
AIError
(
Exception
):
class
AIError
(
Exception
):
"""
"""
...
@@ -36,3 +39,17 @@ class AIGradingInternalError(AIError):
...
@@ -36,3 +39,17 @@ class AIGradingInternalError(AIError):
An unexpected error occurred while using the AI assessment API.
An unexpected error occurred while using the AI assessment API.
"""
"""
pass
pass
class
AIReschedulingRequestError
(
AIError
):
"""
There was a problem with the request sent to the AI assessment API.
"""
pass
class
AIReschedulingInternalError
(
AIError
):
"""
An unexpected error occurred while using the AI assessment API.
"""
pass
\ No newline at end of file
apps/openassessment/assessment/models/ai.py
View file @
3cbd6b7b
...
@@ -14,7 +14,7 @@ from django_extensions.db.fields import UUIDField
...
@@ -14,7 +14,7 @@ from django_extensions.db.fields import UUIDField
from
dogapi
import
dog_stats_api
from
dogapi
import
dog_stats_api
from
submissions
import
api
as
sub_api
from
submissions
import
api
as
sub_api
from
openassessment.assessment.serializers
import
rubric_from_dict
from
openassessment.assessment.serializers
import
rubric_from_dict
from
openassessment.assessment.errors.ai
import
AIError
from
openassessment.assessment.errors.ai
import
AI
ReschedulingInternal
Error
from
.base
import
Rubric
,
Criterion
,
Assessment
,
AssessmentPart
from
.base
import
Rubric
,
Criterion
,
Assessment
,
AssessmentPart
from
.training
import
TrainingExample
from
.training
import
TrainingExample
...
@@ -118,10 +118,19 @@ class AIClassifierSet(models.Model):
...
@@ -118,10 +118,19 @@ class AIClassifierSet(models.Model):
# Retrieve the criteria for this rubric,
# Retrieve the criteria for this rubric,
# then organize them by criterion name
# then organize them by criterion name
criteria
=
{
criterion
.
name
:
criterion
try
:
for
criterion
in
Criterion
.
objects
.
filter
(
rubric
=
rubric
)
criteria
=
{
}
criterion
.
name
:
criterion
for
criterion
in
Criterion
.
objects
.
filter
(
rubric
=
rubric
)
}
except
DatabaseError
as
ex
:
msg
=
(
u"An unexpected error occurred while retrieving rubric criteria with the"
u"rubric hash {rh} and algorithm_id {aid}: {ex}"
)
.
format
(
rh
=
rubric
.
content_hash
,
aid
=
algorithm_id
,
ex
=
ex
)
logger
.
exception
(
msg
)
raise
# Check that we have classifiers for all criteria in the rubric
# Check that we have classifiers for all criteria in the rubric
if
set
(
criteria
.
keys
())
!=
set
(
classifiers_dict
.
keys
()):
if
set
(
criteria
.
keys
())
!=
set
(
classifiers_dict
.
keys
()):
...
@@ -339,13 +348,8 @@ class AIWorkflow(models.Model):
...
@@ -339,13 +348,8 @@ class AIWorkflow(models.Model):
for
workflow_uuid
in
grade_workflow_uuids
:
for
workflow_uuid
in
grade_workflow_uuids
:
# Returns the grading workflow associated with the uuid stored in the initial query
# Returns the grading workflow associated with the uuid stored in the initial query
try
:
workflow
=
cls
.
objects
.
get
(
uuid
=
workflow_uuid
)
grading_workflow
=
cls
.
objects
.
get
(
uuid
=
workflow_uuid
)
yield
workflow
yield
grading_workflow
except
(
cls
.
DoesNotExist
,
ObjectDoesNotExist
,
DatabaseError
)
as
ex
:
msg
=
u"No workflow with uuid '{}' could be found within the system."
.
format
(
workflow_uuid
)
logger
.
exception
(
msg
)
raise
AIError
(
ex
)
def
_log_start_workflow
(
self
):
def
_log_start_workflow
(
self
):
"""
"""
...
@@ -353,15 +357,9 @@ class AIWorkflow(models.Model):
...
@@ -353,15 +357,9 @@ class AIWorkflow(models.Model):
Increments the number of tasks of that kind.
Increments the number of tasks of that kind.
"""
"""
# Identifies whether the task is a training or grading workflow
# Identifies whether the type of task for reporting
data_path
=
None
class_name
=
self
.
__class__
.
__name__
name
=
None
data_path
=
'openassessment.assessment.ai_task.'
+
class_name
if
isinstance
(
self
,
AITrainingWorkflow
):
data_path
=
'openassessment.assessment.ai_task.train'
name
=
u"Training"
elif
isinstance
(
self
,
AIGradingWorkflow
):
data_path
=
'openassessment.assessment.ai_task.grade'
name
=
u"Grading"
# Sets identity tags which allow sorting by course and item
# Sets identity tags which allow sorting by course and item
tags
=
[
tags
=
[
...
@@ -369,7 +367,7 @@ class AIWorkflow(models.Model):
...
@@ -369,7 +367,7 @@ class AIWorkflow(models.Model):
u"item_id:{item_id}"
.
format
(
item_id
=
self
.
item_id
),
u"item_id:{item_id}"
.
format
(
item_id
=
self
.
item_id
),
]
]
logger
.
info
(
u"
AI{name} workflow with uuid {uuid} was started."
.
format
(
name
=
name
,
uuid
=
self
.
uuid
))
logger
.
info
(
u"
{class_name} with uuid {uuid} was started."
.
format
(
class_name
=
class_
name
,
uuid
=
self
.
uuid
))
dog_stats_api
.
increment
(
data_path
+
'.scheduled_count'
,
tags
=
tags
)
dog_stats_api
.
increment
(
data_path
+
'.scheduled_count'
,
tags
=
tags
)
...
@@ -379,15 +377,9 @@ class AIWorkflow(models.Model):
...
@@ -379,15 +377,9 @@ class AIWorkflow(models.Model):
Reports the total time the task took.
Reports the total time the task took.
"""
"""
# Identifies whether the task is a training or grading workflow
# Identifies whether the type of task for reporting
data_path
=
None
class_name
=
self
.
__class__
.
__name__
name
=
None
data_path
=
'openassessment.assessment.ai_task.'
+
class_name
if
isinstance
(
self
,
AITrainingWorkflow
):
data_path
=
'openassessment.assessment.ai_task.train'
name
=
u"Training"
elif
isinstance
(
self
,
AIGradingWorkflow
):
data_path
=
'openassessment.assessment.ai_task.grade'
name
=
u"Grading"
tags
=
[
tags
=
[
u"course_id:{course_id}"
.
format
(
course_id
=
self
.
course_id
),
u"course_id:{course_id}"
.
format
(
course_id
=
self
.
course_id
),
...
@@ -406,9 +398,9 @@ class AIWorkflow(models.Model):
...
@@ -406,9 +398,9 @@ class AIWorkflow(models.Model):
logger
.
info
(
logger
.
info
(
(
(
u"
AI{name} workflow
with uuid {uuid} completed its workflow successfully "
u"
{class_name}
with uuid {uuid} completed its workflow successfully "
u"in {seconds} seconds."
u"in {seconds} seconds."
)
.
format
(
name
=
name
,
uuid
=
self
.
uuid
,
seconds
=
time_delta
.
total_seconds
())
)
.
format
(
class_name
=
class_
name
,
uuid
=
self
.
uuid
,
seconds
=
time_delta
.
total_seconds
())
)
)
...
@@ -605,6 +597,7 @@ class AIGradingWorkflow(AIWorkflow):
...
@@ -605,6 +597,7 @@ class AIGradingWorkflow(AIWorkflow):
workflow
.
save
()
workflow
.
save
()
workflow
.
_log_start_workflow
()
workflow
.
_log_start_workflow
()
return
workflow
return
workflow
@transaction.commit_on_success
@transaction.commit_on_success
...
...
apps/openassessment/assessment/test/test_ai.py
View file @
3cbd6b7b
...
@@ -5,6 +5,7 @@ Tests for AI assessment.
...
@@ -5,6 +5,7 @@ Tests for AI assessment.
import
copy
import
copy
import
mock
import
mock
from
nose.tools
import
raises
from
nose.tools
import
raises
from
celery.exceptions
import
NotConfigured
,
InvalidTaskError
from
django.db
import
DatabaseError
from
django.db
import
DatabaseError
from
django.test.utils
import
override_settings
from
django.test.utils
import
override_settings
from
openassessment.test_utils
import
CacheResetTest
from
openassessment.test_utils
import
CacheResetTest
...
@@ -19,7 +20,8 @@ from openassessment.assessment.models import AITrainingWorkflow, AIGradingWorkfl
...
@@ -19,7 +20,8 @@ from openassessment.assessment.models import AITrainingWorkflow, AIGradingWorkfl
from
openassessment.assessment.worker.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
openassessment.assessment.worker.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
openassessment.assessment.serializers
import
rubric_from_dict
from
openassessment.assessment.serializers
import
rubric_from_dict
from
openassessment.assessment.errors
import
(
from
openassessment.assessment.errors
import
(
AITrainingRequestError
,
AITrainingInternalError
,
AIGradingRequestError
,
AIGradingInternalError
,
AIError
AITrainingRequestError
,
AITrainingInternalError
,
AIGradingRequestError
,
AIReschedulingInternalError
,
AIGradingInternalError
,
AIError
)
)
from
openassessment.assessment.test.constants
import
RUBRIC
,
EXAMPLES
,
STUDENT_ITEM
,
ANSWER
from
openassessment.assessment.test.constants
import
RUBRIC
,
EXAMPLES
,
STUDENT_ITEM
,
ANSWER
...
@@ -156,12 +158,11 @@ class AITrainingTest(CacheResetTest):
...
@@ -156,12 +158,11 @@ class AITrainingTest(CacheResetTest):
ai_api
.
train_classifiers
(
RUBRIC
,
EXAMPLES
,
COURSE_ID
,
ITEM_ID
,
ALGORITHM_ID
)
ai_api
.
train_classifiers
(
RUBRIC
,
EXAMPLES
,
COURSE_ID
,
ITEM_ID
,
ALGORITHM_ID
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
@mock.patch
(
'openassessment.assessment.api.ai.training_tasks'
)
def
test_train_classifiers_celery_error
(
self
):
def
test_schedule_training_error
(
self
,
mock_training_tasks
):
with
mock
.
patch
(
'openassessment.assessment.api.ai.training_tasks.train_classifiers.apply_async'
)
as
mock_train
:
# Simulate an exception raised when scheduling a training task
mock_train
.
side_effect
=
NotConfigured
mock_training_tasks
.
train_classifiers
.
apply_async
.
side_effect
=
AITrainingRequestError
(
"KABOOM!"
)
with
self
.
assertRaises
(
AITrainingInternalError
):
with
self
.
assertRaises
(
AITrainingInternalError
):
ai_api
.
train_classifiers
(
RUBRIC
,
EXAMPLES
,
COURSE_ID
,
ITEM_ID
,
ALGORITHM_ID
)
ai_api
.
train_classifiers
(
RUBRIC
,
EXAMPLES
,
COURSE_ID
,
ITEM_ID
,
ALGORITHM_ID
)
class
AIGradingTest
(
CacheResetTest
):
class
AIGradingTest
(
CacheResetTest
):
...
@@ -221,28 +222,6 @@ class AIGradingTest(CacheResetTest):
...
@@ -221,28 +222,6 @@ class AIGradingTest(CacheResetTest):
mock_filter
.
side_effect
=
DatabaseError
(
"Oh no!"
)
mock_filter
.
side_effect
=
DatabaseError
(
"Oh no!"
)
ai_api
.
get_assessment_scores_by_criteria
(
self
.
submission_uuid
)
ai_api
.
get_assessment_scores_by_criteria
(
self
.
submission_uuid
)
@mock.patch
(
'openassessment.assessment.api.ai.grading_tasks.grade_essay'
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_no_classifiers_available
(
self
,
mock_task
):
# Use a rubric that does not have classifiers available
new_rubric
=
copy
.
deepcopy
(
RUBRIC
)
new_rubric
[
'criteria'
]
=
new_rubric
[
'criteria'
][
1
:]
# Submit the essay -- since there are no classifiers available,
# the workflow should be created, but no task should be scheduled.
workflow_uuid
=
ai_api
.
submit
(
self
.
submission_uuid
,
new_rubric
,
ALGORITHM_ID
)
# Verify that the workflow was created with a null classifier set
workflow
=
AIGradingWorkflow
.
objects
.
get
(
uuid
=
workflow_uuid
)
self
.
assertIs
(
workflow
.
classifier_set
,
None
)
# Verify that there are no assessments
latest_assessment
=
ai_api
.
get_latest_assessment
(
self
.
submission_uuid
)
self
.
assertIs
(
latest_assessment
,
None
)
# Verify that the task was never scheduled
self
.
assertFalse
(
mock_task
.
apply_async
.
called
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_submission_not_found
(
self
):
def
test_submit_submission_not_found
(
self
):
with
self
.
assertRaises
(
AIGradingRequestError
):
with
self
.
assertRaises
(
AIGradingRequestError
):
...
@@ -261,19 +240,42 @@ class AIGradingTest(CacheResetTest):
...
@@ -261,19 +240,42 @@ class AIGradingTest(CacheResetTest):
with
self
.
assertRaises
(
AIGradingInternalError
):
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
@mock.patch
(
'openassessment.assessment.api.ai.grading_tasks.grade_essay'
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_grade_task_schedule_error
(
self
,
mock_task
):
mock_task
.
apply_async
.
side_effect
=
IOError
(
"Test error!"
)
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
@mock.patch.object
(
Assessment
.
objects
,
'filter'
)
@mock.patch.object
(
Assessment
.
objects
,
'filter'
)
def
test_get_latest_assessment_database_error
(
self
,
mock_call
):
def
test_get_latest_assessment_database_error
(
self
,
mock_call
):
mock_call
.
side_effect
=
DatabaseError
(
"KABOOM!"
)
mock_call
.
side_effect
=
DatabaseError
(
"KABOOM!"
)
with
self
.
assertRaises
(
AIGradingInternalError
):
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
get_latest_assessment
(
self
.
submission_uuid
)
ai_api
.
get_latest_assessment
(
self
.
submission_uuid
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_celery_error
(
self
):
with
mock
.
patch
(
'openassessment.assessment.api.ai.grading_tasks.grade_essay.apply_async'
)
as
mock_grade
:
mock_grade
.
side_effect
=
NotConfigured
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
@mock.patch.object
(
AIClassifierSet
.
objects
,
'filter'
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_database_error
(
self
,
mock_filter
):
mock_filter
.
side_effect
=
DatabaseError
(
"rumble... ruMBLE, RUMBLE! BOOM!"
)
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
@mock.patch.object
(
AIClassifierSet
.
objects
,
'filter'
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_no_classifiers
(
self
,
mock_call
):
mock_call
.
return_value
=
[]
with
mock
.
patch
(
'openassessment.assessment.api.ai.logger.info'
)
as
mock_log
:
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
argument
=
mock_log
.
call_args
[
0
][
0
]
self
.
assertTrue
(
u"no classifiers are available"
in
argument
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_submit_submission_db_error
(
self
):
with
mock
.
patch
(
'openassessment.assessment.api.ai.AIGradingWorkflow.start_workflow'
)
as
mock_start
:
mock_start
.
side_effect
=
sub_api
.
SubmissionInternalError
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
submit
(
self
.
submission_uuid
,
RUBRIC
,
ALGORITHM_ID
)
class
AIUntrainedGradingTest
:
class
AIUntrainedGradingTest
:
"""
"""
...
@@ -296,7 +298,6 @@ class AIReschedulingTest(CacheResetTest):
...
@@ -296,7 +298,6 @@ class AIReschedulingTest(CacheResetTest):
Tests in both orders, and tests all error conditions that can arise as a result of calling rescheduling
Tests in both orders, and tests all error conditions that can arise as a result of calling rescheduling
"""
"""
def
setUp
(
self
):
def
setUp
(
self
):
"""
"""
Sets up each test so that it will have unfinished tasks of both types
Sets up each test so that it will have unfinished tasks of both types
...
@@ -394,33 +395,6 @@ class AIReschedulingTest(CacheResetTest):
...
@@ -394,33 +395,6 @@ class AIReschedulingTest(CacheResetTest):
self
.
_assert_complete
(
grading_done
=
True
,
training_done
=
True
)
self
.
_assert_complete
(
grading_done
=
True
,
training_done
=
True
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_reschedule_database_failure
(
self
):
# 3) Mock in a DB error.
patched_method
=
'openassessment.assessment.worker.grading.AIGradingWorkflow.objects.filter'
with
mock
.
patch
(
patched_method
)
as
mock_filter
:
mock_filter
.
side_effect
=
DatabaseError
(
'DB ERROR: KABOOM'
)
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
None
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_reschedule_grade_failure
(
self
):
# Mock in consistent failure to grade an essay
with
mock
.
patch
(
'openassessment.assessment.worker.grading.grade_essay.apply_async'
)
as
mock_grade_essay
:
mock_grade_essay
.
side_effect
=
AIGradingInternalError
(
"Sorry, no grade for you."
)
with
self
.
assertRaises
(
AIGradingInternalError
):
# Try to reschedule all unfinished tasks, expect an internal grading error
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
None
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_reschedule_train_failure
(
self
):
# Mock in consistent failure to grade an essay
with
mock
.
patch
(
'openassessment.assessment.worker.training.train_classifiers.apply_async'
)
as
mock_train
:
mock_train
.
side_effect
=
AITrainingInternalError
(
"OH MY! THE TRAINing DERAILED!"
)
with
self
.
assertRaises
(
AITrainingInternalError
):
# Try to reschedule all unfinished tasks, expect an internal tra error
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
u"train"
)
@override_settings
(
ORA2_AI_ALGORITHMS
=
AI_ALGORITHMS
)
def
test_reschedule_non_valid_args
(
self
):
def
test_reschedule_non_valid_args
(
self
):
with
self
.
assertRaises
(
AIError
):
with
self
.
assertRaises
(
AIError
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
task_type
=
u"train"
)
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
task_type
=
u"train"
)
...
@@ -446,6 +420,43 @@ class AIReschedulingTest(CacheResetTest):
...
@@ -446,6 +420,43 @@ class AIReschedulingTest(CacheResetTest):
# Check that both training and grading are now complete
# Check that both training and grading are now complete
self
.
_assert_complete
(
grading_done
=
True
,
training_done
=
True
)
self
.
_assert_complete
(
grading_done
=
True
,
training_done
=
True
)
def
test_reschedule_grade_celery_error
(
self
):
patched_method
=
'openassessment.assessment.api.ai.grading_tasks.reschedule_grading_tasks.apply_async'
with
mock
.
patch
(
patched_method
)
as
mock_grade
:
mock_grade
.
side_effect
=
NotConfigured
with
self
.
assertRaises
(
AIGradingInternalError
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
)
def
test_reschedule_train_celery_error
(
self
):
patched_method
=
'openassessment.assessment.api.ai.training_tasks.reschedule_training_tasks.apply_async'
with
mock
.
patch
(
patched_method
)
as
mock_train
:
mock_train
.
side_effect
=
NotConfigured
with
self
.
assertRaises
(
AITrainingInternalError
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
None
)
@mock.patch.object
(
AIGradingWorkflow
,
'get_incomplete_workflows'
)
def
test_get_incomplete_workflows_error_grading
(
self
,
mock_incomplete
):
mock_incomplete
.
side_effect
=
DatabaseError
with
self
.
assertRaises
(
AIReschedulingInternalError
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
)
def
test_get_incomplete_workflows_error_training
(
self
):
patched_method
=
'openassessment.assessment.models.ai.AIWorkflow.get_incomplete_workflows'
with
mock
.
patch
(
patched_method
)
as
mock_incomplete
:
mock_incomplete
.
side_effect
=
DatabaseError
with
self
.
assertRaises
(
Exception
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
u"train"
)
def
test_reschedule_train_internal_celery_error
(
self
):
patched_method
=
'openassessment.assessment.worker.training.train_classifiers.apply_async'
with
mock
.
patch
(
patched_method
)
as
mock_train
:
mock_train
.
side_effect
=
NotConfigured
(
"NotConfigured"
)
with
mock
.
patch
(
'openassessment.assessment.worker.training.logger.exception'
)
as
mock_logger
:
with
self
.
assertRaises
(
Exception
):
ai_api
.
reschedule_unfinished_tasks
(
course_id
=
COURSE_ID
,
item_id
=
ITEM_ID
,
task_type
=
u"train"
)
last_call
=
mock_logger
.
call_args
[
0
][
0
]
self
.
assertTrue
(
u"NotConfigured"
in
last_call
)
class
AIAutomaticGradingTest
(
CacheResetTest
):
class
AIAutomaticGradingTest
(
CacheResetTest
):
...
...
apps/openassessment/assessment/worker/grading.py
View file @
3cbd6b7b
...
@@ -9,11 +9,12 @@ from django.conf import settings
...
@@ -9,11 +9,12 @@ from django.conf import settings
from
celery.utils.log
import
get_task_logger
from
celery.utils.log
import
get_task_logger
from
dogapi
import
dog_stats_api
from
dogapi
import
dog_stats_api
from
openassessment.assessment.api
import
ai_worker
as
ai_worker_api
from
openassessment.assessment.api
import
ai_worker
as
ai_worker_api
from
openassessment.assessment.errors
import
AIError
,
AIGradingInternalError
,
AIGradingRequestError
from
openassessment.assessment.errors
import
(
AIError
,
AIGradingInternalError
,
AIGradingRequestError
,
AIReschedulingInternalError
,
ANTICIPATED_CELERY_ERRORS
)
from
.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
openassessment.assessment.models.ai
import
AIClassifierSet
,
AIGradingWorkflow
from
openassessment.assessment.models.ai
import
AIClassifierSet
,
AIGradingWorkflow
MAX_RETRIES
=
2
MAX_RETRIES
=
2
logger
=
get_task_logger
(
__name__
)
logger
=
get_task_logger
(
__name__
)
...
@@ -106,6 +107,10 @@ def reschedule_grading_tasks(course_id, item_id):
...
@@ -106,6 +107,10 @@ def reschedule_grading_tasks(course_id, item_id):
Args:
Args:
course_id (unicode): The course item that we will be rerunning the rescheduling on.
course_id (unicode): The course item that we will be rerunning the rescheduling on.
item_id (unicode): The item that the rescheduling will be running on
item_id (unicode): The item that the rescheduling will be running on
Raises:
AIReschedulingInternalError
AIGradingInternalError
"""
"""
# Logs the start of the rescheduling process and records the start time so that total time can be calculated later.
# Logs the start of the rescheduling process and records the start time so that total time can be calculated later.
...
@@ -113,7 +118,15 @@ def reschedule_grading_tasks(course_id, item_id):
...
@@ -113,7 +118,15 @@ def reschedule_grading_tasks(course_id, item_id):
start_time
=
datetime
.
datetime
.
now
()
start_time
=
datetime
.
datetime
.
now
()
# Finds all incomplete grading workflows
# Finds all incomplete grading workflows
grading_workflows
=
AIGradingWorkflow
.
get_incomplete_workflows
(
course_id
,
item_id
)
try
:
grading_workflows
=
AIGradingWorkflow
.
get_incomplete_workflows
(
course_id
,
item_id
)
except
(
DatabaseError
,
AIGradingWorkflow
.
DoesNotExist
)
as
ex
:
msg
=
(
u"An unexpected error occurred while retrieving all incomplete "
u"grading tasks for course_id: {cid} and item_id: {iid}: {ex}"
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
logger
.
exception
(
msg
)
raise
AIReschedulingInternalError
(
msg
)
# Notes whether or not one or more operations failed. If they did, the process of rescheduling will be retried.
# Notes whether or not one or more operations failed. If they did, the process of rescheduling will be retried.
failures
=
0
failures
=
0
...
@@ -179,18 +192,23 @@ def reschedule_grading_tasks(course_id, item_id):
...
@@ -179,18 +192,23 @@ def reschedule_grading_tasks(course_id, item_id):
logger
.
info
(
logger
.
info
(
u"Rescheduling of grading was successful for grading workflow with uuid='{}'"
.
format
(
workflow
.
uuid
)
u"Rescheduling of grading was successful for grading workflow with uuid='{}'"
.
format
(
workflow
.
uuid
)
)
)
except
(
AIGradingInternalError
,
AIGradingRequestError
,
AIError
)
as
ex
:
except
ANTICIPATED_CELERY_ERRORS
as
ex
:
msg
=
(
msg
=
(
u"An error occurred while try to grade essay with uuid='{id}': {ex}"
u"An error occurred while try to grade essay with uuid='{id}': {ex}"
)
.
format
(
id
=
workflow
.
uuid
,
ex
=
ex
)
)
.
format
(
id
=
workflow
.
uuid
,
ex
=
ex
)
logger
.
exception
(
msg
)
logger
.
exception
(
msg
)
failures
+=
1
failures
+=
1
# If we couldn't assign classifiers, we failed.
else
:
failures
+=
1
# Logs the data from our rescheduling attempt
# Logs the data from our rescheduling attempt
time_delta
=
datetime
.
datetime
.
now
()
-
start_time
time_delta
=
datetime
.
datetime
.
now
()
-
start_time
_log_complete_reschedule_grading
(
_log_complete_reschedule_grading
(
course_id
=
course_id
,
item_id
=
item_id
,
seconds
=
time_delta
.
total_seconds
(),
success
=
(
failures
==
0
)
course_id
=
course_id
,
item_id
=
item_id
,
seconds
=
time_delta
.
total_seconds
(),
success
=
(
failures
==
0
)
)
)
# If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way
# If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way
# that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e.
# that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e.
# the total number of workflows matching these critera that still have left to be graded).
# the total number of workflows matching these critera that still have left to be graded).
...
@@ -215,7 +233,7 @@ def _log_start_reschedule_grading(course_id=None, item_id=None):
...
@@ -215,7 +233,7 @@ def _log_start_reschedule_grading(course_id=None, item_id=None):
u"course_id:{}"
.
format
(
course_id
),
u"course_id:{}"
.
format
(
course_id
),
u"item_id:{}"
.
format
(
item_id
),
u"item_id:{}"
.
format
(
item_id
),
]
]
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
reschedule_grade
.scheduled_count'
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
AIRescheduleGrading
.scheduled_count'
,
tags
)
msg
=
u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}"
msg
=
u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}"
logger
.
info
(
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
))
logger
.
info
(
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
))
...
@@ -241,11 +259,11 @@ def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, s
...
@@ -241,11 +259,11 @@ def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, s
u"success:{}"
.
format
(
success
)
u"success:{}"
.
format
(
success
)
]
]
dog_stats_api
.
histogram
(
'openassessment.assessment.ai_task.
reschedule_grade
.turnaround_time'
,
seconds
,
tags
)
dog_stats_api
.
histogram
(
'openassessment.assessment.ai_task.
AIRescheduleGrading
.turnaround_time'
,
seconds
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
reschedule_grade
.completed_count'
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
AIRescheduleGrading
.completed_count'
,
tags
)
msg
=
u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
msg
=
u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if
not
success
:
if
not
success
:
msg
+=
u" At least one grading task failed due to internal error."
msg
+=
u" At least one grading task failed due to internal error."
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
,
s
=
seconds
)
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
,
s
=
seconds
)
logger
.
info
(
msg
)
logger
.
info
(
msg
)
apps/openassessment/assessment/worker/training.py
View file @
3cbd6b7b
...
@@ -5,16 +5,17 @@ import datetime
...
@@ -5,16 +5,17 @@ import datetime
from
collections
import
defaultdict
from
collections
import
defaultdict
from
celery
import
task
from
celery
import
task
from
celery.utils.log
import
get_task_logger
from
celery.utils.log
import
get_task_logger
from
celery.exceptions
import
InvalidTaskError
,
NotConfigured
,
NotRegistered
,
QueueNotFound
from
dogapi
import
dog_stats_api
from
dogapi
import
dog_stats_api
from
django.conf
import
settings
from
django.conf
import
settings
from
django.db
import
DatabaseError
from
openassessment.assessment.api
import
ai_worker
as
ai_worker_api
from
openassessment.assessment.api
import
ai_worker
as
ai_worker_api
from
openassessment.assessment.errors
import
AIError
from
openassessment.assessment.errors
import
AIError
,
ANTICIPATED_CELERY_ERRORS
from
.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
.algorithm
import
AIAlgorithm
,
AIAlgorithmError
from
.grading
import
reschedule_grading_tasks
from
.grading
import
reschedule_grading_tasks
from
openassessment.assessment.errors.ai
import
AIGradingInternalError
from
openassessment.assessment.errors.ai
import
AIGradingInternalError
from
openassessment.assessment.models.ai
import
AITrainingWorkflow
from
openassessment.assessment.models.ai
import
AITrainingWorkflow
MAX_RETRIES
=
2
MAX_RETRIES
=
2
logger
=
get_task_logger
(
__name__
)
logger
=
get_task_logger
(
__name__
)
...
@@ -156,13 +157,25 @@ def reschedule_training_tasks(course_id, item_id):
...
@@ -156,13 +157,25 @@ def reschedule_training_tasks(course_id, item_id):
Args:
Args:
course_id (unicode): The course that we are going to search for unfinished training workflows
course_id (unicode): The course that we are going to search for unfinished training workflows
item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for
item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for
Raises:
AIReschedulingInternalError
DatabaseError
"""
"""
# Starts logging the details of the rescheduling
# Starts logging the details of the rescheduling
_log_start_reschedule_training
(
course_id
=
course_id
,
item_id
=
item_id
)
_log_start_reschedule_training
(
course_id
=
course_id
,
item_id
=
item_id
)
start_time
=
datetime
.
datetime
.
now
()
start_time
=
datetime
.
datetime
.
now
()
# Run a query to find the incomplete training workflows
# Run a query to find the incomplete training workflows
training_workflows
=
AITrainingWorkflow
.
get_incomplete_workflows
(
course_id
,
item_id
)
try
:
training_workflows
=
AITrainingWorkflow
.
get_incomplete_workflows
(
course_id
,
item_id
)
except
(
DatabaseError
,
AITrainingWorkflow
.
DoesNotExist
)
as
ex
:
msg
=
(
u"An unexpected error occurred while retrieving all incomplete "
u"training tasks for course_id: {cid} and item_id: {iid}: {ex}"
)
.
format
(
cid
=
course_id
,
iid
=
item_id
,
ex
=
ex
)
logger
.
exception
(
msg
)
raise
reschedule_training_tasks
.
retry
()
# Tries to train every workflow that has not completed.
# Tries to train every workflow that has not completed.
for
target_workflow
in
training_workflows
:
for
target_workflow
in
training_workflows
:
...
@@ -171,10 +184,10 @@ def reschedule_training_tasks(course_id, item_id):
...
@@ -171,10 +184,10 @@ def reschedule_training_tasks(course_id, item_id):
logger
.
info
(
logger
.
info
(
u"Rescheduling of training was successful for workflow with uuid{}"
.
format
(
target_workflow
.
uuid
)
u"Rescheduling of training was successful for workflow with uuid{}"
.
format
(
target_workflow
.
uuid
)
)
)
except
Exception
as
ex
:
except
ANTICIPATED_CELERY_ERRORS
as
ex
:
msg
=
(
msg
=
(
u"An unexpected error occurred while scheduling the task for training workflow with UUID {}"
u"An unexpected error occurred while scheduling the task for training workflow with UUID {
id}: {ex
}"
)
.
format
(
target_workflow
.
uuid
)
)
.
format
(
id
=
target_workflow
.
uuid
,
ex
=
ex
)
logger
.
exception
(
msg
)
logger
.
exception
(
msg
)
time_delta
=
datetime
.
datetime
.
now
()
-
start_time
time_delta
=
datetime
.
datetime
.
now
()
-
start_time
...
@@ -251,7 +264,7 @@ def _log_start_reschedule_training(course_id=None, item_id=None):
...
@@ -251,7 +264,7 @@ def _log_start_reschedule_training(course_id=None, item_id=None):
u"course_id:{}"
.
format
(
course_id
),
u"course_id:{}"
.
format
(
course_id
),
u"item_id:{}"
.
format
(
item_id
),
u"item_id:{}"
.
format
(
item_id
),
]
]
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
reschedule_train
.scheduled_count'
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
AIRescheduleTraining
.scheduled_count'
,
tags
)
msg
=
u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}"
msg
=
u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}"
logger
.
info
(
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
))
logger
.
info
(
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
))
...
@@ -276,11 +289,11 @@ def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1,
...
@@ -276,11 +289,11 @@ def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1,
u"success:{}"
.
format
(
success
)
u"success:{}"
.
format
(
success
)
]
]
dog_stats_api
.
histogram
(
'openassessment.assessment.ai_task.
reschedule_train
.turnaround_time'
,
seconds
,
tags
)
dog_stats_api
.
histogram
(
'openassessment.assessment.ai_task.
AIRescheduleTraining
.turnaround_time'
,
seconds
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
reschedule_train
.completed_count'
,
tags
)
dog_stats_api
.
increment
(
'openassessment.assessment.ai_task.
AIRescheduleTraining
.completed_count'
,
tags
)
msg
=
u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
msg
=
u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if
not
success
:
if
not
success
:
msg
+=
u" At least one rescheduling task failed due to internal error."
msg
+=
u" At least one rescheduling task failed due to internal error."
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
,
s
=
seconds
)
msg
.
format
(
cid
=
course_id
,
iid
=
item_id
,
s
=
seconds
)
logger
.
info
(
msg
)
logger
.
info
(
msg
)
apps/openassessment/xblock/staff_info_mixin.py
View file @
3cbd6b7b
...
@@ -233,7 +233,7 @@ class StaffInfoMixin(object):
...
@@ -233,7 +233,7 @@ class StaffInfoMixin(object):
'success'
:
True
,
'success'
:
True
,
'msg'
:
_
(
u"All AI tasks associated with this item have been rescheduled successfully."
)
'msg'
:
_
(
u"All AI tasks associated with this item have been rescheduled successfully."
)
}
}
except
(
AIGradingInternalError
,
AITrainingInternalError
,
AIError
)
as
ex
:
except
AIError
as
ex
:
return
{
return
{
'success'
:
False
,
'success'
:
False
,
'msg'
:
_
(
u"An error occurred while rescheduling tasks: {}"
.
format
(
ex
))
'msg'
:
_
(
u"An error occurred while rescheduling tasks: {}"
.
format
(
ex
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment