Commit eced849d by Andy Armstrong

Add group_access field to all xblocks

TNL-670
parent f24f01d2
......@@ -5,6 +5,8 @@ These are notable changes in edx-platform. This is a rolling list of changes,
in roughly chronological order, most recent first. Add your entries at or near
the top. Include a label indicating the component affected.
Platform: Add group_access field to all xblocks. TNL-670
LMS: Add support for user partitioning based on cohort. TNL-710
Platform: Add base support for cohorted group configurations. TNL-649
......
......@@ -28,9 +28,11 @@ class CourseMetadata(object):
'graded',
'hide_from_toc',
'pdf_textbooks',
'user_partitions',
'name', # from xblock
'tags', # from xblock
'visible_to_staff_only',
'group_access',
]
@classmethod
......
......@@ -106,7 +106,7 @@ class EnrollmentTest(ModuleStoreTestCase):
self.assertFalse(CourseEnrollment.is_enrolled(self.user, self.course.id))
@patch.dict(settings.FEATURES, {'ENABLE_MKTG_EMAIL_OPT_IN': True})
@patch('user_api.api.profile.update_email_opt_in')
@patch('openedx.core.djangoapps.user_api.api.profile.update_email_opt_in')
@ddt.data(
([], 'true'),
([], 'false'),
......
......@@ -109,7 +109,7 @@ from student.helpers import (
)
from xmodule.error_module import ErrorDescriptor
from shoppingcart.models import CourseRegistrationCode
from user_api.api import profile as profile_api
from openedx.core.djangoapps.user_api.api import profile as profile_api
import analytics
from eventtracking import tracker
......
"""Generate a list indicating whether users have opted in or out of receiving email from an org.
Email opt-in is stored as an org-level preference.
When reports are generated, we need to handle:
1) Org aliases: some organizations might have multiple course key "org" values.
We choose the most recently set preference among all org aliases.
Since this information isn't stored anywhere in edx-platform,
the caller needs to pass in the list of orgs and aliases.
2) No preference set: Some users may not have an opt-in preference set
if they enrolled before the preference was introduced.
These users are opted in by default.
3) Restricting to a subset of courses in an org: Some orgs have courses
that we don't want to include in the results (e.g. EdX-created test courses).
Allow the caller to explicitly specify the list of courses in the org.
The command will always use the read replica database if one is configured.
"""
import os.path
import csv
import time
import contextlib
import logging
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from django.db import connections
from opaque_keys.edx.keys import CourseKey
from xmodule.modulestore.django import modulestore
LOGGER = logging.getLogger(__name__)
class Command(BaseCommand):
"""Generate a list of email opt-in values for user enrollments. """
args = "<OUTPUT_FILENAME> <ORG_ALIASES> --courses=COURSE_ID_LIST"
help = "Generate a list of email opt-in values for user enrollments."
# Fields output in the CSV
OUTPUT_FIELD_NAMES = [
"email",
"full_name",
"course_id",
"is_opted_in_for_email",
"preference_set_date"
]
# Number of records to read at a time when making
# multiple queries over a potentially large dataset.
QUERY_INTERVAL = 1000
def handle(self, *args, **options):
"""Execute the command.
Arguments:
file_path (str): Path to the output file.
*org_list (unicode): List of organization aliases.
Keyword Arguments:
courses (unicode): Comma-separated list of course keys. If provided,
include only these courses in the results.
Raises:
CommandError
"""
file_path, org_list = self._parse_args(args)
# Retrieve all the courses for the org.
# If we were given a specific list of courses to include,
# filter out anything not in that list.
courses = self._get_courses_for_org(org_list)
only_courses = options.get("courses")
if only_courses is not None:
only_courses = [
CourseKey.from_string(course_key.strip())
for course_key in only_courses.split(",")
]
courses = list(set(courses) & set(only_courses))
# Add in organizations from the course keys, to ensure
# we're including orgs with different capitalizations
org_list = list(set(org_list) | set(course.org for course in courses))
# If no courses are found, abort
if not courses:
raise CommandError(
u"No courses found for orgs: {orgs}".format(
orgs=", ".join(org_list)
)
)
# Let the user know what's about to happen
LOGGER.info(
u"Retrieving data for courses: {courses}".format(
courses=", ".join([unicode(course) for course in courses])
)
)
# Open the output file and generate the report.
with open(file_path, "w") as file_handle:
with self._log_execution_time():
self._write_email_opt_in_prefs(file_handle, org_list, courses)
# Remind the user where the output file is
LOGGER.info(u"Output file: {file_path}".format(file_path=file_path))
def _parse_args(self, args):
"""Check and parse arguments.
Validates that the right number of args were provided
and that the output file doesn't already exist.
Arguments:
args (list): List of arguments given at the command line.
Returns:
Tuple of (file_path, org_list)
Raises:
CommandError
"""
if len(args) < 2:
raise CommandError(u"Usage: {args}".format(args=self.args))
file_path = args[0]
org_list = args[1:]
if os.path.exists(file_path):
raise CommandError("File already exists at '{path}'".format(path=file_path))
return file_path, org_list
def _get_courses_for_org(self, org_aliases):
"""Retrieve all course keys for a particular org.
Arguments:
org_aliases (list): List of aliases for the org.
Returns:
List of `CourseKey`s
"""
all_courses = modulestore().get_courses()
orgs_lowercase = [org.lower() for org in org_aliases]
return [
course.id
for course in all_courses
if course.id.org.lower() in orgs_lowercase
]
@contextlib.contextmanager
def _log_execution_time(self):
"""Context manager for measuring execution time. """
start_time = time.time()
yield
execution_time = time.time() - start_time
LOGGER.info(u"Execution time: {time} seconds".format(time=execution_time))
def _write_email_opt_in_prefs(self, file_handle, org_aliases, courses):
"""Write email opt-in preferences to the output file.
This will generate a CSV with one row for each enrollment.
This means that the user's "opt in" preference will be specified
multiple times if the user has enrolled in multiple courses
within the org. However, the values should always be the same:
if the user is listed as "opted out" for course A, she will
also be listed as "opted out" for courses B, C, and D.
Arguments:
file_handle (file): Handle to the output file.
org_aliases (list): List of aliases for the org.
courses (list): List of course keys in the org.
Returns:
None
"""
writer = csv.DictWriter(file_handle, fieldnames=self.OUTPUT_FIELD_NAMES)
cursor = self._db_cursor()
query = (
u"""
SELECT
user.`email` AS `email`,
profile.`name` AS `full_name`,
enrollment.`course_id` AS `course_id`,
(
SELECT value
FROM user_api_userorgtag
WHERE org IN ( {org_list} )
AND `key`=\"email-optin\"
AND `user_id`=user.`id`
ORDER BY modified DESC
LIMIT 1
) AS `is_opted_in_for_email`,
(
SELECT modified
FROM user_api_userorgtag
WHERE org IN ( {org_list} )
AND `key`=\"email-optin\"
AND `user_id`=user.`id`
ORDER BY modified DESC
LIMIT 1
) AS `preference_set_date`
FROM
student_courseenrollment AS enrollment
LEFT JOIN auth_user AS user ON user.id=enrollment.user_id
LEFT JOIN auth_userprofile AS profile ON profile.user_id=user.id
WHERE enrollment.course_id IN ( {course_id_list} )
"""
).format(
course_id_list=self._sql_list(courses),
org_list=self._sql_list(org_aliases)
)
cursor.execute(query)
row_count = 0
for row in self._iterate_results(cursor):
email, full_name, course_id, is_opted_in, pref_set_date = row
writer.writerow({
"email": email.encode('utf-8'),
"full_name": full_name.encode('utf-8'),
"course_id": course_id.encode('utf-8'),
"is_opted_in_for_email": is_opted_in if is_opted_in else "True",
"preference_set_date": pref_set_date,
})
row_count += 1
# Log the number of rows we processed
LOGGER.info(u"Retrieved {num_rows} records.".format(num_rows=row_count))
def _iterate_results(self, cursor):
"""Iterate through the results of a database query, fetching in chunks.
Arguments:
cursor: The database cursor
Yields:
tuple of row values from the query
"""
while True:
rows = cursor.fetchmany(self.QUERY_INTERVAL)
if not rows:
break
for row in rows:
yield row
def _sql_list(self, values):
"""Serialize a list of values for including in a SQL "IN" statement. """
return u",".join([u'"{}"'.format(val) for val in values])
def _db_cursor(self):
"""Return a database cursor to the read replica if one is available. """
# Use the read replica if one has been configured
db_alias = (
'read_replica'
if 'read_replica' in settings.DATABASES
else 'default'
)
return connections[db_alias].cursor()
# -*- coding: utf-8 -*-
"""Tests for the email opt-in list management command. """
import os.path
import tempfile
import shutil
import csv
from collections import defaultdict
from unittest import skipUnless
import ddt
from django.conf import settings
from django.test.utils import override_settings
from django.core.management.base import CommandError
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, mixed_store_config
from xmodule.modulestore.tests.factories import CourseFactory
from student.tests.factories import UserFactory, CourseEnrollmentFactory
from student.models import CourseEnrollment
import user_api.api.profile as profile_api
from user_api.models import UserOrgTag
from user_api.management.commands import email_opt_in_list
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False)
@ddt.ddt
@skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
class EmailOptInListTest(ModuleStoreTestCase):
"""Tests for the email opt-in list management command. """
USER_USERNAME = "test_user"
USER_FIRST_NAME = u"Ṫëṡẗ"
USER_LAST_NAME = u"Űśéŕ"
TEST_ORG = u"téśt_őŕǵ"
OUTPUT_FILE_NAME = "test_org_email_opt_in.csv"
OUTPUT_FIELD_NAMES = [
"email",
"full_name",
"course_id",
"is_opted_in_for_email",
"preference_set_date"
]
def setUp(self):
self.user = UserFactory.create(
username=self.USER_USERNAME,
first_name=self.USER_FIRST_NAME,
last_name=self.USER_LAST_NAME
)
self.courses = []
self.enrollments = defaultdict(list)
def test_not_enrolled(self):
self._create_courses_and_enrollments((self.TEST_ORG, False))
output = self._run_command(self.TEST_ORG)
# The user isn't enrolled in the course, so the output should be empty
self._assert_output(output)
def test_enrolled_no_pref(self):
self._create_courses_and_enrollments((self.TEST_ORG, True))
output = self._run_command(self.TEST_ORG)
# By default, if no preference is set by the user is enrolled, opt in
self._assert_output(output, (self.user, self.courses[0].id, True))
def test_enrolled_pref_opted_in(self):
self._create_courses_and_enrollments((self.TEST_ORG, True))
self._set_opt_in_pref(self.user, self.TEST_ORG, True)
output = self._run_command(self.TEST_ORG)
self._assert_output(output, (self.user, self.courses[0].id, True))
def test_enrolled_pref_opted_out(self):
self._create_courses_and_enrollments((self.TEST_ORG, True))
self._set_opt_in_pref(self.user, self.TEST_ORG, False)
output = self._run_command(self.TEST_ORG)
self._assert_output(output, (self.user, self.courses[0].id, False))
def test_opt_in_then_opt_out(self):
self._create_courses_and_enrollments((self.TEST_ORG, True))
self._set_opt_in_pref(self.user, self.TEST_ORG, True)
self._set_opt_in_pref(self.user, self.TEST_ORG, False)
output = self._run_command(self.TEST_ORG)
self._assert_output(output, (self.user, self.courses[0].id, False))
def test_exclude_non_org_courses(self):
# Enroll in a course that's not in the org
self._create_courses_and_enrollments(
(self.TEST_ORG, True),
("other_org", True)
)
# Opt out of the other course
self._set_opt_in_pref(self.user, "other_org", False)
# The first course is included in the results,
# but the second course is excluded,
# so the user should be opted in by default.
output = self._run_command(self.TEST_ORG)
self._assert_output(
output,
(self.user, self.courses[0].id, True),
expect_pref_datetime=False
)
def test_enrolled_conflicting_prefs(self):
# Enroll in two courses, both in the org
self._create_courses_and_enrollments(
(self.TEST_ORG, True),
("org_alias", True)
)
# Opt into the first course, then opt out of the second course
self._set_opt_in_pref(self.user, self.TEST_ORG, True)
self._set_opt_in_pref(self.user, "org_alias", False)
# The second preference change should take precedence
# Note that *both* courses are included in the list,
# but they should have the same value.
output = self._run_command(self.TEST_ORG, other_names=["org_alias"])
self._assert_output(
output,
(self.user, self.courses[0].id, False),
(self.user, self.courses[1].id, False)
)
# Opt into the first course
# Even though the other course still has a preference set to false,
# the newest preference takes precedence
self._set_opt_in_pref(self.user, self.TEST_ORG, True)
output = self._run_command(self.TEST_ORG, other_names=["org_alias"])
self._assert_output(
output,
(self.user, self.courses[0].id, True),
(self.user, self.courses[1].id, True)
)
@ddt.data(True, False)
def test_unenrolled_from_all_courses(self, opt_in_pref):
# Enroll in the course and set a preference
self._create_courses_and_enrollments((self.TEST_ORG, True))
self._set_opt_in_pref(self.user, self.TEST_ORG, opt_in_pref)
# Unenroll from the course
CourseEnrollment.unenroll(self.user, self.courses[0].id, skip_refund=True)
# Enrollments should still appear in the outpu
output = self._run_command(self.TEST_ORG)
self._assert_output(output, (self.user, self.courses[0].id, opt_in_pref))
def test_unenrolled_from_some_courses(self):
# Enroll in several courses in the org
self._create_courses_and_enrollments(
(self.TEST_ORG, True),
(self.TEST_ORG, True),
(self.TEST_ORG, True),
("org_alias", True)
)
# Set a preference for the aliased course
self._set_opt_in_pref(self.user, "org_alias", False)
# Unenroll from the aliased course
CourseEnrollment.unenroll(self.user, self.courses[3].id, skip_refund=True)
# Expect that the preference still applies,
# and all the enrollments should appear in the list
output = self._run_command(self.TEST_ORG, other_names=["org_alias"])
self._assert_output(
output,
(self.user, self.courses[0].id, False),
(self.user, self.courses[1].id, False),
(self.user, self.courses[2].id, False),
(self.user, self.courses[3].id, False)
)
def test_no_courses_for_org_name(self):
self._create_courses_and_enrollments((self.TEST_ORG, True))
self._set_opt_in_pref(self.user, self.TEST_ORG, True)
# No course available for this particular org
with self.assertRaisesRegexp(CommandError, "^No courses found for orgs:"):
self._run_command("other_org")
def test_specify_subset_of_courses(self):
# Create several courses in the same org
self._create_courses_and_enrollments(
(self.TEST_ORG, True),
(self.TEST_ORG, True),
(self.TEST_ORG, True),
)
# Execute the command, but exclude the second course from the list
only_courses = [self.courses[0].id, self.courses[1].id]
self._run_command(self.TEST_ORG, only_courses=only_courses)
# Choose numbers before and after the query interval boundary
@ddt.data(2, 3, 4, 5, 6, 7, 8, 9)
def test_many_users(self, num_users):
# Create many users and enroll them in the test course
course = CourseFactory.create(org=self.TEST_ORG)
usernames = []
for _ in range(num_users):
user = UserFactory.create()
usernames.append(user.username)
CourseEnrollmentFactory.create(course_id=course.id, user=user)
# Generate the report
output = self._run_command(self.TEST_ORG, query_interval=4)
# Expect that every enrollment shows up in the report
output_emails = [row["email"] for row in output]
for email in output_emails:
self.assertIn(email, output_emails)
def test_org_capitalization(self):
# Lowercase some of the org names in the course IDs
self._create_courses_and_enrollments(
("MyOrg", True),
("myorg", True)
)
# Set preferences for both courses
self._set_opt_in_pref(self.user, "MyOrg", True)
self._set_opt_in_pref(self.user, "myorg", False)
# Execute the command, expecting both enrollments to show up
# We're passing in the uppercase org, but we set the lowercase
# version more recently, so we expect the lowercase org
# preference to apply.
output = self._run_command("MyOrg")
self._assert_output(
output,
(self.user, self.courses[0].id, False),
(self.user, self.courses[1].id, False)
)
@ddt.data(0, 1)
def test_not_enough_args(self, num_args):
args = ["dummy"] * num_args
expected_msg_regex = "^Usage: <OUTPUT_FILENAME> <ORG_ALIASES> --courses=COURSE_ID_LIST$"
with self.assertRaisesRegexp(CommandError, expected_msg_regex):
email_opt_in_list.Command().handle(*args)
def test_file_already_exists(self):
temp_file = tempfile.NamedTemporaryFile(delete=True)
def _cleanup(): # pylint: disable=missing-docstring
temp_file.close()
with self.assertRaisesRegexp(CommandError, "^File already exists"):
email_opt_in_list.Command().handle(temp_file.name, self.TEST_ORG)
def _create_courses_and_enrollments(self, *args):
"""Create courses and enrollments.
Created courses and enrollments are stored in instance variables
so tests can refer to them later.
Arguments:
*args: Tuples of (course_org, should_enroll), where
course_org is the name of the org in the course key
and should_enroll is a boolean indicating whether to enroll
the user in the course.
Returns:
None
"""
for course_number, (course_org, should_enroll) in enumerate(args):
course = CourseFactory.create(org=course_org, number=str(course_number))
if should_enroll:
enrollment = CourseEnrollmentFactory.create(
is_active=True,
course_id=course.id,
user=self.user
)
self.enrollments[course.id].append(enrollment)
self.courses.append(course)
def _set_opt_in_pref(self, user, org, is_opted_in):
"""Set the email opt-in preference.
Arguments:
user (User): The user model.
org (unicode): The org in the course key.
is_opted_in (bool): Whether the user is opted in or out of emails.
Returns:
None
"""
profile_api.update_email_opt_in(user.username, org, is_opted_in)
def _latest_pref_set_date(self, user):
"""Retrieve the latest opt-in preference for the user,
across all orgs and preference keys.
Arguments:
user (User): The user whos preference was set.
Returns:
ISO-formatted date string or empty string
"""
pref = UserOrgTag.objects.filter(user=user).order_by("-modified")
return pref[0].modified.isoformat(' ') if len(pref) > 0 else ""
def _run_command(self, org, other_names=None, only_courses=None, query_interval=None):
"""Execute the management command to generate the email opt-in list.
Arguments:
org (unicode): The org to generate the report for.
Keyword Arguments:
other_names (list): List of other aliases for the org.
only_courses (list): If provided, include only these course IDs in the report.
query_interval (int): If provided, override the default query interval.
Returns:
list: The rows of the generated CSV report. Each item is a dictionary.
"""
# Create a temporary directory for the output
# Delete it when we're finished
temp_dir_path = tempfile.mkdtemp()
def _cleanup(): # pylint: disable=missing-docstring
shutil.rmtree(temp_dir_path)
self.addCleanup(_cleanup)
# Sanitize the arguments
if other_names is None:
other_names = []
output_path = os.path.join(temp_dir_path, self.OUTPUT_FILE_NAME)
org_list = [org] + other_names
if only_courses is not None:
only_courses = ",".join(unicode(course_id) for course_id in only_courses)
command = email_opt_in_list.Command()
# Override the query interval to speed up the tests
if query_interval is not None:
command.QUERY_INTERVAL = query_interval
# Execute the command
command.handle(output_path, *org_list, courses=only_courses)
# Retrieve the output from the file
try:
with open(output_path) as output_file:
reader = csv.DictReader(output_file, fieldnames=self.OUTPUT_FIELD_NAMES)
rows = [row for row in reader]
except IOError:
self.fail("Could not find or open output file at '{path}'".format(path=output_path))
# Return the output as a list of dictionaries
return rows
def _assert_output(self, output, *args, **kwargs):
"""Check the output of the report.
Arguments:
output (list): List of rows in the output CSV file.
*args: Tuples of (user, course_id, opt_in_pref)
Keyword Arguments:
expect_pref_datetime (bool): If false, expect an empty
string for the preference.
Returns:
None
Raises:
AssertionError
"""
self.assertEqual(len(output), len(args))
for user, course_id, opt_in_pref in args:
self.assertIn({
"email": user.email.encode('utf-8'),
"full_name": user.profile.name.encode('utf-8'),
"course_id": unicode(course_id).encode('utf-8'),
"is_opted_in_for_email": unicode(opt_in_pref),
"preference_set_date": (
self._latest_pref_set_date(self.user)
if kwargs.get("expect_pref_datetime", True)
else ""
)
}, output)
......@@ -57,6 +57,15 @@ class InheritanceMixin(XBlockMixin):
default=False,
scope=Scope.settings,
)
group_access = Dict(
help="A dictionary that maps which groups can be shown this block. The keys "
"are group configuration ids and the values are a list of group IDs. "
"If there is no key for a group configuration or if the list of group IDs "
"is empty then the block is considered visible to all. Note that this "
"field is ignored if the block is visible_to_staff_only.",
default={},
scope=Scope.settings,
)
course_edit_method = String(
display_name=_("Course Editor"),
help=_("Enter the method by which this course is edited (\"XML\" or \"Studio\")."),
......
......@@ -80,10 +80,6 @@ class SplitTestFields(object):
# location needs to actually match one of the children of this
# Block. (expected invariant that we'll need to test, and handle
# authoring tools that mess this up)
# TODO: is there a way to add some validation around this, to
# be run on course load or in studio or ....
group_id_to_child = ReferenceValueDict(
help=_("Which child module students in a particular group_id should see"),
scope=Scope.content
......
......@@ -40,7 +40,6 @@ class SplitTestMixin(object):
partition_id, name, description, groups, MockUserPartitionScheme("random")
).to_json()
def verify_groups(self, container, active_groups, inactive_groups, verify_missing_groups_not_present=True):
"""
Check that the groups appear and are correctly categorized as to active and inactive.
......
......@@ -17,19 +17,15 @@ from django_comment_client.tests.utils import CohortedContentTestCase
from django_comment_client.utils import strip_none
from student.tests.factories import UserFactory, CourseEnrollmentFactory
from util.testing import UrlResetMixin
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.django_utils import TEST_DATA_MOCK_MODULESTORE
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase, TEST_DATA_MOCK_MODULESTORE
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from courseware.tests.modulestore_config import TEST_DATA_DIR
from courseware.courses import UserNotEnrolled
from nose.tools import assert_true # pylint: disable=E0611
from mock import patch, Mock, ANY, call
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
TEST_DATA_MONGO_MODULESTORE = mixed_store_config(TEST_DATA_DIR, {}, include_xml=False)
log = logging.getLogger(__name__)
# pylint: disable=missing-docstring
......
......@@ -2,7 +2,7 @@ from django.test.utils import override_settings
from mock import patch
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
from courseware.tests.modulestore_config import TEST_DATA_MIXED_MODULESTORE
from xmodule.modulestore.tests.django_utils import TEST_DATA_MOCK_MODULESTORE
from django_comment_common.models import Role
from django_comment_common.utils import seed_permissions_roles
from student.tests.factories import CourseEnrollmentFactory, UserFactory
......
......@@ -2,7 +2,6 @@
Tests for instructor.basic
"""
from django.test import TestCase
from student.models import CourseEnrollment
from django.core.urlresolvers import reverse
from mock import patch
......@@ -18,11 +17,9 @@ from instructor_analytics.basic import (
coupon_codes_features, AVAILABLE_FEATURES, STUDENT_FEATURES, PROFILE_FEATURES
)
from openedx.core.djangoapps.course_groups.tests.helpers import CohortFactory
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
from courseware.tests.factories import InstructorFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
class TestAnalyticsBasic(ModuleStoreTestCase):
......
"""
Namespace that defines fields common to all blocks used in the LMS
"""
from xblock.fields import Boolean, Scope, String, XBlockMixin
from xblock.fields import Boolean, Scope, String, XBlockMixin, Dict
from xblock.validation import ValidationMessage
from xmodule.modulestore.inheritance import UserPartitionList
# Make '_' a no-op so we can scrape strings
_ = lambda text: text
......@@ -53,3 +55,73 @@ class LmsBlockMixin(XBlockMixin):
default=False,
scope=Scope.settings,
)
group_access = Dict(
help="A dictionary that maps which groups can be shown this block. The keys "
"are group configuration ids and the values are a list of group IDs. "
"If there is no key for a group configuration or if the list of group IDs "
"is empty then the block is considered visible to all. Note that this "
"field is ignored if the block is visible_to_staff_only.",
default={},
scope=Scope.settings,
)
# Specified here so we can see what the value set at the course-level is.
user_partitions = UserPartitionList(
help=_("The list of group configurations for partitioning students in content experiments."),
default=[],
scope=Scope.settings
)
def _get_user_partition(self, user_partition_id):
"""
Returns the user partition with the specified id, or None if there is no such partition.
"""
for user_partition in self.user_partitions:
if user_partition.id == user_partition_id:
return user_partition
return None
def is_visible_to_group(self, user_partition, group):
"""
Returns true if this xblock should be shown to a user in the specified user partition group.
This method returns true if one of the following is true:
- the xblock has no group_access dictionary specified
- if the dictionary has no key for the user partition's id
- if the value for the user partition's id is an empty list
- if the value for the user partition's id contains the specified group's id
"""
if not self.group_access:
return True
group_ids = self.group_access.get(user_partition.id, [])
if len(group_ids) == 0:
return True
return group.id in group_ids
def validate(self):
"""
Validates the state of this xblock instance.
"""
_ = self.runtime.service(self, "i18n").ugettext # pylint: disable=redefined-outer-name
validation = super(LmsBlockMixin, self).validate()
for user_partition_id, group_ids in self.group_access.iteritems():
user_partition = self._get_user_partition(user_partition_id)
if not user_partition:
validation.add(
ValidationMessage(
ValidationMessage.ERROR,
_(u"This xblock refers to a deleted or invalid content group configuration.")
)
)
else:
for group_id in group_ids:
group = user_partition.get_group(group_id)
if not group:
validation.add(
ValidationMessage(
ValidationMessage.ERROR,
_(u"This xblock refers to a deleted or invalid content group.")
)
)
return validation
"""
Tests of the LMS XBlock Mixin
"""
from xblock.validation import ValidationMessage
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.partitions.partitions import Group, UserPartition
class LmsXBlockMixinTestCase(ModuleStoreTestCase):
"""
Base class for XBlock mixin tests cases. A simple course with a single user partition is created
in setUp for all subclasses to use.
"""
def setUp(self):
super(LmsXBlockMixinTestCase, self).setUp()
self.user_partition = UserPartition(
0,
'first_partition',
'First Partition',
[
Group(0, 'alpha'),
Group(1, 'beta')
]
)
self.group1 = self.user_partition.groups[0] # pylint: disable=no-member
self.group2 = self.user_partition.groups[1] # pylint: disable=no-member
self.course = CourseFactory.create(user_partitions=[self.user_partition])
self.section = ItemFactory.create(parent=self.course, category='chapter', display_name='Test Section')
self.subsection = ItemFactory.create(parent=self.section, category='sequential', display_name='Test Subsection')
self.vertical = ItemFactory.create(parent=self.subsection, category='vertical', display_name='Test Unit')
self.video = ItemFactory.create(parent=self.subsection, category='video', display_name='Test Video')
class XBlockValidationTest(LmsXBlockMixinTestCase):
"""
Unit tests for XBlock validation
"""
def verify_validation_message(self, message, expected_message, expected_message_type):
"""
Verify that the validation message has the expected validation message and type.
"""
self.assertEqual(message.text, expected_message)
self.assertEqual(message.type, expected_message_type)
def test_validate_full_group_access(self):
"""
Test the validation messages produced for an xblock with full group access.
"""
validation = self.video.validate()
self.assertEqual(len(validation.messages), 0)
def test_validate_restricted_group_access(self):
"""
Test the validation messages produced for an xblock with a valid group access restriction
"""
self.video.group_access[self.user_partition.id] = [self.group1.id, self.group2.id] # pylint: disable=no-member
validation = self.video.validate()
self.assertEqual(len(validation.messages), 0)
def test_validate_invalid_user_partition(self):
"""
Test the validation messages produced for an xblock referring to a non-existent user partition.
"""
self.video.group_access[999] = [self.group1.id]
validation = self.video.validate()
self.assertEqual(len(validation.messages), 1)
self.verify_validation_message(
validation.messages[0],
u"This xblock refers to a deleted or invalid content group configuration.",
ValidationMessage.ERROR,
)
def test_validate_invalid_group(self):
"""
Test the validation messages produced for an xblock referring to a non-existent group.
"""
self.video.group_access[self.user_partition.id] = [self.group1.id, 999] # pylint: disable=no-member
validation = self.video.validate()
self.assertEqual(len(validation.messages), 1)
self.verify_validation_message(
validation.messages[0],
u"This xblock refers to a deleted or invalid content group.",
ValidationMessage.ERROR,
)
class XBlockGroupAccessTest(LmsXBlockMixinTestCase):
"""
Unit tests for XBlock group access.
"""
def test_is_visible_to_group(self):
"""
Test the behavior of is_visible_to_group.
"""
# All groups are visible for an unrestricted xblock
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group1))
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group2))
# Verify that all groups are visible if the set of group ids is empty
self.video.group_access[self.user_partition.id] = [] # pylint: disable=no-member
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group1))
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group2))
# Verify that only specified groups are visible
self.video.group_access[self.user_partition.id] = [self.group1.id] # pylint: disable=no-member
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group1))
self.assertFalse(self.video.is_visible_to_group(self.user_partition, self.group2))
# Verify that having an invalid user partition does not affect group visibility of other partitions
self.video.group_access[999] = [self.group1.id]
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group1))
self.assertFalse(self.video.is_visible_to_group(self.user_partition, self.group2))
# Verify that group access is still correct even with invalid group ids
self.video.group_access.clear()
self.video.group_access[self.user_partition.id] = [self.group2.id, 999] # pylint: disable=no-member
self.assertFalse(self.video.is_visible_to_group(self.user_partition, self.group1))
self.assertTrue(self.video.is_visible_to_group(self.user_partition, self.group2))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment