Commit ba64f06c by gradyward

Added points where information about AI workflows is sent to datadog

parent b4a0f027
...@@ -4,12 +4,14 @@ Database models for AI assessment. ...@@ -4,12 +4,14 @@ Database models for AI assessment.
from uuid import uuid4 from uuid import uuid4
import json import json
import logging import logging
import itertools
from django.conf import settings from django.conf import settings
from django.core.files.base import ContentFile from django.core.files.base import ContentFile
from django.db import models, transaction, DatabaseError from django.db import models, transaction, DatabaseError
from django.utils.timezone import now from django.utils.timezone import now
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django_extensions.db.fields import UUIDField from django_extensions.db.fields import UUIDField
from dogapi import dog_stats_api
from submissions import api as sub_api from submissions import api as sub_api
from openassessment.assessment.serializers import rubric_from_dict from openassessment.assessment.serializers import rubric_from_dict
from openassessment.assessment.errors.ai import AIError from openassessment.assessment.errors.ai import AIError
...@@ -307,6 +309,7 @@ class AIWorkflow(models.Model): ...@@ -307,6 +309,7 @@ class AIWorkflow(models.Model):
""" """
self.completed_at = now() self.completed_at = now()
self.save() self.save()
self._log_complete_workflow()
@classmethod @classmethod
def get_incomplete_workflows(cls, course_id, item_id): def get_incomplete_workflows(cls, course_id, item_id):
...@@ -344,6 +347,70 @@ class AIWorkflow(models.Model): ...@@ -344,6 +347,70 @@ class AIWorkflow(models.Model):
logger.exception(msg) logger.exception(msg)
raise AIError(ex) raise AIError(ex)
def _log_start_workflow(self):
"""
A logging operation called at the beginning of an AI Workflows life.
Increments the number of tasks of that kind.
"""
# Identifies whether the task is a training or grading workflow
data_path = None
name = None
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
# Sets identity tags which allow sorting by course and item
tags = [
u"course_id:{course_id}".format(course_id=self.course_id),
u"item_id:{item_id}".format(item_id=self.item_id),
]
logger.info(u"AI{name} workflow with uuid {uuid} was started.".format(name=name, uuid=self.uuid))
dog_stats_api.increment(data_path + '.scheduled_count', tags=tags)
def _log_complete_workflow(self):
"""
A logging operation called at the end of an AI Workflow's Life
Reports the total time the task took.
"""
# Identifies whether the task is a training or grading workflow
data_path = None
name = None
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
tags = [
u"course_id:{course_id}".format(course_id=self.course_id),
u"item_id:{item_id}".format(item_id=self.item_id),
]
# Calculates the time taken to complete the task and reports it to datadog
time_delta = self.completed_at - self.scheduled_at
dog_stats_api.histogram(
data_path + '.turnaround_time',
time_delta.total_seconds(),
tags=tags
)
dog_stats_api.increment(data_path + '.completed_count', tags=tags)
logger.info(
(
u"AI{name} workflow with uuid {uuid} completed its workflow successfully "
u"in {seconds} seconds."
).format(name=name, uuid=self.uuid, seconds=time_delta.total_seconds())
)
class AITrainingWorkflow(AIWorkflow): class AITrainingWorkflow(AIWorkflow):
""" """
...@@ -389,6 +456,7 @@ class AITrainingWorkflow(AIWorkflow): ...@@ -389,6 +456,7 @@ class AITrainingWorkflow(AIWorkflow):
workflow = AITrainingWorkflow.objects.create(algorithm_id=algorithm_id, item_id=item_id, course_id=course_id) workflow = AITrainingWorkflow.objects.create(algorithm_id=algorithm_id, item_id=item_id, course_id=course_id)
workflow.training_examples.add(*examples) workflow.training_examples.add(*examples)
workflow.save() workflow.save()
workflow._log_start_workflow()
return workflow return workflow
@property @property
...@@ -512,7 +580,7 @@ class AIGradingWorkflow(AIWorkflow): ...@@ -512,7 +580,7 @@ class AIGradingWorkflow(AIWorkflow):
essay_text = unicode(submission) essay_text = unicode(submission)
# Create the workflow # Create the workflow
return cls.objects.create( workflow = cls.objects.create(
submission_uuid=submission_uuid, submission_uuid=submission_uuid,
essay_text=essay_text, essay_text=essay_text,
algorithm_id=algorithm_id, algorithm_id=algorithm_id,
...@@ -522,6 +590,10 @@ class AIGradingWorkflow(AIWorkflow): ...@@ -522,6 +590,10 @@ class AIGradingWorkflow(AIWorkflow):
rubric=rubric rubric=rubric
) )
workflow._log_start_workflow()
return workflow
@transaction.commit_on_success @transaction.commit_on_success
def complete(self, criterion_scores): def complete(self, criterion_scores):
""" """
......
...@@ -2,9 +2,11 @@ ...@@ -2,9 +2,11 @@
Asynchronous tasks for grading essays using text classifiers. Asynchronous tasks for grading essays using text classifiers.
""" """
import datetime
from celery import task from celery import task
from django.db import DatabaseError from django.db import DatabaseError
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from dogapi import dog_stats_api
from openassessment.assessment.api import ai_worker as ai_worker_api from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError, AIGradingInternalError, AIGradingRequestError from openassessment.assessment.errors import AIError, AIGradingInternalError, AIGradingRequestError
from .algorithm import AIAlgorithm, AIAlgorithmError from .algorithm import AIAlgorithm, AIAlgorithmError
...@@ -100,6 +102,11 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -100,6 +102,11 @@ def reschedule_grading_tasks(course_id, item_id):
course_id (unicode): The course item that we will be rerunning the rescheduling on. course_id (unicode): The course item that we will be rerunning the rescheduling on.
item_id (unicode): The item that the rescheduling will be running on item_id (unicode): The item that the rescheduling will be running on
""" """
# Logs the start of the rescheduling process and records the start time so that total time can be calculated later.
_log_start_reschedule_grading(course_id=course_id, item_id=item_id)
start_time = datetime.datetime.now()
# Finds all incomplete grading workflows # Finds all incomplete grading workflows
grading_workflows = AIGradingWorkflow.get_incomplete_workflows(course_id, item_id) grading_workflows = AIGradingWorkflow.get_incomplete_workflows(course_id, item_id)
...@@ -174,6 +181,11 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -174,6 +181,11 @@ def reschedule_grading_tasks(course_id, item_id):
logger.exception(msg) logger.exception(msg)
failures += 1 failures += 1
# Logs the data from our rescheduling attempt
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_grading(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=(failures == 0)
)
# If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way # If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way
# that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e. # that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e.
# the total number of workflows matching these critera that still have left to be graded). # the total number of workflows matching these critera that still have left to be graded).
...@@ -184,3 +196,51 @@ def reschedule_grading_tasks(course_id, item_id): ...@@ -184,3 +196,51 @@ def reschedule_grading_tasks(course_id, item_id):
) )
except AIGradingInternalError as ex: except AIGradingInternalError as ex:
raise reschedule_grading_tasks.retry() raise reschedule_grading_tasks.retry()
def _log_start_reschedule_grading(course_id=None, item_id=None):
"""
Sends data about the rescheduling_grading task to datadog
Args:
course_id (unicode): the course id to associate with the log start
item_id (unicode): the item id to tag with the log start
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.scheduled_count', tags)
msg = u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id))
def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, success=False):
"""
Sends the total time the rescheduling of grading tasks took to datadog
(Just the time taken to reschedule tasks, not the time nescessary to complete them)
Note that this function may be invoked multiple times per call to reschedule_grading_tasks,
because the time for EACH ATTEMPT is taken (i.e. if we fail (by error) to schedule grading once,
we log the time elapsed before trying again.)
Args:
course_id (unicode): the course_id to tag the task with
item_id (unicode): the item_id to tag the task with
seconds (int): the number of seconds that elapsed during the rescheduling task.
success (bool): indicates whether or not all attempts to reschedule were successful
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
u"success:{}".format(success)
]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_grade.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.completed_count', tags)
msg = u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success:
msg += u" At least one grading task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds)
logger.info(msg)
\ No newline at end of file
""" """
Asynchronous tasks for training classifiers from examples. Asynchronous tasks for training classifiers from examples.
""" """
import datetime
from collections import defaultdict from collections import defaultdict
from celery import task from celery import task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from dogapi import dog_stats_api
from django.conf import settings from django.conf import settings
from openassessment.assessment.api import ai_worker as ai_worker_api from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError from openassessment.assessment.errors import AIError
...@@ -153,6 +155,10 @@ def reschedule_training_tasks(course_id, item_id): ...@@ -153,6 +155,10 @@ def reschedule_training_tasks(course_id, item_id):
course_id (unicode): The course that we are going to search for unfinished training workflows course_id (unicode): The course that we are going to search for unfinished training workflows
item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for
""" """
# Starts logging the details of the rescheduling
_log_start_reschedule_training(course_id=course_id, item_id=item_id)
start_time = datetime.datetime.now()
# Run a query to find the incomplete training workflows # Run a query to find the incomplete training workflows
training_workflows = AITrainingWorkflow.get_incomplete_workflows(course_id, item_id) training_workflows = AITrainingWorkflow.get_incomplete_workflows(course_id, item_id)
...@@ -168,8 +174,19 @@ def reschedule_training_tasks(course_id, item_id): ...@@ -168,8 +174,19 @@ def reschedule_training_tasks(course_id, item_id):
u"An unexpected error occurred while scheduling the task for training workflow with UUID {}" u"An unexpected error occurred while scheduling the task for training workflow with UUID {}"
).format(target_workflow.uuid) ).format(target_workflow.uuid)
logger.exception(msg) logger.exception(msg)
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_training(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=False
)
raise reschedule_training_tasks.retry() raise reschedule_training_tasks.retry()
# Logs the total time to reschedule all training of classifiers if not logged beforehand by exception.
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_training(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=True
)
def _examples_by_criterion(examples): def _examples_by_criterion(examples):
""" """
Transform the examples returned by the AI API into our internal format. Transform the examples returned by the AI API into our internal format.
...@@ -218,3 +235,50 @@ def _examples_by_criterion(examples): ...@@ -218,3 +235,50 @@ def _examples_by_criterion(examples):
internal_ex = AIAlgorithm.ExampleEssay(text, score) internal_ex = AIAlgorithm.ExampleEssay(text, score)
internal_examples[criterion_name].append(internal_ex) internal_examples[criterion_name].append(internal_ex)
return internal_examples return internal_examples
def _log_start_reschedule_training(course_id=None, item_id=None):
"""
Sends data about the rescheduling_training task to datadog
Args:
course_id (unicode): the course id to associate with the log start
item_id (unicode): the item id to tag with the log start
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.scheduled_count', tags)
msg = u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id))
def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1, success=False):
"""
Sends the total time the rescheduling of training tasks took to datadog
Note that this function may be invoked multiple times per call to reschedule_training_tasks,
because the time for EACH ATTEMPT is taken (i.e. if we fail (by error) to schedule training once,
we log the time elapsed before trying again.)
Args:
course_id (unicode): the course_id to tag the task with
item_id (unicode): the item_id to tag the task with
seconds (int): the number of seconds that elapsed during the rescheduling task.
success (bool): indicates whether or not all attempts to reschedule were successful
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
u"success:{}".format(success)
]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_train.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.completed_count', tags)
msg = u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success:
msg += u" At least one rescheduling task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds)
logger.info(msg)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment