#-*- encoding=utf-8 -*-
'''
Created on Jan 18, 2013

@author: brian
'''
import openid
from openid.fetchers import HTTPFetcher, HTTPResponse
from urlparse import parse_qs, urlparse

from django.conf import settings
from django.test import TestCase, LiveServerTestCase
from django.core.cache import cache
from django.test.utils import override_settings
from django.core.urlresolvers import reverse
from django.test.client import RequestFactory
from unittest import skipUnless

from student.tests.factories import UserFactory
from external_auth.views import provider_login


class MyFetcher(HTTPFetcher):
    """A fetcher that uses server-internal calls for performing HTTP
    requests.
    """

    def __init__(self, client):
        """@param client: A test client object"""

        super(MyFetcher, self).__init__()
        self.client = client

    def fetch(self, url, body=None, headers=None):
        """Perform an HTTP request

        @raises Exception: Any exception that can be raised by Django

        @see: C{L{HTTPFetcher.fetch}}
        """
        if body:
            # method = 'POST'
            # undo the URL encoding of the POST arguments
            data = parse_qs(body)
            response = self.client.post(url, data)
        else:
            # method = 'GET'
            data = {}
            if headers and 'Accept' in headers:
                data['CONTENT_TYPE'] = headers['Accept']
            response = self.client.get(url, data)

        # Translate the test client response to the fetcher's HTTP response abstraction
        content = response.content
        final_url = url
        response_headers = {}
        if 'Content-Type' in response:
            response_headers['content-type'] = response['Content-Type']
        if 'X-XRDS-Location' in response:
            response_headers['x-xrds-location'] = response['X-XRDS-Location']
        status = response.status_code

        return HTTPResponse(
            body=content,
            final_url=final_url,
            headers=response_headers,
            status=status,
        )


class OpenIdProviderTest(TestCase):
    """
    Tests of the OpenId login
    """
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_begin_login_with_xrds_url(self):

        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-xrds')
        factory = RequestFactory()
        request = factory.request()
        abs_provider_url = request.build_absolute_uri(location=provider_url)

        # In order for this absolute URL to work (i.e. to get xrds, then authentication)
        # in the test environment, we either need a live server that works with the default
        # fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
        # Here we do the latter:
        fetcher = MyFetcher(self.client)
        openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)

        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):

            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_begin_login_with_login_url(self):

        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-login')
        factory = RequestFactory()
        request = factory.request()
        abs_provider_url = request.build_absolute_uri(location=provider_url)

        # In order for this absolute URL to work (i.e. to get xrds, then authentication)
        # in the test environment, we either need a live server that works with the default
        # fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
        # Here we do the latter:
        fetcher = MyFetcher(self.client)
        openid.fetchers.setDefaultFetcher(fetcher, wrap_exceptions=False)

        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):
            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))
            self.assertContains(resp, '<input name="openid.mode" type="hidden" value="checkid_setup" />', html=True)
            self.assertContains(resp, '<input name="openid.ns" type="hidden" value="http://specs.openid.net/auth/2.0" />', html=True)
            self.assertContains(resp, '<input name="openid.identity" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
            self.assertContains(resp, '<input name="openid.claimed_id" type="hidden" value="http://specs.openid.net/auth/2.0/identifier_select" />', html=True)
            self.assertContains(resp, '<input name="openid.ns.ax" type="hidden" value="http://openid.net/srv/ax/1.0" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.mode" type="hidden" value="fetch_request" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.required" type="hidden" value="email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.fullname" type="hidden" value="http://axschema.org/namePerson" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.lastname" type="hidden" value="http://axschema.org/namePerson/last" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.firstname" type="hidden" value="http://axschema.org/namePerson/first" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.nickname" type="hidden" value="http://axschema.org/namePerson/friendly" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.email" type="hidden" value="http://axschema.org/contact/email" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_email" type="hidden" value="http://schema.openid.net/contact/email" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_nickname" type="hidden" value="http://schema.openid.net/namePerson/friendly" />', html=True)
            self.assertContains(resp, '<input name="openid.ax.type.old_fullname" type="hidden" value="http://schema.openid.net/namePerson" />', html=True)
            self.assertContains(resp, '<input type="submit" value="Continue" />', html=True)
            # this should work on the server:
            self.assertContains(resp, '<input name="openid.realm" type="hidden" value="http://testserver/" />', html=True)

            # not included here are elements that will vary from run to run:
            # <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" />
            # <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" />

    def attempt_login(self, expected_code, login_method='POST', **kwargs):
        """ Attempt to log in through the open id provider login """
        url = reverse('openid-provider-login')
        args = {
            "openid.mode": "checkid_setup",
            "openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
            "openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
            "openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
            "openid.ns": "http://specs.openid.net/auth/2.0",
            "openid.realm": "http://testserver/",
            "openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
            "openid.ns.ax": "http://openid.net/srv/ax/1.0",
            "openid.ax.mode": "fetch_request",
            "openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
            "openid.ax.type.fullname": "http://axschema.org/namePerson",
            "openid.ax.type.lastname": "http://axschema.org/namePerson/last",
            "openid.ax.type.firstname": "http://axschema.org/namePerson/first",
            "openid.ax.type.nickname": "http://axschema.org/namePerson/friendly",
            "openid.ax.type.email": "http://axschema.org/contact/email",
            "openid.ax.type.old_email": "http://schema.openid.net/contact/email",
            "openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly",
            "openid.ax.type.old_fullname": "http://schema.openid.net/namePerson",
        }
        # override the default args with any given arguments
        for key in kwargs:
            args["openid." + key] = kwargs[key]

        if login_method == 'POST':
            resp = self.client.post(url, args)
        elif login_method == 'GET':
            resp = self.client.get(url, args)
        else:
            self.fail('Invalid login method')

        code = expected_code
        self.assertEqual(resp.status_code, code,
                         "got code {0} for url '{1}'. Expected code {2}"
                         .format(resp.status_code, url, code))

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_open_id_setup(self):
        """ Attempt a standard successful login """
        self.attempt_login(200)

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_invalid_namespace(self):
        """ Test for 403 error code when the namespace of the request is invalid"""
        self.attempt_login(403, ns="http%3A%2F%2Fspecs.openid.net%2Fauth%2F2.0")

    @override_settings(OPENID_PROVIDER_TRUSTED_ROOTS=['http://apps.cs50.edx.org'])
    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_invalid_return_url(self):
        """ Test for 403 error code when the url"""
        self.attempt_login(403, return_to="http://apps.cs50.edx.or")

    def _send_bad_redirection_login(self):
        """
        Attempt to log in to the provider with setup parameters

        Intentionally fail the login to force a redirect
        """
        user = UserFactory()

        factory = RequestFactory()
        post_params = {'email': user.email, 'password': 'password'}
        fake_url = 'fake url'
        request = factory.post(reverse('openid-provider-login'), post_params)
        openid_setup = {
            'request': factory.request(),
            'url': fake_url,
            'post_params': {}
        }
        request.session = {
            'openid_setup': openid_setup
        }
        response = provider_login(request)
        return response

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_login_openid_handle_redirection(self):
        """ Test to see that we can handle login redirection properly"""
        response = self._send_bad_redirection_login()
        self.assertEquals(response.status_code, 302)

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_login_openid_handle_redirection_ratelimited(self):
        # try logging in 30 times, the default limit in the number of failed
        # log in attempts before the rate gets limited
        for _ in xrange(30):
            self._send_bad_redirection_login()

        response = self._send_bad_redirection_login()
        # verify that we are not returning the default 403
        self.assertEquals(response.status_code, 302)
        # clear the ratelimit cache so that we don't fail other logins
        cache.clear()

    def _attempt_login_and_perform_final_response(self, user, profile_name):
        """
        Performs full procedure of a successful OpenID provider login for user,
        all required data is taken form ``user`` attribute which is an instance
        of ``User`` model. As a convenience this method will also set
        ``profile.name`` for the user.
        """
        url = reverse('openid-provider-login')

        # login to the client so that we can persist session information
        user.profile.name = profile_name
        user.profile.save()
        # It is asssumed that user's password is test (default for UserFactory)
        self.client.login(username=user.username, password='test')
        # login once to get the right session information
        self.attempt_login(200)
        post_args = {
            'email': user.email,
            'password': 'test'
        }

        # call url again, this time with username and password
        return self.client.post(url, post_args)

    @skipUnless(
        settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'), 'OpenID not enabled')
    def test_provider_login_can_handle_unicode_email(self):
        user = UserFactory(email=u"user.ąęł@gmail.com")
        resp = self._attempt_login_and_perform_final_response(user, u"Jan ĄĘŁ")
        location = resp['Location']
        parsed_url = urlparse(location)
        parsed_qs = parse_qs(parsed_url.query)
        self.assertEquals(parsed_qs['openid.ax.type.ext1'][0], 'http://axschema.org/contact/email')
        self.assertEquals(parsed_qs['openid.ax.type.ext0'][0], 'http://axschema.org/namePerson')
        self.assertEquals(parsed_qs['openid.ax.value.ext0.1'][0],
                          user.profile.name.encode('utf-8'))  # pylint: disable=no-member
        self.assertEquals(parsed_qs['openid.ax.value.ext1.1'][0],
                          user.email.encode('utf-8'))  # pylint: disable=no-member

    @skipUnless(
        settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'), 'OpenID not enabled')
    def test_provider_login_can_handle_unicode_email_invalid_password(self):
        user = UserFactory(email=u"user.ąęł@gmail.com")
        url = reverse('openid-provider-login')

        # login to the client so that we can persist session information
        user.profile.name = u"Jan ĄĘ"
        user.profile.save()
        # It is asssumed that user's password is test (default for UserFactory)
        self.client.login(username=user.username, password='test')
        # login once to get the right session information
        self.attempt_login(200)
        # We trigger situation where user password is invalid at last phase
        # of openid login
        post_args = {
            'email': user.email,
            'password': 'invalid-password'
        }

        # call url again, this time with username and password
        return self.client.post(url, post_args)

    @skipUnless(
        settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'), 'OpenID not enabled')
    def test_provider_login_can_handle_unicode_email_inactive_account(self):
        user = UserFactory(email=u"user.ąęł@gmail.com", username=u"ąęół")
        url = reverse('openid-provider-login')

        # login to the client so that we can persist session information
        user.profile.name = u'Jan ĄĘ'
        user.profile.save()  # pylint: disable=no-member
        self.client.login(username=user.username, password='test')
        # login once to get the right session information
        self.attempt_login(200)
        # We trigger situation where user is not active at final phase of
        # OpenId login.
        user.is_active = False
        user.save()  # pylint: disable=no-member
        post_args = {
            'email': user.email,
            'password': 'test'
        }
        # call url again, this time with username and password
        self.client.post(url, post_args)

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_openid_final_response(self):

        user = UserFactory()

        # login to the client so that we can persist session information
        for name in ['Robot 33', '☃']:
            resp = self._attempt_login_and_perform_final_response(user, name)
            # all information is embedded in the redirect url
            location = resp['Location']
            # parse the url
            parsed_url = urlparse(location)
            parsed_qs = parse_qs(parsed_url.query)
            self.assertEquals(parsed_qs['openid.ax.type.ext1'][0], 'http://axschema.org/contact/email')
            self.assertEquals(parsed_qs['openid.ax.type.ext0'][0], 'http://axschema.org/namePerson')
            self.assertEquals(parsed_qs['openid.ax.value.ext1.1'][0], user.email)
            self.assertEquals(parsed_qs['openid.ax.value.ext0.1'][0], user.profile.name)

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_openid_invalid_password(self):

        url = reverse('openid-provider-login')
        user = UserFactory()

        # login to the client so that we can persist session information
        for method in ['POST', 'GET']:
            self.client.login(username=user.username, password='test')
            self.attempt_login(200, method)
            openid_setup = self.client.session['openid_setup']
            self.assertIn('post_params', openid_setup)
            post_args = {
                'email': user.email,
                'password': 'bad_password',
            }

            # call url again, this time with username and password
            resp = self.client.post(url, post_args)
            self.assertEquals(resp.status_code, 302)
            redirect_url = resp['Location']
            parsed_url = urlparse(redirect_url)
            query_params = parse_qs(parsed_url[4])
            self.assertIn('openid.return_to', query_params)
            self.assertTrue(
                query_params['openid.return_to'][0].startswith('http://testserver/openid/complete/')
            )


class OpenIdProviderLiveServerTest(LiveServerTestCase):
    """
    In order for this absolute URL to work (i.e. to get xrds, then authentication)
    in the test environment, we either need a live server that works with the default
    fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
    Here we do the former.
    """

    @skipUnless(settings.FEATURES.get('AUTH_USE_OPENID') and
                settings.FEATURES.get('AUTH_USE_OPENID_PROVIDER'),
                'OpenID not enabled')
    def test_begin_login(self):
        # the provider URL must be converted to an absolute URL in order to be
        # used as an openid provider.
        provider_url = reverse('openid-provider-xrds')
        factory = RequestFactory()
        request = factory.request()
        abs_provider_url = request.build_absolute_uri(location=provider_url)

        # now we can begin the login process by invoking a local openid client,
        # with a pointer to the (also-local) openid provider:
        with self.settings(OPENID_SSO_SERVER_URL=abs_provider_url):
            url = reverse('openid-login')
            resp = self.client.post(url)
            code = 200
            self.assertEqual(resp.status_code, code,
                             "got code {0} for url '{1}'. Expected code {2}"
                             .format(resp.status_code, url, code))

    @classmethod
    def tearDownClass(cls):
        """
        Workaround for a runtime error that occurs
        intermittently when the server thread doesn't shut down
        within 2 seconds.

        Since the server is running in a Django thread and will
        be terminated when the test suite terminates,
        this shouldn't cause a resource allocation issue.
        """
        try:
            super(OpenIdProviderLiveServerTest, cls).tearDownClass()
        except RuntimeError:
            print "Warning: Could not shut down test server."