test_openid_provider.py 15.8 KB
Newer Older
1
#-*- encoding=utf-8 -*-
2 3 4 5 6 7
'''
Created on Jan 18, 2013

@author: brian
'''
import openid
8
import json
9
from openid.fetchers import HTTPFetcher, HTTPResponse
10
from urlparse import parse_qs, urlparse
11 12 13

from django.conf import settings
from django.test import TestCase, LiveServerTestCase
Diana Huang committed
14
from django.core.cache import cache
15
from django.test.utils import override_settings
16 17
from django.core.urlresolvers import reverse
from django.test.client import RequestFactory
18
from unittest import skipUnless
19

Diana Huang committed
20 21 22
from student.tests.factories import UserFactory
from external_auth.views import provider_login

Calen Pennington committed
23

24 25
class MyFetcher(HTTPFetcher):
    """A fetcher that uses server-internal calls for performing HTTP
Calen Pennington committed
26
    requests.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    """

    def __init__(self, client):
        """@param client: A test client object"""

        super(MyFetcher, self).__init__()
        self.client = client

    def fetch(self, url, body=None, headers=None):
        """Perform an HTTP request

        @raises Exception: Any exception that can be raised by Django

        @see: C{L{HTTPFetcher.fetch}}
        """
        if body:
            # method = 'POST'
            # undo the URL encoding of the POST arguments
            data = parse_qs(body)
            response = self.client.post(url, data)
        else:
            # method = 'GET'
            data = {}
            if headers and 'Accept' in headers:
                data['CONTENT_TYPE'] = headers['Accept']
            response = self.client.get(url, data)
Calen Pennington committed
53

54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
        # Translate the test client response to the fetcher's HTTP response abstraction
        content = response.content
        final_url = url
        response_headers = {}
        if 'Content-Type' in response:
            response_headers['content-type'] = response['Content-Type']
        if 'X-XRDS-Location' in response:
            response_headers['x-xrds-location'] = response['X-XRDS-Location']
        status = response.status_code

        return HTTPResponse(
            body=content,
            final_url=final_url,
            headers=response_headers,
            status=status,
69
        )
70

Calen Pennington committed
71

72
class OpenIdProviderTest(TestCase):
73 74 75
    """
    Tests of the OpenId login
    """
76

Diana Huang committed
77
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
78 79
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
80
    def test_begin_login_with_xrds_url(self):
81 82 83 84 85 86

        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-xrds')
        factory = RequestFactory()
        request = factory.request()
Calen Pennington committed
87
        abs_provider_url = request.build_absolute_uri(location=provider_url)
88 89 90 91 92 93 94

        # In order for this absolute URL to work (i.e. to get xrds, then authentication)
        # in the test environment, we either need a live server that works with the default
        # fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
        # Here we do the latter:
        fetcher = MyFetcher(self.client)
        openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
Calen Pennington committed
95

96 97
        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
Calen Pennington committed
98
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):
99

100 101 102 103 104 105 106
            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))

Diana Huang committed
107
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
108 109
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
110
    def test_begin_login_with_login_url(self):
111 112 113 114 115 116

        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-login')
        factory = RequestFactory()
        request = factory.request()
Calen Pennington committed
117
        abs_provider_url = request.build_absolute_uri(location=provider_url)
118 119 120 121 122 123 124

        # In order for this absolute URL to work (i.e. to get xrds, then authentication)
        # in the test environment, we either need a live server that works with the default
        # fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
        # Here we do the latter:
        fetcher = MyFetcher(self.client)
        openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)
Calen Pennington committed
125

126 127
        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
Calen Pennington committed
128
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):
129 130 131 132 133 134
            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
            self.assertContains(resp, '<input name="openid.mode" type="hidden" value="checkid_setup" />', html=True)
            self.assertContains(resp, '<input name="openid.ns" type="hidden" value="http://specs.openid.net/auth/2.0" />', html=True)
            self.assertContains(resp, '<input name="openid.identity" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
            self.assertContains(resp, '<input name="openid.claimed_id" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
            self.assertContains(resp, '<input name="openid.ns.ax" type="hidden" value="http://openid.net/srv/ax/1.0" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.mode" type="hidden" value="fetch_request" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.required" type="hidden" value="email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.fullname" type="hidden" value="http://axschema.org/namePerson" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.lastname" type="hidden" value="http://axschema.org/namePerson/last" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.firstname" type="hidden" value="http://axschema.org/namePerson/first" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.nickname" type="hidden" value="http://axschema.org/namePerson/friendly" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.email" type="hidden" value="http://axschema.org/contact/email" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_email" type="hidden" value="http://schema.openid.net/contact/email" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_nickname" type="hidden" value="http://schema.openid.net/namePerson/friendly" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_fullname" type="hidden" value="http://schema.openid.net/namePerson" />', html=True)
            self.assertContains(resp, '<input type="submit" value="Continue" />', html=True)
            # this should work on the server:
            self.assertContains(resp, '<input name="openid.realm" type="hidden" value="http://testserver/" />', html=True)
Calen Pennington committed
153

154 155 156
            # not included here are elements that will vary from run to run:
            # <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" />
            # <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" />
Calen Pennington committed
157

158 159
    def attempt_login(self, expected_code, **kwargs):
        """ Attempt to log in through the open id provider login """
160 161
        url = reverse('openid-provider-login')
        post_args = {
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
            "openid.mode": "checkid_setup",
            "openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
            "openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
            "openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
            "openid.ns": "http://specs.openid.net/auth/2.0",
            "openid.realm": "http://testserver/",
            "openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
            "openid.ns.ax": "http://openid.net/srv/ax/1.0",
            "openid.ax.mode": "fetch_request",
            "openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
            "openid.ax.type.fullname": "http://axschema.org/namePerson",
            "openid.ax.type.lastname": "http://axschema.org/namePerson/last",
            "openid.ax.type.firstname": "http://axschema.org/namePerson/first",
            "openid.ax.type.nickname": "http://axschema.org/namePerson/friendly",
            "openid.ax.type.email": "http://axschema.org/contact/email",
            "openid.ax.type.old_email": "http://schema.openid.net/contact/email",
            "openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly",
            "openid.ax.type.old_fullname": "http://schema.openid.net/namePerson",
        }
181 182 183 184
        # override the default args with any given arguments
        for key in kwargs:
            post_args["openid." + key] = kwargs[key]

185
        resp = self.client.post(url, post_args)
186
        code = expected_code
187 188 189
        self.assertEqual(resp.status_code, code,
                         "got code {0} for url '{1}'. Expected code {2}"
                         .format(resp.status_code, url, code))
Calen Pennington committed
190

Diana Huang committed
191
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
192 193
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
194 195 196 197
    def test_open_id_setup(self):
        """ Attempt a standard successful login """
        self.attempt_login(200)

Diana Huang committed
198
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
199 200
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
201 202
    def test_invalid_namespace(self):
        """ Test for 403 error code when the namespace of the request is invalid"""
203
        self.attempt_login(403, ns="http%3A%2F%2Fspecs.openid.net%2Fauth%2F2.0")
204 205

    @override_settings(OPENID_PROVIDER_TRUSTED_ROOTS=['http://apps.cs50.edx.org'])
Diana Huang committed
206
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
207 208
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
209 210
    def test_invalid_return_url(self):
        """ Test for 403 error code when the url"""
211
        self.attempt_login(403, return_to="http://apps.cs50.edx.or")
212

Diana Huang committed
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
    def _send_bad_redirection_login(self):
        """
        Attempt to log in to the provider with setup parameters

        Intentionally fail the login to force a redirect
        """
        user = UserFactory()

        factory = RequestFactory()
        post_params = {'email': user.email, 'password': 'password'}
        fake_url = 'fake url'
        request = factory.post(reverse('openid-provider-login'), post_params)
        openid_setup = {
            'request': factory.request(),
            'url': fake_url
        }
        request.session = {
            'openid_setup': openid_setup
        }
        response = provider_login(request)
        return response

Diana Huang committed
235
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
236 237
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
Diana Huang committed
238 239 240 241 242
    def test_login_openid_handle_redirection(self):
        """ Test to see that we can handle login redirection properly"""
        response = self._send_bad_redirection_login()
        self.assertEquals(response.status_code, 302)

Diana Huang committed
243
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
244 245
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
Diana Huang committed
246 247 248 249 250 251 252 253 254 255 256 257
    def test_login_openid_handle_redirection_ratelimited(self):
        # try logging in 30 times, the default limit in the number of failed
        # log in attempts before the rate gets limited
        for _ in xrange(30):
            self._send_bad_redirection_login()

        response = self._send_bad_redirection_login()
        # verify that we are not returning the default 403
        self.assertEquals(response.status_code, 302)
        # clear the ratelimit cache so that we don't fail other logins
        cache.clear()

Diana Huang committed
258
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
                'OpenID not enabled')
    def test_openid_final_response(self):

        url = reverse('openid-provider-login')
        user = UserFactory()

        # login to the client so that we can persist session information
        for name in ['Robot 33', '☃']:
            user.profile.name = name
            user.profile.save()
            self.client.login(username=user.username, password='test')
            # login once to get the right session information
            self.attempt_login(200)
            post_args = {
                'email': user.email,
                'password': 'test',
            }

            # call url again, this time with username and password
            resp = self.client.post(url, post_args)
            # all information is embedded in the redirect url
            location = resp['Location']
            # parse the url
            parsed_url = urlparse(location)
            parsed_qs = parse_qs(parsed_url.query)
            self.assertEquals(parsed_qs['openid.ax.type.ext1'][0], 'http://axschema.org/contact/email')
            self.assertEquals(parsed_qs['openid.ax.type.ext0'][0], 'http://axschema.org/namePerson')
            self.assertEquals(parsed_qs['openid.ax.value.ext1.1'][0], user.email)
            self.assertEquals(parsed_qs['openid.ax.value.ext0.1'][0], user.profile.name)

Calen Pennington committed
289

290
class OpenIdProviderLiveServerTest(LiveServerTestCase):
291 292 293 294 295 296
    """
    In order for this absolute URL to work (i.e. to get xrds, then authentication)
    in the test environment, we either need a live server that works with the default
    fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
    Here we do the former.
    """
297

Diana Huang committed
298
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
299 300
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
301
    def test_begin_login(self):
302 303 304 305 306
        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-xrds')
        factory = RequestFactory()
        request = factory.request()
Calen Pennington committed
307
        abs_provider_url = request.build_absolute_uri(location=provider_url)
308 309 310

        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
Calen Pennington committed
311
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):
312 313 314 315 316 317
            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

    @classmethod
    def tearDownClass(cls):
        """
        Workaround for a runtime error that occurs
        intermittently when the server thread doesn't shut down
        within 2 seconds.

        Since the server is running in a Django thread and will
        be terminated when the test suite terminates,
        this shouldn't cause a resource allocation issue.
        """
        try:
            super(OpenIdProviderLiveServerTest, cls).tearDownClass()
        except RuntimeError:
            print "Warning: Could not shut down test server."