Commit cd941ead by Braden MacDonald

New SAML/Shibboleth tests - PR 8518

parent b4904adc
...@@ -368,7 +368,7 @@ def signin_user(request): ...@@ -368,7 +368,7 @@ def signin_user(request):
for msg in messages.get_messages(request): for msg in messages.get_messages(request):
if msg.extra_tags.split()[0] == "social-auth": if msg.extra_tags.split()[0] == "social-auth":
# msg may or may not be translated. Try translating [again] in case we are able to: # msg may or may not be translated. Try translating [again] in case we are able to:
third_party_auth_error = _(msg) # pylint: disable=translation-of-non-string third_party_auth_error = _(unicode(msg)) # pylint: disable=translation-of-non-string
break break
context = { context = {
......
...@@ -60,7 +60,7 @@ class Command(BaseCommand): ...@@ -60,7 +60,7 @@ class Command(BaseCommand):
self.stdout.write("\n→ Fetching {}\n".format(url)) self.stdout.write("\n→ Fetching {}\n".format(url))
if not url.lower().startswith('https'): if not url.lower().startswith('https'):
self.stdout.write("→ WARNING: This URL is not secure! It should use HTTPS.\n") self.stdout.write("→ WARNING: This URL is not secure! It should use HTTPS.\n")
response = requests.get(url, verify=True) # May raise HTTPError or SSLError response = requests.get(url, verify=True) # May raise HTTPError or SSLError or ConnectionError
response.raise_for_status() # May raise an HTTPError response.raise_for_status() # May raise an HTTPError
try: try:
...@@ -75,7 +75,7 @@ class Command(BaseCommand): ...@@ -75,7 +75,7 @@ class Command(BaseCommand):
public_key, sso_url, expires_at = self._parse_metadata_xml(xml, entity_id) public_key, sso_url, expires_at = self._parse_metadata_xml(xml, entity_id)
self._update_data(entity_id, public_key, sso_url, expires_at) self._update_data(entity_id, public_key, sso_url, expires_at)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
self.stderr.write("→ ERROR: {}\n\n".format(err.message)) self.stderr.write(u"→ ERROR: {}\n\n".format(err.message))
@classmethod @classmethod
def _parse_metadata_xml(cls, xml, entity_id): def _parse_metadata_xml(cls, xml, entity_id):
......
...@@ -8,6 +8,7 @@ from django.conf import settings ...@@ -8,6 +8,7 @@ from django.conf import settings
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext as _
import json import json
import logging import logging
from social.backends.base import BaseAuth from social.backends.base import BaseAuth
...@@ -53,7 +54,7 @@ class AuthNotConfigured(SocialAuthBaseException): ...@@ -53,7 +54,7 @@ class AuthNotConfigured(SocialAuthBaseException):
self.provider_name = provider_name self.provider_name = provider_name
def __str__(self): def __str__(self):
return 'Authentication with {} is currently unavailable.'.format( return _('Authentication with {} is currently unavailable.').format(
self.provider_name self.provider_name
) )
...@@ -313,10 +314,20 @@ class SAMLConfiguration(ConfigurationModel): ...@@ -313,10 +314,20 @@ class SAMLConfiguration(ConfigurationModel):
self.org_info_str = clean_json(self.org_info_str, dict) self.org_info_str = clean_json(self.org_info_str, dict)
self.other_config_str = clean_json(self.other_config_str, dict) self.other_config_str = clean_json(self.other_config_str, dict)
self.private_key = self.private_key.replace("-----BEGIN PRIVATE KEY-----", "").strip() self.private_key = (
self.private_key = self.private_key.replace("-----END PRIVATE KEY-----", "").strip() self.private_key
self.public_key = self.public_key.replace("-----BEGIN CERTIFICATE-----", "").strip() .replace("-----BEGIN RSA PRIVATE KEY-----", "")
self.public_key = self.public_key.replace("-----END CERTIFICATE-----", "").strip() .replace("-----BEGIN PRIVATE KEY-----", "")
.replace("-----END RSA PRIVATE KEY-----", "")
.replace("-----END PRIVATE KEY-----", "")
.strip()
)
self.public_key = (
self.public_key
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.strip()
)
def get_setting(self, name): def get_setting(self, name):
""" Get the value of a setting, or raise KeyError """ """ Get the value of a setting, or raise KeyError """
......
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQDM+Nf7IeRdIIgYUke6sR3n7osHVYXwH6pb+Ovq8j3hUoy8kzT9
kJF0RB3h3Q2VJ3ZWiQtT94fZX2YYorVdoGVK2NWzjLwgpHUsgfeJq5pCjP0d2OQu
9Qvjg6YOtYP6PN3j7eK7pUcxQvIcaY9APDF57ua/zPsm3UzbjhRlJZQUewIDAQAB
AoGADWBsD/qdQaqe1x9/iOKINhuuPRNKw2n9nzT2iIW4nhzaDHB689VceL79SEE5
4rMJmQomkBtGZVxBeHgd5/dQxNy3bC9lPN1uoMuzjQs7UMk+lvy0MoHfiJcuIxPX
RdyZTV9LKN8vq+ZpVykVu6pBdDlne4psPZeQ76ynxke/24ECQQD3NX7JeluZ64la
tC6b3VHzA4Hd1qTXDWtEekh2WaR2xuKzcLyOWhqPIWprylBqVc1m+FA/LRRWQ9y6
vJMiXMk7AkEA1ELWj9DtZzk9BV1JxsDUUP0/IMAiYliVac3YrvQfys8APCY1xr9q
BAGurH4VWXuEnbx1yNXK89HqFI7kDrMtwQJAVTXtVAmHFZEosUk2X6d0He3xj8Py
4eXQObRk0daoaAC6F9weQnsweHGuOyVrfpvAx2OEVaJ2Rh3yMbPai5esDQJAS9Yh
gLqdx26M3bjJ3igQ82q3vkTHRCnwICA6la+FGFnC9LqWJg9HmmzbcqeNiy31YMHv
tzSjUV+jaXrwAkyEQQJAK/SCIVsWRhFe/ssr8hS//V+hZC4kvCv4b3NqzZK1x+Xm
7GaGMV0xEWN7shqVSRBU4O2vn/RWD6/6x3sHkU57qg==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICsDCCAhmgAwIBAgIJAJrENr8EPgpcMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTUwNjEzMDEwNTE0WhcNMjUwNjEyMDEwNTE0WjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDM+Nf7IeRdIIgYUke6sR3n7osHVYXwH6pb+Ovq8j3hUoy8kzT9kJF0RB3h3Q2V
J3ZWiQtT94fZX2YYorVdoGVK2NWzjLwgpHUsgfeJq5pCjP0d2OQu9Qvjg6YOtYP6
PN3j7eK7pUcxQvIcaY9APDF57ua/zPsm3UzbjhRlJZQUewIDAQABo4GnMIGkMB0G
A1UdDgQWBBTjOyPvAuej5q4C80jlFrQmOlszmzB1BgNVHSMEbjBsgBTjOyPvAuej
5q4C80jlFrQmOlszm6FJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrENr8E
PgpcMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAV5w0SxjUTFWfL3ZG
6sgA0gKf8aV8w3AlihLt9tKCRgrK4sBK9xmfwp/fnbdxkHU58iozI894HqmrRzCi
aRLWmy3W8640E/XCa6P+i8ET7RksgNJ5cD9WtISHkGc2dnW76+2nv8d24JKeIx2w
oJAtspMywzr0SoxDIJr42N6Kvjk=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAMoR8CP+HlvsPRwi
VCCuWxZOdNjYa4Qre3JEWPkqlUwpci1XGTBqH7DK9b2hmBXMjYoDKOnF5pL7Y453
3JSJ2+AG7D4AJGSotA3boKF18EDgeMzAWjAhDVhTprGz+/1G+W0R4SSyY5QGyBhL
Z36xF2w5HyeiqN/Iiq3QKGl2CFORAgMBAAECgYEAwH2CAudqSCqstAZHmbI99uva
B09ybD93owxUrVbRTfIVX/eeeS4+7g0JNxGebPWkxxnneXoaAV4UIn0v1RfWKMs3
QGiBsOSup1DWWwkBfvQ1hNlJfVCqgZH1QVbhPpw9M9gxhLZQaSZoI/qY/8n/54L0
zU4S6VYBH6hnkgZZmiECQQDpYUS8HgnkMUX/qcDNBJT23qHewHsZOe6uqC+7+YxQ
xKT8iCxybDbZU7hmZ1Av8Ns4iF7EvZ0faFM8Ls76wFX1AkEA3afLUMLHfTx40XwO
oU7GWrYFyLNCc3/7JeWi6ZKzwzQqiGvFderRf/QGQsCtpLQ8VoLz/knF9TkQdSh6
yuIprQJATfcmxUmruEYVwnFtbZBoS4jYvtfCyAyohkS9naiijaEEFTFQ1/D66eOk
KOG+0iU+t0YnksZdpU5u8B4bG34BuQJAXv6FhTQk+MhM40KupnUzTzcJXY1t4kAs
K36yBjZoMjWOMO83LiUX6iVz9XHMOXVBEraGySlm3IS7R+q0TXUF9QJAQ69wautf
8q1OQiLcg5WTFmSFBEXqAvVwX6FcDSxor9UnI0iHwyKBss3a2IXY9LoTPTjR5SHh
GDq2lXmP+kmbnQ==
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICWDCCAcGgAwIBAgIJAMlM2wrOvplkMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTUwNjEzMDEyMTAwWhcNMjUwNjEyMDEyMTAwWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDKEfAj/h5b7D0cIlQgrlsWTnTY2GuEK3tyRFj5KpVMKXItVxkwah+wyvW9oZgV
zI2KAyjpxeaS+2OOd9yUidvgBuw+ACRkqLQN26ChdfBA4HjMwFowIQ1YU6axs/v9
RvltEeEksmOUBsgYS2d+sRdsOR8noqjfyIqt0ChpdghTkQIDAQABo1AwTjAdBgNV
HQ4EFgQUU0TNPc1yGas/W4HJl/Hgtrmdu6MwHwYDVR0jBBgwFoAUU0TNPc1yGas/
W4HJl/Hgtrmdu6MwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOBgQCE4BqJ
v2s99DS16NbZtR7tpqXDxiDaCg59VtgcHQwxN4qXcixZi5N4yRvzjYschAQN5tQ6
bofXdIK3tJY9Ynm0KPO+5l0RCv7CkhNgftTww0bWC91xaHJ/y66AqONuLpaP6s43
SZYG2D6ric57ZY4kQ6ZlUv854TPzmvapnGG7Hw==
-----END CERTIFICATE-----
"""
Test the views served by third_party_auth.
"""
# pylint: disable=no-member
import ddt
from lxml import etree
import unittest
from .testutil import AUTH_FEATURE_ENABLED, SAMLTestCase
# Define some XML namespaces:
SAML_XML_NS = 'urn:oasis:names:tc:SAML:2.0:metadata'
XMLDSIG_XML_NS = 'http://www.w3.org/2000/09/xmldsig#'
@unittest.skipUnless(AUTH_FEATURE_ENABLED, 'third_party_auth not enabled')
@ddt.ddt
class SAMLMetadataTest(SAMLTestCase):
"""
Test the SAML metadata view
"""
METADATA_URL = '/auth/saml/metadata.xml'
def test_saml_disabled(self):
""" When SAML is not enabled, the metadata view should return 404 """
self.enable_saml(enabled=False)
response = self.client.get(self.METADATA_URL)
self.assertEqual(response.status_code, 404)
@ddt.data('saml_key', 'saml_key_alt') # Test two slightly different key pair export formats
def test_metadata(self, key_name):
self.enable_saml(
private_key=self._get_private_key(key_name),
public_key=self._get_public_key(key_name),
entity_id="https://saml.example.none",
)
doc = self._fetch_metadata()
# Check the ACS URL:
acs_node = doc.find(".//{}".format(etree.QName(SAML_XML_NS, 'AssertionConsumerService')))
self.assertIsNotNone(acs_node)
self.assertEqual(acs_node.attrib['Location'], 'http://example.none/auth/complete/tpa-saml/')
def test_signed_metadata(self):
self.enable_saml(
private_key=self._get_private_key(),
public_key=self._get_public_key(),
entity_id="https://saml.example.none",
other_config_str='{"SECURITY_CONFIG": {"signMetadata": true} }',
)
doc = self._fetch_metadata()
sig_node = doc.find(".//{}".format(etree.QName(XMLDSIG_XML_NS, 'SignatureValue')))
self.assertIsNotNone(sig_node)
def _fetch_metadata(self):
""" Fetch and parse the metadata XML at self.METADATA_URL """
response = self.client.get(self.METADATA_URL)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-Type'], 'text/xml')
# The result should be valid XML:
try:
metadata_doc = etree.fromstring(response.content)
except etree.LxmlError:
self.fail('SAML metadata must be valid XML')
self.assertEqual(metadata_doc.tag, etree.QName(SAML_XML_NS, 'EntityDescriptor'))
return metadata_doc
...@@ -8,6 +8,7 @@ from contextlib import contextmanager ...@@ -8,6 +8,7 @@ from contextlib import contextmanager
from django.conf import settings from django.conf import settings
import django.test import django.test
import mock import mock
import os.path
from third_party_auth.models import OAuth2ProviderConfig, SAMLProviderConfig, SAMLConfiguration, cache as config_cache from third_party_auth.models import OAuth2ProviderConfig, SAMLProviderConfig, SAMLConfiguration, cache as config_cache
...@@ -87,6 +88,33 @@ class TestCase(ThirdPartyAuthTestMixin, django.test.TestCase): ...@@ -87,6 +88,33 @@ class TestCase(ThirdPartyAuthTestMixin, django.test.TestCase):
pass pass
class SAMLTestCase(TestCase):
"""
Base class for SAML-related third_party_auth tests
"""
def setUp(self):
super(SAMLTestCase, self).setUp()
self.client.defaults['SERVER_NAME'] = 'example.none' # The SAML lib we use doesn't like testserver' as a domain
self.url_prefix = 'http://example.none'
@classmethod
def _get_public_key(cls, key_name='saml_key'):
""" Get a public key for use in the test. """
return cls._read_data_file('{}.pub'.format(key_name))
@classmethod
def _get_private_key(cls, key_name='saml_key'):
""" Get a private key for use in the test. """
return cls._read_data_file('{}.key'.format(key_name))
@staticmethod
def _read_data_file(filename):
""" Read the contents of a file in the data folder """
with open(os.path.join(os.path.dirname(__file__), 'data', filename)) as f:
return f.read()
@contextmanager @contextmanager
def simulate_running_pipeline(pipeline_target, backend, email=None, fullname=None, username=None): def simulate_running_pipeline(pipeline_target, backend, email=None, fullname=None, username=None):
"""Simulate that a pipeline is currently running. """Simulate that a pipeline is currently running.
......
...@@ -198,7 +198,7 @@ def _third_party_auth_context(request, redirect_to): ...@@ -198,7 +198,7 @@ def _third_party_auth_context(request, redirect_to):
for msg in messages.get_messages(request): for msg in messages.get_messages(request):
if msg.extra_tags.split()[0] == "social-auth": if msg.extra_tags.split()[0] == "social-auth":
# msg may or may not be translated. Try translating [again] in case we are able to: # msg may or may not be translated. Try translating [again] in case we are able to:
context['errorMessage'] = _(msg) # pylint: disable=translation-of-non-string context['errorMessage'] = _(unicode(msg)) # pylint: disable=translation-of-non-string
break break
return context return context
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment