Commit 82f3c0df by Nimisha Asthagiri

Blocks Transformers: Visibility, Start Date, User Partitions

parent 8407cac1
"""
...
"""
# TODO 8874: Finish docstrings for all of lms_course_cache/
# Importing signals is necessary to active the course publish/delete signal handlers.
import signals # pylint: disable=unused-import
"""
Signal handler for invalidating cached courses.
"""
from django.dispatch.dispatcher import receiver
from xmodule.modulestore.django import SignalHandler
from api import clear_course_from_cache
@receiver(SignalHandler.course_published)
def _listen_for_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Catches the signal that a course has been published in the module store and
invalidates the corresponding cache entry if one exists.
"""
clear_course_from_cache(course_key)
@receiver(SignalHandler.course_deleted)
def _listen_for_course_delete(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Catches the signal that a course has been deleted from the module store and
invalidates the corresponding cache entry if one exists.
"""
clear_course_from_cache(course_key)
"""
Tests for the LMS side of the course cache.
"""
# TODO 8874: Write tests to cover api.py and signals.py.
"""
...
"""
from openedx.core.lib.block_cache.transformer import BlockStructureTransformer
from xmodule.course_metadata_utils import DEFAULT_START_DATE
from courseware.access_utils import check_start_date
class StartDateTransformer(BlockStructureTransformer):
"""
...
"""
VERSION = 1
MERGED_START_DATE = 'merged_start_date'
@classmethod
def get_merged_start_date(cls, block_structure, block_key):
return block_structure.get_transformer_block_data(
block_key, cls, cls.MERGED_START_DATE, False
)
@classmethod
def collect(cls, block_structure):
"""
Collects any information that's necessary to execute this transformer's
transform method.
"""
block_structure.request_xblock_fields('days_early_for_beta')
for block_key in block_structure.topological_traversal():
# compute merged value of start date from all parents
parents = block_structure.get_parents(block_key)
min_all_parents_start_date = min(
cls.get_merged_start_date(block_structure, parent_key)
for parent_key in parents
) if parents else None
# set the merged value for this block
block_start = block_structure.get_xblock(block_key).start
block_structure.set_transformer_block_data(
block_key,
cls,
cls.MERGED_START_DATE,
# max of merged-start-from-all-parents and this block
max(min_all_parents_start_date or block_start, block_start)
)
def transform(self, user_info, block_structure):
"""
Mutates block_structure based on the given user_info.
"""
if user_info.has_staff_access:
return
block_structure.remove_block_if(
lambda block_key: not check_start_date(
user_info.user,
block_structure.get_xblock_field(block_key, 'days_early_for_beta'),
self.get_merged_start_date(block_structure, block_key),
user_info.course_key,
)
)
"""
...
"""
from student.tests.factories import CourseEnrollmentFactory, UserFactory
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from courseware.access import has_access
from course_blocks.api import get_course_blocks
class CourseStructureTestCase(ModuleStoreTestCase):
"""
Helper for test cases that need to build course structures.
"""
def build_course(self, course_hierarchy):
"""
Build a hierarchy of XBlocks.
Arguments:
course_hierarchy (BlockStructureDict): Definition of course hierarchy.
where a BlockStructureDict is a dict in the form {
'key1': 'value1',
...
'keyN': 'valueN',
'#type': block_type,
'#ref': short_string_for_referencing_block,
'#children': list[BlockStructureDict]
}
Special keys start with '#'; the rest just get passed as kwargs to
Factory.create.
Returns:
dict[str: XBlock]: Mapping from '#ref' values to their XBlocks.
"""
block_map = {}
def build_xblock(block_hierarchy, parent):
"""
Build an XBlock, add it to result_dict, and call build_xblock on the
children defined in block_dict.
Arguments:
block_hierarchy (BlockStructureDict): Definition of hierarchy,
from this block down.
is_root (bool): Whether this is the course's root XBlock.
"""
block_type = block_hierarchy['#type']
block_ref = block_hierarchy['#ref']
factory = (CourseFactory if block_type == 'course' else ItemFactory)
kwargs = {key: value for key, value in block_hierarchy.iteritems() if key[0] != '#'}
if block_type != 'course':
kwargs['category'] = block_type
if parent:
kwargs['parent'] = parent
xblock = factory.create(
display_name='{} {}'.format(block_type, block_ref),
publish_item=True,
**kwargs
)
block_map[block_ref] = xblock
for child_hierarchy in block_hierarchy.get('#children', []):
build_xblock(child_hierarchy, xblock)
if '#type' not in course_hierarchy:
course_hierarchy['#type'] = 'course'
build_xblock(course_hierarchy, None)
return block_map
class BlockParentsMapTestCase(ModuleStoreTestCase):
# Tree formed by parent_map:
# 0
# / \
# 1 2
# / \ / \
# 3 4 / 5
# \ /
# 6
# Note the parents must always have lower indices than their children.
parents_map = [[], [0], [0], [1], [1], [2], [2, 4]]
# TODO change this to setupClass style
def setUp(self, **kwargs):
super(BlockParentsMapTestCase, self).setUp()
self.course = CourseFactory.create()
self.xblock_keys = [self.course.location]
for i, parents_index in enumerate(self.parents_map):
if i == 0:
continue # course already created
self.xblock_keys.append(
ItemFactory.create(
parent=self.get_block(parents_index[0]),
category="vertical",
).location
)
# add additional parents
if len(parents_index) > 1:
for index in range(1, len(parents_index)):
parent_index = parents_index[index]
parent_block = self.get_block(parent_index)
parent_block.children.append(self.xblock_keys[i])
self.update_block(parent_block)
self.password = 'test'
self.student = UserFactory.create(is_staff=False, password=self.password)
self.staff = UserFactory.create(is_staff=True, password=self.password)
CourseEnrollmentFactory.create(is_active=True, mode='honor', user=self.student, course_id=self.course.id)
def check_transformer_results(
self, expected_student_accessible_blocks, blocks_with_differing_student_access, transformers=None
):
def check_results(user, expected_accessible_blocks, blocks_with_differing_access):
self.client.login(username=user.username, password=self.password)
block_structure = get_course_blocks(
user, self.course.id, self.course.location, transformers=transformers
)
for i, xblock_key in enumerate(self.xblock_keys):
block_structure_result = block_structure.has_block(xblock_key)
has_access_result = bool(has_access(user, 'load', self.get_block(i)))
self.assertEquals(
block_structure_result,
i in expected_accessible_blocks,
"block_structure return value {0} not equal to expected value for block {1}".format(
block_structure_result, i
))
if i in blocks_with_differing_access:
self.assertNotEqual(
block_structure_result,
has_access_result,
"block structure and has_access results are equal for block {0}".format(i)
)
else:
self.assertEquals(
block_structure_result,
has_access_result,
"block structure and has_access results are not equal for block {0}".format(i)
)
self.client.logout()
check_results(self.student, expected_student_accessible_blocks, blocks_with_differing_student_access)
check_results(self.staff, {0, 1, 2, 3, 4, 5, 6}, {})
def get_block(self, i):
return modulestore().get_item(self.xblock_keys[i])
def update_block(self, block):
return modulestore().update_item(block, 'test_user')
\ No newline at end of file
"""
Tests for StartDateTransformer.
"""
import ddt
from datetime import timedelta
from django.utils.timezone import now
from mock import patch
from courseware.access import has_access
from xmodule.course_metadata_utils import DEFAULT_START_DATE
from ..start_date import StartDateTransformer
from .test_helpers import BlockParentsMapTestCase
from xmodule.modulestore.django import modulestore
@ddt.ddt
class StartDateTransformerTestCase(BlockParentsMapTestCase):
"""
...
"""
class StartDateType(object):
"""
Use constant enum types for deterministic ddt test method names (rather than dynamically generated timestamps)
"""
released = 1,
future = 2,
default = 3
TODAY = now()
LAST_MONTH = TODAY - timedelta(days=30)
NEXT_MONTH = TODAY + timedelta(days=30)
@classmethod
def start(cls, enum_value):
if enum_value == cls.released:
return cls.LAST_MONTH
elif enum_value == cls.future:
return cls.NEXT_MONTH
else:
return DEFAULT_START_DATE
# Following test cases are based on BlockParentsMapTestCase.parents_map
@patch.dict('django.conf.settings.FEATURES', {'DISABLE_START_DATES': False})
@ddt.data(
({}, {}, {}),
({0: StartDateType.released}, {0, 1, 2, 3, 4, 5, 6}, {}),
({1: StartDateType.released}, {}, {1, 3, 4, 6}),
({1: StartDateType.released, 2: StartDateType.released}, {}, {1, 2, 3, 4, 5, 6}),
({0: StartDateType.released, 4: StartDateType.future}, {0, 1, 2, 3, 5}, {}),
)
@ddt.unpack
def test_block_start_date(
self, start_date_type_values, expected_student_visible_blocks, blocks_with_differing_student_access
):
for i, start_date_type in start_date_type_values.iteritems():
block = self.get_block(i)
block.start = self.StartDateType.start(start_date_type)
self.update_block(block)
self.check_transformer_results(
expected_student_visible_blocks, blocks_with_differing_student_access, [StartDateTransformer()]
)
"""
Tests for UserPartitionTransformation.
"""
from openedx.core.djangoapps.course_groups.partition_scheme import CohortPartitionScheme
from openedx.core.djangoapps.course_groups.tests.helpers import CohortFactory, config_course_cohorts
from openedx.core.djangoapps.course_groups.cohorts import add_user_to_cohort
from openedx.core.djangoapps.course_groups.views import link_cohort_to_partition_group
from student.tests.factories import UserFactory
from student.tests.factories import CourseEnrollmentFactory
from xmodule.modulestore.django import modulestore
from xmodule.partitions.partitions import Group, UserPartition
from course_blocks.transformers.user_partitions import UserPartitionTransformer
from course_blocks.api import get_course_blocks, clear_course_from_cache
from test_helpers import CourseStructureTestCase
class UserPartitionTransformerTestCase(CourseStructureTestCase):
"""
...
"""
def setUp(self):
super(UserPartitionTransformerTestCase, self).setUp()
# Set up user partitions and groups.
self.groups = [Group(1, 'Group 1'), Group(2, 'Group 2')]
self.content_groups = [1, 2]
self.user_partition = UserPartition(
id=0,
name='Partition 1',
description='This is partition 1',
groups=self.groups,
scheme=CohortPartitionScheme
)
self.user_partition.scheme.name = "cohort"
# Build course.
self.course_hierarchy = self.get_test_course_hierarchy()
self.blocks = self.build_course(self.course_hierarchy)
self.course = self.blocks['course']
clear_course_from_cache(self.course.id)
# Set up user and enroll in course.
self.password = 'test'
self.user = UserFactory.create(password=self.password)
CourseEnrollmentFactory.create(user=self.user, course_id=self.course.id, is_active=True)
# Set up cohorts.
config_course_cohorts(self.course, is_cohorted=True)
self.cohorts = [CohortFactory(course_id=self.course.id) for __ in enumerate(self.groups)]
self.add_user_to_cohort_group(self.cohorts[0], self.groups[0])
def get_test_course_hierarchy(self):
"""
Get a course hierarchy to test with.
Assumes self.user_partition has already been initialized.
"""
return {
'org': 'UserPartitionTransformation',
'course': 'UP101F',
'run': 'test_run',
'user_partitions': [self.user_partition],
'#ref': 'course',
'#children': [
{
'#type': 'chapter',
'#ref': 'chapter1',
'#children': [
{
'metadata': {
'group_access': {0: [0, 1, 2]},
},
'#type': 'sequential',
'#ref': 'lesson1',
'#children': [
{
'#type': 'vertical',
'#ref': 'vertical1',
'#children': [
{
'metadata': {'group_access': {0: [0]}},
'#type': 'html',
'#ref': 'html1',
},
{
'metadata': {'group_access': {0: [1]}},
'#type': 'html',
'#ref': 'html2',
}
],
}
],
}
],
}
]
}
def add_user_to_cohort_group(self, cohort, group):
"""
Add user to cohort, link cohort to content group, and update blocks.
"""
add_user_to_cohort(cohort, self.user.username)
link_cohort_to_partition_group(
cohort,
self.user_partition.id,
group.id,
)
def get_block_key_set(self, *refs):
"""
Gets the set of usage keys that correspond to the list of
#ref values as defined on self.blocks.
Returns: set[UsageKey]
"""
xblocks = (self.blocks[ref] for ref in refs)
return set([xblock.location for xblock in xblocks])
def test_course_structure_with_user_partition(self):
self.transformation = UserPartitionTransformer()
raw_block_structure = get_course_blocks(
self.user,
self.course.id,
self.course.location,
transformers={}
)
self.assertEqual(len(list(raw_block_structure.get_block_keys())), len(self.blocks))
clear_course_from_cache(self.course.id)
trans_block_structure = get_course_blocks(
self.user,
self.course.id,
self.course.location,
transformers={self.transformation}
)
self.assertSetEqual(
set(trans_block_structure.get_block_keys()),
self.get_block_key_set('course', 'chapter1', 'lesson1', 'vertical1', 'html2')
)
"""
Tests for VisibilityTransformer.
"""
import ddt
from course_blocks.transformers.visibility import VisibilityTransformer
from .test_helpers import BlockParentsMapTestCase
@ddt.ddt
class VisibilityTransformerTestCase(BlockParentsMapTestCase):
"""
...
"""
# Following test cases are based on BlockParentsMapTestCase.parents_map
@ddt.data(
({}, {0, 1, 2, 3, 4, 5, 6}, {}),
({0}, {}, {1, 2, 3, 4, 5, 6}),
({1}, {0, 2, 5, 6}, {3, 4}),
({2}, {0, 1, 3, 4, 6}, {5}),
({3}, {0, 1, 2, 4, 5, 6}, {}),
({4}, {0, 1, 2, 3, 5, 6}, {}),
({5}, {0, 1, 2, 3, 4, 6}, {}),
({6}, {0, 1, 2, 3, 4, 5}, {}),
({1, 2}, {0}, {3, 4, 5, 6}),
({2, 4}, {0, 1, 3}, {5, 6}),
({1, 2, 3, 4, 5, 6}, {0}, {}),
)
@ddt.unpack
def test_block_visibility(
self, staff_only_blocks, expected_student_visible_blocks, blocks_with_differing_student_access
):
for i, _ in enumerate(self.parents_map):
block = self.get_block(i)
block.visible_to_staff_only = (i in staff_only_blocks)
self.update_block(block)
self.check_transformer_results(
expected_student_visible_blocks, blocks_with_differing_student_access, [VisibilityTransformer()]
)
"""
...
"""
from courseware.access import _has_access_to_course
from openedx.core.lib.block_cache.transformer import BlockStructureTransformer
from openedx.core.djangoapps.user_api.partition_schemes import RandomUserPartitionScheme
from openedx.core.djangoapps.course_groups.partition_scheme import CohortPartitionScheme
# TODO 8874: Make it so we support all schemes instead of manually declaring them here.
INCLUDE_SCHEMES = [CohortPartitionScheme, RandomUserPartitionScheme,]
SCHEME_SUPPORTS_ASSIGNMENT = [RandomUserPartitionScheme,]
class MergedGroupAccess(object):
"""
...
"""
# TODO 8874: Make it so LmsBlockMixin.merged_group_access use MergedGroupAccess
def __init__(self, user_partitions, xblock, merged_parent_access_list):
"""
Arguments:
user_partitions (list[UserPartition])
xblock (XBlock)
merged_parent_access_list (list[MergedGroupAccess])
"""
# How group access restrictions are represented within an XBlock:
# - group_access not defined => No group access restrictions.
# - For each partition:
# - partition.id not in group_access => All groups have access for this partition
# - group_access[partition_id] is None => All groups have access for this partition
# - group_access[partition_id] == [] => All groups have access for this partition
# - group_access[partition_id] == [group1..groupN] => groups 1..N have access for this partition
#
# We internally represent the restrictions in a simplified way:
# - self._access == {} => No group access restrictions.
# - For each partition:
# - partition.id not in _access => All groups have access for this partition
# - _access[partition_id] == set() => No groups have access for this partition
# - _access[partition_id] == set(group1..groupN) => groups 1..N have access for this partition
#
# Note that a user must have access to all partitions in group_access
# or _access in order to access a block.
block_group_access = getattr(xblock, 'group_access', {})
self._access = {} # { partition.id: set(IDs of groups that can access partition }
for partition in user_partitions:
# Within this loop, None <=> Universe set <=> "No access restriction"
block_group_ids = set(block_group_access.get(partition.id, [])) or None
parents_group_ids = [
merged_parent_access._access[partition.id]
for merged_parent_access in merged_parent_access_list
if partition.id in merged_parent_access._access
]
merged_parent_group_ids = (
set().union(*parents_group_ids)
if parents_group_ids != []
else None
)
merged_group_ids = MergedGroupAccess._intersection(block_group_ids, merged_parent_group_ids)
if merged_group_ids is not None:
self._access[partition.id] = merged_group_ids
@staticmethod
def _intersection(*sets):
"""
Compute an intersection of sets, interpreting None as the Universe set.
This makes __init__ a bit more elegant.
Arguments:
sets (list[set or None]), where None represents the Universe set.
Returns:
set or None, where None represents the Universe set.
"""
non_universe_sets = [set_ for set_ in sets if set_ is not None]
if non_universe_sets:
first, rest = non_universe_sets[0], non_universe_sets[1:]
return first.intersection(*rest)
else:
return None
def check_group_access(self, user_groups):
"""
Arguments:
dict[int: Group]: Given a user, a mapping from user partition IDs
to the group to which the user belongs in each partition.
Returns:
bool: Whether said user has group access.
"""
for partition_id, allowed_group_ids in self._access.iteritems():
# If the user is not assigned to a group for this partition, deny access.
# TODO 8874: Ensure that denying access to users who aren't in a group is the correct action.
if partition_id not in user_groups:
return False
# If the user belongs to one of the allowed groups for this partition,
# then move and check the next partition.
elif user_groups[partition_id].id in allowed_group_ids:
continue
# Else, deny access.
else:
return False
# If the user has access for every partition, grant access.
else:
return True
class UserPartitionTransformer(BlockStructureTransformer):
"""
...
"""
VERSION = 1
@staticmethod
def _get_user_partition_groups(course_key, user_partitions, user):
"""
Collect group ID for each partition in this course for this user.
Arguments:
course_key (CourseKey)
user_partitions (list[UserPartition])
user (User)
Returns:
dict[int: Group]: Mapping from user partitions to the group to which
the user belongs in each partition. If the user isn't in a group
for a particular partition, then that partition's ID will not be
in the dict.
"""
partition_groups = {}
for partition in user_partitions:
if partition.scheme not in INCLUDE_SCHEMES:
continue
group = partition.scheme.get_group_for_user(
course_key,
user,
partition,
**({'assign': False} if partition.scheme in SCHEME_SUPPORTS_ASSIGNMENT else {})
)
if group is not None:
partition_groups[partition.id] = group
return partition_groups
@classmethod
def collect(cls, block_structure):
"""
Computes any information for each XBlock that's necessary to execute
this transformation's apply method.
Arguments:
course_key (CourseKey)
block_structure (BlockStructure)
xblock_dict (dict[UsageKey: XBlock])
Returns:
dict[UsageKey: dict]
"""
# Because user partitions are course-wide, only store data for them on the root block.
root_block = block_structure.get_xblock(block_structure.root_block_key)
user_partitions = getattr(root_block, 'user_partitions', []) or []
block_structure.set_transformer_data(cls, 'user_partitions', user_partitions)
# If there are no user partitions, this transformation is a no-op,
# so there is nothing to collect.
if not user_partitions:
return
# For each block, compute merged group access. Because this is a
# topological sort, we know a block's parents are guaranteed to
# already have merged group access computed before the block itself.
for block_key in block_structure.topological_traversal():
xblock = block_structure.get_xblock(block_key)
parent_keys = block_structure.get_parents(block_key)
parent_access = [
block_structure.get_transformer_block_data(parent_key, cls, 'merged_group_access')
for parent_key in parent_keys
]
merged_group_access = MergedGroupAccess(user_partitions, xblock, parent_access)
block_structure.set_transformer_block_data(block_key, cls, 'merged_group_access', merged_group_access)
def transform(self, user_info, block_structure):
"""
Mutates block_structure and block_data based on the given user_info.
"""
# TODO 8874: Factor out functionality of UserPartitionTransformation.apply and access._has_group_access into a common utility function.
user_partitions = block_structure.get_transformer_data(self, 'user_partitions')
# If there are no user partitions, this transformation is a no-op,
# so there is nothing to apply.
if not user_partitions:
return
user_groups = self._get_user_partition_groups(
user_info.course_key, user_partitions, user_info.user
)
if not user_info.has_staff_access:
block_structure.remove_block_if(
lambda block_key: not block_structure.get_transformer_block_data(
block_key, self, 'merged_group_access'
).check_group_access(user_groups)
)
"""
...
"""
from openedx.core.lib.block_cache.transformer import BlockStructureTransformer
class VisibilityTransformer(BlockStructureTransformer):
"""
...
"""
VERSION = 1
MERGED_VISIBLE_TO_STAFF_ONLY = 'merged_visible_to_staff_only'
@classmethod
def get_visible_to_staff_only(cls, block_structure, block_key):
return block_structure.get_transformer_block_data(
block_key, cls, cls.MERGED_VISIBLE_TO_STAFF_ONLY, False
)
@classmethod
def collect(cls, block_structure):
"""
Collects any information that's necessary to execute this transformer's
transform method.
"""
for block_key in block_structure.topological_traversal():
# compute merged value of visible_to_staff_only from all parents
parents = block_structure.get_parents(block_key)
all_parents_visible_to_staff_only = all(
cls.get_visible_to_staff_only(block_structure, parent_key)
for parent_key in parents
) if parents else False
# set the merged value for this block
block_structure.set_transformer_block_data(
block_key,
cls,
cls.MERGED_VISIBLE_TO_STAFF_ONLY,
# merge visible_to_staff_only from all parents and this block
(
all_parents_visible_to_staff_only or
block_structure.get_xblock(block_key).visible_to_staff_only
)
)
def transform(self, user_info, block_structure):
"""
Mutates block_structure based on the given user_info.
"""
if user_info.has_staff_access:
return
block_structure.remove_block_if(
lambda block_key: self.get_visible_to_staff_only(block_structure, block_key)
)
"""
...
"""
from openedx.core.lib.block_cache.user_info import UserInfo
from courseware.access import _has_access_to_course
class CourseUserInfo(UserInfo):
"""
...
"""
def __init__(self, course_key, user):
super(CourseUserInfo, self).__init__()
self.user = user
self.course_key = course_key
self._has_staff_access = None
@property
def has_staff_access(self):
if self._has_staff_access is None:
self._has_staff_access = _has_access_to_course(self.user, 'staff', self.course_key)
return self._has_staff_access
......@@ -1945,8 +1945,12 @@ INSTALLED_APPS = (
'lms.djangoapps.lms_xblock',
# Course data caching
'openedx.core.djangoapps.content.course_overviews',
'openedx.core.djangoapps.content.course_structures',
'lms.djangoapps.course_blocks',
# Old course structure API
'course_structure_api',
# Mailchimp Syncing
......
......@@ -219,6 +219,9 @@ CACHES = {
'course_structure_cache': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
},
'lms.course_blocks': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
},
}
# Dummy secret key for dev
......
......@@ -48,5 +48,10 @@ setup(
"cohort = openedx.core.djangoapps.course_groups.partition_scheme:CohortPartitionScheme",
"verification = openedx.core.djangoapps.credit.partition_schemes:VerificationPartitionScheme",
],
"openedx.block_structure_transformer": [
"visibility = lms.djangoapps.course_blocks.transformers.visibility:VisibilityTransformer",
"start_date = lms.djangoapps.course_blocks.transformers.start_date:StartDateTransformer",
"user_partitions = lms.djangoapps.course_blocks.transformers.user_partitions:UserPartitionTransformer",
],
}
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment