Commit 862d847c by Tim Babych

TNL-782 Check for valid ID Token

parent 3e5e4b7b
Oleg Marshev <oleg@edx.org>
Tim Babych <tim.babych@gmail.com>
PACKAGES = notesserver notesapi
.PHONY: requirements
validate: test.requirements test coverage
......
import jwt
import logging
from django.conf import settings
from rest_framework.permissions import BasePermission
logger = logging.getLogger(__name__)
class TokenWrongIssuer(Exception):
pass
class HasAccessToken(BasePermission):
"""
Allow requests having valid ID Token.
https://tools.ietf.org/html/draft-ietf-oauth-json-web-token-31
Expected Token:
Header {
"alg": "HS256",
"typ": "JWT"
}
Claims {
"sub": "<USER_ID>",
"exp": <EXPIRATION TIMESTAMP>,
"iat": <ISSUED TIMESTAMP>,
"aud": "<CLIENT ID"
}
Should be signed with CLIENT_SECRET
"""
def has_permission(self, request, view):
return True
if getattr(settings, 'DISABLE_TOKEN_CHECK', False):
return True
token = request.META.get('HTTP_X_ANNOTATOR_AUTH_TOKEN', '')
if not token:
logger.debug("No token found in headers")
return False
try:
data = jwt.decode(token, settings.CLIENT_SECRET)
auth_user = data['sub']
if data['aud'] != settings.CLIENT_ID:
raise TokenWrongIssuer
for request_field in ('GET', 'POST', 'DATA'):
if 'user' in getattr(request, request_field):
req_user = getattr(request, request_field)['user']
if req_user == auth_user:
return True
else:
logger.debug("Token user {auth_user} did not match {field} user {req_user}".format(
auth_user=auth_user, field=request_field, req_user=req_user
))
return False
logger.info("No user was present to compare in GET, POST or DATA")
except jwt.ExpiredSignature:
logger.debug("Token was expired: {}".format(token))
except jwt.DecodeError:
logger.debug("Could not decode token {}".format(token))
except TokenWrongIssuer:
logger.debug("Token has wrong issuer {}".format(token))
return False
\ No newline at end of file
class MockConsumer(object):
def __init__(self, key='mockconsumer'):
self.key = key
self.secret = 'top-secret'
self.ttl = 86400
class MockUser(object):
def __init__(self, id='alice', consumer=None):
self.id = id
self.consumer = MockConsumer(consumer if consumer is not None else 'mockconsumer')
self.is_admin = False
class MockAuthenticator(object):
def request_user(self, request):
return MockUser()
def mock_authorizer(*args, **kwargs):
return True
import jwt
from calendar import timegm
from datetime import datetime, timedelta
from django.conf import settings
def get_id_token(user):
now = datetime.utcnow()
return jwt.encode({
'aud': settings.CLIENT_ID,
'sub': user,
'iat': timegm(now.utctimetuple()),
'exp': timegm((now + timedelta(seconds=300)).utctimetuple()),
}, settings.CLIENT_SECRET)
import unittest
import jwt
from calendar import timegm
from datetime import datetime, timedelta
from mock import patch
import unittest
from django.core.urlresolvers import reverse
from django.conf import settings
......@@ -9,12 +12,13 @@ from rest_framework.test import APITestCase
from annotator import es, auth
from annotator.annotation import Annotation
from .helpers import MockUser
from .helpers import get_id_token
TEST_USER = "test-user-id"
class AnnotationViewTests(APITestCase):
class BaseAnnotationViewTests(APITestCase):
"""
Tests for annotation views.
Abstract class for testing annotation views.
"""
def setUp(self):
assert Annotation.es.host == settings.ELASTICSEARCH_URL
......@@ -23,13 +27,12 @@ class AnnotationViewTests(APITestCase):
Annotation.create_all()
es.conn.cluster.health(wait_for_status='yellow')
self.user = MockUser()
payload = {'consumerKey': self.user.consumer.key, 'userId': self.user.id}
token = auth.encode_token(payload, self.user.consumer.secret)
self.headers = {'x-annotator-auth-token': token}
token = get_id_token(TEST_USER)
self.client.credentials(HTTP_X_ANNOTATOR_AUTH_TOKEN=token)
self.headers = {"user": TEST_USER}
self.payload = {
"user": "test-user-id",
"user": TEST_USER,
"usage_id": "test-usage-id",
"course_id": "test-course-id",
"text": "test note text",
......@@ -47,7 +50,7 @@ class AnnotationViewTests(APITestCase):
self.expected_note = {
"created": "2014-11-26T00:00:00+00:00",
"updated": "2014-11-26T00:00:00+00:00",
"user": "test-user-id",
"user": TEST_USER,
"usage_id": "test-usage-id",
"course_id": "test-course-id",
"text": "test note text",
......@@ -71,8 +74,7 @@ class AnnotationViewTests(APITestCase):
Create annotation directly in elasticsearch.
"""
opts = {
'user': self.user.id,
'consumer': self.user.consumer.key
'user': TEST_USER,
}
opts.update(kwargs)
annotation = Annotation(**opts)
......@@ -89,17 +91,15 @@ class AnnotationViewTests(APITestCase):
"""
Helper for search method.
"""
url = reverse('api:v1:annotations_search') + '?{}'.format(qs)
result = self.client.get(url, **self.headers)
url = reverse('api:v1:annotations_search') + '?user=' + TEST_USER + '&{}'.format(qs)
result = self.client.get(url)
return result.data
def test_create_no_payload(self):
"""
Test if no payload is sent when creating a note.
"""
url = reverse('api:v1:annotations')
response = self.client.post(url, {}, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
class AnnotationViewTests(BaseAnnotationViewTests):
"""
Test annotation views, checking permissions
"""
@patch('annotator.elasticsearch.datetime')
def test_create_note(self, mock_datetime):
......@@ -124,15 +124,14 @@ class AnnotationViewTests(APITestCase):
"the response should have a Location header with the URL to read the annotation that was created"
)
# TODO: self.assertEqual(self.user.id, response.data['user'])
# TODO: self.assertEqual(self.user.consumer.key, response.data['consumer'])
self.assertEqual(response.data['user'], TEST_USER)
def test_create_ignore_created(self):
"""
Test if annotation 'created' field is not used by API.
"""
self.payload['created'] = 'abc'
response = self.client.post(reverse('api:v1:annotations'), self.payload, format='json', **self.headers)
response = self.client.post(reverse('api:v1:annotations'), self.payload, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
annotation = self._get_annotation(response.data['id'])
......@@ -143,36 +142,27 @@ class AnnotationViewTests(APITestCase):
Test if annotation 'updated' field is not used by API.
"""
self.payload['updated'] = 'abc'
response = self.client.post(reverse('api:v1:annotations'), self.payload, format='json', **self.headers)
payload = self.payload
payload.update(self.headers)
response = self.client.post(reverse('api:v1:annotations'), self.payload, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
annotation = self._get_annotation(response.data['id'])
self.assertNotEqual(annotation['updated'], 'abc', "annotation 'updated' field should not be used by API")
@unittest.skip("Unskip when auth will be done.")
def test_create_ignore_auth_in_payload(self):
"""
Test if annotation 'user' and 'consumer' fields are not used by API.
"""
payload = {'user': 'jenny', 'consumer': 'myconsumer'}
response = self.client.post(reverse('api:v1:annotations'), payload, format='json', **self.headers)
annotation = self._get_annotation(response.data['id'])
self.assertEqual(annotation['user'], self.user.id, "'user' field should not be used by API")
self.assertEqual(annotation['consumer'], self.user.consumer.key, "'consumer' field should not be used by API")
def test_create_must_not_update(self):
"""
Create must not update annotations.
"""
payload = {'name': 'foo'}
response = self.client.post(reverse('api:v1:annotations'), payload, format='json', **self.headers)
payload.update(self.headers)
response = self.client.post(reverse('api:v1:annotations'), payload, format='json')
annotation_id = response.data['id']
# Try to update the annotation using the create API.
update_payload = {'name': 'bar', 'id': annotation_id}
response = self.client.post(reverse('api:v1:annotations'), update_payload, format='json', **self.headers)
update_payload.update(self.headers)
response = self.client.post(reverse('api:v1:annotations'), update_payload, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
......@@ -191,11 +181,10 @@ class AnnotationViewTests(APITestCase):
self._create_annotation(**note)
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': "test_id"})
response = self.client.get(url, **self.headers)
response = self.client.get(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.expected_note['id'] = 'test_id'
self.expected_note['consumer'] = 'mockconsumer'
self.assertEqual(response.data, self.expected_note)
def test_read_notfound(self):
......@@ -203,7 +192,7 @@ class AnnotationViewTests(APITestCase):
Case when no annotation is present with specific id.
"""
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
response = self.client.get(url, **self.headers)
response = self.client.get(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND, "response should be 404 NOT FOUND")
def test_update(self):
......@@ -212,6 +201,7 @@ class AnnotationViewTests(APITestCase):
"""
self._create_annotation(text=u"Foo", id='123', created='2014-10-10')
payload = {'id': '123', 'text': 'Bar'}
payload.update(self.headers)
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
response = self.client.put(url, payload, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
......@@ -229,6 +219,7 @@ class AnnotationViewTests(APITestCase):
self._create_annotation(text=u"Foo", id='123')
payload = {'text': 'Bar'}
payload.update(self.headers)
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
response = self.client.put(url, payload, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
......@@ -246,6 +237,7 @@ class AnnotationViewTests(APITestCase):
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
payload = {'text': 'Bar', 'id': 'abc'}
payload.update(self.headers)
response = self.client.put(url, payload, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
......@@ -257,6 +249,7 @@ class AnnotationViewTests(APITestCase):
Test if annotation not exists with specified id and update was attempted on it.
"""
payload = {'id': '123', 'text': 'Bar'}
payload.update(self.headers)
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
response = self.client.put(url, payload, format='json')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
......@@ -268,7 +261,7 @@ class AnnotationViewTests(APITestCase):
kwargs = dict(text=u"Bar", id='456')
self._create_annotation(**kwargs)
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 456})
response = self.client.delete(url, **self.headers)
response = self.client.delete(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT, "response should be 204 NO CONTENT")
self.assertEqual(self._get_annotation('456'), None, "annotation wasn't deleted in db")
......@@ -278,16 +271,16 @@ class AnnotationViewTests(APITestCase):
Case when no annotation is present with specific id when trying to delete.
"""
url = reverse('api:v1:annotations_detail', kwargs={'annotation_id': 123})
response = self.client.delete(url, **self.headers)
response = self.client.delete(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND, "response should be 404 NOT FOUND")
def test_search(self):
"""
Tests for search method.
"""
note_1 = self._create_annotation(text=u'First one', user=u'user_3')
note_2 = self._create_annotation(text=u'Second note', user=u'user_2')
note_3 = self._create_annotation(text=u'Third note', user=u'user_3')
note_1 = self._create_annotation(text=u'First one')
note_2 = self._create_annotation(text=u'Second note')
note_3 = self._create_annotation(text=u'Third note')
results = self._get_search_results()
self.assertEqual(results['total'], 3)
......@@ -307,9 +300,9 @@ class AnnotationViewTests(APITestCase):
Sorting is by descending order (most recent first).
"""
note_1 = self._create_annotation(text=u'First one', user=u'user_3')
note_2 = self._create_annotation(text=u'Second note', user=u'user_3')
note_3 = self._create_annotation(text=u'Third note', user=u'user_3')
note_1 = self._create_annotation(text=u'First one')
note_2 = self._create_annotation(text=u'Second note')
note_3 = self._create_annotation(text=u'Third note')
results = self._get_search_results()
self.assertEqual(results['rows'][0]['text'], 'Third note')
......@@ -372,7 +365,7 @@ class AnnotationViewTests(APITestCase):
Tests list all annotations endpoint when no annotations are present in elasticsearch.
"""
url = reverse('api:v1:annotations')
response = self.client.get(url, **self.headers)
response = self.client.get(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 0, "no annotation should be returned in response")
......@@ -387,6 +380,97 @@ class AnnotationViewTests(APITestCase):
es.conn.indices.refresh(es.index)
url = reverse('api:v1:annotations')
response = self.client.get(url, **self.headers)
response = self.client.get(url, self.headers)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 5, "five annotations should be returned in response")
@patch('django.conf.settings.DISABLE_TOKEN_CHECK', True)
class AllowAllAnnotationViewTests(BaseAnnotationViewTests):
"""
Test annotator behavior when authorization is not enforced
"""
def test_create_no_payload(self):
"""
Test if no payload is sent when creating a note.
"""
url = reverse('api:v1:annotations')
response = self.client.post(url, {}, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
class TokenTests(BaseAnnotationViewTests):
"""
Test token interactions
"""
url = reverse('api:v1:annotations')
token_data = {
'aud': settings.CLIENT_ID,
'sub': TEST_USER,
'iat': timegm(datetime.utcnow().utctimetuple()),
'exp': timegm((datetime.utcnow() + timedelta(seconds=300)).utctimetuple()),
}
def _assert_403(self, token):
"""
Asserts that request with this token will fail
"""
self.client.credentials(HTTP_X_ANNOTATOR_AUTH_TOKEN=token)
response = self.client.get(self.url, self.headers)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_200(self):
"""
Ensure we can read list of annotations
"""
response = self.client.get(self.url, self.headers)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_no_token(self):
"""
403 when no token is provided
"""
self.client._credentials = {}
response = self.client.get(self.url, self.headers)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_malformed_token(self):
"""
403 when token can not be decoded
"""
self._assert_403("kuku")
def test_expired_token(self):
"""
403 when token is expired
"""
token = self.token_data.copy()
token['exp'] = 1
token = jwt.encode(token, settings.CLIENT_SECRET)
self._assert_403(token)
def test_wrong_issuer(self):
"""
403 when token's issuer is wrong
"""
token = self.token_data.copy()
token['aud'] = 'not Edx-notes'
token = jwt.encode(token, settings.CLIENT_SECRET)
self._assert_403(token)
def test_wrong_user(self):
"""
403 when token's user is wrong
"""
token = self.token_data.copy()
token['sub'] = 'joe'
token = jwt.encode(token, settings.CLIENT_SECRET)
self._assert_403(token)
def test_wrong_secret(self):
"""
403 when token is signed by wrong secret
"""
token = jwt.encode(self.token_data, "some secret")
self._assert_403(token)
......@@ -2,7 +2,6 @@ from django.conf import settings
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
......@@ -16,7 +15,6 @@ class AnnotationSearchView(APIView):
"""
Search annotations.
"""
permission_classes = (AllowAny,)
def get(self, *args, **kwargs): # pylint: disable=unused-argument
"""
......@@ -52,7 +50,6 @@ class AnnotationListView(APIView):
"""
List all annotations or create.
"""
permission_classes = (AllowAny,)
def get(self, *args, **kwargs): # pylint: disable=unused-argument
"""
......@@ -90,7 +87,6 @@ class AnnotationDetailView(APIView):
"""
Annotation detail view.
"""
permission_classes = (AllowAny,)
UPDATE_FILTER_FIELDS = ('updated', 'created', 'user', 'consumer')
......
......@@ -4,12 +4,18 @@ import sys
DEBUG = False
TEMPLATE_DEBUG = False
DISABLE_TOKEN_CHECK = False
USE_TZ = True
TIME_ZONE = 'UTC'
# This value needs to be overriden in production.
SECRET_KEY = '*^owi*4%!%9=#h@app!l^$jz8(c*q297^)4&4yn^#_m#fq=z#l'
# ID and Secret used for authenticating JWT Auth Tokens
# should match those configured for `edx-notes` Client in EdX's /admin/oauth2/client/
CLIENT_ID = 'edx-notes-id'
CLIENT_SECRET = 'edx-notes-secret'
ROOT_URLCONF = 'notesserver.urls'
MIDDLEWARE_CLASSES = (
......@@ -78,12 +84,17 @@ LOGGING = {
'level': 'DEBUG',
'propagate': True
},
'notesapi.v1.permissions': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': True
},
},
}
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated'
'notesapi.v1.permissions.HasAccessToken'
]
}
......
......@@ -16,11 +16,11 @@ def root(request): # pylint: disable=unused-argument
})
@permission_classes([AllowAny])
class StatusView(APIView):
"""
Determine if server is alive.
"""
permission_classes = (AllowAny,)
def get(self, *args, **kwargs): # pylint: disable=unused-argument
"""
......
......@@ -5,3 +5,4 @@ django-rest-swagger==0.2.0
elasticsearch==1.2.0
annotator==0.12.0
django-cors-headers==0.13
PyJWT==0.3.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment