Commit 5db231a0 by Michael Terry Committed by Michael Terry

Filter courses on create run POST

When creating a course run in the Publisher dashboard, only allow the
user to create runs for courses they have access to.

LEARNER-4102
parent c5d0f4da
......@@ -3,7 +3,6 @@ import logging
from dal import autocomplete
from django.apps import apps
from django.contrib.auth.mixins import LoginRequiredMixin
from guardian.shortcuts import get_objects_for_user
from rest_framework import status
from rest_framework.generics import ListAPIView, RetrieveAPIView, UpdateAPIView, get_object_or_404
from rest_framework.permissions import IsAuthenticated
......@@ -19,8 +18,7 @@ from course_discovery.apps.publisher.api.serializers import (CourseRevisionSeria
CourseUserRoleSerializer, GroupUserSerializer)
from course_discovery.apps.publisher.forms import CourseForm
from course_discovery.apps.publisher.models import (Course, CourseRun, CourseRunState, CourseState, CourseUserRole,
OrganizationExtension)
from course_discovery.apps.publisher.utils import is_internal_user, is_publisher_admin
OrganizationExtension, PublisherUser)
logger = logging.getLogger(__name__)
......@@ -117,22 +115,8 @@ class CoursesAutoComplete(LoginRequiredMixin, autocomplete.Select2QuerySetView):
def get_queryset(self):
if self.q:
user = self.request.user
if is_publisher_admin(user):
qs = Course.objects.filter(title__icontains=self.q)
elif is_internal_user(user):
qs = Course.objects.filter(title__icontains=self.q, course_user_roles__user=user).distinct()
else:
organizations = get_objects_for_user(
user,
OrganizationExtension.VIEW_COURSE,
OrganizationExtension,
use_groups=True,
with_superuser=False
).values_list('organization')
qs = Course.objects.filter(title__icontains=self.q, organizations__in=organizations)
return qs
qs = PublisherUser.get_courses(self.request.user)
return qs.filter(title__icontains=self.q)
return []
......
......@@ -210,7 +210,7 @@ class CourseSearchForm(forms.Form):
""" Course Type ahead Search Form. """
course = forms.ModelChoiceField(
label=_('Find Course'),
queryset=Course.objects.all(),
queryset=Course.objects.none(),
widget=autocomplete.ModelSelect2(
url='publisher:api:course-autocomplete',
attrs={
......@@ -221,11 +221,11 @@ class CourseSearchForm(forms.Form):
)
def __init__(self, *args, **kwargs):
qs = kwargs.pop('queryset', None)
super(CourseSearchForm, self).__init__(*args, **kwargs)
user = kwargs.pop('user', None)
super().__init__(*args, **kwargs)
if qs is not None:
self.fields['course'].queryset = qs
if user:
self.fields['course'].queryset = PublisherUser.get_courses(user)
class CourseRunForm(BaseForm):
......
......@@ -12,6 +12,7 @@ from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from django_extensions.db.models import TimeStampedModel
from django_fsm import FSMField, transition
from guardian.shortcuts import get_objects_for_user
from simple_history.models import HistoricalRecords
from sortedm2m.fields import SortedManyToManyField
from stdimage.models import StdImageField
......@@ -26,7 +27,7 @@ from course_discovery.apps.ietf_language_tags.models import LanguageTag
from course_discovery.apps.publisher import emails
from course_discovery.apps.publisher.choices import (CourseRunStateChoices, CourseStateChoices, InternalUserRole,
PublisherUserRole)
from course_discovery.apps.publisher.utils import is_email_notification_enabled
from course_discovery.apps.publisher.utils import is_email_notification_enabled, is_internal_user, is_publisher_admin
from course_discovery.apps.publisher.validators import ImageMultiSizeValidator
logger = logging.getLogger(__name__)
......@@ -907,3 +908,22 @@ class PublisherUser(User):
class Meta:
proxy = True
@staticmethod
def get_courses(user, queryset=None):
if queryset is None:
queryset = Course.objects.all()
if is_publisher_admin(user):
return queryset
elif is_internal_user(user):
return queryset.filter(course_user_roles__user=user).distinct()
else:
organizations = get_objects_for_user(
user,
OrganizationExtension.VIEW_COURSE,
OrganizationExtension,
use_groups=True,
with_superuser=False
).values_list('organization')
return queryset.filter(organizations__in=organizations)
......@@ -4,6 +4,7 @@ import ddt
import pytest
from django.core.exceptions import ValidationError
from django.test import TestCase
from guardian.shortcuts import assign_perm
from pytz import timezone
from waffle.testutils import override_switch
......@@ -11,11 +12,12 @@ from course_discovery.apps.core.models import User
from course_discovery.apps.core.tests.factories import UserFactory
from course_discovery.apps.course_metadata.tests.factories import OrganizationFactory
from course_discovery.apps.publisher.choices import CourseRunStateChoices, PublisherUserRole
from course_discovery.apps.publisher.constants import ADMIN_GROUP_NAME, INTERNAL_USER_GROUP_NAME
from course_discovery.apps.publisher.forms import (
CourseEntitlementForm, CourseForm, CourseRunForm, CourseRunStateAdminForm, CourseStateAdminForm,
CourseEntitlementForm, CourseForm, CourseRunForm, CourseRunStateAdminForm, CourseSearchForm, CourseStateAdminForm,
PublisherUserCreationForm, SeatForm
)
from course_discovery.apps.publisher.models import CourseEntitlement, Seat
from course_discovery.apps.publisher.models import CourseEntitlement, Group, OrganizationExtension, Seat
from course_discovery.apps.publisher.tests.factories import (
CourseFactory, CourseUserRoleFactory, OrganizationExtensionFactory, SeatFactory
)
......@@ -417,3 +419,57 @@ class TestSeatForm:
form.save()
assert seat.course_run.seats.count() == 2
assert seat.course_run.seats.filter(type=Seat.AUDIT, price=0).exists()
@ddt.ddt
class CourseSearchFormTests(TestCase):
"""
Tests for publisher 'CourseSearchForm'
"""
def setUp(self):
super().setUp()
self.organization = OrganizationFactory()
self.organization_extension = OrganizationExtensionFactory()
self.user = UserFactory()
self.user.groups.add(self.organization_extension.group)
self.course = CourseFactory(title='Test course')
assign_perm(
OrganizationExtension.VIEW_COURSE, self.organization_extension.group, self.organization_extension
)
def test_no_user(self):
course_form = CourseSearchForm()
course_form.full_clean()
self.assertFalse(course_form.is_valid())
self.assertEqual(0, course_form.fields['course'].queryset.count())
def _check_form(self):
course_form = CourseSearchForm(user=self.user, data={'course': self.course.id})
course_form.full_clean()
return course_form.is_valid()
def test_unrelated_course(self):
""" Verify course search doesn't allow courses unrelated to the user. """
self.assertFalse(self._check_form())
def test_with_course_team(self):
""" Verify course search allows courses in the user's organizations. """
self.course.organizations.add(self.organization_extension.organization) # pylint: disable=no-member
self.assertTrue(self._check_form())
def test_with_admin_user(self):
""" Verify course search lets an admin access courses they aren't associated with. """
self.user.groups.add(Group.objects.get(name=ADMIN_GROUP_NAME))
self.assertTrue(self._check_form())
def test_with_internal_user(self):
""" Verify course search only lets an internal user access courses with a role for them. """
self.user.groups.add(Group.objects.get(name=INTERNAL_USER_GROUP_NAME))
# Confirm that internal users aren't granted blanket access
self.assertFalse(self._check_form())
# But it *will* work if we add a role for this user
CourseUserRoleFactory(course=self.course, user=self.user, role=PublisherUserRole.MarketingReviewer)
self.assertTrue(self._check_form())
......@@ -2094,7 +2094,7 @@ class CourseListViewPaginationTests(PaginationMixin, TestCase):
def test_pagination_for_internal_user(self):
""" Verify that pagination works for internal user. """
with mock.patch('course_discovery.apps.publisher.views.is_publisher_admin', return_value=False):
with mock.patch('course_discovery.apps.publisher.models.is_publisher_admin', return_value=False):
self.user.groups.add(Group.objects.get(name=INTERNAL_USER_GROUP_NAME))
self.course_team_role = factories.CourseUserRoleFactory(
course=self.courses[0], user=self.user, role=PublisherUserRole.CourseTeam
......@@ -2107,8 +2107,8 @@ class CourseListViewPaginationTests(PaginationMixin, TestCase):
def test_pagination_for_user_organizations(self):
""" Verify that pagination works for user organizations. """
with mock.patch('course_discovery.apps.publisher.views.is_publisher_admin', return_value=False):
with mock.patch('course_discovery.apps.publisher.views.is_internal_user', return_value=False):
with mock.patch('course_discovery.apps.publisher.models.is_publisher_admin', return_value=False):
with mock.patch('course_discovery.apps.publisher.models.is_internal_user', return_value=False):
organization_extension = factories.OrganizationExtensionFactory(
organization=self.courses[0].organizations.all()[0] # zeroX
)
......@@ -3665,6 +3665,7 @@ class CreateRunFromDashboardViewTests(SiteMixin, TestCase):
assign_perm(
OrganizationExtension.VIEW_COURSE_RUN, self.organization_extension.group, self.organization_extension
)
assign_perm(OrganizationExtension.VIEW_COURSE, self.organization_extension.group, self.organization_extension)
self.client.login(username=self.user.username, password=USER_PASSWORD)
......@@ -3710,6 +3711,15 @@ class CreateRunFromDashboardViewTests(SiteMixin, TestCase):
response = self.client.post(self.create_course_run_url, post_data)
self.assertContains(response, 'The page could not be updated. Make', status_code=400)
def test_create_course_run_without_access_to_course(self):
""" Verify that user cannot create course run for a course they don't have access to.
"""
self.course.organizations = [] # user will no longer be associated with course
self.course.save()
post_data = self._post_data()
response = self.client.post(self.create_course_run_url, post_data)
self.assertContains(response, 'The page could not be updated. Make', status_code=400)
def test_create_course_run_and_seat(self):
""" Verify that we can create a new course run with seat. """
self.assertEqual(self.course.course_runs.count(), 0)
......
......@@ -33,8 +33,8 @@ from course_discovery.apps.publisher.emails import send_email_for_published_cour
from course_discovery.apps.publisher.forms import (AdminImportCourseForm, CourseEntitlementForm, CourseForm,
CourseRunForm, CourseSearchForm, SeatForm)
from course_discovery.apps.publisher.models import (PAID_SEATS, Course, CourseEntitlement, CourseRun, CourseRunState,
CourseState, CourseUserRole, OrganizationExtension, Seat,
UserAttributes)
CourseState, CourseUserRole, OrganizationExtension, PublisherUser,
Seat, UserAttributes)
from course_discovery.apps.publisher.utils import (get_internal_users, has_role_for_course, is_internal_user,
is_project_coordinator_user, is_publisher_admin, make_bread_crumbs)
from course_discovery.apps.publisher.wrappers import CourseRunWrapper
......@@ -744,7 +744,7 @@ class CreateRunFromDashboardView(CreateCourseRunView):
def get_context_data(self, **kwargs):
context = {
'cancel_url': reverse('publisher:publisher_dashboard'),
'course_form': self.course_form(queryset=Course.objects.none()),
'course_form': self.course_form(),
'run_form': self.run_form(),
'seat_form': self.seat_form(),
'hide_seat_form': False
......@@ -752,7 +752,7 @@ class CreateRunFromDashboardView(CreateCourseRunView):
return context
def post(self, request, *args, **kwargs):
course_form = self.course_form(request.POST)
course_form = self.course_form(request.POST, user=request.user)
if not course_form.is_valid():
messages.error(
request, _('The page could not be updated. Make sure that all values are correct, then try again.')
......@@ -909,20 +909,7 @@ class CourseListView(mixins.LoginRequiredMixin, ListView):
'organizations', 'course_state', 'publisher_course_runs', 'course_user_roles'
)
if is_publisher_admin(user):
courses = courses
elif is_internal_user(user):
courses = courses.filter(course_user_roles__user=user).distinct()
else:
organizations = get_objects_for_user(
user,
OrganizationExtension.VIEW_COURSE,
OrganizationExtension,
use_groups=True,
with_superuser=False
).values_list('organization')
courses = courses.filter(organizations__in=organizations)
courses = PublisherUser.get_courses(user, queryset=courses)
courses = self.filter_queryset(courses)
courses = self.sort_queryset(courses)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment