Commit 4fb3da23 by John Eskew

Remove config_models & use external config_models repo instead.

parent 5cb129e2
"""
Model-Based Configuration
=========================
This app allows other apps to easily define a configuration model
that can be hooked into the admin site to allow configuration management
with auditing.
Installation
------------
Add ``config_models`` to your ``INSTALLED_APPS`` list.
Usage
-----
Create a subclass of ``ConfigurationModel``, with fields for each
value that needs to be configured::
class MyConfiguration(ConfigurationModel):
frobble_timeout = IntField(default=10)
frazzle_target = TextField(defalut="debug")
This is a normal django model, so it must be synced and migrated as usual.
The default values for the fields in the ``ConfigurationModel`` will be
used if no configuration has yet been created.
Register that class with the Admin site, using the ``ConfigurationAdminModel``::
from django.contrib import admin
from config_models.admin import ConfigurationModelAdmin
admin.site.register(MyConfiguration, ConfigurationModelAdmin)
Use the configuration in your code::
def my_view(self, request):
config = MyConfiguration.current()
fire_the_missiles(config.frazzle_target, timeout=config.frobble_timeout)
Use the admin site to add new configuration entries. The most recently created
entry is considered to be ``current``.
Configuration
-------------
The current ``ConfigurationModel`` will be cached in the ``configuration`` django cache,
or in the ``default`` cache if ``configuration`` doesn't exist. You can specify the cache
timeout in each ``ConfigurationModel`` by setting the ``cache_timeout`` property.
You can change the name of the cache key used by the ``ConfigurationModel`` by overriding
the ``cache_key_name`` function.
Extension
---------
``ConfigurationModels`` are just django models, so they can be extended with new fields
and migrated as usual. Newly added fields must have default values and should be nullable,
so that rollbacks to old versions of configuration work correctly.
"""
"""
Admin site models for managing :class:`.ConfigurationModel` subclasses
"""
from django.forms import models
from django.contrib import admin
from django.contrib.admin import ListFilter
from django.core.cache import caches, InvalidCacheBackendError
from django.core.files.base import File
from django.core.urlresolvers import reverse
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as _
try:
cache = caches['configuration'] # pylint: disable=invalid-name
except InvalidCacheBackendError:
from django.core.cache import cache
# pylint: disable=protected-access
class ConfigurationModelAdmin(admin.ModelAdmin):
"""
:class:`~django.contrib.admin.ModelAdmin` for :class:`.ConfigurationModel` subclasses
"""
date_hierarchy = 'change_date'
def get_actions(self, request):
return {
'revert': (ConfigurationModelAdmin.revert, 'revert', _('Revert to the selected configuration'))
}
def get_list_display(self, request):
return self.get_displayable_field_names()
def get_displayable_field_names(self):
"""
Return all field names, excluding reverse foreign key relationships.
"""
return [
f.name
for f in self.model._meta.get_fields()
if not f.one_to_many
]
# Don't allow deletion of configuration
def has_delete_permission(self, request, obj=None):
return False
# Make all fields read-only when editing an object
def get_readonly_fields(self, request, obj=None):
if obj: # editing an existing object
return self.get_displayable_field_names()
return self.readonly_fields
def add_view(self, request, form_url='', extra_context=None):
# Prepopulate new configuration entries with the value of the current config
get = request.GET.copy()
get.update(models.model_to_dict(self.model.current()))
request.GET = get
return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
# Hide the save buttons in the change view
def change_view(self, request, object_id, form_url='', extra_context=None):
extra_context = extra_context or {}
extra_context['readonly'] = True
return super(ConfigurationModelAdmin, self).change_view(
request,
object_id,
form_url,
extra_context=extra_context
)
def save_model(self, request, obj, form, change):
obj.changed_by = request.user
super(ConfigurationModelAdmin, self).save_model(request, obj, form, change)
cache.delete(obj.cache_key_name(*(getattr(obj, key_name) for key_name in obj.KEY_FIELDS)))
cache.delete(obj.key_values_cache_key_name())
def revert(self, request, queryset):
"""
Admin action to revert a configuration back to the selected value
"""
if queryset.count() != 1:
self.message_user(request, _("Please select a single configuration to revert to."))
return
target = queryset[0]
target.id = None
self.save_model(request, target, None, False)
self.message_user(request, _("Reverted configuration."))
return HttpResponseRedirect(
reverse(
'admin:{}_{}_change'.format(
self.model._meta.app_label,
self.model._meta.model_name,
),
args=(target.id,),
)
)
class ShowHistoryFilter(ListFilter):
"""
Admin change view filter to show only the most recent (i.e. the "current") row for each
unique key value.
"""
title = _('Status')
parameter_name = 'show_history'
def __init__(self, request, params, model, model_admin):
super(ShowHistoryFilter, self).__init__(request, params, model, model_admin)
if self.parameter_name in params:
value = params.pop(self.parameter_name)
self.used_parameters[self.parameter_name] = value
def has_output(self):
""" Should this filter be shown? """
return True
def choices(self, cl):
""" Returns choices ready to be output in the template. """
show_all = self.used_parameters.get(self.parameter_name) == "1"
return (
{
'display': _('Current Configuration'),
'selected': not show_all,
'query_string': cl.get_query_string({}, [self.parameter_name]),
},
{
'display': _('All (Show History)'),
'selected': show_all,
'query_string': cl.get_query_string({self.parameter_name: "1"}, []),
}
)
def queryset(self, request, queryset):
""" Filter the queryset. No-op since it's done by KeyedConfigurationModelAdmin """
return queryset
def expected_parameters(self):
""" List the query string params used by this filter """
return [self.parameter_name]
class KeyedConfigurationModelAdmin(ConfigurationModelAdmin):
"""
:class:`~django.contrib.admin.ModelAdmin` for :class:`.ConfigurationModel` subclasses that
use extra keys (i.e. they have KEY_FIELDS set).
"""
date_hierarchy = None
list_filter = (ShowHistoryFilter, )
def get_queryset(self, request):
"""
Annote the queryset with an 'is_active' property that's true iff that row is the most
recently added row for that particular set of KEY_FIELDS values.
Filter the queryset to show only is_active rows by default.
"""
if request.GET.get(ShowHistoryFilter.parameter_name) == '1':
queryset = self.model.objects.with_active_flag()
else:
# Show only the most recent row for each key.
queryset = self.model.objects.current_set()
ordering = self.get_ordering(request)
if ordering:
return queryset.order_by(*ordering)
return queryset
def get_list_display(self, request):
""" Add a link to each row for creating a new row using the chosen row as a template """
return self.get_displayable_field_names() + ['edit_link']
def add_view(self, request, form_url='', extra_context=None):
# Prepopulate new configuration entries with the value of the current config, if given:
if 'source' in request.GET:
get = request.GET.copy()
source_id = int(get.pop('source')[0])
source = get_object_or_404(self.model, pk=source_id)
source_dict = models.model_to_dict(source)
for field_name, field_value in source_dict.items():
# read files into request.FILES, if:
# * user hasn't ticked the "clear" checkbox
# * user hasn't uploaded a new file
if field_value and isinstance(field_value, File):
clear_checkbox_name = '{0}-clear'.format(field_name)
if request.POST.get(clear_checkbox_name) != 'on':
request.FILES.setdefault(field_name, field_value)
get[field_name] = field_value
request.GET = get
# Call our grandparent's add_view, skipping the parent code
# because the parent code has a different way to prepopulate new configuration entries
# with the value of the latest config, which doesn't make sense for keyed models.
# pylint: disable=bad-super-call
return super(ConfigurationModelAdmin, self).add_view(request, form_url, extra_context)
def edit_link(self, inst):
""" Edit link for the change view """
if not inst.is_active:
return u'--'
update_url = reverse('admin:{}_{}_add'.format(self.model._meta.app_label, self.model._meta.model_name))
update_url += "?source={}".format(inst.pk)
return u'<a href="{}">{}</a>'.format(update_url, _('Update'))
edit_link.allow_tags = True
edit_link.short_description = _('Update')
"""Decorators for model-based configuration. """
from functools import wraps
from django.http import HttpResponseNotFound
def require_config(config_model):
"""View decorator that enables/disables a view based on configuration.
Arguments:
config_model (ConfigurationModel subclass): The class of the configuration
model to check.
Returns:
HttpResponse: 404 if the configuration model is disabled,
otherwise returns the response from the decorated view.
"""
def _decorator(func):
@wraps(func)
def _inner(*args, **kwargs):
if not config_model.current().enabled:
return HttpResponseNotFound()
else:
return func(*args, **kwargs)
return _inner
return _decorator
"""
Populates a ConfigurationModel by deserializing JSON data contained in a file.
"""
import os
from optparse import make_option
from django.core.management.base import BaseCommand, CommandError
from django.utils.translation import ugettext_lazy as _
from config_models.utils import deserialize_json
class Command(BaseCommand):
"""
This command will deserialize the JSON data in the supplied file to populate
a ConfigurationModel. Note that this will add new entries to the model, but it
will not delete any entries (ConfigurationModel entries are read-only).
"""
help = """
Populates a ConfigurationModel by deserializing the supplied JSON.
JSON should be in a file, with the following format:
{ "model": "config_models.ExampleConfigurationModel",
"data":
[
{ "enabled": True,
"color": "black"
...
},
{ "enabled": False,
"color": "yellow"
...
},
...
]
}
A username corresponding to an existing user must be specified to indicate who
is executing the command.
$ ... populate_model -f path/to/file.json -u username
"""
option_list = BaseCommand.option_list + (
make_option('-f', '--file',
metavar='JSON_FILE',
dest='file',
default=False,
help='JSON file to import ConfigurationModel data'),
make_option('-u', '--username',
metavar='USERNAME',
dest='username',
default=False,
help='username to specify who is executing the command'),
)
def handle(self, *args, **options):
if 'file' not in options or not options['file']:
raise CommandError(_("A file containing JSON must be specified."))
if 'username' not in options or not options['username']:
raise CommandError(_("A valid username must be specified."))
json_file = options['file']
if not os.path.exists(json_file):
raise CommandError(_("File {0} does not exist").format(json_file))
self.stdout.write(_("Importing JSON data from file {0}").format(json_file))
with open(json_file) as data:
created_entries = deserialize_json(data, options['username'])
self.stdout.write(_("Import complete, {0} new entries created").format(created_entries))
"""
Django Model baseclass for database-backed configuration.
"""
from django.db import connection, models
from django.contrib.auth.models import User
from django.core.cache import caches, InvalidCacheBackendError
from django.utils.translation import ugettext_lazy as _
from rest_framework.utils import model_meta
try:
cache = caches['configuration'] # pylint: disable=invalid-name
except InvalidCacheBackendError:
from django.core.cache import cache
class ConfigurationModelManager(models.Manager):
"""
Query manager for ConfigurationModel
"""
def _current_ids_subquery(self):
"""
Internal helper method to return an SQL string that will get the IDs of
all the current entries (i.e. the most recent entry for each unique set
of key values). Only useful if KEY_FIELDS is set.
"""
key_fields_escaped = [connection.ops.quote_name(name) for name in self.model.KEY_FIELDS]
# The following assumes that the rows with the most recent date also have the highest IDs
return "SELECT MAX(id) FROM {table_name} GROUP BY {key_fields}".format(
key_fields=', '.join(key_fields_escaped),
table_name=self.model._meta.db_table # pylint: disable=protected-access
)
def current_set(self):
"""
A queryset for the active configuration entries only. Only useful if KEY_FIELDS is set.
Active means the means recent entries for each unique combination of keys. It does not
necessaryily mean enbled.
"""
assert self.model.KEY_FIELDS != (), "Just use model.current() if there are no KEY_FIELDS"
return self.get_queryset().extra( # pylint: disable=no-member
where=["{table_name}.id IN ({subquery})".format(
table_name=self.model._meta.db_table, # pylint: disable=protected-access, no-member
subquery=self._current_ids_subquery(),
)],
select={'is_active': 1}, # This annotation is used by the admin changelist. sqlite requires '1', not 'True'
)
def with_active_flag(self):
"""
A query set where each result is annotated with an 'is_active' field that indicates
if it's the most recent entry for that combination of keys.
"""
if self.model.KEY_FIELDS:
subquery = self._current_ids_subquery()
return self.get_queryset().extra( # pylint: disable=no-member
select={'is_active': "{table_name}.id IN ({subquery})".format(
table_name=self.model._meta.db_table, # pylint: disable=protected-access, no-member
subquery=subquery,
)}
)
else:
return self.get_queryset().extra( # pylint: disable=no-member
select={'is_active': "{table_name}.id = {pk}".format(
table_name=self.model._meta.db_table, # pylint: disable=protected-access, no-member
pk=self.model.current().pk, # pylint: disable=no-member
)}
)
class ConfigurationModel(models.Model):
"""
Abstract base class for model-based configuration
Properties:
cache_timeout (int): The number of seconds that this configuration
should be cached
"""
class Meta(object):
abstract = True
ordering = ("-change_date", )
objects = ConfigurationModelManager()
KEY_FIELDS = ()
# The number of seconds
cache_timeout = 600
change_date = models.DateTimeField(auto_now_add=True, verbose_name=_("Change date"))
changed_by = models.ForeignKey(
User,
editable=False,
null=True,
on_delete=models.PROTECT,
# Translators: this label indicates the name of the user who made this change:
verbose_name=_("Changed by"),
)
enabled = models.BooleanField(default=False, verbose_name=_("Enabled"))
def save(self, *args, **kwargs):
"""
Clear the cached value when saving a new configuration entry
"""
# Always create a new entry, instead of updating an existing model
self.pk = None # pylint: disable=invalid-name
super(ConfigurationModel, self).save(*args, **kwargs)
cache.delete(self.cache_key_name(*[getattr(self, key) for key in self.KEY_FIELDS]))
if self.KEY_FIELDS:
cache.delete(self.key_values_cache_key_name())
@classmethod
def cache_key_name(cls, *args):
"""Return the name of the key to use to cache the current configuration"""
if cls.KEY_FIELDS != ():
if len(args) != len(cls.KEY_FIELDS):
raise TypeError(
"cache_key_name() takes exactly {} arguments ({} given)".format(len(cls.KEY_FIELDS), len(args))
)
return u'configuration/{}/current/{}'.format(cls.__name__, u','.join(unicode(arg) for arg in args))
else:
return 'configuration/{}/current'.format(cls.__name__)
@classmethod
def current(cls, *args):
"""
Return the active configuration entry, either from cache,
from the database, or by creating a new empty entry (which is not
persisted).
"""
cached = cache.get(cls.cache_key_name(*args))
if cached is not None:
return cached
key_dict = dict(zip(cls.KEY_FIELDS, args))
try:
current = cls.objects.filter(**key_dict).order_by('-change_date')[0]
except IndexError:
current = cls(**key_dict)
cache.set(cls.cache_key_name(*args), current, cls.cache_timeout)
return current
@classmethod
def is_enabled(cls, *key_fields):
"""
Returns True if this feature is configured as enabled, else False.
Arguments:
key_fields: The positional arguments are the KEY_FIELDS used to identify the
configuration to be checked.
"""
return cls.current(*key_fields).enabled
@classmethod
def key_values_cache_key_name(cls, *key_fields):
""" Key for fetching unique key values from the cache """
key_fields = key_fields or cls.KEY_FIELDS
return 'configuration/{}/key_values/{}'.format(cls.__name__, ','.join(key_fields))
@classmethod
def key_values(cls, *key_fields, **kwargs):
"""
Get the set of unique values in the configuration table for the given
key[s]. Calling cls.current(*value) for each value in the resulting
list should always produce an entry, though any such entry may have
enabled=False.
Arguments:
key_fields: The positional arguments are the KEY_FIELDS to return. For example if
you had a course embargo configuration where each entry was keyed on (country,
course), then you might want to know "What countries have embargoes configured?"
with cls.key_values('country'), or "Which courses have country restrictions?"
with cls.key_values('course'). You can also leave this unspecified for the
default, which returns the distinct combinations of all keys.
flat: If you pass flat=True as a kwarg, it has the same effect as in Django's
'values_list' method: Instead of returning a list of lists, you'll get one list
of values. This makes sense to use whenever there is only one key being queried.
Return value:
List of lists of each combination of keys found in the database.
e.g. [("Italy", "course-v1:SomeX+some+2015"), ...] for the course embargo example
"""
flat = kwargs.pop('flat', False)
assert not kwargs, "'flat' is the only kwarg accepted"
key_fields = key_fields or cls.KEY_FIELDS
cache_key = cls.key_values_cache_key_name(*key_fields)
cached = cache.get(cache_key)
if cached is not None:
return cached
values = list(cls.objects.values_list(*key_fields, flat=flat).order_by().distinct())
cache.set(cache_key, values, cls.cache_timeout)
return values
def fields_equal(self, instance, fields_to_ignore=("id", "change_date", "changed_by")):
"""
Compares this instance's fields to the supplied instance to test for equality.
This will ignore any fields in `fields_to_ignore`.
Note that this method ignores many-to-many fields.
Args:
instance: the model instance to compare
fields_to_ignore: List of fields that should not be compared for equality. By default
includes `id`, `change_date`, and `changed_by`.
Returns: True if the checked fields are all equivalent, else False
"""
for field in self._meta.get_fields():
if not field.many_to_many and field.name not in fields_to_ignore:
if getattr(instance, field.name) != getattr(self, field.name):
return False
return True
@classmethod
def equal_to_current(cls, json, fields_to_ignore=("id", "change_date", "changed_by")):
"""
Compares for equality this instance to a model instance constructed from the supplied JSON.
This will ignore any fields in `fields_to_ignore`.
Note that this method cannot handle fields with many-to-many associations, as those can only
be set on a saved model instance (and saving the model instance will create a new entry).
All many-to-many field entries will be removed before the equality comparison is done.
Args:
json: json representing an entry to compare
fields_to_ignore: List of fields that should not be compared for equality. By default
includes `id`, `change_date`, and `changed_by`.
Returns: True if the checked fields are all equivalent, else False
"""
# Remove many-to-many relationships from json.
# They require an instance to be already saved.
info = model_meta.get_field_info(cls)
for field_name, relation_info in info.relations.items():
if relation_info.to_many and (field_name in json):
json.pop(field_name)
new_instance = cls(**json)
key_field_args = tuple(getattr(new_instance, key) for key in cls.KEY_FIELDS)
current = cls.current(*key_field_args)
# If current.id is None, no entry actually existed and the "current" method created it.
if current.id is not None:
return current.fields_equal(new_instance, fields_to_ignore)
return False
"""
Override the submit_row template tag to remove all save buttons from the
admin dashboard change view if the context has readonly marked in it.
"""
from django.contrib.admin.templatetags.admin_modify import register
from django.contrib.admin.templatetags.admin_modify import submit_row as original_submit_row
@register.inclusion_tag('admin/submit_line.html', takes_context=True)
def submit_row(context):
"""
Overrides 'django.contrib.admin.templatetags.admin_modify.submit_row'.
Manipulates the context going into that function by hiding all of the buttons
in the submit row if the key `readonly` is set in the context.
"""
ctx = original_submit_row(context)
if context.get('readonly', False):
ctx.update({
'show_delete_link': False,
'show_save_as_new': False,
'show_save_and_add_another': False,
'show_save_and_continue': False,
'show_save': False,
})
else:
return ctx
{
"model": "config_models.ExampleDeserializeConfig",
"data": [
{
"name": "betty",
"enabled": true,
"int_field": 5
},
{
"name": "fred",
"enabled": false
}
]
}
"""
Tests of the populate_model management command and its helper utils.deserialize_json method.
"""
import textwrap
import os.path
from django.utils import timezone
from django.utils.six import BytesIO
from django.contrib.auth.models import User
from django.core.management.base import CommandError
from django.db import models
from config_models.management.commands import populate_model
from config_models.models import ConfigurationModel
from config_models.utils import deserialize_json
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase
class ExampleDeserializeConfig(ConfigurationModel):
"""
Test model for testing deserialization of ``ConfigurationModels`` with keyed configuration.
"""
KEY_FIELDS = ('name',)
name = models.TextField()
int_field = models.IntegerField(default=10)
def __unicode__(self):
return "ExampleDeserializeConfig(enabled={}, name={}, int_field={})".format(
self.enabled, self.name, self.int_field
)
class DeserializeJSONTests(CacheIsolationTestCase):
"""
Tests of deserializing the JSON representation of ConfigurationModels.
"""
def setUp(self):
super(DeserializeJSONTests, self).setUp()
self.test_username = 'test_worker'
User.objects.create_user(username=self.test_username)
self.fixture_path = os.path.join(os.path.dirname(__file__), 'data', 'data.json')
def test_deserialize_models(self):
"""
Tests the "happy path", where 2 instances of the test model should be created.
A valid username is supplied for the operation.
"""
start_date = timezone.now()
with open(self.fixture_path) as data:
entries_created = deserialize_json(data, self.test_username)
self.assertEquals(2, entries_created)
self.assertEquals(2, ExampleDeserializeConfig.objects.count())
betty = ExampleDeserializeConfig.current('betty')
self.assertTrue(betty.enabled)
self.assertEquals(5, betty.int_field)
self.assertGreater(betty.change_date, start_date)
self.assertEquals(self.test_username, betty.changed_by.username)
fred = ExampleDeserializeConfig.current('fred')
self.assertFalse(fred.enabled)
self.assertEquals(10, fred.int_field)
self.assertGreater(fred.change_date, start_date)
self.assertEquals(self.test_username, fred.changed_by.username)
def test_existing_entries_not_removed(self):
"""
Any existing configuration model entries are retained
(though they may be come history)-- deserialize_json is purely additive.
"""
ExampleDeserializeConfig(name="fred", enabled=True).save()
ExampleDeserializeConfig(name="barney", int_field=200).save()
with open(self.fixture_path) as data:
entries_created = deserialize_json(data, self.test_username)
self.assertEquals(2, entries_created)
self.assertEquals(4, ExampleDeserializeConfig.objects.count())
self.assertEquals(3, len(ExampleDeserializeConfig.objects.current_set()))
self.assertEquals(5, ExampleDeserializeConfig.current('betty').int_field)
self.assertEquals(200, ExampleDeserializeConfig.current('barney').int_field)
# The JSON file changes "enabled" to False for Fred.
fred = ExampleDeserializeConfig.current('fred')
self.assertFalse(fred.enabled)
def test_duplicate_entries_not_made(self):
"""
If there is no change in an entry (besides changed_by and change_date),
a new entry is not made.
"""
with open(self.fixture_path) as data:
entries_created = deserialize_json(data, self.test_username)
self.assertEquals(2, entries_created)
with open(self.fixture_path) as data:
entries_created = deserialize_json(data, self.test_username)
self.assertEquals(0, entries_created)
# Importing twice will still only result in 2 records (second import a no-op).
self.assertEquals(2, ExampleDeserializeConfig.objects.count())
# Change Betty.
betty = ExampleDeserializeConfig.current('betty')
betty.int_field = -8
betty.save()
self.assertEquals(3, ExampleDeserializeConfig.objects.count())
self.assertEquals(-8, ExampleDeserializeConfig.current('betty').int_field)
# Now importing will add a new entry for Betty.
with open(self.fixture_path) as data:
entries_created = deserialize_json(data, self.test_username)
self.assertEquals(1, entries_created)
self.assertEquals(4, ExampleDeserializeConfig.objects.count())
self.assertEquals(5, ExampleDeserializeConfig.current('betty').int_field)
def test_bad_username(self):
"""
Tests the error handling when the specified user does not exist.
"""
test_json = textwrap.dedent("""
{
"model": "config_models.ExampleDeserializeConfig",
"data": [{"name": "dino"}]
}
""")
with self.assertRaisesRegexp(Exception, "User matching query does not exist"):
deserialize_json(BytesIO(test_json), "unknown_username")
def test_invalid_json(self):
"""
Tests the error handling when there is invalid JSON.
"""
test_json = textwrap.dedent("""
{
"model": "config_models.ExampleDeserializeConfig",
"data": [{"name": "dino"
""")
with self.assertRaisesRegexp(Exception, "JSON parse error"):
deserialize_json(BytesIO(test_json), self.test_username)
def test_invalid_model(self):
"""
Tests the error handling when the configuration model specified does not exist.
"""
test_json = textwrap.dedent("""
{
"model": "xxx.yyy",
"data":[{"name": "dino"}]
}
""")
with self.assertRaisesRegexp(Exception, "No installed app"):
deserialize_json(BytesIO(test_json), self.test_username)
class PopulateModelTestCase(CacheIsolationTestCase):
"""
Tests of populate model management command.
"""
def setUp(self):
super(PopulateModelTestCase, self).setUp()
self.file_path = os.path.join(os.path.dirname(__file__), 'data', 'data.json')
self.test_username = 'test_management_worker'
User.objects.create_user(username=self.test_username)
def test_run_command(self):
"""
Tests the "happy path", where 2 instances of the test model should be created.
A valid username is supplied for the operation.
"""
_run_command(file=self.file_path, username=self.test_username)
self.assertEquals(2, ExampleDeserializeConfig.objects.count())
betty = ExampleDeserializeConfig.current('betty')
self.assertEquals(self.test_username, betty.changed_by.username)
fred = ExampleDeserializeConfig.current('fred')
self.assertEquals(self.test_username, fred.changed_by.username)
def test_no_user_specified(self):
"""
Tests that a username must be specified.
"""
with self.assertRaisesRegexp(CommandError, "A valid username must be specified"):
_run_command(file=self.file_path)
def test_bad_user_specified(self):
"""
Tests that a username must be specified.
"""
with self.assertRaisesRegexp(Exception, "User matching query does not exist"):
_run_command(file=self.file_path, username="does_not_exist")
def test_no_file_specified(self):
"""
Tests the error handling when no JSON file is supplied.
"""
with self.assertRaisesRegexp(CommandError, "A file containing JSON must be specified"):
_run_command(username=self.test_username)
def test_bad_file_specified(self):
"""
Tests the error handling when the path to the JSON file is incorrect.
"""
with self.assertRaisesRegexp(CommandError, "File does/not/exist.json does not exist"):
_run_command(file="does/not/exist.json", username=self.test_username)
def _run_command(*args, **kwargs):
"""Run the management command to deserializer JSON ConfigurationModel data. """
command = populate_model.Command()
return command.handle(*args, **kwargs)
# -*- coding: utf-8 -*-
"""
Tests of ConfigurationModel
"""
import ddt
from django.contrib.auth.models import User
from django.db import models
from django.test import TestCase
from rest_framework.test import APIRequestFactory
from freezegun import freeze_time
from mock import patch, Mock
from config_models.models import ConfigurationModel
from config_models.views import ConfigurationModelCurrentAPIView
class ExampleConfig(ConfigurationModel):
"""
Test model for testing ``ConfigurationModels``.
"""
cache_timeout = 300
string_field = models.TextField()
int_field = models.IntegerField(default=10)
def __unicode__(self):
return "ExampleConfig(enabled={}, string_field={}, int_field={})".format(
self.enabled, self.string_field, self.int_field
)
class ManyToManyExampleConfig(ConfigurationModel):
"""
Test model configuration with a many-to-many field.
"""
cache_timeout = 300
string_field = models.TextField()
many_user_field = models.ManyToManyField(User, related_name='topic_many_user_field')
def __unicode__(self):
return "ManyToManyExampleConfig(enabled={}, string_field={})".format(self.enabled, self.string_field)
@patch('config_models.models.cache')
class ConfigurationModelTests(TestCase):
"""
Tests of ConfigurationModel
"""
def setUp(self):
super(ConfigurationModelTests, self).setUp()
self.user = User()
self.user.save()
def test_cache_deleted_on_save(self, mock_cache):
ExampleConfig(changed_by=self.user).save()
mock_cache.delete.assert_called_with(ExampleConfig.cache_key_name())
def test_cache_key_name(self, __):
self.assertEquals(ExampleConfig.cache_key_name(), 'configuration/ExampleConfig/current')
def test_no_config_empty_cache(self, mock_cache):
mock_cache.get.return_value = None
current = ExampleConfig.current()
self.assertEquals(current.int_field, 10)
self.assertEquals(current.string_field, '')
mock_cache.set.assert_called_with(ExampleConfig.cache_key_name(), current, 300)
def test_no_config_full_cache(self, mock_cache):
current = ExampleConfig.current()
self.assertEquals(current, mock_cache.get.return_value)
def test_config_ordering(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
first = ExampleConfig(changed_by=self.user)
first.string_field = 'first'
first.save()
second = ExampleConfig(changed_by=self.user)
second.string_field = 'second'
second.save()
self.assertEquals(ExampleConfig.current().string_field, 'second')
def test_cache_set(self, mock_cache):
mock_cache.get.return_value = None
first = ExampleConfig(changed_by=self.user)
first.string_field = 'first'
first.save()
ExampleConfig.current()
mock_cache.set.assert_called_with(ExampleConfig.cache_key_name(), first, 300)
def test_active_annotation(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleConfig.objects.create(string_field='first')
ExampleConfig.objects.create(string_field='second')
rows = ExampleConfig.objects.with_active_flag().order_by('-change_date')
self.assertEqual(len(rows), 2)
self.assertEqual(rows[0].string_field, 'second')
self.assertEqual(rows[0].is_active, True)
self.assertEqual(rows[1].string_field, 'first')
self.assertEqual(rows[1].is_active, False)
def test_always_insert(self, __):
config = ExampleConfig(changed_by=self.user, string_field='first')
config.save()
config.string_field = 'second'
config.save()
self.assertEquals(2, ExampleConfig.objects.all().count())
def test_equality(self, mock_cache):
mock_cache.get.return_value = None
config = ExampleConfig(changed_by=self.user, string_field='first')
config.save()
self.assertTrue(ExampleConfig.equal_to_current({"string_field": "first"}))
self.assertTrue(ExampleConfig.equal_to_current({"string_field": "first", "enabled": False}))
self.assertTrue(ExampleConfig.equal_to_current({"string_field": "first", "int_field": 10}))
self.assertFalse(ExampleConfig.equal_to_current({"string_field": "first", "enabled": True}))
self.assertFalse(ExampleConfig.equal_to_current({"string_field": "first", "int_field": 20}))
self.assertFalse(ExampleConfig.equal_to_current({"string_field": "second"}))
self.assertFalse(ExampleConfig.equal_to_current({}))
def test_equality_custom_fields_to_ignore(self, mock_cache):
mock_cache.get.return_value = None
config = ExampleConfig(changed_by=self.user, string_field='first')
config.save()
# id, change_date, and changed_by will all be different for a newly created entry
self.assertTrue(ExampleConfig.equal_to_current({"string_field": "first"}))
self.assertFalse(
ExampleConfig.equal_to_current({"string_field": "first"}, fields_to_ignore=("change_date", "changed_by"))
)
self.assertFalse(
ExampleConfig.equal_to_current({"string_field": "first"}, fields_to_ignore=("id", "changed_by"))
)
self.assertFalse(
ExampleConfig.equal_to_current({"string_field": "first"}, fields_to_ignore=("change_date", "id"))
)
# Test the ability to ignore a different field ("int_field").
self.assertFalse(ExampleConfig.equal_to_current({"string_field": "first", "int_field": 20}))
self.assertTrue(
ExampleConfig.equal_to_current(
{"string_field": "first", "int_field": 20},
fields_to_ignore=("id", "change_date", "changed_by", "int_field")
)
)
def test_equality_ignores_many_to_many(self, mock_cache):
mock_cache.get.return_value = None
config = ManyToManyExampleConfig(changed_by=self.user, string_field='first')
config.save()
second_user = User(username="second_user")
second_user.save()
config.many_user_field.add(second_user) # pylint: disable=no-member
config.save()
# The many-to-many field is ignored in comparison.
self.assertTrue(
ManyToManyExampleConfig.equal_to_current({"string_field": "first", "many_user_field": "removed"})
)
class ExampleKeyedConfig(ConfigurationModel):
"""
Test model for testing ``ConfigurationModels`` with keyed configuration.
Does not inherit from ExampleConfig due to how Django handles model inheritance.
"""
cache_timeout = 300
KEY_FIELDS = ('left', 'right')
left = models.CharField(max_length=30)
right = models.CharField(max_length=30)
string_field = models.TextField()
int_field = models.IntegerField(default=10)
def __unicode__(self):
return "ExampleKeyedConfig(enabled={}, left={}, right={}, string_field={}, int_field={})".format(
self.enabled, self.left, self.right, self.string_field, self.int_field
)
@ddt.ddt
@patch('config_models.models.cache')
class KeyedConfigurationModelTests(TestCase):
"""
Tests for ``ConfigurationModels`` with keyed configuration.
"""
def setUp(self):
super(KeyedConfigurationModelTests, self).setUp()
self.user = User()
self.user.save()
@ddt.data(('a', 'b'), ('c', 'd'))
@ddt.unpack
def test_cache_key_name(self, left, right, _mock_cache):
self.assertEquals(
ExampleKeyedConfig.cache_key_name(left, right),
'configuration/ExampleKeyedConfig/current/{},{}'.format(left, right)
)
@ddt.data(
((), 'left,right'),
(('left', 'right'), 'left,right'),
(('left', ), 'left')
)
@ddt.unpack
def test_key_values_cache_key_name(self, args, expected_key, _mock_cache):
self.assertEquals(
ExampleKeyedConfig.key_values_cache_key_name(*args),
'configuration/ExampleKeyedConfig/key_values/{}'.format(expected_key))
@ddt.data(('a', 'b'), ('c', 'd'))
@ddt.unpack
def test_no_config_empty_cache(self, left, right, mock_cache):
mock_cache.get.return_value = None
current = ExampleKeyedConfig.current(left, right)
self.assertEquals(current.int_field, 10)
self.assertEquals(current.string_field, '')
mock_cache.set.assert_called_with(ExampleKeyedConfig.cache_key_name(left, right), current, 300)
@ddt.data(('a', 'b'), ('c', 'd'))
@ddt.unpack
def test_no_config_full_cache(self, left, right, mock_cache):
current = ExampleKeyedConfig.current(left, right)
self.assertEquals(current, mock_cache.get.return_value)
def test_config_ordering(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(
changed_by=self.user,
left='left_a',
right='right_a',
string_field='first_a',
).save()
ExampleKeyedConfig(
changed_by=self.user,
left='left_b',
right='right_b',
string_field='first_b',
).save()
ExampleKeyedConfig(
changed_by=self.user,
left='left_a',
right='right_a',
string_field='second_a',
).save()
ExampleKeyedConfig(
changed_by=self.user,
left='left_b',
right='right_b',
string_field='second_b',
).save()
self.assertEquals(ExampleKeyedConfig.current('left_a', 'right_a').string_field, 'second_a')
self.assertEquals(ExampleKeyedConfig.current('left_b', 'right_b').string_field, 'second_b')
def test_cache_set(self, mock_cache):
mock_cache.get.return_value = None
first = ExampleKeyedConfig(
changed_by=self.user,
left='left',
right='right',
string_field='first',
)
first.save()
ExampleKeyedConfig.current('left', 'right')
mock_cache.set.assert_called_with(ExampleKeyedConfig.cache_key_name('left', 'right'), first, 300)
def test_key_values(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
unique_key_pairs = ExampleKeyedConfig.key_values()
self.assertEquals(len(unique_key_pairs), 2)
self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
self.assertEquals(len(unique_left_keys), 2)
self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
def test_key_string_values(self, mock_cache):
""" Ensure str() vs unicode() doesn't cause duplicate cache entries """
ExampleKeyedConfig(left='left', right=u'〉☃', enabled=True, int_field=10, changed_by=self.user).save()
mock_cache.get.return_value = None
entry = ExampleKeyedConfig.current('left', u'〉☃')
key = mock_cache.get.call_args[0][0]
self.assertEqual(entry.int_field, 10)
mock_cache.get.assert_called_with(key)
self.assertEqual(mock_cache.set.call_args[0][0], key)
mock_cache.get.reset_mock()
entry = ExampleKeyedConfig.current(u'left', u'〉☃')
self.assertEqual(entry.int_field, 10)
mock_cache.get.assert_called_with(key)
def test_current_set(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()
queryset = ExampleKeyedConfig.objects.current_set()
self.assertEqual(len(queryset.all()), 2)
self.assertEqual(
set(queryset.order_by('int_field').values_list('int_field', flat=True)),
set([1, 2])
)
def test_active_annotation(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig.objects.create(left='left_a', right='right_a', string_field='first')
ExampleKeyedConfig.objects.create(left='left_b', right='right_b', string_field='first')
ExampleKeyedConfig.objects.create(left='left_a', right='right_a', string_field='second')
rows = ExampleKeyedConfig.objects.with_active_flag()
self.assertEqual(len(rows), 3)
for row in rows:
if row.left == 'left_a':
self.assertEqual(row.is_active, row.string_field == 'second')
else:
self.assertEqual(row.left, 'left_b')
self.assertEqual(row.string_field, 'first')
self.assertEqual(row.is_active, True)
def test_key_values_cache(self, mock_cache):
mock_cache.get.return_value = None
self.assertEquals(ExampleKeyedConfig.key_values(), [])
mock_cache.set.assert_called_with(ExampleKeyedConfig.key_values_cache_key_name(), [], 300)
fake_result = [('a', 'b'), ('c', 'd')]
mock_cache.get.return_value = fake_result
self.assertEquals(ExampleKeyedConfig.key_values(), fake_result)
def test_equality(self, mock_cache):
mock_cache.get.return_value = None
config1 = ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user)
config1.save()
config2 = ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user, enabled=True)
config2.save()
config3 = ExampleKeyedConfig(left='left_c', changed_by=self.user)
config3.save()
self.assertTrue(
ExampleKeyedConfig.equal_to_current({"left": "left_a", "right": "right_a", "int_field": 1})
)
self.assertTrue(
ExampleKeyedConfig.equal_to_current({"left": "left_b", "right": "right_b", "int_field": 2, "enabled": True})
)
self.assertTrue(
ExampleKeyedConfig.equal_to_current({"left": "left_c"})
)
self.assertFalse(
ExampleKeyedConfig.equal_to_current(
{"left": "left_a", "right": "right_a", "int_field": 1, "string_field": "foo"}
)
)
self.assertFalse(
ExampleKeyedConfig.equal_to_current({"left": "left_a", "int_field": 1})
)
self.assertFalse(
ExampleKeyedConfig.equal_to_current({"left": "left_b", "right": "right_b", "int_field": 2})
)
self.assertFalse(
ExampleKeyedConfig.equal_to_current({"left": "left_c", "int_field": 11})
)
self.assertFalse(ExampleKeyedConfig.equal_to_current({}))
@ddt.ddt
class ConfigurationModelAPITests(TestCase):
"""
Tests for the configuration model API.
"""
def setUp(self):
super(ConfigurationModelAPITests, self).setUp()
self.factory = APIRequestFactory()
self.user = User.objects.create_user(
username='test_user',
email='test_user@example.com',
password='test_pass',
)
self.user.is_superuser = True
self.user.save()
self.current_view = ConfigurationModelCurrentAPIView.as_view(model=ExampleConfig)
# Disable caching while testing the API
patcher = patch('config_models.models.cache', Mock(get=Mock(return_value=None)))
patcher.start()
self.addCleanup(patcher.stop)
def test_insert(self):
self.assertEquals("", ExampleConfig.current().string_field)
request = self.factory.post('/config/ExampleConfig', {"string_field": "string_value"})
request.user = self.user
__ = self.current_view(request)
self.assertEquals("string_value", ExampleConfig.current().string_field)
self.assertEquals(self.user, ExampleConfig.current().changed_by)
def test_multiple_inserts(self):
for i in xrange(3):
self.assertEquals(i, ExampleConfig.objects.all().count())
request = self.factory.post('/config/ExampleConfig', {"string_field": str(i)})
request.user = self.user
response = self.current_view(request)
self.assertEquals(201, response.status_code)
self.assertEquals(i + 1, ExampleConfig.objects.all().count())
self.assertEquals(str(i), ExampleConfig.current().string_field)
def test_get_current(self):
request = self.factory.get('/config/ExampleConfig')
request.user = self.user
response = self.current_view(request)
self.assertEquals('', response.data['string_field'])
self.assertEquals(10, response.data['int_field'])
self.assertEquals(None, response.data['changed_by'])
self.assertEquals(False, response.data['enabled'])
self.assertEquals(None, response.data['change_date'])
ExampleConfig(string_field='string_value', int_field=20).save()
response = self.current_view(request)
self.assertEquals('string_value', response.data['string_field'])
self.assertEquals(20, response.data['int_field'])
@ddt.data(
('get', [], 200),
('post', [{'string_field': 'string_value', 'int_field': 10}], 201),
)
@ddt.unpack
def test_permissions(self, method, args, status_code):
request = getattr(self.factory, method)('/config/ExampleConfig', *args)
request.user = User.objects.create_user(
username='no-perms',
email='no-perms@example.com',
password='no-perms',
)
response = self.current_view(request)
self.assertEquals(403, response.status_code)
request.user = self.user
response = self.current_view(request)
self.assertEquals(status_code, response.status_code)
"""
Utilities for working with ConfigurationModels.
"""
from django.apps import apps
from rest_framework.parsers import JSONParser
from rest_framework.serializers import ModelSerializer
from django.contrib.auth.models import User
def get_serializer_class(configuration_model):
""" Returns a ConfigurationModel serializer class for the supplied configuration_model. """
class AutoConfigModelSerializer(ModelSerializer):
"""Serializer class for configuration models."""
class Meta(object):
"""Meta information for AutoConfigModelSerializer."""
model = configuration_model
def create(self, validated_data):
if "changed_by_username" in self.context:
validated_data['changed_by'] = User.objects.get(username=self.context["changed_by_username"])
return super(AutoConfigModelSerializer, self).create(validated_data)
return AutoConfigModelSerializer
def deserialize_json(stream, username):
"""
Given a stream containing JSON, deserializers the JSON into ConfigurationModel instances.
The stream is expected to be in the following format:
{ "model": "config_models.ExampleConfigurationModel",
"data":
[
{ "enabled": True,
"color": "black"
...
},
{ "enabled": False,
"color": "yellow"
...
},
...
]
}
If the provided stream does not contain valid JSON for the ConfigurationModel specified,
an Exception will be raised.
Arguments:
stream: The stream of JSON, as described above.
username: The username of the user making the change. This must match an existing user.
Returns: the number of created entries
"""
parsed_json = JSONParser().parse(stream)
serializer_class = get_serializer_class(apps.get_model(parsed_json["model"]))
list_serializer = serializer_class(data=parsed_json["data"], context={"changed_by_username": username}, many=True)
if list_serializer.is_valid():
model_class = serializer_class.Meta.model
for data in reversed(list_serializer.validated_data):
if model_class.equal_to_current(data):
list_serializer.validated_data.remove(data)
entries_created = len(list_serializer.validated_data)
list_serializer.save()
return entries_created
else:
raise Exception(list_serializer.error_messages)
"""
API view to allow manipulation of configuration models.
"""
from rest_framework.generics import CreateAPIView, RetrieveAPIView
from rest_framework.permissions import DjangoModelPermissions
from rest_framework.authentication import SessionAuthentication
from django.db import transaction
from config_models.utils import get_serializer_class
class ReadableOnlyByAuthors(DjangoModelPermissions):
"""Only allow access by users with `add` permissions on the model."""
perms_map = DjangoModelPermissions.perms_map.copy()
perms_map['GET'] = perms_map['OPTIONS'] = perms_map['HEAD'] = perms_map['POST']
class AtomicMixin(object):
"""Mixin to provide atomic transaction for as_view."""
@classmethod
def create_atomic_wrapper(cls, wrapped_func):
"""Returns a wrapped function."""
def _create_atomic_wrapper(*args, **kwargs):
"""Actual wrapper."""
# When a view call fails due to a permissions error, it raises an exception.
# An uncaught exception breaks the DB transaction for any following DB operations
# unless it's wrapped in a atomic() decorator or context manager.
with transaction.atomic():
return wrapped_func(*args, **kwargs)
return _create_atomic_wrapper
@classmethod
def as_view(cls, **initkwargs):
"""Overrides as_view to add atomic transaction."""
view = super(AtomicMixin, cls).as_view(**initkwargs)
return cls.create_atomic_wrapper(view)
class ConfigurationModelCurrentAPIView(AtomicMixin, CreateAPIView, RetrieveAPIView):
"""
This view allows an authenticated user with the appropriate model permissions
to read and write the current configuration for the specified `model`.
Like other APIViews, you can use this by using a url pattern similar to the following::
url(r'config/example_config$', ConfigurationModelCurrentAPIView.as_view(model=ExampleConfig))
"""
authentication_classes = (SessionAuthentication,)
permission_classes = (ReadableOnlyByAuthors,)
model = None
def get_queryset(self):
return self.model.objects.all()
def get_object(self):
# Return the currently active configuration
return self.model.current()
def get_serializer_class(self):
if self.serializer_class is None:
self.serializer_class = get_serializer_class(self.model)
return self.serializer_class
def perform_create(self, serializer):
# Set the requesting user as the one who is updating the configuration
serializer.save(changed_by=self.request.user)
......@@ -92,6 +92,7 @@ git+https://github.com/edx/xblock-utils.git@v1.0.3#egg=xblock-utils==1.0.3
git+https://github.com/edx/edx-user-state-client.git@1.0.1#egg=edx-user-state-client==1.0.1
git+https://github.com/edx/xblock-lti-consumer.git@v1.0.9#egg=xblock-lti-consumer==1.0.9
git+https://github.com/edx/edx-proctoring.git@0.14.0#egg=edx-proctoring==0.14.0
git+https://github.com/edx/django-config-models.git@0.1.0#egg=config_models==0.1.0
# Third Party XBlocks
-e git+https://github.com/mitodl/edx-sga@172a90fd2738f8142c10478356b2d9ed3e55334a#egg=edx-sga
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment