Commit 83ea06c4 by Will Daly

Initial AI training implementation

parent 6814e508
......@@ -30,6 +30,7 @@ pip-log.txt
.tox
nosetests.xml
htmlcov
coverage.xml
# Translations
*.mo
......@@ -57,6 +58,7 @@ coverage
# tim-specific
ora2db
apps/openassessment/xblock/static/js/fixtures/*.html
storage/*
# logging
logs/*.log*
"""
Public interface for AI training and grading, used by students/course authors.
"""
import logging
from openassessment.assessment.serializers import (
deserialize_training_examples, InvalidTrainingExample, InvalidRubric
)
from openassessment.assessment.errors import (
AITrainingRequestError, AITrainingInternalError
)
from openassessment.assessment.models import AITrainingWorkflow, InvalidOptionSelection
from openassessment.assessment.worker import training as training_tasks
logger = logging.getLogger(__name__)
def submit(submission_uuid, rubric):
......@@ -46,13 +58,13 @@ def get_latest_assessment(submission_uuid):
pass
def train_classifiers(rubric, examples, algorithm_id):
def train_classifiers(rubric_dict, examples, algorithm_id):
"""
Schedule a task to train classifiers.
All training examples must match the rubric!
Args:
rubric (dict): The rubric used to assess the classifiers.
rubric_dict (dict): The rubric used to assess the classifiers.
examples (list of dict): Serialized training examples.
algorithm_id (unicode): The ID of the algorithm used to train the classifiers.
......@@ -67,7 +79,37 @@ def train_classifiers(rubric, examples, algorithm_id):
AITrainingInternalError
"""
pass
# Get or create the rubric and training examples
try:
examples = deserialize_training_examples(examples, rubric_dict)
except (InvalidRubric, InvalidTrainingExample, InvalidOptionSelection) as ex:
msg = u"Could not parse rubric and/or training examples: {ex}".format(ex=ex)
raise AITrainingRequestError(msg)
# Create the workflow model
try:
workflow = AITrainingWorkflow.start_workflow(examples, algorithm_id)
except:
msg = (
u"An unexpected error occurred while creating "
u"the AI training workflow"
)
logger.exception(msg)
raise AITrainingInternalError(msg)
# Schedule the task, parametrized by the workflow UUID
try:
training_tasks.train_classifiers.apply_async(args=[workflow.uuid])
except:
msg = (
u"An unexpected error occurred while scheduling "
u"the task for training workflow with UUID {}"
).format(workflow.uuid)
logger.exception(msg)
raise AITrainingInternalError(msg)
# Return the workflow UUID
return workflow.uuid
def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=None):
......@@ -82,8 +124,7 @@ def reschedule_unfinished_tasks(course_id=None, item_id=None, task_type=None):
task_type (unicode): Either "grade" or "train". Restrict to unfinished tasks of this type.
Raises:
AIRequestError
AIInternalError
AIError
"""
pass
"""
Public interface for AI training and grading, used by workers.
"""
import logging
from django.utils.timezone import now
from django.db import DatabaseError
from openassessment.assessment.models import (
AITrainingWorkflow, AIClassifierSet,
ClassifierUploadError, ClassifierSerializeError,
IncompleteClassifierSet
)
from openassessment.assessment.errors import (
AITrainingRequestError, AITrainingInternalError
)
logger = logging.getLogger(__name__)
def get_submission(grading_workflow_uuid):
......@@ -40,7 +55,6 @@ def get_classifier_set(grading_workflow_uuid):
pass
def create_assessment(grading_workflow_uuid, assessment):
"""
Create an AI assessment (complete the AI grading task).
......@@ -75,7 +89,21 @@ def get_algorithm_id(training_workflow_uuid):
AITrainingInternalError
"""
pass
try:
workflow = AITrainingWorkflow.objects.get(uuid=training_workflow_uuid)
return workflow.algorithm_id
except AITrainingWorkflow.DoesNotExist:
msg = (
u"Could not retrieve AI training workflow with UUID {}"
).format(training_workflow_uuid)
raise AITrainingRequestError(msg)
except DatabaseError:
msg = (
u"An unexpected error occurred while retrieving "
u"the algorithm ID for training workflow with UUID {}"
).format(training_workflow_uuid)
logger.exception(msg)
raise AITrainingInternalError(msg)
def get_training_examples(training_workflow_uuid):
......@@ -86,14 +114,66 @@ def get_training_examples(training_workflow_uuid):
training_workflow_uuid (str): The UUID of the training workflow.
Returns:
list of dict: Serialized training examples.
list of dict: Serialized training examples, of the form:
Raises:
AITrainingRequestError
AITrainingInternalError
Example usage:
>>> get_training_examples('abcd1234')
[
{
"text": u"Example answer number one",
"scores": {
"vocabulary": 1,
"grammar": 2
}
},
{
"text": u"Example answer number two",
"scores": {
"vocabulary": 3,
"grammar": 1
}
}
]
"""
pass
try:
workflow = AITrainingWorkflow.objects.get(uuid=training_workflow_uuid)
returned_examples = []
for example in workflow.training_examples.all():
answer = example.answer
if isinstance(answer, dict):
text = answer.get('answer', '')
else:
text = answer
scores = {
option.criterion.name: option.points
for option in example.options_selected.all()
}
returned_examples.append({
'text': text,
'scores': scores
})
return returned_examples
except AITrainingWorkflow.DoesNotExist:
msg = (
u"Could not retrieve AI training workflow with UUID {}"
).format(training_workflow_uuid)
raise AITrainingRequestError(msg)
except DatabaseError:
msg = (
u"An unexpected error occurred while retrieving "
u"training examples for the AI training workflow with UUID {}"
).format(training_workflow_uuid)
logger.exception(msg)
raise AITrainingInternalError(msg)
def create_classifiers(training_workflow_uuid, classifier_set):
......@@ -116,4 +196,56 @@ def create_classifiers(training_workflow_uuid, classifier_set):
AITrainingInternalError
"""
pass
try:
workflow = AITrainingWorkflow.objects.get(uuid=training_workflow_uuid)
# If the task is executed multiple times, the classifier set may already
# have been created. If so, log a warning then return immediately.
if workflow.classifier_set is not None:
msg = u"AI training workflow with UUID {} already has trained classifiers."
logger.warning(msg)
return
# Retrieve the rubric model
rubric = workflow.rubric
if rubric is None:
msg = (
u"The AI training workflow with UUID {} does not have "
u"a rubric associated with it, which means it has no "
u"training examples."
).format(training_workflow_uuid)
logger.exception(msg)
raise AITrainingInternalError(msg)
try:
workflow.classifier_set = AIClassifierSet.create_classifier_set(
classifier_set, rubric, workflow.algorithm_id
)
except IncompleteClassifierSet as ex:
msg = (
u"An error occurred while creating the classifier set "
u"for the training workflow with UUID {uuid}: {ex}"
).format(uuid=training_workflow_uuid, ex=ex)
raise AITrainingRequestError(msg)
except (ClassifierSerializeError, ClassifierUploadError, DatabaseError) as ex:
msg = (
u"An unexpected error occurred while creating the classifier "
u"set for training workflow UUID {uuid}: {ex}"
).format(uuid=training_workflow_uuid, ex=ex)
logger.exception(msg)
raise AITrainingInternalError(msg)
workflow.completed_at = now()
workflow.save()
except AITrainingWorkflow.DoesNotExist:
msg = (
u"Could not retrieve AI training workflow with UUID {}"
).format(training_workflow_uuid)
raise AITrainingRequestError(msg)
except DatabaseError:
msg = (
u"An unexpected error occurred while creating the classifier set "
u"for the AI training workflow with UUID {}"
).format(training_workflow_uuid)
logger.exception(msg)
raise AITrainingInternalError(msg)
......@@ -7,3 +7,4 @@ Export errors from all modules defined in this package.
from .peer import *
from .self import *
from .student_training import *
from .ai import *
......@@ -7,3 +7,4 @@ from .base import *
from .peer import *
from .training import *
from .student_training import *
from .ai import *
"""
Database models for AI assessment.
"""
from uuid import uuid4
import json
from django.core.files.base import ContentFile
from django.db import models, transaction
from django.utils.timezone import now
from django_extensions.db.fields import UUIDField
from .base import Rubric, Criterion
from .training import TrainingExample
class IncompleteClassifierSet(Exception):
"""
The classifier set is missing a classifier for a criterion in the rubric.
"""
def __init__(self, expected_criteria, actual_criteria):
"""
"""
missing_criteria = set(expected_criteria) - set(actual_criteria)
msg = (
u"Missing classifiers for the following "
u"criteria: {missing}"
).format(missing=missing_criteria)
super(IncompleteClassifierSet, self).__init__(msg)
class ClassifierUploadError(Exception):
"""
An error occurred while uploading classifier data.
"""
pass
class ClassifierSerializeError(Exception):
"""
An error occurred while serializing classifier data.
"""
pass
class AIClassifierSet(models.Model):
"""
A set of trained classifiers (immutable).
"""
class Meta:
app_label = "assessment"
# The rubric associated with this set of classifiers
# We should have one classifier for each of the criteria in the rubric.
rubric = models.ForeignKey(Rubric, related_name="+")
# Timestamp for when the classifier set was created.
# This allows us to find the most recently trained set of classifiers.
created_at = models.DateTimeField(default=now, db_index=True)
@classmethod
@transaction.commit_on_success
def create_classifier_set(cls, classifiers_dict, rubric, algorithm_id):
"""
Create a set of classifiers.
Args:
classifiers_dict (dict): Mapping of criterion names to
JSON-serializable classifiers.
rubric (Rubric): The rubric model.
algorithm_id (unicode): The ID of the algorithm used to train the classifiers.
Returns:
AIClassifierSet
Raises:
ClassifierSerializeError
ClassifierUploadError
DatabaseError
"""
# Create the classifier set
classifier_set = cls.objects.create(rubric=rubric)
# Retrieve the criteria for this rubric,
# then organize them by criterion name
criteria = {
criterion.name: criterion
for criterion in Criterion.objects.filter(rubric=rubric)
}
# Check that we have classifiers for all criteria in the rubric
if set(criteria.keys()) != set(classifiers_dict.keys()):
raise IncompleteClassifierSet(criteria.keys(), classifiers_dict.keys())
# Create classifiers for each criterion
for criterion_name, classifier_data in classifiers_dict.iteritems():
criterion = criteria.get(criterion_name)
classifier = AIClassifier.objects.create(
classifier_set=classifier_set,
criterion=criterion,
algorithm_id=algorithm_id
)
# Serialize the classifier data and upload
try:
contents = ContentFile(json.dumps(classifier_data))
except (TypeError, ValueError, UnicodeDecodeError) as ex:
msg = (
u"Could not serialize classifier data as JSON: {ex}"
).format(ex=ex)
raise ClassifierSerializeError(msg)
filename = uuid4().hex
try:
classifier.classifier_data.save(filename, contents)
except Exception as ex:
msg = (
u"Could not upload classifier data to {filename}: {ex}"
).format(filename=filename, ex=ex)
raise ClassifierUploadError(msg)
return classifier_set
@property
def classifiers_dict(self):
"""
Return all classifiers in this classifier set in a dictionary
that maps criteria names to classifier data.
Returns:
dict: keys are criteria names, values are JSON-serializable classifier data
If there are no classifiers in the set, returns None
"""
classifiers = list(self.classifiers.all()) # pylint: disable=E1101
if len(classifiers) == 0:
return None
else:
return {
classifier.criterion.name: classifier.download_classifier_data()
for classifier in classifiers
}
class AIClassifier(models.Model):
"""
A trained classifier (immutable).
"""
class Meta:
app_label = "assessment"
# Directory in which classifiers will be stored
# For instance, if we're using the default file system storage backend
# for local development, this will be a subdirectory.
# If using an S3 storage backend, this will be a subdirectory in
# an AWS S3 bucket.
AI_CLASSIFIER_STORAGE = "ora2_ai_classifiers"
# The set of classifiers this classifier belongs to
classifier_set = models.ForeignKey(AIClassifierSet, related_name="classifiers")
# The criterion (in the rubric) that this classifier evaluates.
criterion = models.ForeignKey(Criterion, related_name="+")
# The serialized classifier
# Because this may be large, we store it using a Django `FileField`,
# which allows us to plug in different storage backends (such as S3)
classifier_data = models.FileField(upload_to=AI_CLASSIFIER_STORAGE)
# The ID of the algorithm that was used to train this classifier.
algorithm_id = models.CharField(max_length=128, db_index=True)
def download_classifier_data(self):
"""
Download and deserialize the classifier data.
Returns:
JSON-serializable
Raises:
ValueError
IOError
"""
return json.loads(self.classifier_data.read()) # pylint:disable=E1101
class AITrainingWorkflow(models.Model):
"""
Used to track all training tasks.
Training tasks take as input an algorithm ID and a set of training examples
(which are associated with a rubric).
On successful completion, training tasks output a set of trained classifiers.
"""
class Meta:
app_label = "assessment"
# Unique identifier used to track this workflow
uuid = UUIDField(version=1, db_index=True)
# The ID of the algorithm used to train the classifiers
# This is a parameter passed to and interpreted by the workers.
# Django settings allow the users to map algorithm ID strings
# to the Python code they should use to perform the training.
algorithm_id = models.CharField(max_length=128, db_index=True)
# The training examples (essays + scores) used to train the classifiers.
# This is a many-to-many field because
# (a) we need multiple training examples to train a classifier, and
# (b) we may want to re-use training examples
# (for example, if a training task is executed by Celery workers multiple times)
training_examples = models.ManyToManyField(TrainingExample, related_name="+")
# The set of trained classifiers.
# Until the task completes successfully, this will be set to null.
classifier_set = models.ForeignKey(
AIClassifierSet, related_name='training_workflow',
null=True, default=None
)
# Timestamps
# The task is *scheduled* as soon as a client asks the API to
# train classifiers.
# The task is *completed* when a worker has successfully created a
# classifier set based on the training examples.
scheduled_at = models.DateTimeField(default=now, db_index=True)
completed_at = models.DateTimeField(null=True, db_index=True)
@classmethod
@transaction.commit_on_success
def start_workflow(cls, examples, algorithm_id):
"""
Start a workflow to track a training task.
Args:
examples (list of TrainingExample): The training examples used to create the classifiers.
algorithm_id (unicode): The ID of the algorithm to use for training.
Returns:
AITrainingWorkflow
"""
workflow = AITrainingWorkflow.objects.create(algorithm_id=algorithm_id)
workflow.training_examples.add(*examples)
workflow.save()
return workflow
@property
def rubric(self):
"""
Return the rubric associated with this classifier set.
Returns:
Rubric or None (if no training examples are available)
"""
# We assume that all the training examples we have been provided are using
# the same rubric (this is enforced by the API call that deserializes
# the training examples).
first_example = list(self.training_examples.all()[:1]) # pylint: disable=E1101
if first_example:
return first_example[0].rubric
else:
return None
# coding=utf-8
"""
Tests for AI assessment.
"""
import copy
import mock
from django.db import DatabaseError
from django.test.utils import override_settings
from openassessment.test_utils import CacheResetTest
from openassessment.assessment.api import ai as ai_api
from openassessment.assessment.models import AITrainingWorkflow
from openassessment.assessment.worker.algorithm import AIAlgorithm
from openassessment.assessment.errors import AITrainingRequestError, AITrainingInternalError
from openassessment.assessment.test.constants import RUBRIC, EXAMPLES
class StubAIAlgorithm(AIAlgorithm):
"""
Stub implementation of a supervised ML algorithm.
"""
# The format of the serialized classifier is controlled
# by the AI algorithm implementation, so we can return
# anything here as long as it's JSON-serializable
FAKE_CLASSIFIER = {
'name': u'ƒαкє ¢ℓαѕѕιƒιєя',
'binary_content': "TWFuIGlzIGRpc3Rpbmd1aX"
}
def train_classifier(self, examples):
"""
Stub implementation that returns fake classifier data.
"""
# Include the input essays in the classifier
# so we can test that the correct inputs were used
classifier = copy.copy(self.FAKE_CLASSIFIER)
classifier['examples'] = examples
return classifier
def score(self, text, classifier):
"""
Not implemented, but we need to make the abstact
method concrete.
"""
raise NotImplementedError
class AITrainingTest(CacheResetTest):
"""
Tests for AI training tasks.
"""
ALGORITHM_ID = "test-stub"
AI_ALGORITHMS = {
ALGORITHM_ID: '{module}.StubAIAlgorithm'.format(module=__name__)
}
EXPECTED_INPUT_SCORES = {
u'vøȼȺƀᵾłȺɍɏ': [1, 0],
u'ﻭɼค๓๓คɼ': [0, 2]
}
# Use a stub AI algorithm
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_train_classifiers(self):
# Schedule a training task
# Because Celery is configured in "always eager" mode,
# expect the task to be executed synchronously.
workflow_uuid = ai_api.train_classifiers(RUBRIC, EXAMPLES, self.ALGORITHM_ID)
# Retrieve the classifier set from the database
workflow = AITrainingWorkflow.objects.get(uuid=workflow_uuid)
classifier_set = workflow.classifier_set
self.assertIsNot(classifier_set, None)
# Retrieve a dictionary mapping criteria names to deserialized classifiers
classifiers = classifier_set.classifiers_dict
# Check that we have classifiers for all criteria in the rubric
criteria = set(criterion['name'] for criterion in RUBRIC['criteria'])
self.assertEqual(set(classifiers.keys()), criteria)
# Check that the classifier data matches the data from our stub AI algorithm
# Since the stub data includes the training examples, we also verify
# that the classifier was trained using the correct examples.
for criterion in RUBRIC['criteria']:
classifier = classifiers[criterion['name']]
self.assertEqual(classifier['name'], StubAIAlgorithm.FAKE_CLASSIFIER['name'])
self.assertEqual(classifier['binary_content'], StubAIAlgorithm.FAKE_CLASSIFIER['binary_content'])
# Verify that the correct essays and scores were used to create the classifier
# Our stub AI algorithm provides these for us, but they would not ordinarily
# be included in the trained classifier.
self.assertEqual(len(classifier['examples']), len(EXAMPLES))
expected_scores = self.EXPECTED_INPUT_SCORES[criterion['name']]
for data in zip(EXAMPLES, classifier['examples'], expected_scores):
sent_example, received_example, expected_score = data
received_example = AIAlgorithm.ExampleEssay(*received_example)
self.assertEqual(received_example.text, sent_example['answer'])
self.assertEqual(received_example.score, expected_score)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_train_classifiers_invalid_examples(self):
# Mutate an example so it does not match the rubric
mutated_examples = copy.deepcopy(EXAMPLES)
mutated_examples[0]['options_selected'] = {'invalid': 'invalid'}
# Expect a request error
with self.assertRaises(AITrainingRequestError):
ai_api.train_classifiers(RUBRIC, mutated_examples, self.ALGORITHM_ID)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
@mock.patch.object(AITrainingWorkflow.objects, 'create')
def test_start_workflow_database_error(self, mock_create):
# Simulate a database error when creating the training workflow
mock_create.side_effect = DatabaseError("KABOOM!")
with self.assertRaises(AITrainingInternalError):
ai_api.train_classifiers(RUBRIC, EXAMPLES, self.ALGORITHM_ID)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
@mock.patch('openassessment.assessment.api.ai.training_tasks')
def test_schedule_training_error(self, mock_training_tasks):
# Simulate an exception raised when scheduling a training task
mock_training_tasks.train_classifiers.apply_async.side_effect = Exception("KABOOM!")
with self.assertRaises(AITrainingInternalError):
ai_api.train_classifiers(RUBRIC, EXAMPLES, self.ALGORITHM_ID)
# -*- coding: utf-8 -*-
"""
Tests for AI worker API calls.
"""
import copy
import datetime
import mock
from django.db import DatabaseError
from openassessment.test_utils import CacheResetTest
from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.models import AITrainingWorkflow, AIClassifier
from openassessment.assessment.serializers import deserialize_training_examples
from openassessment.assessment.errors import (
AITrainingRequestError, AITrainingInternalError
)
from openassessment.assessment.test.constants import EXAMPLES, RUBRIC
class AIWorkerTrainingTest(CacheResetTest):
"""
Tests for the AI API calls a worker would make when
completing a training task.
"""
ALGORITHM_ID = "test-algorithm"
# Classifier data
# Since this is controlled by the AI algorithm implementation,
# we could put anything here as long as it's JSON-serializable.
CLASSIFIERS = {
u"vøȼȺƀᵾłȺɍɏ": {
'name': u'𝒕𝒆𝒔𝒕 𝒄𝒍𝒂𝒔𝒔𝒊𝒇𝒊𝒆𝒓',
'data': u'Öḧ ḷëẗ ẗḧë ṡüṅ ḅëäẗ ḋöẅṅ üṗöṅ ṁÿ ḟäċë, ṡẗäṛṡ ẗö ḟïḷḷ ṁÿ ḋṛëäṁ"'
},
u"ﻭɼค๓๓คɼ": {
'name': u'𝒕𝒆𝒔𝒕 𝒄𝒍𝒂𝒔𝒔𝒊𝒇𝒊𝒆𝒓',
'data': u"І ам а тѓаvэlэѓ оf ъотЂ тімэ аиↁ ѕрасэ, то ъэ шЂэѓэ І Ђаvэ ъээи"
}
}
def setUp(self):
"""
Create a training workflow in the database.
"""
examples = deserialize_training_examples(EXAMPLES, RUBRIC)
workflow = AITrainingWorkflow.start_workflow(examples, self.ALGORITHM_ID)
self.workflow_uuid = workflow.uuid
def test_get_algorithm_id(self):
algorithm_id = ai_worker_api.get_algorithm_id(self.workflow_uuid)
self.assertEqual(algorithm_id, self.ALGORITHM_ID)
def test_get_algorithm_id_no_workflow(self):
with self.assertRaises(AITrainingRequestError):
ai_worker_api.get_algorithm_id("invalid_uuid")
@mock.patch.object(AITrainingWorkflow.objects, 'get')
def test_get_algorithm_id_database_error(self, mock_get):
mock_get.side_effect = DatabaseError("KABOOM!")
with self.assertRaises(AITrainingInternalError):
ai_worker_api.get_algorithm_id(self.workflow_uuid)
def test_get_training_examples(self):
examples = ai_worker_api.get_training_examples(self.workflow_uuid)
expected_examples = [
{
'text': EXAMPLES[0]['answer'],
'scores': {
u"vøȼȺƀᵾłȺɍɏ": 1,
u"ﻭɼค๓๓คɼ": 0
}
},
{
'text': EXAMPLES[1]['answer'],
'scores': {
u"vøȼȺƀᵾłȺɍɏ": 0,
u"ﻭɼค๓๓คɼ": 2
}
},
]
self.assertItemsEqual(examples, expected_examples)
def test_get_training_examples_no_workflow(self):
with self.assertRaises(AITrainingRequestError):
ai_worker_api.get_training_examples("invalid_uuid")
@mock.patch.object(AITrainingWorkflow.objects, 'get')
def test_get_training_examples_database_error(self, mock_get):
mock_get.side_effect = DatabaseError("KABOOM!")
with self.assertRaises(AITrainingInternalError):
ai_worker_api.get_training_examples(self.workflow_uuid)
def test_create_classifiers(self):
ai_worker_api.create_classifiers(self.workflow_uuid, self.CLASSIFIERS)
# Expect that the workflow was marked complete
workflow = AITrainingWorkflow.objects.get(uuid=self.workflow_uuid)
self.assertIsNot(workflow.completed_at, None)
# Expect that the classifier set was created with the correct data
self.assertIsNot(workflow.classifier_set, None)
saved_classifiers = workflow.classifier_set.classifiers_dict
self.assertItemsEqual(self.CLASSIFIERS, saved_classifiers)
def test_create_classifiers_no_workflow(self):
with self.assertRaises(AITrainingRequestError):
ai_worker_api.create_classifiers("invalid_uuid", self.CLASSIFIERS)
@mock.patch.object(AITrainingWorkflow.objects, 'get')
def test_create_classifiers_database_error(self, mock_get):
mock_get.side_effect = DatabaseError("KABOOM!")
with self.assertRaises(AITrainingInternalError):
ai_worker_api.create_classifiers(self.workflow_uuid, self.CLASSIFIERS)
def test_create_classifiers_serialize_error(self):
# Mutate the classifier data so it is NOT JSON-serializable
classifiers = copy.deepcopy(self.CLASSIFIERS)
classifiers[u"vøȼȺƀᵾłȺɍɏ"] = datetime.datetime.now()
# Expect an error when we try to create the classifiers
with self.assertRaises(AITrainingInternalError):
ai_worker_api.create_classifiers(self.workflow_uuid, classifiers)
def test_create_classifiers_missing_criteria(self):
# Remove a criterion from the classifiers dict
classifiers = copy.deepcopy(self.CLASSIFIERS)
del classifiers[u"vøȼȺƀᵾłȺɍɏ"]
# Expect an error when we try to create the classifiers
with self.assertRaises(AITrainingRequestError):
ai_worker_api.create_classifiers(self.workflow_uuid, classifiers)
def test_create_classifiers_unrecognized_criterion(self):
# Add an extra criterion to the classifiers dict
classifiers = copy.deepcopy(self.CLASSIFIERS)
classifiers[u"extra_criterion"] = copy.deepcopy(classifiers[u"vøȼȺƀᵾłȺɍɏ"])
# Expect an error when we try to create the classifiers
with self.assertRaises(AITrainingRequestError):
ai_worker_api.create_classifiers(self.workflow_uuid, classifiers)
@mock.patch.object(AIClassifier, 'classifier_data')
def test_create_classifiers_upload_error(self, mock_data):
# Simulate an error occurring when uploading the trained classifier
mock_data.save.side_effect = IOError("OH NO!!!")
with self.assertRaises(AITrainingInternalError):
ai_worker_api.create_classifiers(self.workflow_uuid, self.CLASSIFIERS)
def test_create_classifiers_twice(self):
# Simulate repeated task execution for the same workflow
# Since these are executed sequentially, the second call should
# have no effect.
ai_worker_api.create_classifiers(self.workflow_uuid, self.CLASSIFIERS)
ai_worker_api.create_classifiers(self.workflow_uuid, self.CLASSIFIERS)
# Expect that the workflow was marked complete
workflow = AITrainingWorkflow.objects.get(uuid=self.workflow_uuid)
self.assertIsNot(workflow.completed_at, None)
# Expect that the classifier set was created with the correct data
self.assertIsNot(workflow.classifier_set, None)
saved_classifiers = workflow.classifier_set.classifiers_dict
self.assertItemsEqual(self.CLASSIFIERS, saved_classifiers)
def test_create_classifiers_no_training_examples(self):
# Create a workflow with no training examples
workflow = AITrainingWorkflow.objects.create(algorithm_id=self.ALGORITHM_ID)
# Expect an error when we try to create classifiers
with self.assertRaises(AITrainingInternalError):
ai_worker_api.create_classifiers(workflow.uuid, self.CLASSIFIERS)
# coding=utf-8
"""
Tests for AI worker tasks.
"""
from contextlib import contextmanager
import datetime
import mock
from django.test.utils import override_settings
from openassessment.test_utils import CacheResetTest
from openassessment.assessment.worker.training import train_classifiers, InvalidExample
from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.models import AITrainingWorkflow
from openassessment.assessment.worker.algorithm import (
AIAlgorithm, UnknownAlgorithm, AlgorithmLoadError, TrainingError
)
from openassessment.assessment.serializers import deserialize_training_examples
from openassessment.assessment.errors import AITrainingRequestError
from openassessment.assessment.test.constants import EXAMPLES, RUBRIC
class StubAIAlgorithm(AIAlgorithm):
"""
Stub implementation of a supervised ML algorithm.
"""
def train_classifier(self, examples):
return {}
def score(self, text, classifier):
raise NotImplementedError
class ErrorStubAIAlgorithm(AIAlgorithm):
"""
Stub implementation that raises an exception during training.
"""
def train_classifier(self, examples):
raise TrainingError("Test error!")
def score(self, text, classifier):
raise NotImplementedError
class AITrainingTaskTest(CacheResetTest):
"""
Tests for the training task executed asynchronously by Celery workers.
"""
ALGORITHM_ID = u"test-stub"
ERROR_STUB_ALGORITHM_ID = u"error-stub"
UNDEFINED_CLASS_ALGORITHM_ID = u"undefined_class"
UNDEFINED_MODULE_ALGORITHM_ID = u"undefined_module"
AI_ALGORITHMS = {
ALGORITHM_ID: '{module}.StubAIAlgorithm'.format(module=__name__),
ERROR_STUB_ALGORITHM_ID: '{module}.ErrorStubAIAlgorithm'.format(module=__name__),
UNDEFINED_CLASS_ALGORITHM_ID: '{module}.NotDefinedAIAlgorithm'.format(module=__name__),
UNDEFINED_MODULE_ALGORITHM_ID: 'openassessment.not.valid.NotDefinedAIAlgorithm'
}
def setUp(self):
"""
Create a training workflow in the database.
"""
examples = deserialize_training_examples(EXAMPLES, RUBRIC)
workflow = AITrainingWorkflow.start_workflow(examples, self.ALGORITHM_ID)
self.workflow_uuid = workflow.uuid
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
@mock.patch('openassessment.assessment.worker.training.ai_worker_api.get_algorithm_id')
def test_get_algorithm_id_api_error(self, mock_call):
mock_call.side_effect = AITrainingRequestError("Test error!")
with self._assert_retry(train_classifiers, AITrainingRequestError):
train_classifiers(self.workflow_uuid)
def test_unknown_algorithm(self):
# Since we haven't overridden settings to configure the algorithms,
# the worker will not recognize the workflow's algorithm ID.
with self._assert_retry(train_classifiers, UnknownAlgorithm):
train_classifiers(self.workflow_uuid)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_unable_to_load_algorithm_class(self):
# The algorithm is defined in the settings, but the class does not exist.
self._set_algorithm_id(self.UNDEFINED_CLASS_ALGORITHM_ID)
with self._assert_retry(train_classifiers, AlgorithmLoadError):
train_classifiers(self.workflow_uuid)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_unable_to_find_algorithm_module(self):
# The algorithm is defined in the settings, but the module can't be loaded
self._set_algorithm_id(self.UNDEFINED_MODULE_ALGORITHM_ID)
with self._assert_retry(train_classifiers, AlgorithmLoadError):
train_classifiers(self.workflow_uuid)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
@mock.patch('openassessment.assessment.worker.training.ai_worker_api.get_training_examples')
def test_get_training_examples_api_error(self, mock_call):
mock_call.side_effect = AITrainingRequestError("Test error!")
with self._assert_retry(train_classifiers, AITrainingRequestError):
train_classifiers(self.workflow_uuid)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_invalid_training_example_error(self):
def _mutation(examples): # pylint: disable=C0111
del examples[0]['scores'][u"ﻭɼค๓๓คɼ"]
self._assert_mutated_examples(_mutation)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_training_example_missing_key(self):
def _mutation(examples): # pylint: disable=C0111
del examples[0]['scores']
self._assert_mutated_examples(_mutation)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_training_example_non_numeric_score(self):
def _mutation(examples): # pylint: disable=C0111
examples[0]['scores'][u"ﻭɼค๓๓คɼ"] = "not an integer"
self._assert_mutated_examples(_mutation)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
def test_training_algorithm_error(self):
# Use a stub algorithm implementation that raises an exception during training
self._set_algorithm_id(self.ERROR_STUB_ALGORITHM_ID)
with self._assert_retry(train_classifiers, TrainingError):
train_classifiers(self.workflow_uuid)
@override_settings(ORA2_AI_ALGORITHMS=AI_ALGORITHMS)
@mock.patch('openassessment.assessment.worker.training.ai_worker_api.create_classifiers')
def test_create_classifiers_api_error(self, mock_call):
mock_call.side_effect = AITrainingRequestError("Test error!")
with self._assert_retry(train_classifiers, AITrainingRequestError):
train_classifiers(self.workflow_uuid)
def _set_algorithm_id(self, algorithm_id):
"""
Override the default algorithm ID for the training workflow.
Args:
algorithm_id (unicode): The new algorithm ID
Returns:
None
"""
workflow = AITrainingWorkflow.objects.get(uuid=self.workflow_uuid)
workflow.algorithm_id = algorithm_id
workflow.save()
def _assert_mutated_examples(self, mutate_func):
"""
Mutate the training examples returned by the API,
then check that we get the expected error.
This *may* be a little paranoid :)
Args:
mutate_func (callable): Function that accepts a single argument,
the list of example dictionaries.
Raises:
AssertionError
"""
examples = ai_worker_api.get_training_examples(self.workflow_uuid)
mutate_func(examples)
call_signature = 'openassessment.assessment.worker.training.ai_worker_api.get_training_examples'
with mock.patch(call_signature) as mock_call:
mock_call.return_value = examples
with self._assert_retry(train_classifiers, InvalidExample):
train_classifiers(self.workflow_uuid)
@contextmanager
def _assert_retry(self, task, final_exception):
"""
Context manager that asserts that the training task was retried.
Args:
task (celery.app.task.Task): The Celery task object.
final_exception (Exception): The error thrown after retrying.
Raises:
AssertionError
"""
original_retry = task.retry
task.retry = mock.MagicMock()
task.retry.side_effect = lambda: original_retry(task)
try:
with self.assertRaises(final_exception):
yield
task.retry.assert_called_once()
finally:
task.retry = original_retry
"""
Define the ML algorithms used to train text classifiers.
"""
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import importlib
from django.conf import settings
class AIAlgorithmError(Exception):
"""
An error occurred when using an AI algorithm.
Superclass for more specific errors below.
"""
pass
class UnknownAlgorithm(AIAlgorithmError):
"""
Algorithm ID not found in the configuration.
"""
def __init__(self, algorithm_id):
msg = u"Could not find algorithm \"u{}\" in the configuration.".format(algorithm_id)
super(UnknownAlgorithm, self).__init__(msg)
class AlgorithmLoadError(AIAlgorithmError):
"""
Unable to load the algorithm class.
"""
def __init__(self, algorithm_id, algorithm_path):
msg = (
u"Could not load algorithm \"{algorithm_id}\" from \"{path}\""
).format(algorithm_id=algorithm_id, path=algorithm_path)
super(AlgorithmLoadError, self).__init__(msg)
class TrainingError(AIAlgorithmError):
"""
An error occurred while training a classifier from example essays.
"""
pass
class ScoreError(AIAlgorithmError):
"""
An error occurred while scoring an essay.
"""
pass
class InvalidClassifier(ScoreError):
"""
The classifier could not be used by this algorithm to score an essay.
"""
pass
class AIAlgorithm(object):
"""
Abstract base class for a supervised ML text classification algorithm.
"""
__metaclass__ = ABCMeta
# Example essay used as input to the training algorithm
# `text` is a unicode string representing a student essay submission.
# `score` is an integer score.
# Note that `score` is used as an arbitrary label, so you could
# have a set of examples with non-adjacent scores.
ExampleEssay = namedtuple('ExampleEssay', ['text', 'score'])
@abstractmethod
def train_classifier(self, examples):
"""
Train a classifier based on example essays and scores.
Args:
examples (list of AIAlgorithm.ExampleEssay): Example essays and scores.
Returns:
JSON-serializable: The trained classifier. This MUST be JSON-serializable;
if any of the classifier data is binary, it should be base-64 encoded.
Raises:
TrainingError: The classifier could not be trained successfully.
"""
pass
@abstractmethod
def score(self, text, classifier):
"""
Score an essay using a classifier.
Args:
text (unicode): The text to classify.
classifier (JSON-serializable): A classifier, using the same format
as `train_classifier()`.
Raises:
InvalidClassifier: The provided classifier cannot be used by this algorithm.
ScoreError: An error occurred while scoring.
"""
pass
@classmethod
def algorithm_for_id(cls, algorithm_id):
"""
Load an algorithm based on Django settings configuration.
Args:
algorithm_id (unicode): The identifier for the algorithm,
which should be specified in Django settings.
Returns:
AIAlgorithm
Raises:
UnknownAlgorithm
"""
algorithms = getattr(settings, "ORA2_AI_ALGORITHMS", dict())
cls_path = algorithms.get(algorithm_id)
if cls_path is None:
raise UnknownAlgorithm(algorithm_id)
else:
module_path, _, name = cls_path.rpartition('.')
try:
algorithm_cls = getattr(importlib.import_module(module_path), name)
return algorithm_cls()
except (ImportError, AttributeError):
raise AlgorithmLoadError(algorithm_id, cls_path)
"""
Asynchronous tasks for training classifiers from examples.
"""
from collections import defaultdict
from celery import task
from celery.utils.log import get_task_logger
from openassessment.assessment.api import ai_worker as ai_worker_api
from openassessment.assessment.errors import AIError
from .algorithm import AIAlgorithm, AIAlgorithmError
MAX_RETRIES = 2
logger = get_task_logger(__name__)
class InvalidExample(Exception):
"""
The example retrieved from the AI API had an invalid format.
"""
def __init__(self, example_dict, msg):
err_msg = u"Training example \"{example}\" is not valid: {msg}".format(
example=example_dict,
msg=msg
)
super(InvalidExample, self).__init__(err_msg)
@task(max_retries=MAX_RETRIES) # pylint: disable=E1102
def train_classifiers(workflow_uuid):
"""
Asynchronous task to train classifiers for AI grading.
This task uses the AI API to retrieve task parameters
(algorithm ID and training examples) and upload
the trained classifiers.
If the task could not be completed successfully,
it is retried a few times. If it continues to fail,
it is left incomplete. Since the AI API tracks all
training tasks in the database, incomplete tasks
can always be rescheduled manually later.
Args:
workflow_uuid (str): The UUID of the workflow associated
with this training task.
Returns:
None
Raises:
AIError: An error occurred during a request to the AI API.
AIAlgorithmError: An error occurred while training the AI classifiers.
InvalidExample: The training examples provided by the AI API were not valid.
"""
# Retrieve the ML algorithm to use for training
# (based on task params and worker configuration)
try:
algorithm_id = ai_worker_api.get_algorithm_id(workflow_uuid)
algorithm = AIAlgorithm.algorithm_for_id(algorithm_id)
except AIAlgorithmError:
msg = (
u"An error occurred while loading the "
u"AI algorithm (training workflow UUID {})"
).format(workflow_uuid)
logger.exception(msg)
raise train_classifiers.retry()
except AIError:
msg = (
u"An error occurred while retrieving "
u"the algorithm ID (training workflow UUID {})"
).format(workflow_uuid)
logger.exception(msg)
raise train_classifiers.retry()
# Retrieve training examples, then transform them into the
# data structures we use internally.
try:
examples = ai_worker_api.get_training_examples(workflow_uuid)
except AIError:
msg = (
u"An error occurred while retrieving "
u"training examples for AI training "
u"(training workflow UUID {})"
).format(workflow_uuid)
logger.exception(msg)
raise train_classifiers.retry()
# Train a classifier for each criterion
# The AIAlgorithm subclass is responsible for ensuring that
# the trained classifiers are JSON-serializable.
try:
classifier_set = {
criterion_name: algorithm.train_classifier(examples_dict)
for criterion_name, examples_dict
in _examples_by_criterion(examples).iteritems()
}
except InvalidExample:
msg = (
u"Training example format was not valid "
u"(training workflow UUID {})"
).format(workflow_uuid)
raise train_classifiers.retry()
except AIAlgorithmError:
msg = (
u"An error occurred while training AI classifiers "
u"(training workflow UUID {})"
).format(workflow_uuid)
raise train_classifiers.retry()
# Upload the classifiers
# (implicitly marks the workflow complete)
try:
ai_worker_api.create_classifiers(workflow_uuid, classifier_set)
except AIError:
msg = (
u"An error occurred while uploading trained classifiers "
u"(training workflow UUID {})"
).format(workflow_uuid)
raise train_classifiers.retry()
def _examples_by_criterion(examples):
"""
Transform the examples returned by the AI API into our internal format.
Args:
examples (list): Training examples of the form returned by the AI API.
Each element of the list should be a dictionary with keys
'text' (the essay text) and 'scores' (a dictionary mapping
criterion names to numeric scores).
Returns:
dict: keys are the criteria names, and each value is list of `AIAlgorithm.ExampleEssay`s
Raises:
InvalidExample: The provided training examples are not in a valid format.
"""
internal_examples = defaultdict(list)
prev_criteria = None
for example_dict in examples:
# Check that the example contains the expected keys
try:
scores_dict = example_dict['scores']
text = unicode(example_dict['text'])
except KeyError:
raise InvalidExample(example_dict, u'Example dict must have keys "scores" and "text"')
# Check that the criteria names are consistent across examples
if prev_criteria is None:
prev_criteria = set(scores_dict.keys())
else:
if prev_criteria != set(scores_dict.keys()):
msg = (
u"Example criteria do not match "
u"the previous example: {criteria}"
).format(criteria=prev_criteria)
raise InvalidExample(example_dict, msg)
for criterion_name, score in scores_dict.iteritems():
try:
score = int(score)
except ValueError:
raise InvalidExample(example_dict, u"Example score is not an integer")
else:
internal_ex = AIAlgorithm.ExampleEssay(text, score)
internal_examples[criterion_name].append(internal_ex)
return internal_examples
......@@ -255,13 +255,10 @@ Data Model
2. **TrainingWorkflow**
a. Algorithm ID (varchar)
b. Rubric UUID (varchar)
c. Many-to-many relation with **TrainingExample**. We can re-use examples for multiple workflows.
d. ClassifierSet (Foreign Key)
e. Scheduled at (timestamp): The time the task was placed on the queue.
f. Started at (timestamp): The time the task was picked up by the worker.
g. Completed at (timestamp): The time the task was completed. If set, the task is considered complete.
h. Worker version (varchar): Identifier for the code running on the worker when the task was started. Useful for error tracking.
b. Many-to-many relation with **TrainingExample**. We can re-use examples for multiple workflows.
c. ClassifierSet (Foreign Key)
d. Scheduled at (timestamp): The time the task was placed on the queue.
e. Completed at (timestamp): The time the task was completed. If set, the task is considered complete.
3. **TrainingExample**
......@@ -272,8 +269,6 @@ Data Model
a. Rubric (Foreign Key)
b. Created at (timestamp)
c. Training example set (many-to-many)
d. Hash of rubric and training examples (varchar): Useful for quickly finding existing classifiers (e.g. for Studio authors trying out the demo problem).
5. **Classifier**
......
#!/usr/bin/env bash
cd `dirname $BASH_SOURCE` && cd ..
# Cleanup uploaded files from previous test runs (AI classifiers)
git clean -xfd "./storage/test/"
./scripts/install-python.sh test
echo "Running Python tests..."
......
......@@ -96,3 +96,7 @@ LOGGING = {
}
},
}
# Store uploaded files in a dev-specific directory
MEDIA_ROOT = os.path.join(BASE_DIR, 'storage/dev')
......@@ -34,10 +34,14 @@ DATABASES = {
TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'
# Install test-specific Django apps
INSTALLED_APPS += ('django_nose',)
# Store uploaded files in a test-specific directory
MEDIA_ROOT = os.path.join(BASE_DIR, 'storage/test')
EDX_ORA2["EVENT_LOGGER"] = "openassessment.workflow.test.events.fake_event_logger"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment