Commit cc5ab9f3 by Will Daly

Merge pull request #456 from edx/will/ai-cache-classifiers-to-disk

AI: Celery workers lose their cache after an exception causing many hits to S3
parents a0c5c11d 65bbf48d
...@@ -67,19 +67,11 @@ def get_grading_task_params(grading_workflow_uuid): ...@@ -67,19 +67,11 @@ def get_grading_task_params(grading_workflow_uuid):
raise AIGradingInternalError(msg) raise AIGradingInternalError(msg)
try: try:
classifiers = list(classifier_set.classifiers.select_related().all())
return { return {
'essay_text': workflow.essay_text, 'essay_text': workflow.essay_text,
'classifier_set': { 'classifier_set': workflow.classifier_set.classifier_data_by_criterion,
classifier.criterion.name: classifier.download_classifier_data()
for classifier in classifiers
},
'algorithm_id': workflow.algorithm_id, 'algorithm_id': workflow.algorithm_id,
'valid_scores': { 'valid_scores': workflow.classifier_set.valid_scores_by_criterion,
classifier.criterion.name: classifier.valid_scores
for classifier in classifiers
}
} }
except ( except (
DatabaseError, ClassifierSerializeError, IncompleteClassifierSet, DatabaseError, ClassifierSerializeError, IncompleteClassifierSet,
......
...@@ -24,14 +24,22 @@ logger = logging.getLogger(__name__) ...@@ -24,14 +24,22 @@ logger = logging.getLogger(__name__)
# Use an in-memory cache to hold classifier data, but allow settings to override this. # Use an in-memory cache to hold classifier data, but allow settings to override this.
# The classifier data will generally be larger than memcached's default max size # The classifier data will generally be larger than memcached's default max size
CLASSIFIERS_CACHE = getattr( CLASSIFIERS_CACHE_IN_MEM = getattr(
settings, 'ORA2_CLASSIFIERS_CACHE', settings, 'ORA2_CLASSIFIERS_CACHE_IN_MEM',
get_cache( get_cache(
'django.core.cache.backends.locmem.LocMemCache', 'django.core.cache.backends.locmem.LocMemCache',
LOCATION='openassessment.ai.classifiers_dict' LOCATION='openassessment.ai.classifiers_dict'
) )
) )
CLASSIFIERS_CACHE_IN_FILE = getattr(
settings, 'ORA2_CLASSIFIERS_CACHE_IN_FILE',
get_cache(
'django.core.cache.backends.filebased.FileBasedCache',
LOCATION='/tmp/ora2_classifier_cache'
)
)
class IncompleteClassifierSet(Exception): class IncompleteClassifierSet(Exception):
""" """
...@@ -263,13 +271,10 @@ class AIClassifierSet(models.Model): ...@@ -263,13 +271,10 @@ class AIClassifierSet(models.Model):
# If we get to this point, no classifiers exist with this rubric and algorithm. # If we get to this point, no classifiers exist with this rubric and algorithm.
return None return None
# Number of seconds to store downloaded classifiers in the in-memory cache.
DEFAULT_CLASSIFIER_CACHE_TIMEOUT = 300
@property @property
def classifiers_dict(self): def classifier_data_by_criterion(self):
""" """
Return all classifiers in this classifier set in a dictionary Return info for all classifiers in this classifier set in a dictionary
that maps criteria names to classifier data. that maps criteria names to classifier data.
Returns: Returns:
...@@ -286,23 +291,79 @@ class AIClassifierSet(models.Model): ...@@ -286,23 +291,79 @@ class AIClassifierSet(models.Model):
# We use an in-memory cache because the classifier data will most often # We use an in-memory cache because the classifier data will most often
# be several megabytes, which exceeds the default memcached size limit. # be several megabytes, which exceeds the default memcached size limit.
# If we find it, we can avoid calls to the database, S3, and json. # If we find it, we can avoid calls to the database, S3, and json.
cache_key = unicode(self.id) cache_key = self._cache_key("classifier_data_by_criterion")
classifiers_dict = CLASSIFIERS_CACHE.get(cache_key) classifiers_dict = CLASSIFIERS_CACHE_IN_MEM.get(cache_key)
# If we can't find the classifier in-memory, check the filesystem cache
# We can't always rely on the in-memory cache because worker processes
# terminate when max retries are exceeded.
if classifiers_dict is None:
msg = (
u"Could not find classifiers dict in the in-memory "
u"cache for key {key}. Falling back to the file-based cache."
).format(key=cache_key)
logger.info(msg)
classifiers_dict = CLASSIFIERS_CACHE_IN_FILE.get(cache_key)
else:
msg = (
u"Found classifiers dict in the in-memory cache "
u"(cache key was {key})"
).format(key=cache_key)
logger.info(msg)
# If we can't find the classifiers dict in the cache, # If we can't find the classifiers dict in the cache,
# we need to look up the classifiers in the database, # we need to look up the classifiers in the database,
# then download the classifier data. # then download the classifier data.
if classifiers_dict is None: if classifiers_dict is None:
classifiers = list(self.classifiers.all()) # pylint: disable=E1101
classifiers_dict = { classifiers_dict = {
classifier.criterion.name: classifier.download_classifier_data() classifier.criterion.name: classifier.download_classifier_data()
for classifier in classifiers for classifier in self.classifiers.select_related().all() # pylint: disable=E1101
} }
timeout = getattr(settings, 'ORA2_CLASSIFIER_CACHE_TIMEOUT', self.DEFAULT_CLASSIFIER_CACHE_TIMEOUT) CLASSIFIERS_CACHE_IN_MEM.set(cache_key, classifiers_dict)
CLASSIFIERS_CACHE.set(cache_key, classifiers_dict, timeout) CLASSIFIERS_CACHE_IN_FILE.set(cache_key, classifiers_dict)
msg = (
u"Could not find classifiers dict in either the in-memory "
u"or file-based cache. Downloaded the data from S3 and cached "
u"it using key {key}"
).format(key=cache_key)
logger.info(msg)
return classifiers_dict if classifiers_dict else None return classifiers_dict if classifiers_dict else None
@property
def valid_scores_by_criterion(self):
"""
Return the valid scores for each classifier in this classifier set.
Returns:
dict: maps rubric criterion names to lists of valid scores.
"""
cache_key = self._cache_key("valid_scores_by_criterion")
valid_scores_by_criterion = cache.get(cache_key)
if valid_scores_by_criterion is None:
valid_scores_by_criterion = {
classifier.criterion.name: classifier.valid_scores
for classifier in self.classifiers.select_related().all() # pylint: disable=E1101
}
cache.set(cache_key, valid_scores_by_criterion)
return valid_scores_by_criterion
def _cache_key(self, data_name):
"""
Return a cache key for this classifier set.
Args:
data_name (unicode): Name for the data associated with this key.
Returns:
unicode
"""
return u"openassessment.assessment.ai.classifier_set.{pk}.{data_name}".format(
pk=self.pk, data_name=data_name
)
# Directory in which classifiers will be stored # Directory in which classifiers will be stored
# For instance, if we're using the default file system storage backend # For instance, if we're using the default file system storage backend
...@@ -384,12 +445,7 @@ class AIClassifier(models.Model): ...@@ -384,12 +445,7 @@ class AIClassifier(models.Model):
list of integer scores, in ascending order. list of integer scores, in ascending order.
""" """
cache_key = u"openassessment.assessment.ai.classifier.{pk}.valid_scores".format(pk=self.pk) return sorted([option.points for option in self.criterion.options.all()])
valid_scores = cache.get(cache_key)
if valid_scores is None:
valid_scores = sorted([option.points for option in self.criterion.options.all()])
cache.set(cache_key, valid_scores)
return valid_scores
class AIWorkflow(models.Model): class AIWorkflow(models.Model):
......
...@@ -107,7 +107,7 @@ class AITrainingTest(CacheResetTest): ...@@ -107,7 +107,7 @@ class AITrainingTest(CacheResetTest):
self.assertIsNot(classifier_set, None) self.assertIsNot(classifier_set, None)
# Retrieve a dictionary mapping criteria names to deserialized classifiers # Retrieve a dictionary mapping criteria names to deserialized classifiers
classifiers = classifier_set.classifiers_dict classifiers = classifier_set.classifier_data_by_criterion
# Check that we have classifiers for all criteria in the rubric # Check that we have classifiers for all criteria in the rubric
criteria = set(criterion['name'] for criterion in RUBRIC['criteria']) criteria = set(criterion['name'] for criterion in RUBRIC['criteria'])
......
...@@ -6,7 +6,8 @@ import copy ...@@ -6,7 +6,8 @@ import copy
from django.test.utils import override_settings from django.test.utils import override_settings
from openassessment.test_utils import CacheResetTest from openassessment.test_utils import CacheResetTest
from openassessment.assessment.models import ( from openassessment.assessment.models import (
AIClassifierSet, AIClassifier, AIGradingWorkflow, AI_CLASSIFIER_STORAGE AIClassifierSet, AIClassifier, AIGradingWorkflow, AI_CLASSIFIER_STORAGE,
CLASSIFIERS_CACHE_IN_MEM
) )
from openassessment.assessment.serializers import rubric_from_dict from openassessment.assessment.serializers import rubric_from_dict
from .constants import RUBRIC from .constants import RUBRIC
...@@ -58,6 +59,7 @@ class AIClassifierSetTest(CacheResetTest): ...@@ -58,6 +59,7 @@ class AIClassifierSetTest(CacheResetTest):
Tests for the AIClassifierSet model. Tests for the AIClassifierSet model.
""" """
def setUp(self): def setUp(self):
super(AIClassifierSetTest, self).setUp()
rubric = rubric_from_dict(RUBRIC) rubric = rubric_from_dict(RUBRIC)
self.classifier_set = AIClassifierSet.create_classifier_set( self.classifier_set = AIClassifierSet.create_classifier_set(
CLASSIFIERS_DICT, rubric, "test_algorithm", COURSE_ID, ITEM_ID CLASSIFIERS_DICT, rubric, "test_algorithm", COURSE_ID, ITEM_ID
...@@ -67,15 +69,33 @@ class AIClassifierSetTest(CacheResetTest): ...@@ -67,15 +69,33 @@ class AIClassifierSetTest(CacheResetTest):
# Retrieve the classifier dict twice, which should hit the caching code. # Retrieve the classifier dict twice, which should hit the caching code.
# We can check that we're using the cache by asserting that # We can check that we're using the cache by asserting that
# the number of database queries decreases. # the number of database queries decreases.
with self.assertNumQueries(3): with self.assertNumQueries(1):
first = self.classifier_set.classifiers_dict first = self.classifier_set.classifier_data_by_criterion
with self.assertNumQueries(0): with self.assertNumQueries(0):
second = self.classifier_set.classifiers_dict second = self.classifier_set.classifier_data_by_criterion
# Verify that we got the same value both times # Verify that we got the same value both times
self.assertEqual(first, second) self.assertEqual(first, second)
def test_file_cache_downloads(self):
# Retrieve the classifiers dict, which should be cached
# both in memory and on the file system
first = self.classifier_set.classifier_data_by_criterion
# Clear the in-memory cache
# This simulates what happens when a worker process dies
# after exceeding the maximum number of retries.
CLASSIFIERS_CACHE_IN_MEM.clear()
# We should still be able to retrieve the classifiers dict
# from the on-disk cache, even if memory has been cleared
with self.assertNumQueries(0):
second = self.classifier_set.classifier_data_by_criterion
# Verify that we got the correct classifiers dict back
self.assertEqual(first, second)
class AIGradingWorkflowTest(CacheResetTest): class AIGradingWorkflowTest(CacheResetTest):
""" """
......
...@@ -117,7 +117,7 @@ class AIWorkerTrainingTest(CacheResetTest): ...@@ -117,7 +117,7 @@ class AIWorkerTrainingTest(CacheResetTest):
# Expect that the classifier set was created with the correct data # Expect that the classifier set was created with the correct data
self.assertIsNot(workflow.classifier_set, None) self.assertIsNot(workflow.classifier_set, None)
saved_classifiers = workflow.classifier_set.classifiers_dict saved_classifiers = workflow.classifier_set.classifier_data_by_criterion
self.assertItemsEqual(CLASSIFIERS, saved_classifiers) self.assertItemsEqual(CLASSIFIERS, saved_classifiers)
def test_create_classifiers_no_workflow(self): def test_create_classifiers_no_workflow(self):
...@@ -177,7 +177,7 @@ class AIWorkerTrainingTest(CacheResetTest): ...@@ -177,7 +177,7 @@ class AIWorkerTrainingTest(CacheResetTest):
# Expect that the classifier set was created with the correct data # Expect that the classifier set was created with the correct data
self.assertIsNot(workflow.classifier_set, None) self.assertIsNot(workflow.classifier_set, None)
saved_classifiers = workflow.classifier_set.classifiers_dict saved_classifiers = workflow.classifier_set.classifier_data_by_criterion
self.assertItemsEqual(CLASSIFIERS, saved_classifiers) self.assertItemsEqual(CLASSIFIERS, saved_classifiers)
def test_create_classifiers_no_training_examples(self): def test_create_classifiers_no_training_examples(self):
...@@ -250,12 +250,12 @@ class AIWorkerGradingTest(CacheResetTest): ...@@ -250,12 +250,12 @@ class AIWorkerGradingTest(CacheResetTest):
self.assertItemsEqual(params, expected_params) self.assertItemsEqual(params, expected_params)
def test_get_grading_task_params_num_queries(self): def test_get_grading_task_params_num_queries(self):
with self.assertNumQueries(5): with self.assertNumQueries(6):
ai_worker_api.get_grading_task_params(self.workflow_uuid) ai_worker_api.get_grading_task_params(self.workflow_uuid)
# The second time through we should be caching the queries # The second time through we should be caching the queries
# to determine the valid scores for a classifier # to determine the valid scores for a classifier
with self.assertNumQueries(3): with self.assertNumQueries(2):
ai_worker_api.get_grading_task_params(self.workflow_uuid) ai_worker_api.get_grading_task_params(self.workflow_uuid)
def test_get_grading_task_params_no_workflow(self): def test_get_grading_task_params_no_workflow(self):
......
""" """
Test utilities Test utilities
""" """
from django.core.cache import cache, get_cache from django.core.cache import cache
from django.test import TestCase from django.test import TestCase
from openassessment.assessment.models.ai import (
CLASSIFIERS_CACHE_IN_MEM, CLASSIFIERS_CACHE_IN_FILE
)
class CacheResetTest(TestCase): class CacheResetTest(TestCase):
...@@ -22,7 +25,5 @@ class CacheResetTest(TestCase): ...@@ -22,7 +25,5 @@ class CacheResetTest(TestCase):
Clear the default cache and any custom caches. Clear the default cache and any custom caches.
""" """
cache.clear() cache.clear()
get_cache( CLASSIFIERS_CACHE_IN_MEM.clear()
'django.core.cache.backends.locmem.LocMemCache', CLASSIFIERS_CACHE_IN_FILE.clear()
LOCATION='openassessment.ai.classifiers_dict'
).clear()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment