Commit 6fb3daa7 by jsa Committed by Andy Armstrong

support group_access. JIRA: TNL-649

parent 17d892c5
......@@ -57,15 +57,6 @@ class InheritanceMixin(XBlockMixin):
default=False,
scope=Scope.settings,
)
group_access = Dict(
help="A dictionary that maps which groups can be shown this block. The keys "
"are group configuration ids and the values are a list of group IDs. "
"If there is no key for a group configuration or if the list of group IDs "
"is empty then the block is considered visible to all. Note that this "
"field is ignored if the block is visible_to_staff_only.",
default={},
scope=Scope.settings,
)
course_edit_method = String(
display_name=_("Course Editor"),
help=_("Enter the method by which this course is edited (\"XML\" or \"Studio\")."),
......
......@@ -475,11 +475,6 @@ class ParentLocationCache(dict):
def set(self, key, value):
self[key] = value
@contract(key=unicode)
def delete(self, key):
if key in self:
del self[key]
@contract(value="BlockUsageLocator")
def delete_by_value(self, value):
keys_to_delete = [k for k, v in self.iteritems() if v == value]
......
......@@ -10,7 +10,21 @@ from stevedore.extension import ExtensionManager
class UserPartitionError(Exception):
"""
An error was found regarding user partitions.
Base Exception for when an error was found regarding user partitions.
"""
pass
class NoSuchUserPartitionError(UserPartitionError):
"""
Exception to be raised when looking up a UserPartition by its ID fails.
"""
pass
class NoSuchUserPartitionGroupError(UserPartitionError):
"""
Exception to be raised when looking up a UserPartition Group by its ID fails.
"""
pass
......@@ -171,9 +185,14 @@ class UserPartition(namedtuple("UserPartition", "id name description groups sche
def get_group(self, group_id):
"""
Returns the group with the specified id.
Returns the group with the specified id. Raises NoSuchUserPartitionGroupError if not found.
"""
for group in self.groups: # pylint: disable=no-member
# pylint: disable=no-member
for group in self.groups:
if group.id == group_id:
return group
return None
raise NoSuchUserPartitionGroupError(
"could not find a Group with ID [{}] in UserPartition [{}]".format(group_id, self.id)
)
......@@ -8,7 +8,9 @@ from mock import Mock
from opaque_keys.edx.locations import SlashSeparatedCourseKey
from stevedore.extension import Extension, ExtensionManager
from xmodule.partitions.partitions import Group, UserPartition, UserPartitionError, USER_PARTITION_SCHEME_NAMESPACE
from xmodule.partitions.partitions import (
Group, UserPartition, UserPartitionError, NoSuchUserPartitionGroupError, USER_PARTITION_SCHEME_NAMESPACE
)
from xmodule.partitions.partitions_service import PartitionService
......@@ -259,6 +261,23 @@ class TestUserPartition(PartitionTestCase):
user_partition = UserPartition.from_json(jsonified)
self.assertNotIn("programmer", user_partition.to_json())
def test_get_group(self):
"""
UserPartition.get_group correctly returns the group referenced by the
`group_id` parameter, or raises NoSuchUserPartitionGroupError when
the lookup fails.
"""
self.assertEqual(
self.user_partition.get_group(self.TEST_GROUPS[0].id), # pylint: disable=no-member
self.TEST_GROUPS[0]
)
self.assertEqual(
self.user_partition.get_group(self.TEST_GROUPS[1].id), # pylint: disable=no-member
self.TEST_GROUPS[1]
)
with self.assertRaises(NoSuchUserPartitionGroupError):
self.user_partition.get_group(3)
class StaticPartitionService(PartitionService):
"""
......
......@@ -15,6 +15,7 @@ from xmodule.error_module import ErrorDescriptor
from xmodule.x_module import XModule
from xblock.core import XBlock
from xmodule.partitions.partitions import NoSuchUserPartitionError, NoSuchUserPartitionGroupError
from external_auth.models import ExternalAuthMap
from courseware.masquerade import is_masquerading_as_student
......@@ -301,6 +302,71 @@ def _has_access_error_desc(user, action, descriptor, course_key):
return _dispatch(checkers, action, user, descriptor)
def _has_group_access(descriptor, user, course_key):
"""
This function returns a boolean indicating whether or not `user` has
sufficient group memberships to "load" a block (the `descriptor`)
"""
if len(descriptor.user_partitions) == 0:
# short circuit the process, since there are no defined user partitions
return True
# use merged_group_access which takes group access on the block's
# parents / ancestors into account
merged_access = descriptor.merged_group_access
# check for False in merged_access, which indicates that at least one
# partition's group list excludes all students.
if False in merged_access.values():
log.warning("Group access check excludes all students, access will be denied.", exc_info=True)
return False
# resolve the partition IDs in group_access to actual
# partition objects, skipping those which contain empty group directives.
# if a referenced partition could not be found, access will be denied.
try:
partitions = [
descriptor._get_user_partition(partition_id) # pylint:disable=protected-access
for partition_id, group_ids in merged_access.items()
if group_ids is not None
]
except NoSuchUserPartitionError:
log.warning("Error looking up user partition, access will be denied.", exc_info=True)
return False
# next resolve the group IDs specified within each partition
partition_groups = []
try:
for partition in partitions:
groups = [
partition.get_group(group_id)
for group_id in merged_access[partition.id]
]
if groups:
partition_groups.append((partition, groups))
except NoSuchUserPartitionGroupError:
log.warning("Error looking up referenced user partition group, access will be denied.", exc_info=True)
return False
# look up the user's group for each partition
user_groups = {}
for partition, groups in partition_groups:
user_groups[partition.id] = partition.scheme.get_group_for_user(
course_key,
user,
partition,
)
# finally: check that the user has a satisfactory group assignment
# for each partition.
if not all(
user_groups.get(partition.id) in groups for partition, groups in partition_groups
):
return False
# all checks passed.
return True
def _has_access_descriptor(user, action, descriptor, course_key=None):
"""
Check if user has access to this descriptor.
......@@ -323,6 +389,12 @@ def _has_access_descriptor(user, action, descriptor, course_key=None):
if descriptor.visible_to_staff_only and not _has_staff_access_to_descriptor(user, descriptor, course_key):
return False
# enforce group access
if not _has_group_access(descriptor, user, course_key):
# if group_access check failed, deny access unless the requestor is staff,
# in which case immediately grant access.
return _has_staff_access_to_descriptor(user, descriptor, course_key)
# If start dates are off, can always load
if settings.FEATURES['DISABLE_START_DATES'] and not is_masquerading_as_student(user):
debug("Allow: DISABLE_START_DATES")
......
......@@ -105,7 +105,7 @@ class AccessTestCase(TestCase):
def test__has_access_descriptor(self):
# TODO: override DISABLE_START_DATES and test the start date branch of the method
user = Mock()
descriptor = Mock()
descriptor = Mock(user_partitions=[])
# Always returns true because DISABLE_START_DATES is set in test.py
self.assertTrue(access._has_access_descriptor(user, 'load', descriptor))
......@@ -118,7 +118,7 @@ class AccessTestCase(TestCase):
"""
Tests that "visible_to_staff_only" overrides start date.
"""
mock_unit = Mock()
mock_unit = Mock(user_partitions=[])
mock_unit._class_tags = {} # Needed for detached check in _has_access_descriptor
def verify_access(student_should_have_access):
......
"""
Namespace that defines fields common to all blocks used in the LMS
"""
from lazy import lazy
from xblock.fields import Boolean, Scope, String, XBlockMixin, Dict
from xblock.validation import ValidationMessage
from xmodule.modulestore.inheritance import UserPartitionList
from xmodule.partitions.partitions import NoSuchUserPartitionError, NoSuchUserPartitionGroupError
# Make '_' a no-op so we can scrape strings
_ = lambda text: text
class GroupAccessDict(Dict):
"""Special Dict class for serializing the group_access field"""
def from_json(self, access_dict):
if access_dict is not None:
return {int(k): access_dict[k] for k in access_dict}
def to_json(self, access_dict):
if access_dict is not None:
return {unicode(k): access_dict[k] for k in access_dict}
class LmsBlockMixin(XBlockMixin):
"""
Mixin that defines fields common to all blocks used in the LMS
......@@ -55,16 +69,56 @@ class LmsBlockMixin(XBlockMixin):
default=False,
scope=Scope.settings,
)
group_access = Dict(
help="A dictionary that maps which groups can be shown this block. The keys "
"are group configuration ids and the values are a list of group IDs. "
"If there is no key for a group configuration or if the list of group IDs "
"is empty then the block is considered visible to all. Note that this "
"field is ignored if the block is visible_to_staff_only.",
group_access = GroupAccessDict(
help=_(
"A dictionary that maps which groups can be shown this block. The keys "
"are group configuration ids and the values are a list of group IDs. "
"If there is no key for a group configuration or if the set of group IDs "
"is empty then the block is considered visible to all. Note that this "
"field is ignored if the block is visible_to_staff_only."
),
default={},
scope=Scope.settings,
)
@lazy
def merged_group_access(self):
"""
This computes access to a block's group_access rules in the context of its position
within the courseware structure, in the form of a lazily-computed attribute.
Each block's group_access rule is merged recursively with its parent's, guaranteeing
that any rule in a parent block will be enforced on descendants, even if a descendant
also defined its own access rules. The return value is always a dict, with the same
structure as that of the group_access field.
When merging access rules results in a case where all groups are denied access in a
user partition (which effectively denies access to that block for all students),
the special value False will be returned for that user partition key.
"""
parent = self.get_parent()
if not parent:
return self.group_access or {}
merged_access = parent.merged_group_access.copy()
if self.group_access is not None:
for partition_id, group_ids in self.group_access.items():
if group_ids: # skip if the "local" group_access for this partition is None or empty.
if partition_id in merged_access:
if merged_access[partition_id] is False:
# special case - means somewhere up the hierarchy, merged access rules have eliminated
# all group_ids from this partition, so there's no possible intersection.
continue
# otherwise, if the parent defines group access rules for this partition,
# intersect with the local ones.
merged_access[partition_id] = list(
set(merged_access[partition_id]).intersection(group_ids)
) or False
else:
# add the group access rules for this partition to the merged set of rules.
merged_access[partition_id] = group_ids
return merged_access
# Specified here so we can see what the value set at the course-level is.
user_partitions = UserPartitionList(
help=_("The list of group configurations for partitioning students in content experiments."),
......@@ -74,29 +128,14 @@ class LmsBlockMixin(XBlockMixin):
def _get_user_partition(self, user_partition_id):
"""
Returns the user partition with the specified id, or None if there is no such partition.
Returns the user partition with the specified id. Raises
`NoSuchUserPartitionError` if the lookup fails.
"""
for user_partition in self.user_partitions:
if user_partition.id == user_partition_id:
return user_partition
return None
def is_visible_to_group(self, user_partition, group):
"""
Returns true if this xblock should be shown to a user in the specified user partition group.
This method returns true if one of the following is true:
- the xblock has no group_access dictionary specified
- if the dictionary has no key for the user partition's id
- if the value for the user partition's id is an empty list
- if the value for the user partition's id contains the specified group's id
"""
if not self.group_access:
return True
group_ids = self.group_access.get(user_partition.id, [])
if len(group_ids) == 0:
return True
return group.id in group_ids
raise NoSuchUserPartitionError("could not find a UserPartition with ID [{}]".format(user_partition_id))
def validate(self):
"""
......@@ -105,8 +144,9 @@ class LmsBlockMixin(XBlockMixin):
_ = self.runtime.service(self, "i18n").ugettext # pylint: disable=redefined-outer-name
validation = super(LmsBlockMixin, self).validate()
for user_partition_id, group_ids in self.group_access.iteritems():
user_partition = self._get_user_partition(user_partition_id)
if not user_partition:
try:
user_partition = self._get_user_partition(user_partition_id)
except NoSuchUserPartitionError:
validation.add(
ValidationMessage(
ValidationMessage.ERROR,
......@@ -115,8 +155,9 @@ class LmsBlockMixin(XBlockMixin):
)
else:
for group_id in group_ids:
group = user_partition.get_group(group_id)
if not group:
try:
user_partition.get_group(group_id)
except NoSuchUserPartitionGroupError:
validation.add(
ValidationMessage(
ValidationMessage.ERROR,
......
......@@ -242,3 +242,142 @@ class XBlockGetParentTest(LmsXBlockMixinTestCase):
old_parent_location,
video.get_parent().location.for_branch(None)
)
class RenamedTuple(tuple): # pylint: disable=incomplete-protocol
"""
This class is only used to allow overriding __name__ on the tuples passed
through ddt, in order to have the generated test names make sense.
"""
pass
def ddt_named(parent, child):
"""
Helper to get more readable dynamically-generated test names from ddt.
"""
args = RenamedTuple([parent, child])
setattr(args, '__name__', 'parent_{}_child_{}'.format(parent, child))
return args
@ddt.ddt
class XBlockMergedGroupAccessTest(LmsXBlockMixinTestCase):
"""
Test that XBlock.merged_group_access is computed correctly according to
our access control rules.
"""
PARTITION_1 = 1
PARTITION_1_GROUP_1 = 11
PARTITION_1_GROUP_2 = 12
PARTITION_2 = 2
PARTITION_2_GROUP_1 = 21
PARTITION_2_GROUP_2 = 22
PARENT_CHILD_PAIRS = (
ddt_named('section', 'subsection'),
ddt_named('section', 'vertical'),
ddt_named('section', 'video'),
ddt_named('subsection', 'vertical'),
ddt_named('subsection', 'video'),
)
def setUp(self):
super(XBlockMergedGroupAccessTest, self).setUp()
self.build_course()
def set_group_access(self, block, access_dict):
"""
DRY helper.
"""
block.group_access = access_dict
block.runtime.modulestore.update_item(block, 1)
@ddt.data(*PARENT_CHILD_PAIRS)
@ddt.unpack
def test_intersecting_groups(self, parent, child):
"""
When merging group_access on a block, the resulting group IDs for each
partition is the intersection of the group IDs defined for that
partition across all ancestor blocks (including this one).
"""
parent_block = getattr(self, parent)
child_block = getattr(self, child)
self.set_group_access(parent_block, {self.PARTITION_1: [self.PARTITION_1_GROUP_1, self.PARTITION_1_GROUP_2]})
self.set_group_access(child_block, {self.PARTITION_1: [self.PARTITION_1_GROUP_2]})
self.assertEqual(
parent_block.merged_group_access,
{self.PARTITION_1: [self.PARTITION_1_GROUP_1, self.PARTITION_1_GROUP_2]},
)
self.assertEqual(
child_block.merged_group_access,
{self.PARTITION_1: [self.PARTITION_1_GROUP_2]},
)
@ddt.data(*PARENT_CHILD_PAIRS)
@ddt.unpack
def test_disjoint_groups(self, parent, child):
"""
When merging group_access on a block, if the intersection of group IDs
for a partition is empty, the merged value for that partition is False.
"""
parent_block = getattr(self, parent)
child_block = getattr(self, child)
self.set_group_access(parent_block, {self.PARTITION_1: [self.PARTITION_1_GROUP_1]})
self.set_group_access(child_block, {self.PARTITION_1: [self.PARTITION_1_GROUP_2]})
self.assertEqual(
parent_block.merged_group_access,
{self.PARTITION_1: [self.PARTITION_1_GROUP_1]},
)
self.assertEqual(
child_block.merged_group_access,
{self.PARTITION_1: False},
)
def test_disjoint_groups_no_override(self):
"""
Special case of the above test - ensures that `False` propagates down
to the block being queried even if blocks further down in the hierarchy
try to override it.
"""
self.set_group_access(self.section, {self.PARTITION_1: [self.PARTITION_1_GROUP_1]})
self.set_group_access(self.subsection, {self.PARTITION_1: [self.PARTITION_1_GROUP_2]})
self.set_group_access(self.vertical, {self.PARTITION_1: [self.PARTITION_1_GROUP_1, self.PARTITION_1_GROUP_2]})
self.assertEqual(
self.vertical.merged_group_access,
{self.PARTITION_1: False},
)
self.assertEqual(
self.video.merged_group_access,
{self.PARTITION_1: False},
)
@ddt.data(*PARENT_CHILD_PAIRS)
@ddt.unpack
def test_union_partitions(self, parent, child):
"""
When merging group_access on a block, the result's keys (partitions)
are the union of all partitions specified across all ancestor blocks
(including this one).
"""
parent_block = getattr(self, parent)
child_block = getattr(self, child)
self.set_group_access(parent_block, {self.PARTITION_1: [self.PARTITION_1_GROUP_1]})
self.set_group_access(child_block, {self.PARTITION_2: [self.PARTITION_1_GROUP_2]})
self.assertEqual(
parent_block.merged_group_access,
{self.PARTITION_1: [self.PARTITION_1_GROUP_1]},
)
self.assertEqual(
child_block.merged_group_access,
{self.PARTITION_1: [self.PARTITION_1_GROUP_1], self.PARTITION_2: [self.PARTITION_1_GROUP_2]},
)
......@@ -3,6 +3,8 @@ Provides a UserPartition driver for cohorts.
"""
import logging
from xmodule.partitions.partitions import NoSuchUserPartitionGroupError
from .cohorts import get_cohort, get_partition_group_id_for_cohort
log = logging.getLogger(__name__)
......@@ -56,8 +58,9 @@ class CohortPartitionScheme(object):
# fail silently
return None
group = user_partition.get_group(group_id)
if group is None:
try:
return user_partition.get_group(group_id)
except NoSuchUserPartitionGroupError:
# if we have a match but the group doesn't exist in the partition,
# it means the mapping is invalid. the previous state of the
# partition configuration may have been modified.
......@@ -67,9 +70,8 @@ class CohortPartitionScheme(object):
"requested_partition_id": user_partition.id,
"requested_group_id": group_id,
"cohort_id": cohort.id,
}
},
exc_info=True
)
# fail silently
return None
return group
"""
Provides partition support to the user service.
"""
import logging
import random
import api.course_tag as course_tag_api
from xmodule.partitions.partitions import UserPartitionError
from xmodule.partitions.partitions import UserPartitionError, NoSuchUserPartitionGroupError
log = logging.getLogger(__name__)
class RandomUserPartitionScheme(object):
......@@ -22,7 +24,22 @@ class RandomUserPartitionScheme(object):
"""
partition_key = cls._key_for_partition(user_partition)
group_id = course_tag_api.get_course_tag(user, course_id, partition_key)
group = user_partition.get_group(int(group_id)) if not group_id is None else None
group = None
if group_id is not None:
# attempt to look up the presently assigned group
try:
group = user_partition.get_group(int(group_id))
except NoSuchUserPartitionGroupError:
# jsa: we can turn off warnings here if this is an expected case.
log.warn(
"group not found in RandomUserPartitionScheme: %r",
{
"requested_partition_id": user_partition.id,
"requested_group_id": group_id,
},
exc_info=True
)
if group is None and assign:
if not user_partition.groups:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment