Commit 3ebac807 by Don Mitchell

Replace authz fns with roles.py ones

STUD-1006
parent 858e354a
# pylint: disable=C0111
# pylint: disable=W0621
import time
import os
from lettuce import world, step
from nose.tools import assert_true, assert_in, assert_false # pylint: disable=E0611
from auth.authz import get_user_by_email, get_course_groupname_for_role
from nose.tools import assert_true, assert_in # pylint: disable=no-name-in-module
from django.conf import settings
from student.roles import CourseRole, CourseStaffRole, CourseInstructorRole
from student.models import get_user
from selenium.webdriver.common.keys import Keys
import time
import os
from django.contrib.auth.models import Group
from logging import getLogger
from student.tests.factories import AdminFactory
from student import auth
logger = getLogger(__name__)
from terrain.browser import reset_data
......@@ -158,11 +160,9 @@ def add_course_author(user, course):
Add the user to the instructor group of the course
so they will have the permissions to see it in studio
"""
for role in ("staff", "instructor"):
groupname = get_course_groupname_for_role(course.location, role)
group, __ = Group.objects.get_or_create(name=groupname)
user.groups.add(group)
user.save()
global_admin = AdminFactory()
for role in (CourseStaffRole, CourseInstructorRole):
auth.add_users(global_admin, role(course.location), user)
def create_a_course():
......@@ -171,7 +171,7 @@ def create_a_course():
user = world.scenario_dict.get("USER")
if not user:
user = get_user_by_email('robot+studio@edx.org')
user = get_user('robot+studio@edx.org')
add_course_author(user, course)
......@@ -358,7 +358,7 @@ def other_user_login(step, name):
login_form.find_by_name('submit').click()
world.retry_on_exception(fill_login_form)
assert_true(world.is_css_present('.new-course-button'))
world.scenario_dict['USER'] = get_user_by_email(name + '@edx.org')
world.scenario_dict['USER'] = get_user(name + '@edx.org')
@step(u'the user "([^"]*)" exists( as a course (admin|staff member|is_staff))?$')
......@@ -368,18 +368,17 @@ def create_other_user(_step, name, has_extra_perms, role_name):
if has_extra_perms:
if role_name == "is_staff":
user.is_staff = True
user.save()
else:
if role_name == "admin":
# admins get staff privileges, as well
roles = ("staff", "instructor")
roles = (CourseStaffRole, CourseInstructorRole)
else:
roles = ("staff",)
roles = (CourseStaffRole,)
location = world.scenario_dict["COURSE"].location
global_admin = AdminFactory()
for role in roles:
groupname = get_course_groupname_for_role(location, role)
group, __ = Group.objects.get_or_create(name=groupname)
user.groups.add(group)
user.save()
auth.add_users(global_admin, role(location), user)
@step('I log out')
......@@ -393,7 +392,7 @@ def i_edit_a_draft(_step):
@step(u'I click on "replace with draft"$')
def i_edit_a_draft(_step):
def i_replace_w_draft(_step):
world.css_click("a.publish-draft")
......
......@@ -6,8 +6,7 @@ from xmodule.modulestore.store_utilities import clone_course
from xmodule.modulestore.django import modulestore
from xmodule.contentstore.django import contentstore
from xmodule.course_module import CourseDescriptor
from auth.authz import _copy_course_group
from student.roles import CourseInstructorRole, CourseStaffRole
#
......@@ -20,7 +19,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
"Execute the command"
if len(args) != 2:
raise CommandError("clone requires two arguments: <source-course_id> <dest-course_id>")
raise CommandError("clone requires 2 arguments: <source-course_id> <dest-course_id>")
source_course_id = args[0]
dest_course_id = args[1]
......@@ -28,7 +27,7 @@ class Command(BaseCommand):
mstore = modulestore('direct')
cstore = contentstore()
org, course_num, run = dest_course_id.split("/")
org, course_num, _ = dest_course_id.split("/")
mstore.ignore_write_events_on_courses.append('{0}/{1}'.format(org, course_num))
print("Cloning course {0} to {1}".format(source_course_id, dest_course_id))
......@@ -41,4 +40,10 @@ class Command(BaseCommand):
mstore.refresh_cached_metadata_inheritance_tree(dest_location)
print("copying User permissions...")
_copy_course_group(source_location, dest_location)
# purposely avoids auth.add_user b/c it doesn't have a caller to authorize
CourseInstructorRole(dest_location).add_users(
*CourseInstructorRole(source_location).users_with_role()
)
CourseStaffRole(dest_location).add_users(
*CourseStaffRole(source_location).users_with_role()
)
......@@ -20,8 +20,6 @@ from django.dispatch import Signal
from contentstore.utils import get_modulestore
from contentstore.tests.utils import parse_json, AjaxEnabledTestClient
from auth.authz import add_user_to_creator_group
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from contentstore.tests.modulestore_config import TEST_MODULESTORE
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
......@@ -57,6 +55,8 @@ import re
from contentstore.utils import delete_course_and_groups
from xmodule.modulestore.django import loc_mapper
from student.roles import CourseCreatorRole
from student import auth
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
TEST_DATA_CONTENTSTORE['DOC_STORE_CONFIG']['db'] = 'test_xcontent_%s' % uuid4().hex
......@@ -1530,7 +1530,7 @@ class ContentStoreTest(ModuleStoreTestCase):
def test_create_course_with_course_creator(self):
"""Test new course creation -- use course creator group"""
with mock.patch.dict('django.conf.settings.FEATURES', {"ENABLE_CREATOR_GROUP": True}):
add_user_to_creator_group(self.user, self.user)
auth.add_users(self.user, CourseCreatorRole(), self.user)
self.assert_created_course()
def assert_course_permission_denied(self):
......
"""
Test CRUD for authorization.
"""
import copy
from django.test.utils import override_settings
from django.contrib.auth.models import User, Group
......@@ -9,10 +11,9 @@ from contentstore.tests.modulestore_config import TEST_MODULESTORE
from contentstore.tests.utils import AjaxEnabledTestClient
from xmodule.modulestore.django import loc_mapper
from xmodule.modulestore import Location
from auth.authz import INSTRUCTOR_ROLE_NAME, STAFF_ROLE_NAME
from auth import authz
import copy
from contentstore.views.access import has_access
from student.roles import CourseInstructorRole, CourseStaffRole
from contentstore.views.access import has_course_access
from student import auth
@override_settings(MODULESTORE=TEST_MODULESTORE)
......@@ -87,30 +88,35 @@ class TestCourseAccess(ModuleStoreTestCase):
Test getting all authors for a course where their permissions run the gamut of allowed group
types.
"""
# first check the groupname for the course creator.
# first check the course creator.has explicit access (don't use has_access as is_staff
# will trump the actual test)
self.assertTrue(
self.user.groups.filter(
name="{}_{}".format(INSTRUCTOR_ROLE_NAME, self.course_locator.package_id)
).exists(),
CourseInstructorRole(self.course_locator).has_user(self.user),
"Didn't add creator as instructor."
)
users = copy.copy(self.users)
# doesn't use role.users_with_role b/c it's verifying the roles.py behavior
user_by_role = {}
# add the misc users to the course in different groups
for role in [INSTRUCTOR_ROLE_NAME, STAFF_ROLE_NAME]:
for role in [CourseInstructorRole, CourseStaffRole]:
user_by_role[role] = []
groupnames, _ = authz.get_all_course_role_groupnames(self.course_locator, role)
# pylint: disable=protected-access
groupnames = role(self.course_locator)._group_names
self.assertGreater(len(groupnames), 1, "Only 0 or 1 groupname for {}".format(role.ROLE))
# NOTE: this loop breaks the roles.py abstraction by purposely assigning
# users to one of each possible groupname in order to test that has_course_access
# and remove_user work
for groupname in groupnames:
group, _ = Group.objects.get_or_create(name=groupname)
user = users.pop()
user_by_role[role].append(user)
user.groups.add(group)
user.save()
self.assertTrue(has_access(user, self.course_locator), "{} does not have access".format(user))
self.assertTrue(has_access(user, self.course_location), "{} does not have access".format(user))
self.assertTrue(has_course_access(user, self.course_locator), "{} does not have access".format(user))
self.assertTrue(has_course_access(user, self.course_location), "{} does not have access".format(user))
response = self.client.get_html(self.course_locator.url_reverse('course_team'))
for role in [INSTRUCTOR_ROLE_NAME, STAFF_ROLE_NAME]:
for role in [CourseInstructorRole, CourseStaffRole]:
for user in user_by_role[role]:
self.assertContains(response, user.email)
......@@ -119,9 +125,23 @@ class TestCourseAccess(ModuleStoreTestCase):
copy_course_locator = loc_mapper().translate_location(
copy_course_location.course_id, copy_course_location, False, True
)
# pylint: disable=protected-access
authz._copy_course_group(self.course_locator, copy_course_locator)
for role in [INSTRUCTOR_ROLE_NAME, STAFF_ROLE_NAME]:
for role in [CourseInstructorRole, CourseStaffRole]:
auth.add_users(
self.user,
role(copy_course_locator),
*role(self.course_locator).users_with_role()
)
# verify access in copy course and verify that removal from source course w/ the various
# groupnames works
for role in [CourseInstructorRole, CourseStaffRole]:
for user in user_by_role[role]:
self.assertTrue(has_access(user, copy_course_locator), "{} no copy access".format(user))
self.assertTrue(has_access(user, copy_course_location), "{} no copy access".format(user))
# forcefully decache the groups: premise is that any real request will not have
# multiple objects repr the same user but this test somehow uses different instance
# in above add_users call
if hasattr(user, '_groups'):
del user._groups
self.assertTrue(has_course_access(user, copy_course_locator), "{} no copy access".format(user))
self.assertTrue(has_course_access(user, copy_course_location), "{} no copy access".format(user))
auth.remove_users(self.user, role(self.course_locator), user)
self.assertFalse(has_course_access(user, self.course_locator), "{} remove didn't work".format(user))
......@@ -13,10 +13,10 @@ from xmodule.modulestore import Location
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.exceptions import ItemNotFoundError
from django_comment_common.utils import unseed_permissions_roles
from auth.authz import _delete_course_group
from xmodule.modulestore.store_utilities import delete_course
from xmodule.course_module import CourseDescriptor
from xmodule.modulestore.draft import DIRECT_ONLY_CATEGORIES
from student.roles import CourseInstructorRole, CourseStaffRole
log = logging.getLogger(__name__)
......@@ -35,7 +35,7 @@ def delete_course_and_groups(course_id, commit=False):
module_store = modulestore('direct')
content_store = contentstore()
org, course_num, run = course_id.split("/")
org, course_num, _ = course_id.split("/")
module_store.ignore_write_events_on_courses.append('{0}/{1}'.format(org, course_num))
loc = CourseDescriptor.id_to_location(course_id)
......@@ -47,7 +47,10 @@ def delete_course_and_groups(course_id, commit=False):
# in the django layer, we need to remove all the user permissions groups associated with this course
if commit:
try:
_delete_course_group(loc)
staff_role = CourseStaffRole(loc)
staff_role.remove_users(*staff_role.users_with_role())
instructor_role = CourseInstructorRole(loc)
instructor_role.remove_users(*instructor_role.users_with_role())
except Exception as err:
log.error("Error in deleting course groups for {0}: {1}".format(loc, err))
......
from auth.authz import STAFF_ROLE_NAME, INSTRUCTOR_ROLE_NAME
from auth.authz import is_user_in_course_group_role
from ..utils import get_course_location_for_item
from xmodule.modulestore.locator import CourseLocator
from student.roles import CourseStaffRole, GlobalStaff
from student import auth
def has_access(user, location, role=STAFF_ROLE_NAME):
def has_course_access(user, location, role=CourseStaffRole):
"""
Return True if user allowed to access this piece of data
Note that the CMS permissions model is with respect to courses
......@@ -14,14 +14,9 @@ def has_access(user, location, role=STAFF_ROLE_NAME):
will not be in both INSTRUCTOR and STAFF groups, so we have to cascade our
queries here as INSTRUCTOR has all the rights that STAFF do
"""
if GlobalStaff().has_user(user):
return True
if not isinstance(location, CourseLocator):
# this can be expensive if location is not category=='course'
location = get_course_location_for_item(location)
_has_access = is_user_in_course_group_role(user, location, role)
# if we're not in STAFF, perhaps we're in INSTRUCTOR groups
if not _has_access and role == STAFF_ROLE_NAME:
_has_access = is_user_in_course_group_role(
user,
location,
INSTRUCTOR_ROLE_NAME
)
return _has_access
return auth.has_access(user, role(location))
......@@ -21,13 +21,13 @@ from xmodule.modulestore import InvalidLocationError
from xmodule.exceptions import NotFoundError
from django.core.exceptions import PermissionDenied
from xmodule.modulestore.django import loc_mapper
from .access import has_access
from xmodule.modulestore.locator import BlockUsageLocator
from util.json_request import JsonResponse
from django.http import HttpResponseNotFound
from django.utils.translation import ugettext as _
from pymongo import ASCENDING, DESCENDING
from .access import has_course_access
__all__ = ['assets_handler']
......@@ -56,7 +56,7 @@ def assets_handler(request, tag=None, package_id=None, branch=None, version_guid
json: delete an asset
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
response_format = request.REQUEST.get('format', 'html')
......
......@@ -15,7 +15,7 @@ from xmodule.modulestore.inheritance import own_metadata
from ..utils import get_modulestore
from .access import has_access
from .access import has_course_access
from xmodule.course_module import CourseDescriptor
from xmodule.modulestore.locator import BlockUsageLocator
......@@ -37,7 +37,7 @@ def checklists_handler(request, tag=None, package_id=None, branch=None, version_
json: updates the checked state for items within a particular checklist. checklist_index is required.
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(location)
......
......@@ -29,7 +29,7 @@ from contentstore.utils import get_lms_link_for_item, compute_unit_state, UnitSt
from models.settings.course_grading import CourseGradingModel
from .access import has_access
from .access import has_course_access
__all__ = ['OPEN_ENDED_COMPONENT_TYPES',
'ADVANCED_COMPONENT_POLICY_KEY',
......@@ -311,7 +311,7 @@ def _get_item_in_course(request, locator):
Verifies that the caller has permission to access this item.
"""
if not has_access(request.user, locator):
if not has_course_access(request.user, locator):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(locator)
......
......@@ -35,10 +35,9 @@ from models.settings.course_details import CourseDetails, CourseSettingsEncoder
from models.settings.course_grading import CourseGradingModel
from models.settings.course_metadata import CourseMetadata
from auth.authz import create_all_course_groups, is_user_in_creator_group
from util.json_request import expect_json
from .access import has_access
from .access import has_course_access
from .tabs import initialize_course_tabs
from .component import (
OPEN_ENDED_COMPONENT_TYPES, NOTE_COMPONENT_TYPES,
......@@ -52,6 +51,8 @@ from xmodule.html_module import AboutDescriptor
from xmodule.modulestore.locator import BlockUsageLocator
from course_creators.views import get_course_creator_status, add_user_with_status_unrequested
from contentstore import utils
from student.roles import CourseInstructorRole, CourseStaffRole, CourseCreatorRole
from student import auth
__all__ = ['course_info_handler', 'course_handler', 'course_info_update_handler',
'settings_handler',
......@@ -66,7 +67,7 @@ def _get_locator_and_course(package_id, branch, version_guid, block_id, user, de
for the view functions in this file.
"""
locator = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block_id)
if not has_access(user, locator):
if not has_course_access(user, locator):
raise PermissionDenied()
course_location = loc_mapper().translate_locator_to_location(locator)
course_module = modulestore().get_item(course_location, depth=depth)
......@@ -102,7 +103,7 @@ def course_handler(request, tag=None, package_id=None, branch=None, version_guid
raise NotImplementedError('coming soon')
elif request.method == 'POST': # not sure if this is only post. If one will have ids, it goes after access
return create_new_course(request)
elif not has_access(
elif not has_course_access(
request.user,
BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
):
......@@ -136,7 +137,7 @@ def course_listing(request):
"""
Get courses to which this user has access
"""
return (has_access(request.user, course.location)
return (has_course_access(request.user, course.location)
# pylint: disable=fixme
# TODO remove this condition when templates purged from db
and course.location.course != 'templates'
......@@ -207,7 +208,7 @@ def create_new_course(request):
Returns the URL for the course overview page.
"""
if not is_user_in_creator_group(request.user):
if not auth.has_access(request.user, CourseCreatorRole()):
raise PermissionDenied()
org = request.json.get('org')
......@@ -297,7 +298,10 @@ def create_new_course(request):
initialize_course_tabs(new_course)
new_location = loc_mapper().translate_location(new_course.location.course_id, new_course.location, False, True)
create_all_course_groups(request.user, new_location)
# can't use auth.add_users here b/c it requires request.user to already have Instructor perms in this course
# however, we can assume that b/c this user had authority to create the course, the user can add themselves
CourseInstructorRole(new_location).add_users(request.user)
auth.add_users(request.user, CourseStaffRole(new_location), request.user)
# seed the forums
seed_permissions_roles(new_course.location.course_id)
......@@ -370,7 +374,7 @@ def course_info_update_handler(request, tag=None, package_id=None, branch=None,
provided_id = None
# check that logged in user has permissions to this item (GET shouldn't require this level?)
if not has_access(request.user, updates_location):
if not has_course_access(request.user, updates_location):
raise PermissionDenied()
if request.method == 'GET':
......
......@@ -22,7 +22,6 @@ from django.views.decorators.http import require_http_methods, require_GET
from django.utils.translation import ugettext as _
from edxmako.shortcuts import render_to_response
from auth.authz import create_all_course_groups
from xmodule.modulestore.xml_importer import import_from_xml
from xmodule.contentstore.django import contentstore
......@@ -31,10 +30,12 @@ from xmodule.modulestore.django import modulestore, loc_mapper
from xmodule.exceptions import SerializationError
from xmodule.modulestore.locator import BlockUsageLocator
from .access import has_access
from .access import has_course_access
from util.json_request import JsonResponse
from extract_tar import safetar_extractall
from student.roles import CourseInstructorRole, CourseStaffRole
from student import auth
__all__ = ['import_handler', 'import_status_handler', 'export_handler']
......@@ -61,7 +62,7 @@ def import_handler(request, tag=None, package_id=None, branch=None, version_guid
json: import a course via the .tar.gz file specified in request.FILES
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(location)
......@@ -225,13 +226,15 @@ def import_handler(request, tag=None, package_id=None, branch=None, version_guid
draft_store=modulestore()
)
logging.debug('new course at {0}'.format(course_items[0].location))
new_location = course_items[0].location
logging.debug('new course at {0}'.format(new_location))
session_status[key] = 3
request.session.modified = True
create_all_course_groups(request.user, course_items[0].location)
logging.debug('created all course groups at {0}'.format(course_items[0].location))
auth.add_users(request.user, CourseInstructorRole(new_location), request.user)
auth.add_users(request.user, CourseStaffRole(new_location), request.user)
logging.debug('created all course groups at {0}'.format(new_location))
# Send errors to client with stage at which error occured.
except Exception as exception: # pylint: disable=W0703
......@@ -272,7 +275,7 @@ def import_status_handler(request, tag=None, package_id=None, branch=None, versi
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
try:
......@@ -303,7 +306,7 @@ def export_handler(request, tag=None, package_id=None, branch=None, version_guid
which describes the error.
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(location)
......
......@@ -28,7 +28,7 @@ from ..transcripts_utils import manage_video_subtitles_save
from ..utils import get_modulestore
from .access import has_access
from .access import has_course_access
from .helpers import _xmodule_recurse
from preview import handler_prefix, get_preview_html
from edxmako.shortcuts import render_to_response, render_to_string
......@@ -81,7 +81,7 @@ def xblock_handler(request, tag=None, package_id=None, branch=None, version_guid
"""
if package_id is not None:
locator = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, locator):
if not has_course_access(request.user, locator):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(locator)
......@@ -250,7 +250,7 @@ def _create_item(request):
display_name = request.json.get('display_name')
if not has_access(request.user, parent_location):
if not has_course_access(request.user, parent_location):
raise PermissionDenied()
parent = get_modulestore(category).get_item(parent_location)
......@@ -335,7 +335,7 @@ def orphan_handler(request, tag=None, package_id=None, branch=None, version_guid
# DHM: when split becomes back-end, move or conditionalize this conversion
old_location = loc_mapper().translate_locator_to_location(location)
if request.method == 'GET':
if has_access(request.user, old_location):
if has_course_access(request.user, old_location):
return JsonResponse(modulestore().get_orphans(old_location, DETACHED_CATEGORIES, 'draft'))
else:
raise PermissionDenied()
......
"""
Views related to course tabs
"""
from access import has_access
from access import has_course_access
from util.json_request import expect_json, JsonResponse
from django.http import HttpResponseNotFound
......@@ -63,7 +63,7 @@ def tabs_handler(request, tag=None, package_id=None, branch=None, version_guid=N
Instead use the general xblock URL (see item.xblock_handler).
"""
locator = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, locator):
if not has_course_access(request.user, locator):
raise PermissionDenied()
old_location = loc_mapper().translate_locator_to_location(locator)
......
......@@ -37,7 +37,7 @@ from ..transcripts_utils import (
TranscriptsRequestValidationException
)
from .access import has_access
from .access import has_course_access
__all__ = [
'upload_transcripts',
......@@ -546,11 +546,11 @@ def _get_item(request, data):
locator = BlockUsageLocator(data.get('locator'))
old_location = loc_mapper().translate_locator_to_location(locator)
# This is placed before has_access() to validate the location,
# because has_access() raises InvalidLocationError if location is invalid.
# This is placed before has_course_access() to validate the location,
# because has_course_access() raises InvalidLocationError if location is invalid.
item = modulestore().get_item(old_location)
if not has_access(request.user, locator):
if not has_course_access(request.user, locator):
raise PermissionDenied()
return item
import json
from django.core.exceptions import PermissionDenied
from django.contrib.auth.models import User, Group
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_http_methods
from django.utils.translation import ugettext as _
......@@ -10,17 +9,15 @@ from edxmako.shortcuts import render_to_response
from xmodule.modulestore.django import modulestore, loc_mapper
from util.json_request import JsonResponse, expect_json
from auth.authz import (
STAFF_ROLE_NAME, INSTRUCTOR_ROLE_NAME, get_course_groupname_for_role,
get_course_role_users
)
from student.roles import CourseRole, CourseInstructorRole, CourseStaffRole, GlobalStaff
from course_creators.views import user_requested_access
from .access import has_access
from .access import has_course_access
from student.models import CourseEnrollment
from xmodule.modulestore.locator import BlockUsageLocator
from django.http import HttpResponseNotFound
from student import auth
__all__ = ['request_course_creator', 'course_team_handler']
......@@ -53,7 +50,7 @@ def course_team_handler(request, tag=None, package_id=None, branch=None, version
json: remove a particular course team member from the course team (email is required).
"""
location = BlockUsageLocator(package_id=package_id, branch=branch, version_guid=version_guid, block_id=block)
if not has_access(request.user, location):
if not has_course_access(request.user, location):
raise PermissionDenied()
if 'application/json' in request.META.get('HTTP_ACCEPT', 'application/json'):
......@@ -71,19 +68,19 @@ def _manage_users(request, locator):
old_location = loc_mapper().translate_locator_to_location(locator)
# check that logged in user has permissions to this item
if not has_access(request.user, locator):
if not has_course_access(request.user, locator):
raise PermissionDenied()
course_module = modulestore().get_item(old_location)
instructors = get_course_role_users(locator, INSTRUCTOR_ROLE_NAME)
instructors = CourseInstructorRole(locator).users_with_role()
# the page only lists staff and assumes they're a superset of instructors. Do a union to ensure.
staff = set(get_course_role_users(locator, STAFF_ROLE_NAME)).union(instructors)
staff = set(CourseStaffRole(locator).users_with_role()).union(instructors)
return render_to_response('manage_users.html', {
'context_course': course_module,
'staff': staff,
'instructors': instructors,
'allow_actions': has_access(request.user, locator, role=INSTRUCTOR_ROLE_NAME),
'allow_actions': has_course_access(request.user, locator, role=CourseInstructorRole),
})
......@@ -93,10 +90,10 @@ def _course_team_user(request, locator, email):
Handle the add, remove, promote, demote requests ensuring the requester has authority
"""
# check that logged in user has permissions to this item
if has_access(request.user, locator, role=INSTRUCTOR_ROLE_NAME):
if has_course_access(request.user, locator, role=CourseInstructorRole):
# instructors have full permissions
pass
elif has_access(request.user, locator, role=STAFF_ROLE_NAME) and email == request.user.email:
elif has_course_access(request.user, locator, role=CourseStaffRole) and email == request.user.email:
# staff can only affect themselves
pass
else:
......@@ -107,15 +104,13 @@ def _course_team_user(request, locator, email):
try:
user = User.objects.get(email=email)
except:
except Exception:
msg = {
"error": _("Could not find user by email address '{email}'.").format(email=email),
}
return JsonResponse(msg, 404)
# role hierarchy: "instructor" has more permissions than "staff" (in a course)
roles = ["instructor", "staff"]
# role hierarchy: globalstaff > "instructor" > "staff" (in a course)
if request.method == "GET":
# just return info about the user
msg = {
......@@ -123,12 +118,10 @@ def _course_team_user(request, locator, email):
"active": user.is_active,
"role": None,
}
# what's the highest role that this user has?
groupnames = set(g.name for g in user.groups.all())
for role in roles:
role_groupname = get_course_groupname_for_role(locator, role)
if role_groupname in groupnames:
msg["role"] = role
# what's the highest role that this user has? (How should this report global staff?)
for role in [CourseInstructorRole(locator), CourseStaffRole(locator)]:
if role.has_user(user):
msg["role"] = role.ROLE
break
return JsonResponse(msg)
......@@ -139,29 +132,20 @@ def _course_team_user(request, locator, email):
}
return JsonResponse(msg, 400)
# make sure that the role groups exist
groups = {}
for role in roles:
groupname = get_course_groupname_for_role(locator, role)
group, __ = Group.objects.get_or_create(name=groupname)
groups[role] = group
if request.method == "DELETE":
# remove all roles in this course from this user: but fail if the user
# is the last instructor in the course team
instructors = set(groups["instructor"].user_set.all())
staff = set(groups["staff"].user_set.all())
if user in instructors and len(instructors) == 1:
msg = {
"error": _("You may not remove the last instructor from a course")
}
return JsonResponse(msg, 400)
instructors = CourseInstructorRole(locator)
if instructors.has_user(user):
if instructors.users_with_role().count() == 1:
msg = {
"error": _("You may not remove the last instructor from a course")
}
return JsonResponse(msg, 400)
else:
instructors.remove_users(request.user, user)
if user in instructors:
user.groups.remove(groups["instructor"])
if user in staff:
user.groups.remove(groups["staff"])
user.save()
auth.remove_users(request.user, CourseStaffRole(locator), user)
return JsonResponse()
# all other operations require the requesting user to specify a role
......@@ -171,28 +155,30 @@ def _course_team_user(request, locator, email):
old_location = loc_mapper().translate_locator_to_location(locator)
if role == "instructor":
if not has_access(request.user, locator, role=INSTRUCTOR_ROLE_NAME):
if not has_course_access(request.user, locator, role=CourseInstructorRole):
msg = {
"error": _("Only instructors may create other instructors")
}
return JsonResponse(msg, 400)
user.groups.add(groups["instructor"])
user.save()
auth.add_users(request.user, CourseInstructorRole(locator), user)
# auto-enroll the course creator in the course so that "View Live" will work.
CourseEnrollment.enroll(user, old_location.course_id)
elif role == "staff":
# add to staff regardless (can't do after removing from instructors as will no longer
# be allowed)
auth.add_users(request.user, CourseStaffRole(locator), user)
# if we're trying to downgrade a user from "instructor" to "staff",
# make sure we have at least one other instructor in the course team.
instructors = set(groups["instructor"].user_set.all())
if user in instructors:
if len(instructors) == 1:
instructors = CourseInstructorRole(locator)
if instructors.has_user(user):
if instructors.users_with_role().count() == 1:
msg = {
"error": _("You may not remove the last instructor from a course")
}
return JsonResponse(msg, 400)
user.groups.remove(groups["instructor"])
user.groups.add(groups["staff"])
user.save()
else:
instructors.remove_users(request.user, user)
# auto-enroll the course creator in the course so that "View Live" will work.
CourseEnrollment.enroll(user, old_location.course_id)
......
......@@ -10,8 +10,9 @@ import mock
from course_creators.admin import CourseCreatorAdmin
from course_creators.models import CourseCreator
from auth.authz import is_user_in_creator_group
from django.core import mail
from student.roles import CourseCreatorRole
from student import auth
def mock_render_to_string(template_name, context):
......@@ -54,7 +55,7 @@ class CourseCreatorAdminTest(TestCase):
def change_state_and_verify_email(state, is_creator):
""" Changes user state, verifies creator status, and verifies e-mail is sent based on transition """
self._change_state(state)
self.assertEqual(is_creator, is_user_in_creator_group(self.user))
self.assertEqual(is_creator, auth.has_access(self.user, CourseCreatorRole()))
context = {'studio_request_email': self.studio_request_email}
if state == CourseCreator.GRANTED:
......@@ -72,7 +73,7 @@ class CourseCreatorAdminTest(TestCase):
with mock.patch.dict('django.conf.settings.FEATURES', self.enable_creator_group_patch):
# User is initially unrequested.
self.assertFalse(is_user_in_creator_group(self.user))
self.assertFalse(auth.has_access(self.user, CourseCreatorRole()))
change_state_and_verify_email(CourseCreator.GRANTED, True)
......
......@@ -8,9 +8,9 @@ from django.core.exceptions import PermissionDenied
from course_creators.views import add_user_with_status_unrequested, add_user_with_status_granted
from course_creators.views import get_course_creator_status, update_course_creator_group, user_requested_access
from course_creators.models import CourseCreator
from auth.authz import is_user_in_creator_group
import mock
from student.roles import CourseCreatorRole
from student import auth
class CourseCreatorView(TestCase):
......@@ -48,7 +48,7 @@ class CourseCreatorView(TestCase):
def test_add_granted(self):
with mock.patch.dict('django.conf.settings.FEATURES', {"ENABLE_CREATOR_GROUP": True}):
# Calling add_user_with_status_granted impacts is_user_in_course_group_role.
self.assertFalse(is_user_in_creator_group(self.user))
self.assertFalse(auth.has_access(self.user, CourseCreatorRole()))
add_user_with_status_granted(self.admin, self.user)
self.assertEqual('granted', get_course_creator_status(self.user))
......@@ -57,15 +57,15 @@ class CourseCreatorView(TestCase):
add_user_with_status_unrequested(self.user)
self.assertEqual('granted', get_course_creator_status(self.user))
self.assertTrue(is_user_in_creator_group(self.user))
self.assertTrue(auth.has_access(self.user, CourseCreatorRole()))
def test_update_creator_group(self):
with mock.patch.dict('django.conf.settings.FEATURES', {"ENABLE_CREATOR_GROUP": True}):
self.assertFalse(is_user_in_creator_group(self.user))
self.assertFalse(auth.has_access(self.user, CourseCreatorRole()))
update_course_creator_group(self.admin, self.user, True)
self.assertTrue(is_user_in_creator_group(self.user))
self.assertTrue(auth.has_access(self.user, CourseCreatorRole()))
update_course_creator_group(self.admin, self.user, False)
self.assertFalse(is_user_in_creator_group(self.user))
self.assertFalse(auth.has_access(self.user, CourseCreatorRole()))
def test_user_requested_access(self):
add_user_with_status_unrequested(self.user)
......
......@@ -2,8 +2,8 @@
Methods for interacting programmatically with the user creator table.
"""
from course_creators.models import CourseCreator
from auth.authz import add_user_to_creator_group, remove_user_from_creator_group
from student.roles import CourseCreatorRole
from student import auth
def add_user_with_status_unrequested(user):
......@@ -43,9 +43,9 @@ def update_course_creator_group(caller, user, add):
Caller must have staff permissions.
"""
if add:
add_user_to_creator_group(caller, user)
auth.add_users(caller, CourseCreatorRole(), user)
else:
remove_user_from_creator_group(caller, user)
auth.remove_users(caller, CourseCreatorRole(), user)
def get_course_creator_status(user):
......
......@@ -203,7 +203,7 @@ TEMPLATE_DEBUG = False
# Site info
SITE_ID = 1
SITE_NAME = "localhost:8000"
SITE_NAME = "localhost:8001"
HTTPS = 'on'
ROOT_URLCONF = 'cms.urls'
IGNORABLE_404_ENDS = ('favicon.ico')
......
<%! import json %>
<%! from django.utils.translation import ugettext as _ %>
<%! from django.core.urlresolvers import reverse %>
<%! from auth.authz import is_user_in_course_group_role %>
<%! import json %>
<%! from student.roles import CourseInstructorRole %>
<%! from student.auth import has_access %>
<%! from xmodule.modulestore.django import loc_mapper %>
<%inherit file="base.html" />
<%block name="title">${_("Course Team Settings")}</%block>
......@@ -65,7 +66,7 @@
<li class="user-item" data-email="${user.email}" data-url="${new_location.url_reverse('course_team/', user.email) }">
<% is_instuctor = is_user_in_course_group_role(user, context_course.location, 'instructor', check_staff=False) %>
<% is_instuctor = has_access(user, CourseInstructorRole(context_course.location)) %>
% if is_instuctor:
<span class="wrapper-ui-badge">
<span class="flag flag-role flag-role-admin is-hanging">
......@@ -120,7 +121,7 @@
% endfor
</ol>
<% user_is_instuctor = is_user_in_course_group_role(request.user, context_course.location, 'instructor', check_staff=False) %>
<% user_is_instuctor = has_access(request.user, CourseInstructorRole(context_course.location)) %>
% if user_is_instuctor and len(staff) == 1:
<div class="notice notice-incontext notice-create has-actions">
<div class="msg">
......
"""
The application interface to roles which checks whether any user trying to change
authorization has authorization to do so, which infers authorization via role hierarchy
(GlobalStaff is superset of auths of course instructor, ...), which consults the config
to decide whether to check course creator role, and other such functions.
"""
from django.core.exceptions import PermissionDenied
from django.conf import settings
from student.roles import GlobalStaff, CourseCreatorRole, CourseStaffRole, CourseInstructorRole, CourseRole, \
CourseBetaTesterRole
def has_access(user, role):
"""
Check whether this user has access to this role (either direct or implied)
:param user:
:param role: an AccessRole
"""
if not user.is_active:
return False
# do cheapest check first even tho it's not the direct one
if GlobalStaff().has_user(user):
return True
# CourseCreator is odd b/c it can be disabled via config
if isinstance(role, CourseCreatorRole):
# completely shut down course creation setting
if settings.FEATURES.get('DISABLE_COURSE_CREATION', False):
return False
# wide open course creation setting
if not settings.FEATURES.get('ENABLE_CREATOR_GROUP', False):
return True
if role.has_user(user):
return True
# if not, then check inferred permissions
if (isinstance(role, (CourseStaffRole, CourseBetaTesterRole)) and
CourseInstructorRole(role.location).has_user(user)):
return True
return False
def add_users(caller, role, *users):
"""
The caller requests adding the given users to the role. Checks that the caller
has sufficient authority.
:param caller: a user
:param role: an AccessRole
"""
_check_caller_authority(caller, role)
role.add_users(*users)
def remove_users(caller, role, *users):
"""
The caller requests removing the given users from the role. Checks that the caller
has sufficient authority.
:param caller: a user
:param role: an AccessRole
"""
# can always remove self (at this layer)
if not(len(users) == 1 and caller == users[0]):
_check_caller_authority(caller, role)
role.remove_users(*users)
def _check_caller_authority(caller, role):
"""
Internal function to check whether the caller has authority to manipulate this role
:param caller: a user
:param role: an AccessRole
"""
if not (caller.is_authenticated and caller.is_active):
raise PermissionDenied
# superuser
if GlobalStaff().has_user(caller):
return
if isinstance(role, (GlobalStaff, CourseCreatorRole)):
raise PermissionDenied
elif isinstance(role, CourseRole): # instructors can change the roles w/in their course
if not has_access(caller, CourseInstructorRole(role.location)):
raise PermissionDenied
......@@ -64,11 +64,13 @@ class GlobalStaff(AccessRole):
def add_users(self, *users):
for user in users:
user.is_staff = True
user.save()
if (user.is_authenticated and user.is_active):
user.is_staff = True
user.save()
def remove_users(self, *users):
for user in users:
# don't check is_authenticated nor is_active on purpose
user.is_staff = False
user.save()
......@@ -96,10 +98,10 @@ class GroupBasedRole(AccessRole):
"""
Return whether the supplied django user has access to this role.
"""
# pylint: disable=protected-access
if not user.is_authenticated():
if not (user.is_authenticated and user.is_active):
return False
# pylint: disable=protected-access
if not hasattr(user, '_groups'):
user._groups = set(name.lower() for name in user.groups.values_list('name', flat=True))
......@@ -109,8 +111,12 @@ class GroupBasedRole(AccessRole):
"""
Add the supplied django users to this role.
"""
# silently ignores anonymous and inactive users so that any that are
# legit get updated.
users = [user for user in users if user.is_authenticated and user.is_active]
group, _ = Group.objects.get_or_create(name=self._group_names[0])
group.user_set.add(*users)
# remove cache
for user in users:
if hasattr(user, '_groups'):
del user._groups
......@@ -119,8 +125,10 @@ class GroupBasedRole(AccessRole):
"""
Remove the supplied django users from this role.
"""
group, _ = Group.objects.get_or_create(name=self._group_names[0])
group.user_set.remove(*users)
groups = Group.objects.filter(name__in=self._group_names)
for group in groups:
group.user_set.remove(*users)
# remove cache
for user in users:
if hasattr(user, '_groups'):
del user._groups
......@@ -144,31 +152,33 @@ class CourseRole(GroupBasedRole):
"""
# TODO: figure out how to make the group name generation lazy so it doesn't force the
# loc mapping?
location = Locator.to_locator_or_location(location)
self.location = Locator.to_locator_or_location(location)
self.role = role
# direct copy from auth.authz.get_all_course_role_groupnames will refactor to one impl asap
groupnames = []
# pylint: disable=no-member
if isinstance(location, Location):
if isinstance(self.location, Location):
try:
groupnames.append('{0}_{1}'.format(role, location.course_id))
groupnames.append('{0}_{1}'.format(role, self.location.course_id))
course_context = self.location.course_id # course_id is valid for translation
except InvalidLocationError: # will occur on old locations where location is not of category course
if course_context is None:
raise CourseContextRequired()
else:
groupnames.append('{0}_{1}'.format(role, course_context))
try:
locator = loc_mapper().translate_location(location.course_id, location, False, False)
locator = loc_mapper().translate_location(course_context, self.location, False, False)
groupnames.append('{0}_{1}'.format(role, locator.package_id))
except (InvalidLocationError, ItemNotFoundError):
# if it's never been mapped, the auth won't be via the Locator syntax
pass
# least preferred legacy role_course format
groupnames.append('{0}_{1}'.format(role, location.course))
elif isinstance(location, CourseLocator):
groupnames.append('{0}_{1}'.format(role, location.package_id))
groupnames.append('{0}_{1}'.format(role, self.location.course))
elif isinstance(self.location, CourseLocator):
groupnames.append('{0}_{1}'.format(role, self.location.package_id))
# handle old Location syntax
old_location = loc_mapper().translate_locator_to_location(location, get_course=True)
old_location = loc_mapper().translate_locator_to_location(self.location, get_course=True)
if old_location:
# the slashified version of the course_id (myu/mycourse/myrun)
groupnames.append('{0}_{1}'.format(role, old_location.course_id))
......@@ -191,20 +201,23 @@ class OrgRole(GroupBasedRole):
class CourseStaffRole(CourseRole):
"""A Staff member of a course"""
ROLE = 'staff'
def __init__(self, *args, **kwargs):
super(CourseStaffRole, self).__init__('staff', *args, **kwargs)
super(CourseStaffRole, self).__init__(self.ROLE, *args, **kwargs)
class CourseInstructorRole(CourseRole):
"""A course Instructor"""
ROLE = 'instructor'
def __init__(self, *args, **kwargs):
super(CourseInstructorRole, self).__init__('instructor', *args, **kwargs)
super(CourseInstructorRole, self).__init__(self.ROLE, *args, **kwargs)
class CourseBetaTesterRole(CourseRole):
"""A course Beta Tester"""
ROLE = 'beta_testers'
def __init__(self, *args, **kwargs):
super(CourseBetaTesterRole, self).__init__('beta_testers', *args, **kwargs)
super(CourseBetaTesterRole, self).__init__(self.ROLE, *args, **kwargs)
class OrgStaffRole(OrgRole):
......@@ -217,3 +230,13 @@ class OrgInstructorRole(OrgRole):
"""An organization instructor"""
def __init__(self, *args, **kwargs):
super(OrgInstructorRole, self).__init__('instructor', *args, **kwargs)
class CourseCreatorRole(GroupBasedRole):
"""
This is the group of people who have permission to create new courses (we may want to eventually
make this an org based role).
"""
ROLE = "course_creator_group"
def __init__(self, *args, **kwargs):
super(CourseCreatorRole, self).__init__(self.ROLE, *args, **kwargs)
......@@ -53,7 +53,8 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
self.instructor = InstructorFactory(course=self.course.location)
# Create staff
self.staff = [StaffFactory(course=self.course.location) for _ in xrange(STAFF_COUNT)]
self.staff = [StaffFactory(course=self.course.location)
for _ in xrange(STAFF_COUNT)]
# Create students
self.students = [UserFactory() for _ in xrange(STUDENT_COUNT)]
......
......@@ -25,7 +25,10 @@ class TestWikiAccessBase(ModuleStoreTestCase):
self.wiki = get_or_create_root()
self.course_math101 = CourseFactory.create(org='org', number='math101', display_name='Course')
self.course_math101_staff = [InstructorFactory(course=self.course_math101.location), StaffFactory(course=self.course_math101.location)]
self.course_math101_staff = [
InstructorFactory(course=self.course_math101.location),
StaffFactory(course=self.course_math101.location)
]
wiki_math101 = self.create_urlpath(self.wiki, course_wiki_slug(self.course_math101))
wiki_math101_page = self.create_urlpath(wiki_math101, 'Child')
......@@ -44,9 +47,15 @@ class TestWikiAccess(TestWikiAccessBase):
super(TestWikiAccess, self).setUp()
self.course_310b = CourseFactory.create(org='org', number='310b', display_name='Course')
self.course_310b_staff = [InstructorFactory(course=self.course_310b.location), StaffFactory(course=self.course_310b.location)]
self.course_310b_staff = [
InstructorFactory(course=self.course_310b.location),
StaffFactory(course=self.course_310b.location)
]
self.course_310b_ = CourseFactory.create(org='org', number='310b_', display_name='Course')
self.course_310b__staff = [InstructorFactory(course=self.course_310b_.location), StaffFactory(course=self.course_310b_.location)]
self.course_310b__staff = [
InstructorFactory(course=self.course_310b_.location),
StaffFactory(course=self.course_310b_.location)
]
self.wiki_310b = self.create_urlpath(self.wiki, course_wiki_slug(self.course_310b))
self.wiki_310b_ = self.create_urlpath(self.wiki, course_wiki_slug(self.course_310b_))
......@@ -105,7 +114,10 @@ class TestWikiAccessForNumericalCourseNumber(TestWikiAccessBase):
super(TestWikiAccessForNumericalCourseNumber, self).setUp()
self.course_200 = CourseFactory.create(org='org', number='200', display_name='Course')
self.course_200_staff = [InstructorFactory(course=self.course_200.location), StaffFactory(course=self.course_200.location)]
self.course_200_staff = [
InstructorFactory(course=self.course_200.location),
StaffFactory(course=self.course_200.location)
]
wiki_200 = self.create_urlpath(self.wiki, course_wiki_slug(self.course_200))
wiki_200_page = self.create_urlpath(wiki_200, 'Child')
......@@ -126,7 +138,10 @@ class TestWikiAccessForOldFormatCourseStaffGroups(TestWikiAccessBase):
self.course_math101c = CourseFactory.create(org='org', number='math101c', display_name='Course')
Group.objects.get_or_create(name='instructor_math101c')
self.course_math101c_staff = [InstructorFactory(course=self.course_math101c.location), StaffFactory(course=self.course_math101c.location)]
self.course_math101c_staff = [
InstructorFactory(course=self.course_math101c.location),
StaffFactory(course=self.course_math101c.location)
]
wiki_math101c = self.create_urlpath(self.wiki, course_wiki_slug(self.course_math101c))
wiki_math101c_page = self.create_urlpath(wiki_math101c, 'Child')
......
......@@ -3,7 +3,6 @@ import pytz
from mock import patch
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
......@@ -128,6 +127,7 @@ class TestViewAuth(ModuleStoreTestCase, LoginEnrollmentTestCase):
display_name='Welcome'
)
self.global_staff_user = GlobalStaffFactory()
self.unenrolled_user = UserFactory(last_name="Unenrolled")
self.enrolled_user = UserFactory(last_name="Enrolled")
......@@ -135,10 +135,11 @@ class TestViewAuth(ModuleStoreTestCase, LoginEnrollmentTestCase):
CourseEnrollmentFactory(user=self.enrolled_user, course_id=self.test_course.id)
self.staff_user = StaffFactory(course=self.course.location)
self.instructor_user = InstructorFactory(course=self.course.location)
self.instructor_user = InstructorFactory(
course=self.course.location)
self.org_staff_user = OrgStaffFactory(course=self.course.location)
self.org_instructor_user = OrgInstructorFactory(course=self.course.location)
self.global_staff_user = GlobalStaffFactory()
self.org_instructor_user = OrgInstructorFactory(
course=self.course.location)
def test_redirection_unenrolled(self):
"""
......
......@@ -44,11 +44,7 @@ class TestInstructorDashboardAnonCSV(ModuleStoreTestCase, LoginEnrollmentTestCas
self.activate_user(self.student)
self.activate_user(self.instructor)
def make_instructor(course):
""" Create an instructor for the course."""
CourseStaffRole(course.location).add_users(User.objects.get(email=self.instructor))
make_instructor(self.toy)
CourseStaffRole(self.toy.location).add_users(User.objects.get(email=self.instructor))
self.logout()
self.login(self.instructor, self.password)
......
......@@ -41,11 +41,7 @@ class TestInstructorDashboardGradeDownloadCSV(ModuleStoreTestCase, LoginEnrollme
self.activate_user(self.student)
self.activate_user(self.instructor)
def make_instructor(course):
""" Create an instructor for the course. """
CourseStaffRole(course.location).add_users(User.objects.get(email=self.instructor))
make_instructor(self.toy)
CourseStaffRole(self.toy.location).add_users(User.objects.get(email=self.instructor))
self.logout()
self.login(self.instructor, self.password)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment