Commit 405c6b31 by Jesse Shapiro Committed by GitHub

Merge pull request #14793 from open-craft/haikuginger/sap-sf-sso-provider

[ENT-285] SAP SuccessFactors SAML provider user metadata
parents eda5f45d da3867e8
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('third_party_auth', '0006_samlproviderconfig_automatic_refresh_enabled'),
]
operations = [
migrations.AddField(
model_name='samlproviderconfig',
name='identity_provider_type',
field=models.CharField(default=b'standard_saml_provider', help_text=b'Some SAML providers require special behavior. For example, SAP SuccessFactors SAML providers require an additional API call to retrieve user metadata not provided in the SAML response. Select the provider type which best matches your use case. If in doubt, choose the Standard SAML Provider type.', max_length=128, verbose_name=b'Identity Provider Type', choices=[(b'standard_saml_provider', b'Standard SAML provider'), (b'sap_success_factors', b'SAP SuccessFactors provider')]),
),
migrations.AlterField(
model_name='samlproviderconfig',
name='other_settings',
field=models.TextField(help_text=b'For advanced use cases, enter a JSON object with addtional configuration. The tpa-saml backend supports only {"requiredEntitlements": ["urn:..."]} which can be used to require the presence of a specific eduPersonEntitlement. Custom provider types, as selected in the "Identity Provider Type" field, may make use of the information stored in this field for configuration.', verbose_name=b'Advanced settings', blank=True),
),
]
......@@ -20,6 +20,7 @@ from social.backends.base import BaseAuth
from social.backends.oauth import OAuthAuth
from social.backends.saml import SAMLAuth, SAMLIdentityProvider
from .lti import LTIAuthBackend, LTI_PARAMS_KEY
from .saml import STANDARD_SAML_PROVIDER_KEY, get_saml_idp_choices, get_saml_idp_class
from social.exceptions import SocialAuthBaseException
from social.utils import module_member
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
......@@ -350,6 +351,14 @@ class SAMLProviderConfig(ProviderConfig):
automatic_refresh_enabled = models.BooleanField(
default=True, verbose_name="Enable automatic metadata refresh",
help_text="When checked, the SAML provider's metadata will be included in the automatic refresh job, if configured.")
identity_provider_type = models.CharField(
max_length=128, blank=False, verbose_name="Identity Provider Type", default=STANDARD_SAML_PROVIDER_KEY,
choices=get_saml_idp_choices(), help_text=(
"Some SAML providers require special behavior. For example, SAP SuccessFactors SAML providers require an "
"additional API call to retrieve user metadata not provided in the SAML response. Select the provider type "
"which best matches your use case. If in doubt, choose the Standard SAML Provider type."
)
)
debug_mode = models.BooleanField(
default=False, verbose_name="Debug Mode",
help_text=(
......@@ -362,7 +371,9 @@ class SAMLProviderConfig(ProviderConfig):
help_text=(
'For advanced use cases, enter a JSON object with addtional configuration. '
'The tpa-saml backend supports only {"requiredEntitlements": ["urn:..."]} '
'which can be used to require the presence of a specific eduPersonEntitlement.'
'which can be used to require the presence of a specific eduPersonEntitlement. '
'Custom provider types, as selected in the "Identity Provider Type" field, may make '
'use of the information stored in this field for configuration.'
))
def clean(self):
......@@ -423,7 +434,8 @@ class SAMLProviderConfig(ProviderConfig):
raise AuthNotConfigured(provider_name=self.name)
conf['x509cert'] = data.public_key
conf['url'] = data.sso_url
return SAMLIdentityProvider(self.idp_slug, **conf)
idp_class = get_saml_idp_class(self.identity_provider_type)
return idp_class(self.idp_slug, **conf)
class SAMLConfiguration(ConfigurationModel):
......
......@@ -6,9 +6,12 @@ from django.contrib.sites.models import Site
from django.http import Http404
from django.utils.functional import cached_property
from openedx.core.djangoapps.theming.helpers import get_current_request
from social.backends.saml import SAMLAuth, OID_EDU_PERSON_ENTITLEMENT
import requests
from social.backends.saml import SAMLAuth, SAMLIdentityProvider, OID_EDU_PERSON_ENTITLEMENT
from social.exceptions import AuthForbidden, AuthMissingParameter
STANDARD_SAML_PROVIDER_KEY = 'standard_saml_provider'
SAP_SUCCESSFACTORS_SAML_KEY = 'sap_success_factors'
log = logging.getLogger(__name__)
......@@ -93,3 +96,158 @@ class SAMLAuthBackend(SAMLAuth): # pylint: disable=abstract-method
def _config(self):
from .models import SAMLConfiguration
return SAMLConfiguration.current(Site.objects.get_current(get_current_request()))
class SapSuccessFactorsIdentityProvider(SAMLIdentityProvider):
"""
Customized version of SAMLIdentityProvider that knows how to retrieve user details
from the SAPSuccessFactors OData API, rather than parse them directly off the
SAML assertion that we get in response to a login attempt.
"""
required_variables = (
'sapsf_oauth_root_url',
'sapsf_private_key',
'odata_api_root_url',
'odata_company_id',
'odata_client_id',
)
@property
def sapsf_idp_url(self):
return self.conf['sapsf_oauth_root_url'] + 'idp'
@property
def sapsf_token_url(self):
return self.conf['sapsf_oauth_root_url'] + 'token'
@property
def sapsf_private_key(self):
return self.conf['sapsf_private_key']
@property
def odata_api_root_url(self):
return self.conf['odata_api_root_url']
@property
def odata_company_id(self):
return self.conf['odata_company_id']
@property
def odata_client_id(self):
return self.conf['odata_client_id']
def missing_variables(self):
"""
Check that we have all the details we need to properly retrieve rich data from the
SAP SuccessFactors OData API. If we don't, then we should log a warning indicating
the specific variables that are missing.
"""
if not all(var in self.conf for var in self.required_variables):
missing = [var for var in self.required_variables if var not in self.conf]
log.warning(
"To retrieve rich user data for an SAP SuccessFactors identity provider, the following keys in "
"'other_settings' are required, but were missing: %s",
missing
)
return missing
def get_odata_api_client(self, user_id):
"""
Get a Requests session with the headers needed to properly authenticate it with
the SAP SuccessFactors OData API.
"""
session = requests.Session()
assertion = session.post(
self.sapsf_idp_url,
data={
'client_id': self.odata_client_id,
'user_id': user_id,
'token_url': self.sapsf_token_url,
'private_key': self.sapsf_private_key,
},
timeout=10,
)
assertion.raise_for_status()
assertion = assertion.text
token = session.post(
self.sapsf_token_url,
data={
'client_id': self.odata_client_id,
'company_id': self.odata_company_id,
'grant_type': 'urn:ietf:params:oauth:grant-type:saml2-bearer',
'assertion': assertion,
},
timeout=10,
)
token.raise_for_status()
token = token.json()['access_token']
session.headers.update({'Authorization': 'Bearer {}'.format(token), 'Accept': 'application/json'})
return session
def get_user_details(self, attributes):
"""
Attempt to get rich user details from the SAP SuccessFactors OData API. If we're missing any
of the details we need to do that, fail nicely by returning the details we're able to extract
from just the SAML response and log a warning.
"""
details = super(SapSuccessFactorsIdentityProvider, self).get_user_details(attributes)
if self.missing_variables():
# If there aren't enough details to make the request, log a warning and return the details
# from the SAML assertion.
return details
username = details['username']
try:
client = self.get_odata_api_client(user_id=username)
response = client.get(
'{root_url}User(userId=\'{user_id}\')?$select=username,firstName,lastName,defaultFullName,email'.format(
root_url=self.odata_api_root_url,
user_id=username
),
timeout=10,
)
response.raise_for_status()
response = response.json()
except requests.RequestException:
# If there was an HTTP level error, log the error and return the details from the SAML assertion.
log.warning(
'Unable to retrieve user details with username %s from SAPSuccessFactors with company ID %s.',
username,
self.odata_company_id,
)
return details
return {
'username': response['d']['username'],
'first_name': response['d']['firstName'],
'last_name': response['d']['lastName'],
'fullname': response['d']['defaultFullName'],
'email': response['d']['email'],
}
def get_saml_idp_choices():
"""
Get a list of the available SAMLIdentityProvider subclasses that can be used to process
SAML requests, for use in the Django administration form.
"""
return (
(STANDARD_SAML_PROVIDER_KEY, 'Standard SAML provider'),
(SAP_SUCCESSFACTORS_SAML_KEY, 'SAP SuccessFactors provider'),
)
def get_saml_idp_class(idp_identifier_string):
"""
Given a string ID indicating the type of identity provider in use during a given request, return
the SAMLIdentityProvider subclass able to handle requests for that type of identity provider.
"""
choices = {
STANDARD_SAML_PROVIDER_KEY: SAMLIdentityProvider,
SAP_SUCCESSFACTORS_SAML_KEY: SapSuccessFactorsIdentityProvider,
}
if idp_identifier_string not in choices:
log.error(
'%s is not a valid SAMLIdentityProvider subclass; using SAMLIdentityProvider base class.',
idp_identifier_string
)
return choices.get(idp_identifier_string, SAMLIdentityProvider)
......@@ -4,8 +4,10 @@ Third_party_auth integration tests using a mock version of the TestShib provider
import ddt
import unittest
import httpretty
import json
from mock import patch
from social.apps.django_app.default.models import UserSocialAuth
from unittest import skip
from third_party_auth.saml import log as saml_log
from third_party_auth.tasks import fetch_saml_metadata
......@@ -20,11 +22,10 @@ TESTSHIB_METADATA_URL_WITH_CACHE_DURATION = 'https://mock.testshib.org/metadata/
TESTSHIB_SSO_URL = 'https://idp.testshib.org/idp/profile/SAML2/Redirect/SSO'
@ddt.ddt
@unittest.skipUnless(testutil.AUTH_FEATURE_ENABLED, 'third_party_auth not enabled')
class TestShibIntegrationTest(IntegrationTestMixin, testutil.SAMLTestCase):
class SamlIntegrationTestUtilities(object):
"""
TestShib provider Integration Test, to test SAML functionality
Class contains methods particular to SAML integration testing so that they
can be separated out from the actual test methods.
"""
PROVIDER_ID = "saml-testshib"
PROVIDER_NAME = "TestShib"
......@@ -36,7 +37,7 @@ class TestShibIntegrationTest(IntegrationTestMixin, testutil.SAMLTestCase):
USER_USERNAME = "myself"
def setUp(self):
super(TestShibIntegrationTest, self).setUp()
super(SamlIntegrationTestUtilities, self).setUp()
self.enable_saml(
private_key=self._get_private_key(),
public_key=self._get_public_key(),
......@@ -71,6 +72,55 @@ class TestShibIntegrationTest(IntegrationTestMixin, testutil.SAMLTestCase):
self.addCleanup(uid_patch.stop)
self._freeze_time(timestamp=1434326820) # This is the time when the saved request/response was recorded.
def _freeze_time(self, timestamp):
""" Mock the current time for SAML, so we can replay canned requests/responses """
now_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.now', return_value=timestamp)
now_patch.start()
self.addCleanup(now_patch.stop)
def _configure_testshib_provider(self, **kwargs):
""" Enable and configure the TestShib SAML IdP as a third_party_auth provider """
fetch_metadata = kwargs.pop('fetch_metadata', True)
assert_metadata_updates = kwargs.pop('assert_metadata_updates', True)
kwargs.setdefault('name', self.PROVIDER_NAME)
kwargs.setdefault('enabled', True)
kwargs.setdefault('visible', True)
kwargs.setdefault('idp_slug', self.PROVIDER_IDP_SLUG)
kwargs.setdefault('entity_id', TESTSHIB_ENTITY_ID)
kwargs.setdefault('metadata_source', TESTSHIB_METADATA_URL)
kwargs.setdefault('icon_class', 'fa-university')
kwargs.setdefault('attr_email', 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6') # eduPersonPrincipalName
self.configure_saml_provider(**kwargs)
if fetch_metadata:
self.assertTrue(httpretty.is_enabled())
num_total, num_skipped, num_attempted, num_updated, num_failed, failure_messages = fetch_saml_metadata()
if assert_metadata_updates:
self.assertEqual(num_total, 1)
self.assertEqual(num_skipped, 0)
self.assertEqual(num_attempted, 1)
self.assertEqual(num_updated, 1)
self.assertEqual(num_failed, 0)
self.assertEqual(len(failure_messages), 0)
def do_provider_login(self, provider_redirect_url):
""" Mocked: the user logs in to TestShib and then gets redirected back """
# The SAML provider (TestShib) will authenticate the user, then get the browser to POST a response:
self.assertTrue(provider_redirect_url.startswith(TESTSHIB_SSO_URL))
return self.client.post(
self.complete_url,
content_type='application/x-www-form-urlencoded',
data=self.read_data_file('testshib_response.txt'),
)
@ddt.ddt
@unittest.skipUnless(testutil.AUTH_FEATURE_ENABLED, 'third_party_auth not enabled')
class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin, testutil.SAMLTestCase):
"""
TestShib provider Integration Test, to test SAML functionality
"""
def test_login_before_metadata_fetched(self):
self._configure_testshib_provider(fetch_metadata=False)
# The user goes to the login page, and sees a button to login with TestShib:
......@@ -157,43 +207,155 @@ class TestShibIntegrationTest(IntegrationTestMixin, testutil.SAMLTestCase):
self.assertEqual(num_failed, 0)
self.assertEqual(len(failure_messages), 0)
def _freeze_time(self, timestamp):
""" Mock the current time for SAML, so we can replay canned requests/responses """
now_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.now', return_value=timestamp)
now_patch.start()
self.addCleanup(now_patch.stop)
def _configure_testshib_provider(self, **kwargs):
""" Enable and configure the TestShib SAML IdP as a third_party_auth provider """
fetch_metadata = kwargs.pop('fetch_metadata', True)
assert_metadata_updates = kwargs.pop('assert_metadata_updates', True)
kwargs.setdefault('name', self.PROVIDER_NAME)
kwargs.setdefault('enabled', True)
kwargs.setdefault('visible', True)
kwargs.setdefault('idp_slug', self.PROVIDER_IDP_SLUG)
kwargs.setdefault('entity_id', TESTSHIB_ENTITY_ID)
kwargs.setdefault('metadata_source', TESTSHIB_METADATA_URL)
kwargs.setdefault('icon_class', 'fa-university')
kwargs.setdefault('attr_email', 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6') # eduPersonPrincipalName
self.configure_saml_provider(**kwargs)
@unittest.skipUnless(testutil.AUTH_FEATURE_ENABLED, 'third_party_auth not enabled')
class SuccessFactorsIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin, testutil.SAMLTestCase):
"""
Test basic SAML capability using the TestShib details, and then check that we're able
to make the proper calls using the SAP SuccessFactors API.
"""
if fetch_metadata:
self.assertTrue(httpretty.is_enabled())
num_total, num_skipped, num_attempted, num_updated, num_failed, failure_messages = fetch_saml_metadata()
if assert_metadata_updates:
self.assertEqual(num_total, 1)
self.assertEqual(num_skipped, 0)
self.assertEqual(num_attempted, 1)
self.assertEqual(num_updated, 1)
self.assertEqual(num_failed, 0)
self.assertEqual(len(failure_messages), 0)
# Note that these details are different than those that will be provided by the SAML
# assertion metadata. Rather, they will be fetched from the mocked SAPSuccessFactors API.
USER_EMAIL = "john@smith.com"
USER_NAME = "John Smith"
USER_USERNAME = "jsmith"
def do_provider_login(self, provider_redirect_url):
""" Mocked: the user logs in to TestShib and then gets redirected back """
# The SAML provider (TestShib) will authenticate the user, then get the browser to POST a response:
self.assertTrue(provider_redirect_url.startswith(TESTSHIB_SSO_URL))
return self.client.post(
self.complete_url,
content_type='application/x-www-form-urlencoded',
data=self.read_data_file('testshib_response.txt'),
def setUp(self):
"""
Mock out HTTP calls to various endpoints using httpretty.
"""
super(SuccessFactorsIntegrationTest, self).setUp()
# Mock the call to the SAP SuccessFactors assertion endpoint
SAPSF_ASSERTION_URL = 'http://successfactors.com/oauth/idp'
def assertion_callback(_request, _uri, headers):
"""
Return a fake assertion after checking that the input is what we expect.
"""
self.assertIn('private_key=fake_private_key_here', _request.body)
self.assertIn('user_id=myself', _request.body)
self.assertIn('token_url=http%3A%2F%2Fsuccessfactors.com%2Foauth%2Ftoken', _request.body)
self.assertIn('client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB', _request.body)
return (200, headers, 'fake_saml_assertion')
httpretty.register_uri(httpretty.POST, SAPSF_ASSERTION_URL, content_type='text/plain', body=assertion_callback)
SAPSF_BAD_ASSERTION_URL = 'http://successfactors.com/oauth-fake/idp'
def bad_callback(_request, _uri, headers):
"""
Return a 404 error when someone tries to call the URL.
"""
return (404, headers, 'NOT AN ASSERTION')
httpretty.register_uri(httpretty.POST, SAPSF_BAD_ASSERTION_URL, content_type='text/plain', body=bad_callback)
# Mock the call to the SAP SuccessFactors token endpoint
SAPSF_TOKEN_URL = 'http://successfactors.com/oauth/token'
def token_callback(_request, _uri, headers):
"""
Return a fake assertion after checking that the input is what we expect.
"""
self.assertIn('assertion=fake_saml_assertion', _request.body)
self.assertIn('company_id=NCC1701D', _request.body)
self.assertIn('grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Asaml2-bearer', _request.body)
self.assertIn('client_id=TatVotSEiCMteSNWtSOnLanCtBGwNhGB', _request.body)
return (200, headers, '{"access_token": "faketoken"}')
httpretty.register_uri(httpretty.POST, SAPSF_TOKEN_URL, content_type='application/json', body=token_callback)
# Mock the call to the SAP SuccessFactors OData user endpoint
ODATA_USER_URL = (
'http://api.successfactors.com/odata/v2/User(userId=\'myself\')'
'?$select=username,firstName,lastName,defaultFullName,email'
)
def user_callback(request, _uri, headers):
auth_header = request.headers.get('Authorization')
self.assertEqual(auth_header, 'Bearer faketoken')
return (
200,
headers,
json.dumps({
'd': {
'username': 'jsmith',
'firstName': 'John',
'lastName': 'Smith',
'defaultFullName': 'John Smith',
'email': 'john@smith.com',
}
})
)
httpretty.register_uri(httpretty.GET, ODATA_USER_URL, content_type='application/json', body=user_callback)
def test_register_insufficient_sapsf_metadata(self):
"""
Configure the provider such that it doesn't have enough details to contact the SAP
SuccessFactors API, and test that it falls back to the data it receives from the SAML assertion.
"""
self._configure_testshib_provider(
identity_provider_type='sap_success_factors',
metadata_source=TESTSHIB_METADATA_URL,
other_settings='{"key_i_dont_need":"value_i_also_dont_need"}',
)
# Because we're getting details from the assertion, fall back to the initial set of details.
self.USER_EMAIL = "myself@testshib.org"
self.USER_NAME = "Me Myself And I"
self.USER_USERNAME = "myself"
super(SuccessFactorsIntegrationTest, self).test_register()
def test_register_sapsf_metadata_present(self):
"""
Configure the provider such that it can talk to a mocked-out version of the SAP SuccessFactors
API, and ensure that the data it gets that way gets passed to the registration form.
"""
self._configure_testshib_provider(
identity_provider_type='sap_success_factors',
metadata_source=TESTSHIB_METADATA_URL,
other_settings=json.dumps({
'sapsf_oauth_root_url': 'http://successfactors.com/oauth/',
'sapsf_private_key': 'fake_private_key_here',
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
'odata_company_id': 'NCC1701D',
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
})
)
super(SuccessFactorsIntegrationTest, self).test_register()
def test_register_http_failure(self):
"""
Ensure that if there's an HTTP failure while fetching metadata, we continue, using the
metadata from the SAML assertion.
"""
self._configure_testshib_provider(
identity_provider_type='sap_success_factors',
metadata_source=TESTSHIB_METADATA_URL,
other_settings=json.dumps({
'sapsf_oauth_root_url': 'http://successfactors.com/oauth-fake/',
'sapsf_private_key': 'fake_private_key_here',
'odata_api_root_url': 'http://api.successfactors.com/odata/v2/',
'odata_company_id': 'NCC1701D',
'odata_client_id': 'TatVotSEiCMteSNWtSOnLanCtBGwNhGB',
})
)
# Because we're getting details from the assertion, fall back to the initial set of details.
self.USER_EMAIL = "myself@testshib.org"
self.USER_NAME = "Me Myself And I"
self.USER_USERNAME = "myself"
super(SuccessFactorsIntegrationTest, self).test_register()
@skip('Test not necessary for this subclass')
def test_get_saml_idp_class_with_fake_identifier(self):
pass
@skip('Test not necessary for this subclass')
def test_login(self):
pass
@skip('Test not necessary for this subclass')
def test_register(self):
pass
......@@ -25,6 +25,8 @@ from third_party_auth.models import (
ProviderApiPermissions,
)
from third_party_auth.saml import get_saml_idp_class, SAMLIdentityProvider
AUTH_FEATURES_KEY = 'ENABLE_THIRD_PARTY_AUTH'
AUTH_FEATURE_ENABLED = AUTH_FEATURES_KEY in settings.FEATURES
......@@ -213,6 +215,16 @@ class SAMLTestCase(TestCase):
kwargs.setdefault('entity_id', "https://saml.example.none")
super(SAMLTestCase, self).enable_saml(**kwargs)
@mock.patch('third_party_auth.saml.log')
def test_get_saml_idp_class_with_fake_identifier(self, log_mock):
error_mock = log_mock.error
idp_class = get_saml_idp_class('fake_idp_class_option')
error_mock.assert_called_once_with(
'%s is not a valid SAMLIdentityProvider subclass; using SAMLIdentityProvider base class.',
'fake_idp_class_option'
)
self.assertIs(idp_class, SAMLIdentityProvider)
@contextmanager
def simulate_running_pipeline(pipeline_target, backend, email=None, fullname=None, username=None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment