Commit 04294b04 by Alexander Kryklia

Adds oauth checking to acceptance tests

parent b5dc03ec
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
import json
import urllib
import urlparse import urlparse
import threading from requests.packages.oauthlib.oauth1.rfc5849 import signature
import mock
from logging import getLogger from logging import getLogger
logger = getLogger(__name__) logger = getLogger(__name__)
# todo - implement oauth
class MockLTIRequestHandler(BaseHTTPRequestHandler): class MockLTIRequestHandler(BaseHTTPRequestHandler):
''' '''
A handler for LTI POST requests. A handler for LTI POST requests.
...@@ -28,34 +24,41 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): ...@@ -28,34 +24,41 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler):
post_dict = self._post_dict() # Retrieve the POST data post_dict = self._post_dict() # Retrieve the POST data
# Log the request
logger.debug("LTI provider received POST request {} to path {}".format( logger.debug("LTI provider received POST request {} to path {}".format(
str(post_dict), str(post_dict),
self.path) self.path)
) ) # Log the request
# Respond only to requests with correct lti endpoint: # Respond only to requests with correct lti endpoint:
if self._is_correct_lti_request(): if self._is_correct_lti_request():
correct_dict = { correct_keys = [
'user_id': 'default_user_id', 'user_id',
'oauth_nonce': '22990037033121997701377766132', 'oauth_nonce',
'oauth_timestamp': '1377766132', 'oauth_timestamp',
'oauth_consumer_key': 'client_key', 'oauth_consumer_key',
'lti_version': 'LTI-1p0', 'lti_version',
'oauth_signature_method': 'HMAC-SHA1', 'oauth_signature_method',
'oauth_version': '1.0', 'oauth_version',
'oauth_signature': 'HGYMAU/G5EMxd0CDOvWubsqxLIY=', 'oauth_signature',
'lti_message_type': 'basic-lti-launch-request', 'lti_message_type',
'oauth_callback': 'about:blank' 'oauth_callback',
} 'lis_outcome_service_url',
'lis_result_sourcedid',
if sorted(correct_dict.keys()) != sorted(post_dict.keys()): 'launch_presentation_return_url'
error_message = "Incorrect LTI header" ]
if sorted(correct_keys) != sorted(post_dict.keys()):
status_message = "Incorrect LTI header"
else: else:
error_message = "This is LTI tool." params = {k: v for k, v in post_dict.items() if k != 'oauth_signature'}
if self.server.check_oauth_signature(params, post_dict['oauth_signature']):
status_message = "This is LTI tool."
else:
status_message = "Wrong LTI signature"
else: else:
error_message = "Invalid request URL" status_message = "Invalid request URL"
self._send_response(error_message) self._send_response(status_message)
def _send_head(self): def _send_head(self):
''' '''
...@@ -75,7 +78,7 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): ...@@ -75,7 +78,7 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler):
''' '''
try: try:
length = int(self.headers.getheader('content-length')) length = int(self.headers.getheader('content-length'))
post_dict = urlparse.parse_qs(self.rfile.read(length)) post_dict = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=True)
# The POST dict will contain a list of values for each key. # The POST dict will contain a list of values for each key.
# None of our parameters are lists, however, so we map [val] --> val. # None of our parameters are lists, however, so we map [val] --> val.
#I f the list contains multiple entries, we pick the first one #I f the list contains multiple entries, we pick the first one
...@@ -110,8 +113,8 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): ...@@ -110,8 +113,8 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler):
self.wfile.write(response_str) self.wfile.write(response_str)
def _is_correct_lti_request(self): def _is_correct_lti_request(self):
'''If url to get LTI is correct.''' '''If url to LTI tool is correct.'''
return 'correct_lti_endpoint' in self.path return self.server.oauth_settings['lti_endpoint'] in self.path
class MockLTIServer(HTTPServer): class MockLTIServer(HTTPServer):
...@@ -120,22 +123,13 @@ class MockLTIServer(HTTPServer): ...@@ -120,22 +123,13 @@ class MockLTIServer(HTTPServer):
to POST requests to localhost. to POST requests to localhost.
''' '''
def __init__(self, port_num, oauth={}): def __init__(self, address):
''' '''
Initialize the mock XQueue server instance. Initialize the mock XQueue server instance.
*port_num* is the localhost port to listen to *address* is the (host, host's port to listen to) tuple.
*grade_response_dict* is a dictionary that will be JSON-serialized
and sent in response to XQueue grading requests.
''' '''
self.clent_key = oauth.get('client_key', '')
self.clent_secret = oauth.get('client_secret', '')
self.check_oauth()
handler = MockLTIRequestHandler handler = MockLTIRequestHandler
address = ('', port_num)
HTTPServer.__init__(self, address, handler) HTTPServer.__init__(self, address, handler)
def shutdown(self): def shutdown(self):
...@@ -144,15 +138,34 @@ class MockLTIServer(HTTPServer): ...@@ -144,15 +138,34 @@ class MockLTIServer(HTTPServer):
''' '''
# First call superclass shutdown() # First call superclass shutdown()
HTTPServer.shutdown(self) HTTPServer.shutdown(self)
# We also need to manually close the socket # We also need to manually close the socket
self.socket.close() self.socket.close()
def get_oauth_signature(self): def check_oauth_signature(self, params, client_signature):
'''test''' '''
return self._signature Checks oauth signature from client.
`params` are params from post request except signature,
`client_signature` is signature from request.
Builds mocked request and verifies hmac-sha1 signing::
1. builds string to sign from `params`, `url` and `http_method`.
2. signs it with `client_secret` which comes from server settings.
3. obtains signature after sign and then compares it with request.signature
(request signature comes form client in request)
Returns `True` if signatures are correct, otherwise `False`.
'''
client_secret = unicode(self.oauth_settings['client_secret'])
url = self.oauth_settings['lti_base'] + self.oauth_settings['lti_endpoint']
request = mock.Mock()
request.params = [(unicode(k), unicode(v)) for k, v in params.items()]
request.uri = unicode(url)
request.http_method = u'POST'
request.signature = unicode(client_signature)
def check_oauth(self): return signature.verify_hmac_sha1(request, client_secret)
''' generate oauth signature '''
self._signature = '12345'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment