permissions.py 7.47 KB
Newer Older
1 2 3 4
"""
Module for checking permissions with the comment_client backend
"""

Mike Chen committed
5
import logging
6
from types import NoneType
7 8

from request_cache.middleware import RequestCache
9
from lms.lib.comment_client import Thread
10
from opaque_keys.edx.keys import CourseKey
11

12
from django_comment_common.models import all_permissions_for_user_in_course
13
from teams.models import CourseTeam
14

Calen Pennington committed
15

16
def has_permission(user, permission, course_id=None):
17
    assert isinstance(course_id, (NoneType, CourseKey))
18 19 20 21 22 23 24 25 26 27 28
    request_cache_dict = RequestCache.get_request_cache().data
    cache_key = "django_comment_client.permissions.has_permission.all_permissions.{}.{}".format(
        user.id, course_id
    )
    if cache_key in request_cache_dict:
        all_permissions = request_cache_dict[cache_key]
    else:
        all_permissions = all_permissions_for_user_in_course(user, course_id)
        request_cache_dict[cache_key] = all_permissions

    return permission in all_permissions
Mike Chen committed
29 30


31
CONDITIONS = ['is_open', 'is_author', 'is_question_author', 'is_team_member_if_applicable']
Calen Pennington committed
32 33


34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
def get_team(commentable_id):
    """ Returns the team that the commentable_id belongs to if it exists. Returns None otherwise. """
    request_cache_dict = RequestCache.get_request_cache().data
    cache_key = "django_comment_client.team_commentable.{}".format(commentable_id)
    if cache_key in request_cache_dict:
        return request_cache_dict[cache_key]

    try:
        team = CourseTeam.objects.get(discussion_topic_id=commentable_id)
    except CourseTeam.DoesNotExist:
        team = None

    request_cache_dict[cache_key] = team
    return team


50
def _check_condition(user, condition, content):
51 52 53
    """ Check whether or not the given condition applies for the given user and content. """
    def check_open(_user, content):
        """ Check whether the content is open. """
Rocky Duan committed
54
        try:
55
            return content and not content['closed']
Rocky Duan committed
56 57
        except KeyError:
            return False
Mike Chen committed
58

59
    def check_author(user, content):
60
        """ Check if the given user is the author of the content. """
Rocky Duan committed
61
        try:
62
            return content and content['user_id'] == str(user.id)
Rocky Duan committed
63 64
        except KeyError:
            return False
Mike Chen committed
65

66
    def check_question_author(user, content):
67
        """ Check if the given user is the author of the original question for both threads and comments. """
68 69 70 71 72 73 74 75 76 77 78
        if not content:
            return False
        try:
            if content["type"] == "thread":
                return content["thread_type"] == "question" and content["user_id"] == str(user.id)
            else:
                # N.B. This will trigger a comments service query
                return check_question_author(user, Thread(id=content["thread_id"]).to_dict())
        except KeyError:
            return False

79 80 81 82 83 84 85 86 87 88 89 90 91
    def check_team_member(user, content):
        """
        If the content has a commentable_id, verifies that either it is not associated with a team,
        or if it is, that the user is a member of that team.
        """
        if not content:
            return False
        try:
            commentable_id = content['commentable_id']
            request_cache_dict = RequestCache.get_request_cache().data
            cache_key = "django_comment_client.check_team_member.{}.{}".format(user.id, commentable_id)
            if cache_key in request_cache_dict:
                return request_cache_dict[cache_key]
92 93 94 95 96
            team = get_team(commentable_id)
            if team is None:
                passes_condition = True
            else:
                passes_condition = team.users.filter(id=user.id).exists()
97 98 99 100 101 102 103
            request_cache_dict[cache_key] = passes_condition
        except KeyError:
            # We do not expect KeyError in production-- it usually indicates an improper test mock.
            logging.warning("Did not find key commentable_id in content.")
            passes_condition = False
        return passes_condition

104
    handlers = {
Calen Pennington committed
105 106
        'is_open': check_open,
        'is_author': check_author,
107
        'is_question_author': check_question_author,
108
        'is_team_member_if_applicable': check_team_member
109
    }
Mike Chen committed
110

111
    return handlers[condition](user, content)
112

113

114
def _check_conditions_permissions(user, permissions, course_id, content):
115 116
    """
    Accepts a list of permissions and proceed if any of the permission is valid.
117
    Note that ["can_view", "can_edit"] will proceed if the user has either
118 119
    "can_view" or "can_edit" permission. To use AND operator in between, wrap them in
    a list.
120
    """
121 122 123 124

    def test(user, per, operator="or"):
        if isinstance(per, basestring):
            if per in CONDITIONS:
125
                return _check_condition(user, per, content)
126
            return has_permission(user, per, course_id=course_id)
127 128
        elif isinstance(per, list) and operator in ["and", "or"]:
            results = [test(user, x, operator="and") for x in per]
129 130 131
            if operator == "or":
                return True in results
            elif operator == "and":
David Baumgold committed
132
                return False not in results
Kevin Chugh committed
133
    return test(user, permissions, operator="or")
134 135


136 137
# Note: 'edit_content' is being used as a generic way of telling if someone is a privileged user
# (forum Moderator/Admin/TA), because there is a desire that team membership does not impact privileged users.
138
VIEW_PERMISSIONS = {
139
    'update_thread': ['edit_content', ['update_thread', 'is_open', 'is_author']],
140
    'create_comment': ['edit_content', ["create_comment", "is_open", "is_team_member_if_applicable"]],
141 142
    'delete_thread': ['delete_thread', ['update_thread', 'is_author']],
    'update_comment': ['edit_content', ['update_comment', 'is_open', 'is_author']],
143
    'endorse_comment': ['endorse_comment', 'is_question_author'],
144
    'openclose_thread': ['openclose_thread'],
145
    'create_sub_comment': ['edit_content', ['create_sub_comment', 'is_open', 'is_team_member_if_applicable']],
146
    'delete_comment': ['delete_comment', ['update_comment', 'is_open', 'is_author']],
147 148 149 150 151 152 153 154
    'vote_for_comment': ['edit_content', ['vote', 'is_open', 'is_team_member_if_applicable']],
    'undo_vote_for_comment': ['edit_content', ['unvote', 'is_open', 'is_team_member_if_applicable']],
    'vote_for_thread': ['edit_content', ['vote', 'is_open', 'is_team_member_if_applicable']],
    'flag_abuse_for_thread': ['edit_content', ['vote', 'is_team_member_if_applicable']],
    'un_flag_abuse_for_thread': ['edit_content', ['vote', 'is_team_member_if_applicable']],
    'flag_abuse_for_comment': ['edit_content', ['vote', 'is_team_member_if_applicable']],
    'un_flag_abuse_for_comment': ['edit_content', ['vote', 'is_team_member_if_applicable']],
    'undo_vote_for_thread': ['edit_content', ['unvote', 'is_open', 'is_team_member_if_applicable']],
155 156
    'pin_thread': ['openclose_thread'],
    'un_pin_thread': ['openclose_thread'],
157 158 159 160 161
    'follow_thread': ['edit_content', ['follow_thread', 'is_team_member_if_applicable']],
    'follow_commentable': ['edit_content', ['follow_commentable', 'is_team_member_if_applicable']],
    'unfollow_thread': ['edit_content', ['unfollow_thread', 'is_team_member_if_applicable']],
    'unfollow_commentable': ['edit_content', ['unfollow_commentable', 'is_team_member_if_applicable']],
    'create_thread': ['edit_content', ['create_thread', 'is_team_member_if_applicable']],
162 163
}

164 165

def check_permissions_by_view(user, course_id, content, name):
166
    assert isinstance(course_id, CourseKey)
167 168 169
    try:
        p = VIEW_PERMISSIONS[name]
    except KeyError:
170
        logging.warning("Permission for view named %s does not exist in permissions.py", name)
171
    return _check_conditions_permissions(user, p, course_id, content)