Commit c33019e3 by gradyward

Unified exception handling and logging policies

parent 5bc79e19
...@@ -4,22 +4,24 @@ Public interface for AI training and grading, used by students/course authors. ...@@ -4,22 +4,24 @@ Public interface for AI training and grading, used by students/course authors.
import logging import logging
from django.db import DatabaseError from django.db import DatabaseError
from submissions import api as sub_api from submissions import api as sub_api
from celery.exceptions import (
ChordError, InvalidTaskError, NotConfigured, NotRegistered, QueueNotFound, TaskRevokedError
)
from openassessment.assessment.serializers import ( from openassessment.assessment.serializers import (
deserialize_training_examples, InvalidTrainingExample, InvalidRubric, full_assessment_dict deserialize_training_examples, InvalidTrainingExample, InvalidRubric, full_assessment_dict
) )
from openassessment.assessment.errors import ( from openassessment.assessment.errors import (
AITrainingRequestError, AITrainingInternalError, AITrainingRequestError, AITrainingInternalError, AIGradingRequestError,
AIGradingRequestError, AIGradingInternalError, AIError AIGradingInternalError, AIReschedulingRequestError, ANTICIPATED_CELERY_ERRORS
) )
from openassessment.assessment.models import ( from openassessment.assessment.models import (
Assessment, AITrainingWorkflow, AIGradingWorkflow, Assessment, AITrainingWorkflow, AIGradingWorkflow,
InvalidOptionSelection, NoTrainingExamples, InvalidOptionSelection, NoTrainingExamples,
AI_ASSESSMENT_TYPE AI_ASSESSMENT_TYPE, AIClassifierSet
) )
from openassessment.assessment.worker import training as training_tasks from openassessment.assessment.worker import training as training_tasks
from openassessment.assessment.worker import grading as grading_tasks from openassessment.assessment.worker import grading as grading_tasks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -129,27 +131,46 @@ def submit(submission_uuid, rubric, algorithm_id): ...@@ -129,27 +131,46 @@ def submit(submission_uuid, rubric, algorithm_id):
raise AIGradingInternalError(msg) raise AIGradingInternalError(msg)
try: try:
# Schedule the grading task only if the workflow has a classifier set classifier_set_candidates = AIClassifierSet.objects.filter(
if workflow.classifier_set is not None: rubric=workflow.rubric, algorithm_id=algorithm_id
grading_tasks.grade_essay.apply_async(args=[workflow.uuid]) )[:1]
logger.info(( except DatabaseError as ex:
u"Scheduled grading task for AI grading workflow with UUID {workflow_uuid} "
u"(submission UUID = {sub_uuid}, algorithm ID = {algorithm_id})"
).format(workflow_uuid=workflow.uuid, sub_uuid=submission_uuid, algorithm_id=algorithm_id))
else:
logger.info((
u"Cannot schedule a grading task for AI grading workflow with UUID {workflow_uuid} "
u"because no classifiers are available for the rubric associated with submission {sub_uuid} "
u"for the algorithm {algorithm_id}"
).format(workflow_uuid=workflow.uuid, sub_uuid=submission_uuid, algorithm_id=algorithm_id))
return workflow.uuid
except Exception as ex:
msg = ( msg = (
u"An unexpected error occurred while scheduling the " u"An unexpected error occurred while scheduling the "
u"AI grading task for the submission with UUID {uuid}: {ex}" u"AI grading task for the submission with UUID {uuid}: {ex}"
).format(uuid=submission_uuid, ex=ex) ).format(uuid=submission_uuid, ex=ex)
raise AIGradingInternalError(msg) raise AIGradingInternalError(msg)
# If we find classifiers for this rubric/algorithm
# then associate the classifiers with the workflow
# and schedule a grading task.
# Otherwise, the task will need to be scheduled later,
# once the classifiers have been trained.
if len(classifier_set_candidates) > 0:
workflow.classifier_set = classifier_set_candidates[0]
try:
workflow.save()
grading_tasks.grade_essay.apply_async(args=[workflow.uuid])
logger.info((
u"Scheduled grading task for AI grading workflow with UUID {workflow_uuid} "
u"(submission UUID = {sub_uuid}, algorithm ID = {algorithm_id})"
).format(workflow_uuid=workflow.uuid, sub_uuid=submission_uuid, algorithm_id=algorithm_id))
return workflow.uuid
except (DatabaseError,) + ANTICIPATED_CELERY_ERRORS as ex:
msg = (
u"An unexpected error occurred while scheduling the "
u"AI grading task for the submission with UUID {uuid}: {ex}"
).format(uuid=submission_uuid, ex=ex)
logger.exception(msg)
raise AIGradingInternalError(msg)
else:
logger.info((
u"Cannot schedule a grading task for AI grading workflow with UUID {workflow_uuid} "
u"because no classifiers are available for the rubric associated with submission {sub_uuid} "
u"for the algorithm {algorithm_id}"
).format(workflow_uuid=workflow.uuid, sub_uuid=submission_uuid, algorithm_id=algorithm_id))
def get_latest_assessment(submission_uuid): def get_latest_assessment(submission_uuid):
""" """
...@@ -245,7 +266,6 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id): ...@@ -245,7 +266,6 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
Raises: Raises:
AITrainingRequestError AITrainingRequestError
AITrainingInternalError AITrainingInternalError
AIGradingInternalError
Example usage: Example usage:
>>> train_classifiers(rubric, examples, 'ease') >>> train_classifiers(rubric, examples, 'ease')
...@@ -275,26 +295,13 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id): ...@@ -275,26 +295,13 @@ def train_classifiers(rubric_dict, examples, course_id, item_id, algorithm_id):
# Schedule the task, parametrized by the workflow UUID # Schedule the task, parametrized by the workflow UUID
try: try:
training_tasks.train_classifiers.apply_async(args=[workflow.uuid]) training_tasks.train_classifiers.apply_async(args=[workflow.uuid])
logger.info(( except ANTICIPATED_CELERY_ERRORS as ex:
u"Scheduled training task for the AI training workflow with UUID {workflow_uuid} "
u"(algorithm ID = {algorithm_id})"
).format(workflow_uuid=workflow.uuid, algorithm_id=algorithm_id))
except (AITrainingInternalError, AITrainingRequestError):
msg = ( msg = (
u"An unexpected error occurred while scheduling " u"An unexpected error occurred while scheduling incomplete training workflows with"
u"the task for training workflow with UUID {}" u" course_id={cid} and item_id={iid}: {ex}"
).format(workflow.uuid) ).format(cid=course_id, iid=item_id, ex=ex)
logger.exception(msg) logger.exception(msg)
raise AITrainingInternalError(msg) raise AITrainingInternalError(msg)
except AIGradingInternalError:
# If we have an error that is coming from the rescheduled grading after successful completion:
msg = (
u"An unexpected error occurred while scheduling incomplete grading workflows after "
u"the training task was completed successfully. The course_id and item_id for the failed "
u"grading workflows are course_id={cid}, item_id={iid}."
).format(cid=course_id, iid=item_id)
logger.exception(msg)
raise AIGradingInternalError(msg)
# Return the workflow UUID # Return the workflow UUID
return workflow.uuid return workflow.uuid
...@@ -317,19 +324,19 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade" ...@@ -317,19 +324,19 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
Raises: Raises:
AIGradingInternalError AIGradingInternalError
AITrainingInternalError AITrainingInternalError
AIError AIReschedulingRequestError
""" """
if course_id is None or item_id is None: if course_id is None or item_id is None:
msg = u"Rescheduling tasks was not possible because the course_id / item_id was not assigned." msg = u"Rescheduling tasks was not possible because the course_id / item_id was not assigned."
logger.exception(msg) logger.exception(msg)
raise AIError raise AIReschedulingRequestError
# Reschedules all of the training tasks # Reschedules all of the training tasks
if task_type == u"train" or task_type is None: if task_type == u"train" or task_type is None:
try: try:
training_tasks.reschedule_training_tasks.apply_async(args=[course_id, item_id]) training_tasks.reschedule_training_tasks.apply_async(args=[course_id, item_id])
except Exception as ex: except ANTICIPATED_CELERY_ERRORS as ex:
msg = ( msg = (
u"Rescheduling training tasks for course {cid} and item {iid} failed with exception: {ex}" u"Rescheduling training tasks for course {cid} and item {iid} failed with exception: {ex}"
).format(cid=course_id, iid=item_id, ex=ex) ).format(cid=course_id, iid=item_id, ex=ex)
...@@ -340,7 +347,7 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade" ...@@ -340,7 +347,7 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=u"grade"
if task_type == u"grade" or task_type is None: if task_type == u"grade" or task_type is None:
try: try:
grading_tasks.reschedule_grading_tasks.apply_async(args=[course_id, item_id]) grading_tasks.reschedule_grading_tasks.apply_async(args=[course_id, item_id])
except Exception as ex: except ANTICIPATED_CELERY_ERRORS as ex:
msg = ( msg = (
u"Rescheduling grading tasks for course {cid} and item {iid} failed with exception: {ex}" u"Rescheduling grading tasks for course {cid} and item {iid} failed with exception: {ex}"
).format(cid=course_id, iid=item_id, ex=ex) ).format(cid=course_id, iid=item_id, ex=ex)
......
...@@ -55,11 +55,12 @@ def get_grading_task_params(grading_workflow_uuid): ...@@ -55,11 +55,12 @@ def get_grading_task_params(grading_workflow_uuid):
raise AIGradingInternalError(msg) raise AIGradingInternalError(msg)
classifier_set = workflow.classifier_set classifier_set = workflow.classifier_set
# Tasks shouldn't be scheduled until a classifier set is # Though tasks shouldn't be scheduled until classifer set(s) exist, off of the happy path this is a likely
# available, so this is a serious internal error. # occurrence. Our response is to log this lack of compliance to dependency as an exception, and then thrown
# an error with the purpose of killing the celery task running this code.
if classifier_set is None: if classifier_set is None:
msg = ( msg = (
u"AI grading workflow with UUID {} has no classifier set" u"AI grading workflow with UUID {} has no classifier set, but was scheduled for grading"
).format(grading_workflow_uuid) ).format(grading_workflow_uuid)
logger.exception(msg) logger.exception(msg)
raise AIGradingInternalError(msg) raise AIGradingInternalError(msg)
...@@ -72,7 +73,7 @@ def get_grading_task_params(grading_workflow_uuid): ...@@ -72,7 +73,7 @@ def get_grading_task_params(grading_workflow_uuid):
'item_id': workflow.item_id, 'item_id': workflow.item_id,
'algorithm_id': workflow.algorithm_id, 'algorithm_id': workflow.algorithm_id,
} }
except Exception as ex: except (DatabaseError, ClassifierSerializeError, IncompleteClassifierSet, ValueError) as ex:
msg = ( msg = (
u"An unexpected error occurred while retrieving " u"An unexpected error occurred while retrieving "
u"classifiers for the grading workflow with UUID {uuid}: {ex}" u"classifiers for the grading workflow with UUID {uuid}: {ex}"
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
Errors related to AI assessment. Errors related to AI assessment.
""" """
from celery.exceptions import InvalidTaskError, NotConfigured, NotRegistered, QueueNotFound
ANTICIPATED_CELERY_ERRORS = (InvalidTaskError, NotConfigured, NotRegistered, QueueNotFound)
class AIError(Exception): class AIError(Exception):
""" """
...@@ -36,3 +39,17 @@ class AIGradingInternalError(AIError): ...@@ -36,3 +39,17 @@ class AIGradingInternalError(AIError):
An unexpected error occurred while using the AI assessment API. An unexpected error occurred while using the AI assessment API.
""" """
pass pass
class AIReschedulingRequestError(AIError):
"""
There was a problem with the request sent to the AI assessment API.
"""
pass
class AIReschedulingInternalError(AIError):
"""
An unexpected error occurred while using the AI assessment API.
"""
pass
\ No newline at end of file
...@@ -14,7 +14,7 @@ from django_extensions.db.fields import UUIDField ...@@ -14,7 +14,7 @@ from django_extensions.db.fields import UUIDField
from dogapi import dog_stats_api from dogapi import dog_stats_api
from submissions import api as sub_api from submissions import api as sub_api
from openassessment.assessment.serializers import rubric_from_dict from openassessment.assessment.serializers import rubric_from_dict
from openassessment.assessment.errors.ai import AIError from openassessment.assessment.errors.ai import AIReschedulingInternalError
from .base import Rubric, Criterion, Assessment, AssessmentPart from .base import Rubric, Criterion, Assessment, AssessmentPart
from .training import TrainingExample from .training import TrainingExample
...@@ -118,10 +118,19 @@ class AIClassifierSet(models.Model): ...@@ -118,10 +118,19 @@ class AIClassifierSet(models.Model):
# Retrieve the criteria for this rubric, # Retrieve the criteria for this rubric,
# then organize them by criterion name # then organize them by criterion name
criteria = {
criterion.name: criterion try:
for criterion in Criterion.objects.filter(rubric=rubric) criteria = {
} criterion.name: criterion
for criterion in Criterion.objects.filter(rubric=rubric)
}
except DatabaseError as ex:
msg = (
u"An unexpected error occurred while retrieving rubric criteria with the"
u"rubric hash {rh} and algorithm_id {aid}: {ex}"
).format(rh=rubric.content_hash, aid=algorithm_id, ex=ex)
logger.exception(msg)
raise
# Check that we have classifiers for all criteria in the rubric # Check that we have classifiers for all criteria in the rubric
if set(criteria.keys()) != set(classifiers_dict.keys()): if set(criteria.keys()) != set(classifiers_dict.keys()):
...@@ -339,13 +348,8 @@ class AIWorkflow(models.Model): ...@@ -339,13 +348,8 @@ class AIWorkflow(models.Model):
for workflow_uuid in grade_workflow_uuids: for workflow_uuid in grade_workflow_uuids:
# Returns the grading workflow associated with the uuid stored in the initial query # Returns the grading workflow associated with the uuid stored in the initial query
try: workflow = cls.objects.get(uuid=workflow_uuid)
grading_workflow = cls.objects.get(uuid=workflow_uuid) yield workflow
yield grading_workflow
except (cls.DoesNotExist, ObjectDoesNotExist, DatabaseError) as ex:
msg = u"No workflow with uuid '{}' could be found within the system.".format(workflow_uuid)
logger.exception(msg)
raise AIError(ex)
def _log_start_workflow(self): def _log_start_workflow(self):
""" """
...@@ -353,15 +357,9 @@ class AIWorkflow(models.Model): ...@@ -353,15 +357,9 @@ class AIWorkflow(models.Model):
Increments the number of tasks of that kind. Increments the number of tasks of that kind.
""" """
# Identifies whether the task is a training or grading workflow # Identifies whether the type of task for reporting
data_path = None class_name = self.__class__.__name__
name = None data_path = 'openassessment.assessment.ai_task.' + class_name
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
# Sets identity tags which allow sorting by course and item # Sets identity tags which allow sorting by course and item
tags = [ tags = [
...@@ -369,7 +367,7 @@ class AIWorkflow(models.Model): ...@@ -369,7 +367,7 @@ class AIWorkflow(models.Model):
u"item_id:{item_id}".format(item_id=self.item_id), u"item_id:{item_id}".format(item_id=self.item_id),
] ]
logger.info(u"AI{name} workflow with uuid {uuid} was started.".format(name=name, uuid=self.uuid)) logger.info(u"{class_name} with uuid {uuid} was started.".format(class_name=class_name, uuid=self.uuid))
dog_stats_api.increment(data_path + '.scheduled_count', tags=tags) dog_stats_api.increment(data_path + '.scheduled_count', tags=tags)
...@@ -379,15 +377,9 @@ class AIWorkflow(models.Model): ...@@ -379,15 +377,9 @@ class AIWorkflow(models.Model):
Reports the total time the task took. Reports the total time the task took.
""" """
# Identifies whether the task is a training or grading workflow # Identifies whether the type of task for reporting
data_path = None class_name = self.__class__.__name__
name = None data_path = 'openassessment.assessment.ai_task.' + class_name
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
tags = [ tags = [
u"course_id:{course_id}".format(course_id=self.course_id), u"course_id:{course_id}".format(course_id=self.course_id),
...@@ -406,9 +398,9 @@ class AIWorkflow(models.Model): ...@@ -406,9 +398,9 @@ class AIWorkflow(models.Model):
logger.info( logger.info(
( (
u"AI{name} workflow with uuid {uuid} completed its workflow successfully " u"{class_name} with uuid {uuid} completed its workflow successfully "
u"in {seconds} seconds." u"in {seconds} seconds."
).format(name=name, uuid=self.uuid, seconds=time_delta.total_seconds()) ).format(class_name=class_name, uuid=self.uuid, seconds=time_delta.total_seconds())
) )
...@@ -605,6 +597,7 @@ class AIGradingWorkflow(AIWorkflow): ...@@ -605,6 +597,7 @@ class AIGradingWorkflow(AIWorkflow):
workflow.save() workflow.save()
workflow._log_start_workflow() workflow._log_start_workflow()
return workflow return workflow
@transaction.commit_on_success @transaction.commit_on_success
......
...@@ -9,11 +9,12 @@ from django.conf import settings ...@@ -9,11 +9,12 @@ from django.conf import settings
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from dogapi import dog_stats_api from dogapi import dog_stats_api
from openassessment.assessment.api import ai_worker as ai_worker_api from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError, AIGradingInternalError, AIGradingRequestError from openassessment.assessment.errors import (
AIError, AIGradingInternalError, AIGradingRequestError, AIReschedulingInternalError, ANTICIPATED_CELERY_ERRORS
)
from .algorithm import AIAlgorithm, AIAlgorithmError from .algorithm import AIAlgorithm, AIAlgorithmError
from openassessment.assessment.models.ai import AIClassifierSet, AIGradingWorkflow from openassessment.assessment.models.ai import AIClassifierSet, AIGradingWorkflow
MAX_RETRIES = 2 MAX_RETRIES = 2
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
...@@ -106,6 +107,10 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -106,6 +107,10 @@ def reschedule_grading_tasks(course_id, item_id):
Args: Args:
course_id (unicode): The course item that we will be rerunning the rescheduling on. course_id (unicode): The course item that we will be rerunning the rescheduling on.
item_id (unicode): The item that the rescheduling will be running on item_id (unicode): The item that the rescheduling will be running on
Raises:
AIReschedulingInternalError
AIGradingInternalError
""" """
# Logs the start of the rescheduling process and records the start time so that total time can be calculated later. # Logs the start of the rescheduling process and records the start time so that total time can be calculated later.
...@@ -113,7 +118,15 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -113,7 +118,15 @@ def reschedule_grading_tasks(course_id, item_id):
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
# Finds all incomplete grading workflows # Finds all incomplete grading workflows
grading_workflows = AIGradingWorkflow.get_incomplete_workflows(course_id, item_id) try:
grading_workflows = AIGradingWorkflow.get_incomplete_workflows(course_id, item_id)
except (DatabaseError, AIGradingWorkflow.DoesNotExist) as ex:
msg = (
u"An unexpected error occurred while retrieving all incomplete "
u"grading tasks for course_id: {cid} and item_id: {iid}: {ex}"
).format(cid=course_id, iid=item_id, ex=ex)
logger.exception(msg)
raise AIReschedulingInternalError(msg)
# Notes whether or not one or more operations failed. If they did, the process of rescheduling will be retried. # Notes whether or not one or more operations failed. If they did, the process of rescheduling will be retried.
failures = 0 failures = 0
...@@ -179,18 +192,23 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -179,18 +192,23 @@ def reschedule_grading_tasks(course_id, item_id):
logger.info( logger.info(
u"Rescheduling of grading was successful for grading workflow with uuid='{}'".format(workflow.uuid) u"Rescheduling of grading was successful for grading workflow with uuid='{}'".format(workflow.uuid)
) )
except (AIGradingInternalError, AIGradingRequestError, AIError) as ex: except ANTICIPATED_CELERY_ERRORS as ex:
msg = ( msg = (
u"An error occurred while try to grade essay with uuid='{id}': {ex}" u"An error occurred while try to grade essay with uuid='{id}': {ex}"
).format(id=workflow.uuid, ex=ex) ).format(id=workflow.uuid, ex=ex)
logger.exception(msg) logger.exception(msg)
failures += 1 failures += 1
# If we couldn't assign classifiers, we failed.
else:
failures += 1
# Logs the data from our rescheduling attempt # Logs the data from our rescheduling attempt
time_delta = datetime.datetime.now() - start_time time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_grading( _log_complete_reschedule_grading(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=(failures == 0) course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=(failures == 0)
) )
# If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way # If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way
# that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e. # that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e.
# the total number of workflows matching these critera that still have left to be graded). # the total number of workflows matching these critera that still have left to be graded).
...@@ -215,7 +233,7 @@ def _log_start_reschedule_grading(course_id=None, item_id=None): ...@@ -215,7 +233,7 @@ def _log_start_reschedule_grading(course_id=None, item_id=None):
u"course_id:{}".format(course_id), u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id), u"item_id:{}".format(item_id),
] ]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.scheduled_count', tags) dog_stats_api.increment('openassessment.assessment.ai_task.AIRescheduleGrading.scheduled_count', tags)
msg = u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}" msg = u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id)) logger.info(msg.format(cid=course_id, iid=item_id))
...@@ -241,11 +259,11 @@ def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, s ...@@ -241,11 +259,11 @@ def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, s
u"success:{}".format(success) u"success:{}".format(success)
] ]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_grade.turnaround_time', seconds,tags) dog_stats_api.histogram('openassessment.assessment.ai_task.AIRescheduleGrading.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.completed_count', tags) dog_stats_api.increment('openassessment.assessment.ai_task.AIRescheduleGrading.completed_count', tags)
msg = u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds." msg = u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success: if not success:
msg += u" At least one grading task failed due to internal error." msg += u" At least one grading task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds) msg.format(cid=course_id, iid=item_id, s=seconds)
logger.info(msg) logger.info(msg)
...@@ -5,16 +5,17 @@ import datetime ...@@ -5,16 +5,17 @@ import datetime
from collections import defaultdict from collections import defaultdict
from celery import task from celery import task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from celery.exceptions import InvalidTaskError, NotConfigured, NotRegistered, QueueNotFound
from dogapi import dog_stats_api from dogapi import dog_stats_api
from django.conf import settings from django.conf import settings
from django.db import DatabaseError
from openassessment.assessment.api import ai_worker as ai_worker_api from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError from openassessment.assessment.errors import AIError, ANTICIPATED_CELERY_ERRORS
from .algorithm import AIAlgorithm, AIAlgorithmError from .algorithm import AIAlgorithm, AIAlgorithmError
from .grading import reschedule_grading_tasks from .grading import reschedule_grading_tasks
from openassessment.assessment.errors.ai import AIGradingInternalError from openassessment.assessment.errors.ai import AIGradingInternalError
from openassessment.assessment.models.ai import AITrainingWorkflow from openassessment.assessment.models.ai import AITrainingWorkflow
MAX_RETRIES = 2 MAX_RETRIES = 2
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
...@@ -156,13 +157,25 @@ def reschedule_training_tasks(course_id, item_id): ...@@ -156,13 +157,25 @@ def reschedule_training_tasks(course_id, item_id):
Args: Args:
course_id (unicode): The course that we are going to search for unfinished training workflows course_id (unicode): The course that we are going to search for unfinished training workflows
item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for
Raises:
AIReschedulingInternalError
DatabaseError
""" """
# Starts logging the details of the rescheduling # Starts logging the details of the rescheduling
_log_start_reschedule_training(course_id=course_id, item_id=item_id) _log_start_reschedule_training(course_id=course_id, item_id=item_id)
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
# Run a query to find the incomplete training workflows # Run a query to find the incomplete training workflows
training_workflows = AITrainingWorkflow.get_incomplete_workflows(course_id, item_id) try:
training_workflows = AITrainingWorkflow.get_incomplete_workflows(course_id, item_id)
except (DatabaseError, AITrainingWorkflow.DoesNotExist) as ex:
msg = (
u"An unexpected error occurred while retrieving all incomplete "
u"training tasks for course_id: {cid} and item_id: {iid}: {ex}"
).format(cid=course_id, iid=item_id, ex=ex)
logger.exception(msg)
raise reschedule_training_tasks.retry()
# Tries to train every workflow that has not completed. # Tries to train every workflow that has not completed.
for target_workflow in training_workflows: for target_workflow in training_workflows:
...@@ -171,10 +184,10 @@ def reschedule_training_tasks(course_id, item_id): ...@@ -171,10 +184,10 @@ def reschedule_training_tasks(course_id, item_id):
logger.info( logger.info(
u"Rescheduling of training was successful for workflow with uuid{}".format(target_workflow.uuid) u"Rescheduling of training was successful for workflow with uuid{}".format(target_workflow.uuid)
) )
except Exception as ex: except ANTICIPATED_CELERY_ERRORS as ex:
msg = ( msg = (
u"An unexpected error occurred while scheduling the task for training workflow with UUID {}" u"An unexpected error occurred while scheduling the task for training workflow with UUID {id}: {ex}"
).format(target_workflow.uuid) ).format(id=target_workflow.uuid, ex=ex)
logger.exception(msg) logger.exception(msg)
time_delta = datetime.datetime.now() - start_time time_delta = datetime.datetime.now() - start_time
...@@ -251,7 +264,7 @@ def _log_start_reschedule_training(course_id=None, item_id=None): ...@@ -251,7 +264,7 @@ def _log_start_reschedule_training(course_id=None, item_id=None):
u"course_id:{}".format(course_id), u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id), u"item_id:{}".format(item_id),
] ]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.scheduled_count', tags) dog_stats_api.increment('openassessment.assessment.ai_task.AIRescheduleTraining.scheduled_count', tags)
msg = u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}" msg = u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id)) logger.info(msg.format(cid=course_id, iid=item_id))
...@@ -276,11 +289,11 @@ def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1, ...@@ -276,11 +289,11 @@ def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1,
u"success:{}".format(success) u"success:{}".format(success)
] ]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_train.turnaround_time', seconds,tags) dog_stats_api.histogram('openassessment.assessment.ai_task.AIRescheduleTraining.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.completed_count', tags) dog_stats_api.increment('openassessment.assessment.ai_task.AIRescheduleTraining.completed_count', tags)
msg = u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds." msg = u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success: if not success:
msg += u" At least one rescheduling task failed due to internal error." msg += u" At least one rescheduling task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds) msg.format(cid=course_id, iid=item_id, s=seconds)
logger.info(msg) logger.info(msg)
...@@ -233,7 +233,7 @@ class StaffInfoMixin(object): ...@@ -233,7 +233,7 @@ class StaffInfoMixin(object):
'success': True, 'success': True,
'msg': _(u"All AI tasks associated with this item have been rescheduled successfully.") 'msg': _(u"All AI tasks associated with this item have been rescheduled successfully.")
} }
except (AIGradingInternalError, AITrainingInternalError, AIError) as ex: except AIError as ex:
return { return {
'success': False, 'success': False,
'msg': _(u"An error occurred while rescheduling tasks: {}".format(ex)) 'msg': _(u"An error occurred while rescheduling tasks: {}".format(ex))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment