Commit 8f9dd4c8 by gradyward

Merge pull request #388 from edx/gradyward/datadog

Information about AI workflows gets sent to datadog
parents b4a0f027 ba64f06c
......@@ -4,12 +4,14 @@ Database models for AI assessment.
from uuid import uuid4
import json
import logging
import itertools
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import models, transaction, DatabaseError
from django.utils.timezone import now
from django.core.exceptions import ObjectDoesNotExist
from django_extensions.db.fields import UUIDField
from dogapi import dog_stats_api
from submissions import api as sub_api
from openassessment.assessment.serializers import rubric_from_dict
from openassessment.assessment.errors.ai import AIError
......@@ -307,6 +309,7 @@ class AIWorkflow(models.Model):
"""
self.completed_at = now()
self.save()
self._log_complete_workflow()
@classmethod
def get_incomplete_workflows(cls, course_id, item_id):
......@@ -344,6 +347,70 @@ class AIWorkflow(models.Model):
logger.exception(msg)
raise AIError(ex)
def _log_start_workflow(self):
"""
A logging operation called at the beginning of an AI Workflows life.
Increments the number of tasks of that kind.
"""
# Identifies whether the task is a training or grading workflow
data_path = None
name = None
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
# Sets identity tags which allow sorting by course and item
tags = [
u"course_id:{course_id}".format(course_id=self.course_id),
u"item_id:{item_id}".format(item_id=self.item_id),
]
logger.info(u"AI{name} workflow with uuid {uuid} was started.".format(name=name, uuid=self.uuid))
dog_stats_api.increment(data_path + '.scheduled_count', tags=tags)
def _log_complete_workflow(self):
"""
A logging operation called at the end of an AI Workflow's Life
Reports the total time the task took.
"""
# Identifies whether the task is a training or grading workflow
data_path = None
name = None
if isinstance(self, AITrainingWorkflow):
data_path = 'openassessment.assessment.ai_task.train'
name = u"Training"
elif isinstance(self, AIGradingWorkflow):
data_path = 'openassessment.assessment.ai_task.grade'
name = u"Grading"
tags = [
u"course_id:{course_id}".format(course_id=self.course_id),
u"item_id:{item_id}".format(item_id=self.item_id),
]
# Calculates the time taken to complete the task and reports it to datadog
time_delta = self.completed_at - self.scheduled_at
dog_stats_api.histogram(
data_path + '.turnaround_time',
time_delta.total_seconds(),
tags=tags
)
dog_stats_api.increment(data_path + '.completed_count', tags=tags)
logger.info(
(
u"AI{name} workflow with uuid {uuid} completed its workflow successfully "
u"in {seconds} seconds."
).format(name=name, uuid=self.uuid, seconds=time_delta.total_seconds())
)
class AITrainingWorkflow(AIWorkflow):
"""
......@@ -389,6 +456,7 @@ class AITrainingWorkflow(AIWorkflow):
workflow = AITrainingWorkflow.objects.create(algorithm_id=algorithm_id, item_id=item_id, course_id=course_id)
workflow.training_examples.add(*examples)
workflow.save()
workflow._log_start_workflow()
return workflow
@property
......@@ -512,7 +580,7 @@ class AIGradingWorkflow(AIWorkflow):
essay_text = unicode(submission)
# Create the workflow
return cls.objects.create(
workflow = cls.objects.create(
submission_uuid=submission_uuid,
essay_text=essay_text,
algorithm_id=algorithm_id,
......@@ -522,6 +590,10 @@ class AIGradingWorkflow(AIWorkflow):
rubric=rubric
)
workflow._log_start_workflow()
return workflow
@transaction.commit_on_success
def complete(self, criterion_scores):
"""
......
......@@ -2,9 +2,11 @@
Asynchronous tasks for grading essays using text classifiers.
"""
import datetime
from celery import task
from django.db import DatabaseError
from celery.utils.log import get_task_logger
from dogapi import dog_stats_api
from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError, AIGradingInternalError, AIGradingRequestError
from .algorithm import AIAlgorithm, AIAlgorithmError
......@@ -100,6 +102,11 @@ def reschedule_grading_tasks(course_id, item_id):
course_id (unicode): The course item that we will be rerunning the rescheduling on.
item_id (unicode): The item that the rescheduling will be running on
"""
# Logs the start of the rescheduling process and records the start time so that total time can be calculated later.
_log_start_reschedule_grading(course_id=course_id, item_id=item_id)
start_time = datetime.datetime.now()
# Finds all incomplete grading workflows
grading_workflows = AIGradingWorkflow.get_incomplete_workflows(course_id, item_id)
......@@ -174,6 +181,11 @@ def reschedule_grading_tasks(course_id, item_id):
logger.exception(msg)
failures += 1
# Logs the data from our rescheduling attempt
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_grading(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=(failures == 0)
)
# If one or more of these failed, we want to retry rescheduling. Note that this retry is executed in such a way
# that if it fails, an AIGradingInternalError will be raised with the number of failures on the last attempt (i.e.
# the total number of workflows matching these critera that still have left to be graded).
......@@ -183,4 +195,52 @@ def reschedule_grading_tasks(course_id, item_id):
u"In an attempt to reschedule grading workflows, there were {} failures.".format(failures)
)
except AIGradingInternalError as ex:
raise reschedule_grading_tasks.retry()
\ No newline at end of file
raise reschedule_grading_tasks.retry()
def _log_start_reschedule_grading(course_id=None, item_id=None):
"""
Sends data about the rescheduling_grading task to datadog
Args:
course_id (unicode): the course id to associate with the log start
item_id (unicode): the item id to tag with the log start
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.scheduled_count', tags)
msg = u"Rescheduling of incomplete grading tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id))
def _log_complete_reschedule_grading(course_id=None, item_id=None, seconds=-1, success=False):
"""
Sends the total time the rescheduling of grading tasks took to datadog
(Just the time taken to reschedule tasks, not the time nescessary to complete them)
Note that this function may be invoked multiple times per call to reschedule_grading_tasks,
because the time for EACH ATTEMPT is taken (i.e. if we fail (by error) to schedule grading once,
we log the time elapsed before trying again.)
Args:
course_id (unicode): the course_id to tag the task with
item_id (unicode): the item_id to tag the task with
seconds (int): the number of seconds that elapsed during the rescheduling task.
success (bool): indicates whether or not all attempts to reschedule were successful
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
u"success:{}".format(success)
]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_grade.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_grade.completed_count', tags)
msg = u"Rescheduling of incomplete grading tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success:
msg += u" At least one grading task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds)
logger.info(msg)
\ No newline at end of file
"""
Asynchronous tasks for training classifiers from examples.
"""
import datetime
from collections import defaultdict
from celery import task
from celery.utils.log import get_task_logger
from dogapi import dog_stats_api
from django.conf import settings
from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError
......@@ -153,6 +155,10 @@ def reschedule_training_tasks(course_id, item_id):
course_id (unicode): The course that we are going to search for unfinished training workflows
item_id (unicode): The specific item within that course that we will reschedule unfinished workflows for
"""
# Starts logging the details of the rescheduling
_log_start_reschedule_training(course_id=course_id, item_id=item_id)
start_time = datetime.datetime.now()
# Run a query to find the incomplete training workflows
training_workflows = AITrainingWorkflow.get_incomplete_workflows(course_id, item_id)
......@@ -168,8 +174,19 @@ def reschedule_training_tasks(course_id, item_id):
u"An unexpected error occurred while scheduling the task for training workflow with UUID {}"
).format(target_workflow.uuid)
logger.exception(msg)
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_training(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=False
)
raise reschedule_training_tasks.retry()
# Logs the total time to reschedule all training of classifiers if not logged beforehand by exception.
time_delta = datetime.datetime.now() - start_time
_log_complete_reschedule_training(
course_id=course_id, item_id=item_id, seconds=time_delta.total_seconds(), success=True
)
def _examples_by_criterion(examples):
"""
Transform the examples returned by the AI API into our internal format.
......@@ -218,3 +235,50 @@ def _examples_by_criterion(examples):
internal_ex = AIAlgorithm.ExampleEssay(text, score)
internal_examples[criterion_name].append(internal_ex)
return internal_examples
def _log_start_reschedule_training(course_id=None, item_id=None):
"""
Sends data about the rescheduling_training task to datadog
Args:
course_id (unicode): the course id to associate with the log start
item_id (unicode): the item id to tag with the log start
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
]
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.scheduled_count', tags)
msg = u"Rescheduling of incomplete training tasks began for course_id={cid} and item_id={iid}"
logger.info(msg.format(cid=course_id, iid=item_id))
def _log_complete_reschedule_training(course_id=None, item_id=None, seconds=-1, success=False):
"""
Sends the total time the rescheduling of training tasks took to datadog
Note that this function may be invoked multiple times per call to reschedule_training_tasks,
because the time for EACH ATTEMPT is taken (i.e. if we fail (by error) to schedule training once,
we log the time elapsed before trying again.)
Args:
course_id (unicode): the course_id to tag the task with
item_id (unicode): the item_id to tag the task with
seconds (int): the number of seconds that elapsed during the rescheduling task.
success (bool): indicates whether or not all attempts to reschedule were successful
"""
tags = [
u"course_id:{}".format(course_id),
u"item_id:{}".format(item_id),
u"success:{}".format(success)
]
dog_stats_api.histogram('openassessment.assessment.ai_task.reschedule_train.turnaround_time', seconds,tags)
dog_stats_api.increment('openassessment.assessment.ai_task.reschedule_train.completed_count', tags)
msg = u"Rescheduling of incomplete training tasks for course_id={cid} and item_id={iid} completed in {s} seconds."
if not success:
msg += u" At least one rescheduling task failed due to internal error."
msg.format(cid=course_id,iid=item_id,s=seconds)
logger.info(msg)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment