Commit 2744df74 by Elliot Murphy

Merged team extension support from chipaca

parents d0b3d856 40a58028
...@@ -3,9 +3,13 @@ ...@@ -3,9 +3,13 @@
__metaclass__ = type __metaclass__ = type
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User, Group
from openid.consumer.consumer import SUCCESS from openid.consumer.consumer import SUCCESS
from openid.extensions import sreg from openid.extensions import sreg
try:
from openid.extensions import teams
except ImportError:
teams = None
from django_openid_auth.models import UserOpenID from django_openid_auth.models import UserOpenID
...@@ -55,6 +59,14 @@ class OpenIDBackend: ...@@ -55,6 +59,14 @@ class OpenIDBackend:
openid_response) openid_response)
if sreg_response: if sreg_response:
self.update_user_details_from_sreg(user, sreg_response) self.update_user_details_from_sreg(user, sreg_response)
if teams is not None:
if getattr(settings, 'OPENID_UPDATE_GROUPS_FROM_LAUNCHPAD_TEAMS', False):
teams_response = teams.TeamsResponse.fromSuccessResponse(
openid_response)
if teams_response:
self.update_groups_from_teams(user, teams_response)
return user return user
def create_user_from_openid(self, openid_response): def create_user_from_openid(self, openid_response):
...@@ -121,3 +133,19 @@ class OpenIDBackend: ...@@ -121,3 +133,19 @@ class OpenIDBackend:
if email: if email:
user.email = email user.email = email
user.save() user.save()
def update_groups_from_teams(self, user, teams_response):
teams_mapping = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
resp_groups = set(Group.objects.get(name=teams_mapping[i])
for i in teams_response.is_member)
user_groups = set(
i for i in user.groups.filter(name__in=teams_mapping.values()))
# the groups the user is in that aren't reported by openid
# should be removed
for group in user_groups - resp_groups:
user.groups.remove(group)
# and viceversa
for group in resp_groups - user_groups:
user.groups.add(group)
user.save()
...@@ -4,7 +4,7 @@ import time ...@@ -4,7 +4,7 @@ import time
import unittest import unittest
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User, Group
from django.test import TestCase from django.test import TestCase
from openid.extensions.sreg import SRegRequest, SRegResponse from openid.extensions.sreg import SRegRequest, SRegResponse
from openid.fetchers import ( from openid.fetchers import (
...@@ -13,6 +13,11 @@ from openid.oidutil import importElementTree ...@@ -13,6 +13,11 @@ from openid.oidutil import importElementTree
from openid.server.server import BROWSER_REQUEST_MODES, Server from openid.server.server import BROWSER_REQUEST_MODES, Server
from openid.store.memstore import MemoryStore from openid.store.memstore import MemoryStore
try:
from openid.extensions import teams
except ImportError:
teams = None
from django_openid_auth.models import UserOpenID from django_openid_auth.models import UserOpenID
...@@ -99,14 +104,21 @@ class RelyingPartyTests(TestCase): ...@@ -99,14 +104,21 @@ class RelyingPartyTests(TestCase):
self.old_create_users = getattr(settings, 'OPENID_CREATE_USERS', False) self.old_create_users = getattr(settings, 'OPENID_CREATE_USERS', False)
self.old_update_details = getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False) self.old_update_details = getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False)
self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL') self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL')
self.old_update_groups = getattr(settings, 'OPENID_UPDATE_GROUPS_FROM_LAUNCHPAD_TEAMS', False)
self.old_teams_map = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
settings.OPENID_CREATE_USERS = False settings.OPENID_CREATE_USERS = False
settings.OPENID_UPDATE_DETAILS_FROM_SREG = False settings.OPENID_UPDATE_DETAILS_FROM_SREG = False
settings.OPENID_SSO_SERVER_URL = None settings.OPENID_SSO_SERVER_URL = None
settings.OPENID_UPDATE_GROUPS_FROM_LAUNCHPAD_TEAMS = False
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {}
def tearDown(self): def tearDown(self):
settings.OPENID_CREATE_USERS = self.old_create_users settings.OPENID_CREATE_USERS = self.old_create_users
settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details
settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url
settings.OPENID_UPDATE_GROUPS_FROM_LAUNCHPAD_TEAMS = self.old_update_groups
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = self.old_teams_map
setDefaultFetcher(None) setDefaultFetcher(None)
super(RelyingPartyTests, self).tearDown() super(RelyingPartyTests, self).tearDown()
...@@ -254,6 +266,52 @@ class RelyingPartyTests(TestCase): ...@@ -254,6 +266,52 @@ class RelyingPartyTests(TestCase):
self.assertEquals(user.last_name, 'User') self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com') self.assertEquals(user.email, 'foo@example.com')
def test_login_teams(self):
if teams is None:
raise AssertionError, "teams extension is missing!"
settings.OPENID_UPDATE_GROUPS_FROM_LAUNCHPAD_TEAMS = True
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
'otherteam': 'othergroup'}
user = User.objects.create_user('testuser', 'someone@example.com')
group = Group(name='groupname')
group.save()
ogroup = Group(name='othergroup')
ogroup.save()
user.groups.add(ogroup)
user.save()
useropenid = UserOpenID(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser'})
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request
openid_request = self.provider.parseFormPost(response.content)
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
teams_response = teams.TeamsResponse.extractResponse(teams_request,
'teamname')
openid_response.addExtension(teams_response)
response = self.complete(openid_response)
self.assertRedirects(response, 'http://testserver/getuser')
# And they are now logged in as testuser
response = self.client.get('/getuser')
self.assertEquals(response.content, 'testuser')
# The user's groups have been updated.
user = User.objects.get(username='testuser')
self.assertTrue(group in user.groups.all())
self.assertTrue(ogroup not in user.groups.all())
def suite(): def suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)
...@@ -15,6 +15,11 @@ from openid.consumer.consumer import ( ...@@ -15,6 +15,11 @@ from openid.consumer.consumer import (
from openid.consumer.discover import DiscoveryFailure from openid.consumer.discover import DiscoveryFailure
from openid.extensions import sreg from openid.extensions import sreg
try:
from openid.extensions import teams
except ImportError:
teams = None
from django_openid_auth.forms import OpenIDLoginForm from django_openid_auth.forms import OpenIDLoginForm
from django_openid_auth.store import DjangoOpenIDStore from django_openid_auth.store import DjangoOpenIDStore
...@@ -110,6 +115,12 @@ def login_begin(request, template_name='openid/login.html', ...@@ -110,6 +115,12 @@ def login_begin(request, template_name='openid/login.html',
openid_request.addExtension( openid_request.addExtension(
sreg.SRegRequest(optional=['email', 'fullname', 'nickname'])) sreg.SRegRequest(optional=['email', 'fullname', 'nickname']))
if teams is not None:
# Request team info
launchpad_teams = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING',
{})
openid_request.addExtension(teams.TeamsRequest(launchpad_teams.keys()))
# Construct the request completion URL, including the page we # Construct the request completion URL, including the page we
# should redirect to. # should redirect to.
return_to = request.build_absolute_uri(reverse(login_complete)) return_to = request.build_absolute_uri(reverse(login_complete))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment