......@@ -4,8 +4,17 @@ Admin site models for managing :class:`.ConfigurationModel` subclasses
from django.forms import models
from django.contrib import admin
from django.http import HttpResponseRedirect
from django.contrib.admin import ListFilter
from django.core.cache import get_cache, InvalidCacheBackendError
from django.core.urlresolvers import reverse
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as _
cache = get_cache('configuration') # pylint: disable=invalid-name
except InvalidCacheBackendError:
from django.core.cache import cache
# pylint: disable=protected-access
......@@ -18,7 +27,7 @@ class ConfigurationModelAdmin(admin.ModelAdmin):
def get_actions(self, request):
return {
'revert': (ConfigurationModelAdmin.revert, 'revert', 'Revert to the selected configuration')
'revert': (ConfigurationModelAdmin.revert, 'revert', _('Revert to the selected configuration'))
def get_list_display(self, request):
......@@ -55,19 +64,21 @@ class ConfigurationModelAdmin(admin.ModelAdmin):
def save_model(self, request, obj, form, change):
obj.changed_by = request.user
super(ConfigurationModelAdmin, self).save_model(request, obj, form, change)
cache.delete(obj.cache_key_name(*(getattr(obj, key_name) for key_name in obj.KEY_FIELDS)))
def revert(self, request, queryset):
Admin action to revert a configuration back to the selected value
if queryset.count() != 1:
self.message_user(request, "Please select a single configuration to revert to.")
self.message_user(request, _("Please select a single configuration to revert to."))
target = queryset[0] = None
self.save_model(request, target, None, False)
self.message_user(request, "Reverted configuration.")
self.message_user(request, _("Reverted configuration."))
return HttpResponseRedirect(
......@@ -78,3 +89,99 @@ class ConfigurationModelAdmin(admin.ModelAdmin):
class ShowHistoryFilter(ListFilter):
Admin change view filter to show only the most recent (i.e. the "current") row for each
unique key value.
title = _('Status')
parameter_name = 'show_history'
def __init__(self, request, params, model, model_admin):
super(ShowHistoryFilter, self).__init__(request, params, model, model_admin)
if self.parameter_name in params:
value = params.pop(self.parameter_name)
self.used_parameters[self.parameter_name] = value
def has_output(self):
""" Should this filter be shown? """
return True
def choices(self, cl):
""" Returns choices ready to be output in the template. """
show_all = self.used_parameters.get(self.parameter_name) == "1"
return (
'display': _('Current Configuration'),
'selected': not show_all,
'query_string': cl.get_query_string({}, [self.parameter_name]),
'display': _('All (Show History)'),
'selected': show_all,
'query_string': cl.get_query_string({self.parameter_name: "1"}, []),
def queryset(self, request, queryset):
""" Filter the queryset. No-op since it's done by KeyedConfigurationModelAdmin """
return queryset
def expected_parameters(self):
""" List the query string params used by this filter """
return [self.parameter_name]
class KeyedConfigurationModelAdmin(ConfigurationModelAdmin):
:class:`~django.contrib.admin.ModelAdmin` for :class:`.ConfigurationModel` subclasses that
use extra keys (i.e. they have KEY_FIELDS set).
date_hierarchy = None
list_filter = (ShowHistoryFilter, )
def queryset(self, request):
Annote the queryset with an 'is_active' property that's true iff that row is the most
recently added row for that particular set of KEY_FIELDS values.
Filter the queryset to show only is_active rows by default.
if request.GET.get(ShowHistoryFilter.parameter_name) == '1':
queryset = self.model.objects.with_active_flag()
# Show only the most recent row for each key.
queryset = self.model.objects.current_set()
ordering = self.get_ordering(request)
if ordering:
return queryset.order_by(*ordering)
return queryset
def get_list_display(self, request):
""" Add a link to each row for creating a new row using the chosen row as a template """
return self.model._meta.get_all_field_names() + ['edit_link']
def add_view(self, request, form_url='', extra_context=None):
# Prepopulate new configuration entries with the value of the current config, if given:
if 'source' in request.GET:
get = request.GET.copy()
source_id = int(get.pop('source')[0])
source = get_object_or_404(self.model, pk=source_id)
request.GET = get
# Call our grandparent's add_view, skipping the parent code
# because the parent code has a different way to prepopulate new configuration entries
# with the value of the latest config, which doesn't make sense for keyed models.
# pylint: disable=bad-super-call
return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
def edit_link(self, inst):
""" Edit link for the change view """
if not inst.is_active:
return u'--'
update_url = reverse('admin:{}_{}_add'.format(self.model._meta.app_label, self.model._meta.module_name))
update_url += "?source={}".format(
return u'<a href="{}">{}</a>'.format(update_url, _('Update'))
edit_link.allow_tags = True
edit_link.short_description = _('Update')
Django Model baseclass for database-backed configuration.
from django.db import models
from django.db import connection, models
from django.contrib.auth.models import User
from django.core.cache import get_cache, InvalidCacheBackendError
from django.utils.translation import ugettext_lazy as _
cache = get_cache('configuration') # pylint: disable=invalid-name
......@@ -11,6 +12,52 @@ except InvalidCacheBackendError:
from django.core.cache import cache
class ConfigurationModelManager(models.Manager):
Query manager for ConfigurationModel
def _current_ids_subquery(self):
Internal helper method to return an SQL string that will get the IDs of
all the current entries (i.e. the most recent entry for each unique set
of key values). Only useful if KEY_FIELDS is set.
key_fields_escaped = [connection.ops.quote_name(name) for name in self.model.KEY_FIELDS]
# The following assumes that the rows with the most recent date also have the highest IDs
return "SELECT MAX(id) FROM {table_name} GROUP BY {key_fields}".format(
key_fields=', '.join(key_fields_escaped),
table_name=self.model._meta.db_table # pylint: disable=protected-access
def current_set(self):
A queryset for the active configuration entries only. Only useful if KEY_FIELDS is set.
Active means the means recent entries for each unique combination of keys. It does not
necessaryily mean enbled.
assert self.model.KEY_FIELDS != (), "Just use model.current() if there are no KEY_FIELDS"
return self.get_query_set().extra(
where=["id IN ({subquery})".format(subquery=self._current_ids_subquery())],
select={'is_active': 1}, # This annotation is used by the admin changelist. sqlite requires '1', not 'True'
def with_active_flag(self):
A query set where each result is annotated with an 'is_active' field that indicates
if it's the most recent entry for that combination of keys.
if self.model.KEY_FIELDS:
subquery = self._current_ids_subquery()
return self.get_query_set().extra(
select={'is_active': "id IN ({subquery})".format(subquery=subquery)}
return self.get_query_set().extra(
select={'is_active': "id = {pk}".format(pk=self.model.current().pk)}
class ConfigurationModel(models.Model):
Abstract base class for model-based configuration
......@@ -22,46 +69,108 @@ class ConfigurationModel(models.Model):
class Meta(object): # pylint: disable=missing-docstring
abstract = True
ordering = ("-change_date", )
objects = ConfigurationModelManager()
# The number of seconds
cache_timeout = 600
change_date = models.DateTimeField(auto_now_add=True)
changed_by = models.ForeignKey(User, editable=False, null=True, on_delete=models.PROTECT)
enabled = models.BooleanField(default=False)
change_date = models.DateTimeField(auto_now_add=True, verbose_name=_("Change date"))
changed_by = models.ForeignKey(
# Translators: this label indicates the name of the user who made this change:
verbose_name=_("Changed by"),
enabled = models.BooleanField(default=False, verbose_name=_("Enabled"))
def save(self, *args, **kwargs):
Clear the cached value when saving a new configuration entry
super(ConfigurationModel, self).save(*args, **kwargs)
cache.delete(self.cache_key_name(*[getattr(self, key) for key in self.KEY_FIELDS]))
if self.KEY_FIELDS:
def cache_key_name(cls):
def cache_key_name(cls, *args):
"""Return the name of the key to use to cache the current configuration"""
return 'configuration/{}/current'.format(cls.__name__)
if cls.KEY_FIELDS != ():
if len(args) != len(cls.KEY_FIELDS):
raise TypeError(
"cache_key_name() takes exactly {} arguments ({} given)".format(len(cls.KEY_FIELDS), len(args))
return u'configuration/{}/current/{}'.format(cls.__name__, u','.join(unicode(arg) for arg in args))
return 'configuration/{}/current'.format(cls.__name__)
def current(cls):
def current(cls, *args):
Return the active configuration entry, either from cache,
from the database, or by creating a new empty entry (which is not
cached = cache.get(cls.cache_key_name())
cached = cache.get(cls.cache_key_name(*args))
if cached is not None:
return cached
key_dict = dict(zip(cls.KEY_FIELDS, args))
current = cls.objects.order_by('-change_date')[0]
current = cls.objects.filter(**key_dict).order_by('-change_date')[0]
except IndexError:
current = cls()
current = cls(**key_dict)
cache.set(cls.cache_key_name(), current, cls.cache_timeout)
cache.set(cls.cache_key_name(*args), current, cls.cache_timeout)
return current
def is_enabled(cls):
"""Returns True if this feature is configured as enabled, else False."""
return cls.current().enabled
def key_values_cache_key_name(cls, *key_fields):
""" Key for fetching unique key values from the cache """
key_fields = key_fields or cls.KEY_FIELDS
return 'configuration/{}/key_values/{}'.format(cls.__name__, ','.join(key_fields))
def key_values(cls, *key_fields, **kwargs):
Get the set of unique values in the configuration table for the given
key[s]. Calling cls.current(*value) for each value in the resulting
list should always produce an entry, though any such entry may have
key_fields: The positional arguments are the KEY_FIELDS to return. For example if
you had a course embargo configuration where each entry was keyed on (country,
course), then you might want to know "What countries have embargoes configured?"
with cls.key_values('country'), or "Which courses have country restrictions?"
with cls.key_values('course'). You can also leave this unspecified for the
default, which returns the distinct combinations of all keys.
flat: If you pass flat=True as a kwarg, it has the same effect as in Django's
'values_list' method: Instead of returning a list of lists, you'll get one list
of values. This makes sense to use whenever there is only one key being queried.
Return value:
List of lists of each combination of keys found in the database.
e.g. [("Italy", "course-v1:SomeX+some+2015"), ...] for the course embargo example
flat = kwargs.pop('flat', False)
assert not kwargs, "'flat' is the only kwarg accepted"
key_fields = key_fields or cls.KEY_FIELDS
cache_key = cls.key_values_cache_key_name(*key_fields)
cached = cache.get(cache_key)
if cached is not None:
return cached
values = list(cls.objects.values_list(*key_fields, flat=flat).order_by().distinct())
cache.set(cache_key, values, cls.cache_timeout)
return values
# -*- coding: utf-8 -*-
Tests of ConfigurationModel
import ddt
from django.contrib.auth.models import User
from django.db import models
from django.test import TestCase
from freezegun import freeze_time
from mock import patch
......@@ -75,3 +76,209 @@ class ConfigurationModelTests(TestCase):
mock_cache.set.assert_called_with(ExampleConfig.cache_key_name(), first, 300)
def test_active_annotation(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
rows = ExampleConfig.objects.with_active_flag().order_by('-change_date')
self.assertEqual(len(rows), 2)
self.assertEqual(rows[0].string_field, 'second')
self.assertEqual(rows[0].is_active, True)
self.assertEqual(rows[1].string_field, 'first')
self.assertEqual(rows[1].is_active, False)
class ExampleKeyedConfig(ConfigurationModel):
Test model for testing ``ConfigurationModels`` with keyed configuration.
Does not inherit from ExampleConfig due to how Django handles model inheritance.
cache_timeout = 300
KEY_FIELDS = ('left', 'right')
left = models.CharField(max_length=30)
right = models.CharField(max_length=30)
string_field = models.TextField()
int_field = models.IntegerField(default=10)
class KeyedConfigurationModelTests(TestCase):
Tests for ``ConfigurationModels`` with keyed configuration.
def setUp(self):
super(KeyedConfigurationModelTests, self).setUp()
self.user = User()'a', 'b'), ('c', 'd'))
def test_cache_key_name(self, left, right, _mock_cache):
ExampleKeyedConfig.cache_key_name(left, right),
'configuration/ExampleKeyedConfig/current/{},{}'.format(left, right)
((), 'left,right'),
(('left', 'right'), 'left,right'),
(('left', ), 'left')
def test_key_values_cache_key_name(self, args, expected_key, _mock_cache):
'configuration/ExampleKeyedConfig/key_values/{}'.format(expected_key))'a', 'b'), ('c', 'd'))
def test_no_config_empty_cache(self, left, right, mock_cache):
mock_cache.get.return_value = None
current = ExampleKeyedConfig.current(left, right)
self.assertEquals(current.int_field, 10)
self.assertEquals(current.string_field, '')
mock_cache.set.assert_called_with(ExampleKeyedConfig.cache_key_name(left, right), current, 300)'a', 'b'), ('c', 'd'))
def test_no_config_full_cache(self, left, right, mock_cache):
current = ExampleKeyedConfig.current(left, right)
self.assertEquals(current, mock_cache.get.return_value)
def test_config_ordering(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
self.assertEquals(ExampleKeyedConfig.current('left_a', 'right_a').string_field, 'second_a')
self.assertEquals(ExampleKeyedConfig.current('left_b', 'right_b').string_field, 'second_b')
def test_cache_set(self, mock_cache):
mock_cache.get.return_value = None
first = ExampleKeyedConfig(
ExampleKeyedConfig.current('left', 'right')
mock_cache.set.assert_called_with(ExampleKeyedConfig.cache_key_name('left', 'right'), first, 300)
def test_key_values(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
unique_key_pairs = ExampleKeyedConfig.key_values()
self.assertEquals(len(unique_key_pairs), 2)
self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
self.assertEquals(len(unique_left_keys), 2)
self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
def test_key_string_values(self, mock_cache):
""" Ensure str() vs unicode() doesn't cause duplicate cache entries """
ExampleKeyedConfig(left='left', right=u'〉☃', enabled=True, int_field=10, changed_by=self.user).save()
mock_cache.get.return_value = None
entry = ExampleKeyedConfig.current('left', u'〉☃')
key = mock_cache.get.call_args[0][0]
self.assertEqual(entry.int_field, 10)
self.assertEqual(mock_cache.set.call_args[0][0], key)
entry = ExampleKeyedConfig.current(u'left', u'〉☃')
self.assertEqual(entry.int_field, 10)
def test_current_set(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()
queryset = ExampleKeyedConfig.objects.current_set()
self.assertEqual(len(queryset.all()), 2)
set(queryset.order_by('int_field').values_list('int_field', flat=True)),
set([1, 2])
def test_active_annotation(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig.objects.create(left='left_a', right='right_a', string_field='first')
ExampleKeyedConfig.objects.create(left='left_b', right='right_b', string_field='first')
ExampleKeyedConfig.objects.create(left='left_a', right='right_a', string_field='second')
rows = ExampleKeyedConfig.objects.with_active_flag()
self.assertEqual(len(rows), 3)
for row in rows:
if row.left == 'left_a':
self.assertEqual(row.is_active, row.string_field == 'second')
self.assertEqual(row.left, 'left_b')
self.assertEqual(row.string_field, 'first')
self.assertEqual(row.is_active, True)
def test_key_values_cache(self, mock_cache):
mock_cache.get.return_value = None
self.assertEquals(ExampleKeyedConfig.key_values(), [])
mock_cache.set.assert_called_with(ExampleKeyedConfig.key_values_cache_key_name(), [], 300)
fake_result = [('a', 'b'), ('c', 'd')]
mock_cache.get.return_value = fake_result
self.assertEquals(ExampleKeyedConfig.key_values(), fake_result)
