Commit 8b210641 by bmedx

Update repo from upstream revision 128

- Previous pull was revision 115
- Diff of versions: http://bazaar.launchpad.net/~ubuntuone-pqm-team/django-openid-auth/trunk/revision/128?remember=115&compare_revid=115
- Should get us Py3 and Django up to 1.10 support
parent 86b822c2
# Created by .ignore support plugin (hsz.mobi)
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Django stuff:
*.log
local_settings.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
.gitignore
.idea/
django_openid_auth.egg-info/
......@@ -26,3 +26,6 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import sys
PY3 = sys.version_info.major >= 3
......@@ -29,8 +29,11 @@
from __future__ import unicode_literals
from urllib import urlencode
from urlparse import parse_qsl, urlparse
try:
from urllib.parse import parse_qsl, urlencode, urlparse
except ImportError:
from urllib import urlencode
from urlparse import parse_qsl, urlparse
from django.conf import settings
from django.contrib import admin
......@@ -50,6 +53,7 @@ class NonceAdmin(admin.ModelAdmin):
self.message_user(request, "%d expired nonces removed" % count)
cleanup_nonces.short_description = "Clean up expired nonces"
admin.site.register(Nonce, NonceAdmin)
......@@ -65,6 +69,7 @@ class AssociationAdmin(admin.ModelAdmin):
self.message_user(request, "%d expired associations removed" % count)
cleanup_associations.short_description = "Clean up expired associations"
admin.site.register(Association, AssociationAdmin)
......@@ -73,6 +78,7 @@ class UserOpenIDAdmin(admin.ModelAdmin):
list_display = ('user', 'claimed_id')
search_fields = ('claimed_id',)
admin.site.register(UserOpenID, UserOpenIDAdmin)
......
......@@ -30,13 +30,12 @@
from __future__ import unicode_literals
__metaclass__ = type
import re
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group, Permission
from django.core.exceptions import ImproperlyConfigured
from openid.consumer.consumer import SUCCESS
from openid.extensions import ax, sreg, pape
......@@ -49,12 +48,51 @@ from django_openid_auth.exceptions import (
MissingPhysicalMultiFactor,
RequiredAttributeNotReturned,
)
from django_openid_auth.signals import openid_duplicate_username
User = get_user_model()
class OpenIDBackend:
def get_user_group_model():
"""Returns the model used for mapping users to groups."""
user_group_model_name = getattr(settings, 'AUTH_USER_GROUP_MODEL', None)
if user_group_model_name is None:
return User.groups.through
else:
try:
# django.apps available starting from django 1.7
from django.apps import apps
get_model = apps.get_model
args = (user_group_model_name,)
except ImportError:
# if we can't import, then it must be django 1.6, still using
# the old django.db.models.loading code
from django.db.models.loading import get_model
app_label, model_name = user_group_model_name.split('.', 1)
args = (app_label, model_name)
try:
model = get_model(*args)
if model is None:
# in django 1.6 referring to a non-installed app will
# return None for get_model, but in 1.7 onwards it will
# raise a LookupError exception.
raise LookupError()
return model
except ValueError:
raise ImproperlyConfigured(
"AUTH_USER_GROUP_MODEL must be of the form "
"'app_label.model_name'")
except LookupError:
raise ImproperlyConfigured(
"AUTH_USER_GROUP_MODEL refers to model '%s' that has not been "
"installed" % user_group_model_name)
UserGroup = get_user_group_model()
class OpenIDBackend(object):
"""A django.contrib.auth backend that authenticates the user based on
an OpenID response."""
......@@ -116,8 +154,9 @@ class OpenIDBackend:
teams_mapping = self.get_teams_mapping()
groups_required = [group for team, group in teams_mapping.items()
if team in teams_required]
user_groups = UserGroup.objects.filter(user=user)
matches = set(groups_required).intersection(
user.groups.values_list('name', flat=True))
user_groups.values_list('group__name', flat=True))
if not matches:
name = 'OPENID_EMAIL_WHITELIST_REGEXP_LIST'
whitelist_regexp_list = getattr(settings, name, [])
......@@ -194,28 +233,19 @@ class OpenIDBackend:
return suggestion
return 'openiduser'
def _get_available_username(self, nickname, identity_url):
# If we're being strict about usernames, throw an error if we didn't
# get one back from the provider
if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
if nickname is None or nickname == '':
raise MissingUsernameViolation()
def _get_available_username_for_nickname(self, nickname, identity_url):
# If we don't have a nickname, and we're not being strict, use a
# default
nickname = nickname or 'openiduser'
# See if we already have this nickname assigned to a username
try:
User.objects.get(username__exact=nickname)
except User.DoesNotExist:
# No conflict, we can use this nickname
if not User.objects.filter(username=nickname).exists():
return nickname
# Check if we already have nickname+i for this identity_url
try:
user_openid = UserOpenID.objects.get(
claimed_id__exact=identity_url,
claimed_id=identity_url,
user__username__startswith=nickname)
# No exception means we have an existing user for this identity
# that starts with this nickname.
......@@ -239,28 +269,48 @@ class OpenIDBackend:
# No user associated with this identity_url
pass
if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
if User.objects.filter(username__exact=nickname).count() > 0:
raise DuplicateUsernameViolation(
"The username (%s) with which you tried to log in is "
"already in use for a different account." % nickname)
# Pick a username for the user based on their nickname,
# checking for conflicts. Start with number of existing users who's
# username starts with this nickname to avoid having to iterate over
# all of the existing ones.
i = User.objects.filter(username__startswith=nickname).count() + 1
while True:
username = nickname
if i > 1:
username += str(i)
try:
User.objects.get(username__exact=username)
except User.DoesNotExist:
break
while User.objects.filter(username=username).exists():
username = nickname + str(i)
i += 1
return username
def _ensure_available_username(self, nickname, identity_url):
if not nickname:
raise MissingUsernameViolation()
# As long as the `QuerySet` does not get evaluated, no
# caching should be involved in our multiple `exists()`
# calls. See docs for details: http://bit.ly/2aYCmkw
user_with_same_username = User.objects.exclude(
useropenid__claimed_id=identity_url
).filter(username=nickname)
if user_with_same_username.exists():
# Notify any listeners that a duplicated username was
# found and give the opportunity to handle conflict.
openid_duplicate_username.send(sender=User, username=nickname)
# Check for conflicts again as the signal could have handled it.
if user_with_same_username.exists():
raise DuplicateUsernameViolation(
"The username (%s) with which you tried to log in is "
"already in use for a different account." % nickname)
def _get_available_username(self, nickname, identity_url):
if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
self._ensure_available_username(nickname, identity_url)
else:
nickname = self._get_available_username_for_nickname(
nickname, identity_url)
return nickname
def create_user_from_openid(self, openid_response):
details = self._extract_user_details(openid_response)
required_attrs = getattr(settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
......@@ -357,13 +407,17 @@ class OpenIDBackend:
mapping = [
teams_mapping[lp_team] for lp_team in teams_response.is_member
if lp_team in teams_mapping]
user_groups = UserGroup.objects.filter(user=user)
matching_groups = user_groups.filter(
group__name__in=teams_mapping.values())
current_groups = set(
user.groups.filter(name__in=teams_mapping.values()))
user_group.group for user_group in matching_groups)
desired_groups = set(Group.objects.filter(name__in=mapping))
for group in current_groups - desired_groups:
user.groups.remove(group)
for group in desired_groups - current_groups:
user.groups.add(group)
groups_to_remove = current_groups - desired_groups
groups_to_add = desired_groups - current_groups
user_groups.filter(group__in=groups_to_remove).delete()
for group in groups_to_add:
UserGroup.objects.create(user=user, group=group)
def update_staff_status_from_teams(self, user, teams_response):
if not hasattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS'):
......
......@@ -38,6 +38,8 @@ from django.conf import settings
from openid.yadis import xri
from django_openid_auth import PY3
def teams_new_unicode(self):
"""
......@@ -52,8 +54,13 @@ def teams_new_unicode(self):
else:
return name
Group.unicode_before_teams = Group.__unicode__
Group.__unicode__ = teams_new_unicode
if PY3:
Group.unicode_before_teams = Group.__str__
Group.__str__ = teams_new_unicode
else:
Group.unicode_before_teams = Group.__unicode__
Group.__unicode__ = teams_new_unicode
class UserChangeFormWithTeamRestriction(UserChangeForm):
......@@ -72,6 +79,7 @@ class UserChangeFormWithTeamRestriction(UserChangeForm):
"You cannot assign it manually." % group.name)
return data
UserAdmin.form = UserChangeFormWithTeamRestriction
......
......@@ -34,3 +34,4 @@ import django.dispatch
openid_login_complete = django.dispatch.Signal(providing_args=[
'request', 'openid_response'])
openid_duplicate_username = django.dispatch.Signal(providing_args=['username'])
......@@ -36,6 +36,7 @@ from openid.association import Association as OIDAssociation
from openid.store.interface import OpenIDStore
from openid.store.nonce import SKEW
from django_openid_auth import PY3
from django_openid_auth.models import Association, Nonce
......@@ -75,10 +76,15 @@ class DjangoOpenIDStore(OpenIDStore):
expired = []
for assoc in assocs:
association = OIDAssociation(
assoc.handle, base64.decodestring(assoc.secret), assoc.issued,
assoc.lifetime, assoc.assoc_type
assoc.handle,
base64.decodestring(assoc.secret.encode('utf-8')),
assoc.issued, assoc.lifetime, assoc.assoc_type
)
if association.getExpiresIn() == 0:
if PY3:
expires_in = association.expiresIn
else:
expires_in = association.getExpiresIn()
if expires_in == 0:
expired.append(assoc)
else:
associations.append((association.issued, association))
......
......@@ -72,6 +72,7 @@ from openid.message import (
registerNamespaceAlias,
NamespaceAliasRegistrationError,
)
from six import string_types
__all__ = [
'TeamsRequest',
......@@ -84,7 +85,7 @@ ns_uri = 'http://ns.launchpad.net/2007/openid-teams'
try:
registerNamespaceAlias(ns_uri, 'lp')
except NamespaceAliasRegistrationError, e:
except NamespaceAliasRegistrationError as e:
oidutil.log(
'registerNamespaceAlias(%r, %r) failed: %s' % (ns_uri, 'lp', str(e)))
......@@ -139,7 +140,7 @@ def getTeamsNS(message):
# There is no alias, so try to add one. (OpenID version 1)
try:
message.namespaces.addAlias(ns_uri, 'lp')
except KeyError, why:
except KeyError as why:
# An alias for the string 'lp' already exists, but it's
# defined for something other than Launchpad teams
raise TeamsNamespaceError(why[0])
......@@ -287,7 +288,7 @@ class TeamsRequest(Extension):
@raise ValueError: when a team requested is not a string
or strict is set and a team was requested more than once
"""
if isinstance(query_membership, basestring):
if isinstance(query_membership, string_types):
raise TypeError('Teams should be passed as a list of '
'strings (not %r)' % (type(query_membership),))
......
{% load i18n %}
{% load url from future %}
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
......
from django.conf import settings
from django.contrib.auth.models import Group
from django.db import models
class UserGroup(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL)
group = models.ForeignKey(Group)
......@@ -28,56 +28,69 @@
from __future__ import unicode_literals
import re
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
from django.conf import settings
from django.contrib.auth.models import Group, Permission, User
from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase
from django.test.utils import override_settings
from openid.consumer.consumer import SuccessResponse
from openid.consumer.consumer import (
CancelResponse,
FailureResponse,
SetupNeededResponse,
SuccessResponse,
)
from openid.consumer.discover import OpenIDServiceEndpoint
from openid.extensions import pape
from openid.message import Message, OPENID2_NS
from django_openid_auth.auth import OpenIDBackend
from django_openid_auth.auth import OpenIDBackend, get_user_group_model
from django_openid_auth.exceptions import (
DuplicateUsernameViolation,
MissingPhysicalMultiFactor,
MissingUsernameViolation,
RequiredAttributeNotReturned,
)
from django_openid_auth.models import UserOpenID
from django_openid_auth.signals import openid_duplicate_username
from django_openid_auth.teams import ns_uri as TEAMS_NS
from django_openid_auth.tests.helpers import override_session_serializer
SREG_NS = "http://openid.net/sreg/1.0"
AX_NS = "http://openid.net/srv/ax/1.0"
SERVER_URL = 'http://example.com'
@override_session_serializer
@override_settings(
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
class OpenIDBackendTests(TestCase):
def make_claimed_id(id_):
return urljoin(SERVER_URL, id_)
def setUp(self):
super(OpenIDBackendTests, self).setUp()
self.backend = OpenIDBackend()
def make_openid_response(self, sreg_args=None, teams_args=None):
endpoint = OpenIDServiceEndpoint()
endpoint.claimed_id = 'some-id'
message = Message(OPENID2_NS)
if sreg_args is not None:
for key, value in sreg_args.items():
message.setArg(SREG_NS, key, value)
if teams_args is not None:
for key, value in teams_args.items():
message.setArg(TEAMS_NS, key, value)
response = SuccessResponse(
endpoint, message, signed_fields=message.toPostArgs().keys())
return response
def make_response_ax(
self, schema="http://axschema.org/",
fullname="Some User", nickname="someuser", email="foo@example.com",
first=None, last=None, verified=False):
class TestMessage(Message):
"""Convenience class to construct test OpenID messages and responses."""
def __init__(self, openid_namespace=OPENID2_NS):
super(TestMessage, self).__init__(openid_namespace=openid_namespace)
endpoint = OpenIDServiceEndpoint()
message = Message(OPENID2_NS)
endpoint.claimed_id = make_claimed_id('some-id')
endpoint.server_url = SERVER_URL
self.endpoint = endpoint
def set_ax_args(
self,
email="foo@example.com",
first=None,
fullname="Some User",
last=None,
nickname="someuser",
schema="http://axschema.org/",
verified=False):
attributes = [
("nickname", schema + "namePerson/friendly", nickname),
("fullname", schema + "namePerson", fullname),
......@@ -93,53 +106,79 @@ class OpenIDBackendTests(TestCase):
attributes.append(
("last", "http://axschema.org/namePerson/last", last))
message.setArg(AX_NS, "mode", "fetch_response")
self.setArg(AX_NS, "mode", "fetch_response")
for (alias, uri, value) in attributes:
message.setArg(AX_NS, "type.%s" % alias, uri)
message.setArg(AX_NS, "value.%s" % alias, value)
self.setArg(AX_NS, "type.%s" % alias, uri)
self.setArg(AX_NS, "value.%s" % alias, value)
def set_pape_args(self, *auth_policies):
self.setArg(pape.ns_uri, 'auth_policies', ' '.join(auth_policies))
def _set_args(self, ns, **kwargs):
for key, value in kwargs.items():
if value is not None:
self.setArg(ns, key, value)
elif self.hasKey(ns, key):
self.delArg(ns, key)
def set_sreg_args(self, **kwargs):
self._set_args(SREG_NS, **kwargs)
def set_team_args(self, **kwargs):
self._set_args(TEAMS_NS, **kwargs)
def to_response(self):
return SuccessResponse(
endpoint, message, signed_fields=message.toPostArgs().keys())
self.endpoint, self, signed_fields=self.toPostArgs().keys())
def make_user_openid(self, user=None,
claimed_id='http://example.com/existing_identity',
display_id='http://example.com/existing_identity'):
@override_session_serializer
@override_settings(
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
class OpenIDBackendTests(TestCase):
def setUp(self):
super(OpenIDBackendTests, self).setUp()
self.backend = OpenIDBackend()
self.message = TestMessage()
def make_user_openid(
self, user=None, claimed_id=make_claimed_id('existing_identity')):
if user is None:
user = User.objects.create_user(
username='someuser', email='someuser@example.com',
password='12345678')
user_openid, created = UserOpenID.objects.get_or_create(
user=user, claimed_id=claimed_id, display_id=display_id)
user=user, claimed_id=claimed_id, display_id=claimed_id)
return user_openid
def assert_account_verified(self, user, initially_verified, verified):
# set user's verification status
def _assert_account_verified(self, user, expected):
permission = Permission.objects.get(codename='account_verified')
if initially_verified:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
perm_label = '%s.%s' % (permission.content_type.app_label,
permission.codename)
# Always invalidate the per-request perm cache
attrs = list(user.__dict__.keys())
for attr in attrs:
if attr.endswith('_perm_cache'):
delattr(user, attr)
user = User.objects.get(pk=user.pk)
has_perm = user.has_perm('django_openid_auth.account_verified')
assert has_perm == initially_verified
self.assertEqual(user.has_perm(perm_label), expected)
if hasattr(user, '_perm_cache'):
del user._perm_cache
def assert_account_not_verified(self, user):
self._assert_account_verified(user, False)
# get a response including verification status
response = self.make_response_ax()
data = dict(first_name=u"Some56789012345678901234567890123",
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com",
account_verified=verified)
self.backend.update_user_details(user, data, response)
def assert_account_verified(self, user):
self._assert_account_verified(user, True)
# refresh object from the database
user = User.objects.get(pk=user.pk)
# check the verification status
self.assertEqual(
user.has_perm('django_openid_auth.account_verified'), verified)
def assert_no_users_created(self, expected_count=0):
current_count = User.objects.count()
msg = 'New users found (expected: %i, current: %i)' % (
expected_count, current_count)
self.assertEqual(current_count, expected_count, msg)
def test_extract_user_details_sreg(self):
expected = {
......@@ -155,16 +194,19 @@ class OpenIDBackendTests(TestCase):
expected['last_name']),
'email': expected['email'],
}
response = self.make_openid_response(sreg_args=data)
self.message.set_sreg_args(**data)
details = self.backend._extract_user_details(response)
details = self.backend._extract_user_details(
self.message.to_response())
self.assertEqual(details, expected)
def test_extract_user_details_ax(self):
response = self.make_response_ax(
fullname="Some User", nickname="someuser", email="foo@example.com")
data = self.backend._extract_user_details(response)
self.message.set_ax_args(
email="foo@example.com",
fullname="Some User",
nickname="someuser",
)
data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
......@@ -175,10 +217,9 @@ class OpenIDBackendTests(TestCase):
def test_extract_user_details_ax_split_name(self):
# Include fullname too to show that the split data takes
# precedence.
response = self.make_response_ax(
self.message.set_ax_args(
fullname="Bad Data", first="Some", last="User")
data = self.backend._extract_user_details(response)
data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
......@@ -187,11 +228,10 @@ class OpenIDBackendTests(TestCase):
"account_verified": False})
def test_extract_user_details_ax_broken_myopenid(self):
response = self.make_response_ax(
self.message.set_ax_args(
schema="http://schema.openid.net/", fullname="Some User",
nickname="someuser", email="foo@example.com")
data = self.backend._extract_user_details(response)
data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual(data, {"nickname": "someuser",
"first_name": "Some",
......@@ -200,7 +240,7 @@ class OpenIDBackendTests(TestCase):
"account_verified": False})
def test_update_user_details_long_names(self):
response = self.make_response_ax()
self.message.set_ax_args()
user = User.objects.create_user(
'someuser', 'someuser@example.com', password=None)
user_openid, created = UserOpenID.objects.get_or_create(
......@@ -212,49 +252,92 @@ class OpenIDBackendTests(TestCase):
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com", account_verified=False)
self.backend.update_user_details(user, data, response)
self.backend.update_user_details(
user, data, self.message.to_response())
self.assertEqual("Some56789012345678901234567890", user.first_name)
self.assertEqual("User56789012345678901234567890", user.last_name)
def _test_update_user_perms_account_verified(
self, user, initially_verified, verified):
# set user's verification status
permission = Permission.objects.get(codename='account_verified')
if initially_verified:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
if initially_verified:
self.assert_account_verified(user)
else:
self.assert_account_not_verified(user)
# get a response including verification status
self.message.set_ax_args()
data = dict(first_name=u"Some56789012345678901234567890123",
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com",
account_verified=verified)
self.backend.update_user_details(
user, data, self.message.to_response())
# refresh object from the database
user = User.objects.get(pk=user.pk)
if verified:
self.assert_account_verified(user)
else:
self.assert_account_not_verified(user)
def test_update_user_perms_initially_verified_then_verified(self):
self.assert_account_verified(
self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=True)
def test_update_user_perms_initially_verified_then_unverified(self):
self.assert_account_verified(
self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=False)
def test_update_user_perms_initially_not_verified_then_verified(self):
self.assert_account_verified(
self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=True)
def test_update_user_perms_initially_not_verified_then_unverified(self):
self.assert_account_verified(
self._test_update_user_perms_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=False)
def test_extract_user_details_name_with_trailing_space(self):
response = self.make_response_ax(fullname="SomeUser ")
self.message.set_ax_args(fullname="SomeUser ")
data = self.backend._extract_user_details(response)
data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual("", data['first_name'])
self.assertEqual("SomeUser", data['last_name'])
def test_extract_user_details_name_with_thin_space(self):
response = self.make_response_ax(fullname=u"Some\u2009User")
self.message.set_ax_args(fullname=u"Some\u2009User")
data = self.backend._extract_user_details(response)
data = self.backend._extract_user_details(self.message.to_response())
self.assertEqual("Some", data['first_name'])
self.assertEqual("User", data['last_name'])
@override_settings(OPENID_CREATE_USERS=True)
def test_auth_username_when_no_nickname(self):
self.message.set_sreg_args(nickname='')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
self.assertEqual(
user.username, 'openiduser',
"username must default to 'openiduser'")
@override_settings(OPENID_USE_EMAIL_FOR_USERNAME=True)
def test_preferred_username_email_munging(self):
def test_auth_username_email_munging(self):
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'fooexamplecom'),
......@@ -262,11 +345,17 @@ class OpenIDBackendTests(TestCase):
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.assertEqual(
expected,
self.backend._get_preferred_username(nick, email))
self.message.set_sreg_args(nickname=nick, email=email)
user = self.backend.authenticate(
openid_response=self.message.to_response())
# Cleanup user for further tests
user.delete()
self.assertIsNotNone(user)
self.assertEqual(user.username, expected)
def test_preferred_username_no_email_munging(self):
@override_settings(OPENID_USE_EMAIL_FOR_USERNAME=False)
def test_auth_username_no_email_munging(self):
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'openiduser'),
......@@ -274,9 +363,89 @@ class OpenIDBackendTests(TestCase):
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.message.set_sreg_args(nickname=nick, email=email)
user = self.backend.authenticate(
openid_response=self.message.to_response())
# Cleanup user for further tests
user.delete()
self.assertIsNotNone(user)
self.assertEqual(user.username, expected)
@override_settings(
OPENID_CREATE_USERS=True,
OPENID_FOLLOW_RENAMES=False,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_username_duplicate_numbering(self):
# Setup existing user to conflict with
User.objects.create_user('testuser')
self.message.set_sreg_args(nickname='testuser')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
self.assertEqual(
expected,
self.backend._get_preferred_username(nick, email))
user.username, 'testuser2',
'Username must contain numeric suffix to avoid collisions.')
def test_auth_username_duplicate_numbering_with_conflicts(self):
# Setup existing users to conflict with
User.objects.create_user('testuser')
User.objects.create_user('testuser3')
self.message.set_sreg_args(nickname='testuser')
user = self.backend.authenticate(
openid_response=self.message.to_response())
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which already exists, so it should skip to 4.
self.assertIsNotNone(user)
self.assertEqual(
user.username, 'testuser4',
'Username must contain numeric suffix to avoid collisions.')
def test_auth_username_duplicate_numbering_with_holes(self):
# Setup existing users to conflict with
User.objects.create_user('testuser')
User.objects.create_user('testuser1')
User.objects.create_user('testuser6')
User.objects.create_user('testuser7')
User.objects.create_user('testuser8')
self.message.set_sreg_args(nickname='testuser')
user = self.backend.authenticate(
openid_response=self.message.to_response())
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 5. i should
# start at 6, and increment until it reaches 9.
self.assertIsNotNone(user)
self.assertEqual(
user.username, 'testuser9',
'Username must contain numeric suffix to avoid collisions.')
def test_auth_username_duplicate_numbering_with_nonsequential_matches(
self):
# Setup existing users to conflict with
User.objects.create_user('testuser')
User.objects.create_user('testuserfoo')
self.message.set_sreg_args(nickname='testuser')
user = self.backend.authenticate(
openid_response=self.message.to_response())
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which will be available.
self.assertIsNotNone(user)
self.assertEqual(
user.username, 'testuser3',
'Username must contain numeric suffix to avoid collisions.')
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
......@@ -284,10 +453,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_not_member_of_teams_required(self):
Group.objects.create(name='team')
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser')
self.message.set_team_args(is_member='foo')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNone(user)
......@@ -297,10 +466,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_no_group_mapping_to_required_team(self):
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser')
self.message.set_team_args(is_member='foo')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNone(user)
......@@ -310,19 +479,19 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_member_of_teams_required(self):
Group.objects.create(name='team')
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='foo,team'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser')
self.message.set_team_args(is_member='foo,team')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
@override_settings(OPENID_LAUNCHPAD_TEAMS_REQUIRED=[])
def test_authenticate_when_no_teams_required(self):
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='team'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser')
self.message.set_team_args(is_member='team')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
......@@ -332,10 +501,10 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_when_member_of_at_least_one_team(self):
Group.objects.create(name='team1')
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='foo,team1'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser')
self.message.set_team_args(is_member='foo,team1')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
......@@ -347,17 +516,17 @@ class OpenIDBackendTests(TestCase):
self):
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
sreg_args=dict(nickname='someuser', email='foo@foo.com'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(
nickname='someuser', email='foo@foo.com')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
response = self.make_openid_response(
sreg_args=dict(nickname='someuser', email='foo+bar@foo.com'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(
nickname='someuser', email='foo+bar@foo.com')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
......@@ -368,10 +537,9 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_whitelisted_email_multiple_patterns(self):
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
sreg_args=dict(nickname='someuser', email='bar@foo.com'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser', email='bar@foo.com')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
......@@ -382,9 +550,530 @@ class OpenIDBackendTests(TestCase):
def test_authenticate_whitelisted_email_not_match(self):
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
sreg_args=dict(nickname='someuser', email='bar@foo.com'),
teams_args=dict(is_member='foo'))
user = self.backend.authenticate(openid_response=response)
self.message.set_sreg_args(nickname='someuser', email='bar@foo.com')
self.message.set_team_args(is_member='foo')
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNone(user)
def test_auth_no_response(self):
self.assertIsNone(self.backend.authenticate())
self.assert_no_users_created()
def test_auth_cancel_response(self):
response = CancelResponse(OpenIDServiceEndpoint())
self.assertIsNone(self.backend.authenticate(openid_response=response))
self.assert_no_users_created()
def test_auth_failure_response(self):
response = FailureResponse(OpenIDServiceEndpoint())
self.assertIsNone(self.backend.authenticate(openid_response=response))
self.assert_no_users_created()
def test_auth_setup_needed_response(self):
response = SetupNeededResponse(OpenIDServiceEndpoint())
self.assertIsNone(self.backend.authenticate(openid_response=response))
self.assert_no_users_created()
@override_settings(OPENID_CREATE_USERS=False)
def test_auth_no_create_users(self):
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNone(user)
self.assert_no_users_created()
@override_settings(OPENID_CREATE_USERS=False)
def test_auth_no_create_users_existing_user(self):
existing_openid = self.make_user_openid(
claimed_id=self.message.endpoint.claimed_id)
expected_user_count = User.objects.count()
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
self.assertEqual(user, existing_openid.user)
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_UPDATE_DETAILS_FROM_SREG=True,
OPENID_VALID_VERIFICATION_SCHEMES={
SERVER_URL: {'token_via_email'}})
def test_auth_update_details_from_sreg(self):
first_name = 'a' * 31
last_name = 'b' * 31
email = 'new@email.com'
self.message.set_ax_args(
fullname=first_name + ' ' + last_name,
nickname='newnickname',
email=email,
first=first_name,
last=last_name,
verified=True,
)
existing_openid = self.make_user_openid(
claimed_id=self.message.endpoint.claimed_id)
original_username = existing_openid.user.username
expected_user_count = User.objects.count()
self.assert_account_not_verified(existing_openid.user)
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(user, existing_openid.user)
self.assertEqual(
user.username, original_username,
'Username must not be updated unless OPENID_FOLLOW_RENAMES=True.')
self.assertEqual(user.email, email)
self.assertEqual(user.first_name, first_name[:30])
self.assertEqual(user.last_name, last_name[:30])
self.assert_account_verified(user)
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_UPDATE_DETAILS_FROM_SREG=True,
OPENID_VALID_VERIFICATION_SCHEMES={
SERVER_URL: {'token_via_email'}})
def test_auth_update_details_from_sreg_unverifies_account(self):
first_name = 'a' * 31
last_name = 'b' * 31
email = 'new@email.com'
kwargs = dict(
fullname=first_name + ' ' + last_name,
nickname='newnickname',
email=email,
first=first_name,
last=last_name,
verified=True,
)
self.message.set_ax_args(**kwargs)
verified_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assert_account_verified(verified_user)
expected_user_count = User.objects.count()
kwargs['verified'] = False
self.message.set_ax_args(**kwargs)
unverified_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(verified_user, unverified_user)
self.assert_account_not_verified(unverified_user)
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_physical_multifactor_required_not_given(self):
response = self.message.to_response()
with self.assertRaises(MissingPhysicalMultiFactor):
self.backend.authenticate(openid_response=response)
self.assertTrue(
UserOpenID.objects.filter(
claimed_id=self.message.endpoint.claimed_id).exists(),
'User must be created anyways.')
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_physical_multifactor_required_invalid_auth_policy(self):
self.message.set_pape_args(
pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT)
with self.assertRaises(MissingPhysicalMultiFactor):
self.backend.authenticate(
openid_response=self.message.to_response())
self.assertTrue(
UserOpenID.objects.filter(
claimed_id=self.message.endpoint.claimed_id).exists(),
'User must be created anyways.')
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_physical_multifactor_required_valid_auth_policy(self):
self.message.set_pape_args(
pape.AUTH_MULTI_FACTOR, pape.AUTH_MULTI_FACTOR_PHYSICAL,
pape.AUTH_PHISHING_RESISTANT)
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_strict_usernames(self):
username = 'nickname'
self.message.set_sreg_args(nickname=username)
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user, 'User must be created')
self.assertEqual(user.username, username)
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_strict_usernames_no_nickname(self):
self.message.set_sreg_args(nickname='')
msg = re.escape(
"An attribute required for logging in was not returned (nickname)")
with self.assertRaisesRegexp(RequiredAttributeNotReturned, msg):
self.backend.authenticate(
openid_response=self.message.to_response())
self.assert_no_users_created()
@override_settings(
OPENID_STRICT_USERNAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_strict_usernames_conflict(self):
existing_openid = self.make_user_openid()
expected_user_count = User.objects.count()
self.message.set_sreg_args(
nickname=existing_openid.user.username)
with self.assertRaises(DuplicateUsernameViolation):
self.backend.authenticate(
openid_response=self.message.to_response())
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames(self):
new_username = 'new'
self.message.set_sreg_args(nickname='username')
user = self.backend.authenticate(
openid_response=self.message.to_response())
expected_user_count = User.objects.count()
self.assertIsNotNone(user, 'User must be created')
self.message.set_sreg_args(nickname=new_username)
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(user.pk, renamed_user.pk)
self.assertEqual(renamed_user.username, new_username)
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_STRICT_USERNAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_strict_usernames_no_nickname(self):
self.message.set_sreg_args(nickname='nickame')
user = self.backend.authenticate(
openid_response=self.message.to_response())
expected_user_count = User.objects.count()
self.assertIsNotNone(user, 'User must be created')
self.message.set_sreg_args(nickname='')
# XXX: Check possibilities to normalize this error into a
# `RequiredAttributeNotReturned`.
with self.assertRaises(MissingUsernameViolation):
self.backend.authenticate(
openid_response=self.message.to_response())
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_STRICT_USERNAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_strict_usernames_rename_conflict(self):
# Setup existing user to conflict with
User.objects.create_user('testuser')
self.message.set_sreg_args(nickname='nickname')
user = self.backend.authenticate(
openid_response=self.message.to_response())
expected_user_count = User.objects.count()
self.assertIsNotNone(user, 'First request should succeed')
self.message.set_sreg_args(nickname='testuser')
with self.assertRaises(DuplicateUsernameViolation):
self.backend.authenticate(
openid_response=self.message.to_response())
db_user = User.objects.get(pk=user.pk)
self.assertEqual(db_user.username, 'nickname')
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_STRICT_USERNAMES=False,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_to_conflict(self):
# Setup existing user to conflict with
User.objects.create_user('testuser')
# Setup user to rename
user = User.objects.create_user('nickname')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
# Trigger rename
self.message.set_sreg_args(nickname='testuser')
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
renamed_user.username, 'testuser2',
'Username must have a numbered suffix to avoid conflict.')
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_no_change(self):
# Setup user to rename
user = User.objects.create_user('username')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
expected_user_count = User.objects.count()
# Trigger rename
self.message.set_sreg_args(nickname=user.username)
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
renamed_user.username, user.username,
'No numeric suffix should be appended for username owner.')
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_to_numbered_suffix(self):
# Setup user to rename to numbered suffix pattern
user = User.objects.create_user('testuser2000eight')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
# Trigger rename
self.message.set_sreg_args(nickname='testuser2')
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
renamed_user.username, 'testuser2',
'The numbered suffix must be kept.')
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_to_numbered_suffix_with_existing(self):
# Setup existing user to conflict with
User.objects.create_user('testuser')
# Setup user to rename to numbered suffix pattern
user = User.objects.create_user('testuser2000eight')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
# Trigger rename
self.message.set_sreg_args(nickname='testuser3')
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
renamed_user.username, 'testuser3',
'Username must be kept as there are no conflicts.')
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_from_numbered_suffix_to_conflict(self):
# Setup existing user to conflict with
User.objects.create_user('testuser')
# Setup user with numbered suffix pattern
user = User.objects.create_user('testuser2000')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
# Trigger rename
self.message.set_sreg_args(nickname='testuser')
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
user.username, 'testuser2000',
'Since testuser conflicts, username must remain unchanged as it '
'maches the number suffix pattern.')
@override_settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True)
def test_auth_follow_renames_from_numbered_suffix_no_conflict(self):
# Setup user with numbered suffix pattern
user = User.objects.create_user('testuser2')
self.make_user_openid(
user=user, claimed_id=self.message.endpoint.claimed_id)
# Trigger rename
self.message.set_sreg_args(nickname='testuser')
renamed_user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertEqual(renamed_user.pk, user.pk)
self.assertEqual(
renamed_user.username, 'testuser',
'Username must be updated as there are no conflicts.')
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_duplicate_username_signal_is_sent(self):
existing_openid = self.make_user_openid()
expected_user_count = User.objects.count()
signal_kwargs = {}
def duplicate_username_handler(sender, **kwargs):
signal_kwargs.update(kwargs)
self.addCleanup(
openid_duplicate_username.disconnect,
duplicate_username_handler, sender=User, dispatch_uid='testing')
openid_duplicate_username.connect(
duplicate_username_handler, sender=User, weak=False,
dispatch_uid='testing')
self.message.set_sreg_args(
nickname=existing_openid.user.username)
with self.assertRaises(DuplicateUsernameViolation):
self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIn('username', signal_kwargs)
self.assertEqual(
signal_kwargs['username'], existing_openid.user.username)
self.assert_no_users_created(expected_count=expected_user_count)
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_duplicate_username_signal_can_prevent_duplicate_error(self):
existing_openid = self.make_user_openid()
def duplicate_username_handler(sender, **kwargs):
existing_user = existing_openid.user
existing_user.username += '_other'
existing_user.save()
self.addCleanup(
openid_duplicate_username.disconnect,
duplicate_username_handler, sender=User, dispatch_uid='testing')
openid_duplicate_username.connect(
duplicate_username_handler, sender=User, weak=False,
dispatch_uid='testing')
self.message.set_sreg_args(
nickname=existing_openid.user.username)
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
self.assertNotEqual(user, existing_openid.user)
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_duplicate_username_is_not_called_if_no_conflict(self):
def duplicate_username_handler(sender, **kwargs):
assert False, 'This should never have been called.'
self.addCleanup(
openid_duplicate_username.disconnect,
duplicate_username_handler, sender=User, dispatch_uid='testing')
openid_duplicate_username.connect(
duplicate_username_handler, sender=User, weak=False,
dispatch_uid='testing')
self.message.set_sreg_args(nickname='nickname')
self.backend.authenticate(openid_response=self.message.to_response())
@override_settings(OPENID_STRICT_USERNAMES=False)
def test_auth_duplicate_username_is_not_called_if_not_strict(self):
existing_openid = self.make_user_openid()
def duplicate_username_handler(sender, **kwargs):
assert False, 'This should never have been called.'
self.addCleanup(
openid_duplicate_username.disconnect,
duplicate_username_handler, sender=User, dispatch_uid='testing')
openid_duplicate_username.connect(
duplicate_username_handler, sender=User, weak=False,
dispatch_uid='testing')
self.message.set_sreg_args(nickname=existing_openid.user.username)
self.backend.authenticate(openid_response=self.message.to_response())
@override_settings(OPENID_STRICT_USERNAMES=True)
def test_auth_duplicate_username_handling_bypass_numbered_suffix(self):
nickname = 'nickname87'
existing_openid = self.make_user_openid(
user=User.objects.create_user(nickname))
def duplicate_username_handler(sender, **kwargs):
existing_user = existing_openid.user
existing_user.username += '00'
existing_user.save()
self.addCleanup(
openid_duplicate_username.disconnect,
duplicate_username_handler, sender=User, dispatch_uid='testing')
openid_duplicate_username.connect(
duplicate_username_handler, sender=User, weak=False,
dispatch_uid='testing')
self.message.set_sreg_args(nickname=existing_openid.user.username)
user = self.backend.authenticate(
openid_response=self.message.to_response())
self.assertIsNotNone(user)
self.assertNotEqual(user, existing_openid.user)
self.assertEqual(
user.username, nickname,
'In strict mode, when conflicts are handled, the username must '
'be kept unmodified without numbered suffixes.')
class GetGroupModelTestCase(TestCase):
def setUp(self):
super(GetGroupModelTestCase, self).setUp()
self.inject_test_models()
def inject_test_models(self):
installed_apps = settings.INSTALLED_APPS + (
'django_openid_auth.tests',
)
p = self.settings(INSTALLED_APPS=installed_apps)
p.enable()
self.addCleanup(p.disable)
self.clear_app_cache()
def clear_app_cache(self):
try:
from django.apps import apps
apps.clear_cache()
except ImportError:
from django.db.models.loading import cache
cache.loaded = False
def test_default_group_model(self):
model = get_user_group_model()
self.assertEqual(model, User.groups.through)
@override_settings(AUTH_USER_GROUP_MODEL='tests.UserGroup')
def test_custom_group_model(self):
from django_openid_auth.tests.models import UserGroup
model = get_user_group_model()
self.assertEqual(model, UserGroup)
@override_settings(
AUTH_USER_GROUP_MODEL='django_openid_auth.models.UserGroup')
def test_improperly_configured_invalid_name(self):
self.assertRaises(ImproperlyConfigured, get_user_group_model)
@override_settings(
AUTH_USER_GROUP_MODEL='invalid.UserGroup')
def test_improperly_configured_invalid_app(self):
self.assertRaises(ImproperlyConfigured, get_user_group_model)
......@@ -28,6 +28,7 @@
from __future__ import unicode_literals
import base64
import time
from django.test import TestCase
......@@ -52,7 +53,8 @@ class OpenIDStoreTests(TestCase):
server_url='server-url', handle='handle')
self.assertEquals(dbassoc.server_url, 'server-url')
self.assertEquals(dbassoc.handle, 'handle')
self.assertEquals(dbassoc.secret, 'secret'.encode('base-64'))
self.assertEquals(
dbassoc.secret, base64.encodestring(b'secret').decode('utf-8'))
self.assertEquals(dbassoc.issued, 42)
self.assertEquals(dbassoc.lifetime, 600)
self.assertEquals(dbassoc.assoc_type, 'HMAC-SHA1')
......@@ -66,7 +68,8 @@ class OpenIDStoreTests(TestCase):
self.store.storeAssociation('server-url', assoc)
dbassoc = Association.objects.get(
server_url='server-url', handle='handle')
self.assertEqual(dbassoc.secret, 'secret2'.encode('base-64'))
self.assertEqual(
dbassoc.secret, base64.encodestring(b'secret2').decode('utf-8'))
self.assertEqual(dbassoc.issued, 420)
self.assertEqual(dbassoc.lifetime, 900)
self.assertEqual(dbassoc.assoc_type, 'HMAC-SHA256')
......@@ -80,7 +83,7 @@ class OpenIDStoreTests(TestCase):
self.assertTrue(isinstance(assoc, OIDAssociation))
self.assertEquals(assoc.handle, 'handle')
self.assertEquals(assoc.secret, 'secret')
self.assertEquals(assoc.secret, b'secret')
self.assertEquals(assoc.issued, timestamp)
self.assertEquals(assoc.lifetime, 600)
self.assertEquals(assoc.assoc_type, 'HMAC-SHA1')
......
......@@ -31,13 +31,17 @@ from __future__ import unicode_literals
import cgi
from urlparse import parse_qs
try:
from urllib.parse import parse_qs
except ImportError:
from urlparse import parse_qs
from django.conf import settings
from django.contrib.auth.models import User, Group, Permission
from django.core.urlresolvers import reverse
from django.http import HttpRequest, HttpResponse
from django.test import TestCase
from django.test.client import RequestFactory
from django.test.utils import override_settings
from mock import patch
from openid.consumer.consumer import Consumer, SuccessResponse
......@@ -54,8 +58,9 @@ from django_openid_auth import teams
from django_openid_auth.models import UserOpenID
from django_openid_auth.tests.helpers import override_session_serializer
from django_openid_auth.views import (
sanitise_redirect_url,
get_request_data,
make_consumer,
sanitise_redirect_url,
)
from django_openid_auth.signals import openid_login_complete
from django_openid_auth.store import DjangoOpenIDStore
......@@ -123,8 +128,8 @@ class StubOpenIDProvider(HTTPFetcher):
def parseFormPost(self, content):
"""Parse an HTML form post to create an OpenID request."""
# Hack to make the javascript XML compliant ...
content = content.replace('i < elements.length',
'i &lt; elements.length')
content = content.replace(
'i < elements.length', 'i &lt; elements.length')
tree = ET.XML(content)
form = tree.find('.//form')
assert form is not None, 'No form in document'
......@@ -135,8 +140,7 @@ class StubOpenIDProvider(HTTPFetcher):
for input in form.findall('input'):
if input.get('type') != 'hidden':
continue
query[input.get('name').encode('UTF-8')] = \
input.get('value').encode('UTF-8')
query[input.get('name')] = input.get('value')
self.last_request = self.server.decodeRequest(query)
return self.last_request
......@@ -163,13 +167,6 @@ class DummyDjangoRequest(object):
def build_absolute_uri(self):
return self.META['SCRIPT_NAME'] + self.request_path
def _combined_request(self):
request = {}
request.update(self.POST)
request.update(self.GET)
return request
REQUEST = property(_combined_request)
@override_session_serializer
@override_settings(
......@@ -184,10 +181,10 @@ class DummyDjangoRequest(object):
OPENID_SREG_REQUIRED_FIELDS=[],
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_VALID_VERIFICATION_SCHEMES={},
ROOT_URLCONF='django_openid_auth.tests.urls',
)
class RelyingPartyTests(TestCase):
urls = 'django_openid_auth.tests.urls'
login_url = reverse('openid-login')
def setUp(self):
......@@ -241,7 +238,8 @@ class RelyingPartyTests(TestCase):
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
......@@ -253,7 +251,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'someuser')
self.assertEqual(response.content.decode('utf-8'), 'someuser')
def test_login_with_nonascii_return_to(self):
"""Ensure non-ascii characters can be used for the 'next' arg."""
......@@ -275,7 +273,8 @@ class RelyingPartyTests(TestCase):
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
......@@ -302,7 +301,8 @@ class RelyingPartyTests(TestCase):
self.assertEqual(response.status_code, 200)
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
......@@ -314,7 +314,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'someuser')
self.assertEqual(response.content.decode('utf-8'), 'someuser')
def test_login_create_users(self):
# Create a user with the same name as we'll pass back via sreg.
......@@ -326,7 +326,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -340,7 +341,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in as a new user (they haven't taken
# over the existing "someuser" user).
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'someuser2')
self.assertEqual(response.content.decode('utf-8'), 'someuser2')
# Check the details of the new user.
user = User.objects.get(username='someuser2')
......@@ -364,7 +365,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
return openid_request
def _get_login_response(self, openid_request, resp_data, use_sreg,
......@@ -388,7 +390,8 @@ class RelyingPartyTests(TestCase):
self.provider.type_uris.append(pape.ns_uri)
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
request_auth = openid_request.message.getArg(
'http://specs.openid.net/extensions/pape/1.0',
......@@ -436,7 +439,7 @@ class RelyingPartyTests(TestCase):
query['openid.pape.auth_policies'], [preferred_auth])
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'testuser')
self.assertEqual(response.content.decode('utf-8'), 'testuser')
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_not_provided(self):
......@@ -552,10 +555,11 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# username defaults to 'openiduser'
self.assertEqual(response.content, 'openiduser')
username = response.content.decode('utf-8')
self.assertEqual(username, 'openiduser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Openid')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
......@@ -570,7 +574,7 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# username defaults to a munged version of the email
self.assertEqual(response.content, 'fooexamplecom')
self.assertEqual(response.content.decode('utf-8'), 'fooexamplecom')
def test_login_duplicate_username_numbering(self):
# Setup existing user who's name we're going to conflict with
......@@ -587,7 +591,7 @@ class RelyingPartyTests(TestCase):
# Since this username is already taken by someone else, we go through
# the process of adding +i to it, and get testuser2.
self.assertEqual(response.content, 'testuser2')
self.assertEqual(response.content.decode('utf-8'), 'testuser2')
def test_login_duplicate_username_numbering_with_conflicts(self):
# Setup existing user who's name we're going to conflict with
......@@ -607,7 +611,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which already exists, so it should skip to 4.
self.assertEqual(response.content, 'testuser4')
self.assertEqual(response.content.decode('utf-8'), 'testuser4')
def test_login_duplicate_username_numbering_with_holes(self):
# Setup existing user who's name we're going to conflict with
......@@ -630,7 +634,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 5. i should
# start at 6, and increment until it reaches 9.
self.assertEqual(response.content, 'testuser9')
self.assertEqual(response.content.decode('utf-8'), 'testuser9')
def test_login_duplicate_username_numbering_with_nonsequential_matches(
self):
......@@ -651,7 +655,7 @@ class RelyingPartyTests(TestCase):
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which will be available.
self.assertEqual(response.content, 'testuser3')
self.assertEqual(response.content.decode('utf-8'), 'testuser3')
def test_login_follow_rename(self):
user = User.objects.create_user('testuser', 'someone@example.com')
......@@ -671,10 +675,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, they are logged in as
# someuser (the passed in nickname has changed the username)
self.assertEqual(response.content, 'someuser')
username = response.content.decode('utf-8')
self.assertEqual(username, 'someuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
......@@ -696,10 +701,11 @@ class RelyingPartyTests(TestCase):
response = self.client.get('/getuser/')
# Username should not have changed
self.assertEqual(response.content, 'testuser')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
......@@ -735,10 +741,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
# but since that username is already taken by someone else, we go
# through the process of adding +i to it, and get testuser2.
self.assertEqual(response.content, 'testuser2')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser2')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
......@@ -777,10 +784,11 @@ class RelyingPartyTests(TestCase):
# the username follows the nickname+i scheme, it has non-numbers in the
# suffix, so it's not an auto-generated one. The regular process of
# renaming to 'testuser' has a conflict, so we get +2 at the end.
self.assertEqual(response.content, 'testuser2')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser2')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
......@@ -817,10 +825,11 @@ class RelyingPartyTests(TestCase):
# but since that username is already taken by someone else, we go
# through the process of adding +i to it. Since the user for this
# identity url already has a name matching that pattern, check if first
self.assertEqual(response.content, 'testuser2000')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser2000')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
......@@ -847,10 +856,11 @@ class RelyingPartyTests(TestCase):
# If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
# because it wasn't currently taken
self.assertEqual(response.content, 'testuser')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Same')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'same@example.com')
......@@ -865,7 +875,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -903,7 +914,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -932,7 +944,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -974,7 +987,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -997,7 +1011,8 @@ class RelyingPartyTests(TestCase):
# Complete the request, passing back some simple registration
# data. The user is redirected to the next URL.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
......@@ -1033,10 +1048,11 @@ class RelyingPartyTests(TestCase):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'testuser')
username = response.content.decode('utf-8')
self.assertEqual(username, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
user = User.objects.get(username=username)
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
......@@ -1052,7 +1068,8 @@ class RelyingPartyTests(TestCase):
with self.settings(OPENID_SREG_EXTRA_FIELDS=('language',)):
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
for field in ('email', 'fullname', 'nickname', 'language'):
self.assertTrue(field in sreg_request)
......@@ -1069,7 +1086,8 @@ class RelyingPartyTests(TestCase):
with self.settings(OPENID_SREG_REQUIRED_FIELDS=('email', 'language')):
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
self.assertEqual(['email', 'language'], sreg_request.required)
......@@ -1091,7 +1109,8 @@ class RelyingPartyTests(TestCase):
# The resulting OpenID request uses the Attribute Exchange
# extension rather than the Simple Registration extension.
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
self.assertEqual(sreg_request.required, [])
self.assertEqual(sreg_request.optional, [])
......@@ -1137,7 +1156,7 @@ class RelyingPartyTests(TestCase):
assert not settings.OPENID_FOLLOW_RENAMES, (
'OPENID_FOLLOW_RENAMES must be False')
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'testuser')
self.assertEqual(response.content.decode('utf-8'), 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username='testuser')
......@@ -1221,7 +1240,8 @@ class RelyingPartyTests(TestCase):
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
teams_response = teams.TeamsResponse.extractResponse(
......@@ -1236,7 +1256,7 @@ class RelyingPartyTests(TestCase):
# And they are now logged in as testuser
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'testuser')
self.assertEqual(response.content.decode('utf-8'), 'testuser')
# The user's groups have been updated.
User.objects.get(username='testuser')
......@@ -1268,7 +1288,8 @@ class RelyingPartyTests(TestCase):
OPENID_LAUNCHPAD_TEAMS_MAPPING=mapping,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST=blacklist):
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
openid_request.answer(True)
teams.TeamsRequest.fromOpenIDRequest(openid_request)
......@@ -1321,7 +1342,8 @@ class RelyingPartyTests(TestCase):
response = self.client.post(self.login_url, self.openid_req_no_next)
# Complete the request
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
teams_response = teams.TeamsResponse.extractResponse(
......@@ -1340,7 +1362,8 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/identity')
response = self.client.post(self.login_url, self.openid_req_no_next)
openid_request = self.provider.parseFormPost(response.content)
openid_request = self.provider.parseFormPost(
response.content.decode('utf-8'))
openid_response = openid_request.answer(True)
# Use a closure to test whether the signal handler was called.
self.signal_handler_called = False
......@@ -1388,3 +1411,24 @@ class HelperFunctionsTest(TestCase):
self.assertEqual(url, sanitised)
else:
self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
def test_get_request_data_from_post(self):
request = RequestFactory().post('/', data={'foo': 'bar'})
data = get_request_data(request)
self.assertEqual(dict(data), {'foo': ['bar']})
def test_get_request_data_from_get(self):
request = RequestFactory().get('/', data={'foo': 'bar'})
data = get_request_data(request)
self.assertEqual(dict(data), {'foo': ['bar']})
def test_get_request_data_merged(self):
request = RequestFactory().post('/?baz=42', data={'foo': 'bar'})
data = get_request_data(request)
self.assertEqual(dict(data), {'foo': ['bar'], 'baz': ['42']})
def test_get_request_data_override_order(self):
request = RequestFactory().post('/?foo=42', data={'foo': 'bar'})
data = get_request_data(request)
self.assertEqual(dict(data), {'foo': ['42', 'bar']})
self.assertEqual(data['foo'], 'bar')
......@@ -28,7 +28,7 @@
from __future__ import unicode_literals
from django.conf.urls import patterns, include
from django.conf.urls import include, url
from django.http import HttpResponse
......@@ -36,8 +36,7 @@ def get_user(request):
return HttpResponse(request.user.username)
urlpatterns = patterns(
'',
(r'^getuser/$', get_user),
(r'^openid/', include('django_openid_auth.urls')),
)
urlpatterns = [
url(r'^getuser/$', get_user),
url(r'^openid/', include('django_openid_auth.urls')),
]
......@@ -29,11 +29,17 @@
from __future__ import unicode_literals
from django.conf.urls import patterns, url
from django.conf.urls import url
urlpatterns = patterns(
'django_openid_auth.views',
url(r'^login/$', 'login_begin', name='openid-login'),
url(r'^complete/$', 'login_complete', name='openid-complete'),
url(r'^logo.gif$', 'logo', name='openid-logo'),
from django_openid_auth.views import (
login_begin,
login_complete,
logo,
)
urlpatterns = [
url(r'^login/$', login_begin, name='openid-login'),
url(r'^complete/$', login_complete, name='openid-complete'),
url(r'^logo.gif$', logo, name='openid-logo'),
]
......@@ -30,8 +30,11 @@
from __future__ import unicode_literals
import re
import urllib
from urlparse import urlsplit
try:
from urllib.parse import urlencode, urlsplit
except ImportError:
from urllib import urlencode
from urlparse import urlsplit
from django.conf import settings
from django.contrib.auth import (
......@@ -39,6 +42,7 @@ from django.contrib.auth import (
from django.contrib.auth.models import Group
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseRedirect
from django.http.request import QueryDict
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.template.loader import render_to_string
......@@ -129,22 +133,27 @@ def default_render_failure(request, message, status=403,
template_name='openid/failure.html',
exception=None):
"""Render an error page to the user."""
data = render_to_string(
template_name, dict(message=message, exception=exception),
context_instance=RequestContext(request))
context = RequestContext(request)
context.update(dict(message=message, exception=exception))
data = render_to_string(template_name, context)
return HttpResponse(data, status=status)
def parse_openid_response(request):
"""Parse an OpenID response from a Django request."""
# Short cut if there is no request parameters.
# if len(request.REQUEST) == 0:
# return None
current_url = request.build_absolute_uri()
consumer = make_consumer(request)
return consumer.complete(dict(request.REQUEST.items()), current_url)
data = get_request_data(request)
return consumer.complete(data, current_url)
def get_request_data(request):
# simulate old request.REQUEST for backwards compatibility
data = QueryDict(query_string=None, mutable=True)
data.update(request.GET)
data.update(request.POST)
return data
def login_begin(request, template_name='openid/login.html',
......@@ -153,7 +162,8 @@ def login_begin(request, template_name='openid/login.html',
render_failure=default_render_failure,
redirect_field_name=REDIRECT_FIELD_NAME):
"""Begin an OpenID login request, possibly asking for an identity URL."""
redirect_to = request.REQUEST.get(redirect_field_name, '')
data = get_request_data(request)
redirect_to = data.get(redirect_field_name, '')
# Get the OpenID URL to try. First see if we've been configured
# to use a fixed server URL.
......@@ -169,10 +179,12 @@ def login_begin(request, template_name='openid/login.html',
# Invalid or no form data:
if openid_url is None:
context = {'form': login_form, redirect_field_name: redirect_to}
return render_to_response(
template_name, context,
context_instance=RequestContext(request))
context = RequestContext(request)
context.update({
'form': login_form,
redirect_field_name: redirect_to,
})
return render_to_response(template_name, context)
consumer = make_consumer(request)
try:
......@@ -268,7 +280,7 @@ def login_begin(request, template_name='openid/login.html',
# Django gives us Unicode, which is great. We must encode URI.
# urllib enforces str. We can't trust anything about the default
# encoding inside str(foo) , so we must explicitly make foo a str.
return_to += urllib.urlencode(
return_to += urlencode(
{redirect_field_name: redirect_to.encode("UTF-8")})
return render_openid_request(request, openid_request, return_to)
......@@ -277,7 +289,8 @@ def login_begin(request, template_name='openid/login.html',
@csrf_exempt
def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
render_failure=None):
redirect_to = request.REQUEST.get(redirect_field_name, '')
data = get_request_data(request)
redirect_to = data.get(redirect_field_name, '')
render_failure = (
render_failure or getattr(settings, 'OPENID_RENDER_FAILURE', None) or
default_render_failure)
......@@ -290,8 +303,9 @@ def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
if openid_response.status == SUCCESS:
try:
user = authenticate(openid_response=openid_response)
except DjangoOpenIDException, e:
return render_failure(request, e.message, exception=e)
except DjangoOpenIDException as e:
return render_failure(
request, getattr(e, 'message', str(e)), exception=e)
if user is not None:
if user.is_active:
......@@ -325,6 +339,7 @@ def logo(request):
OPENID_LOGO_BASE_64.decode('base64'), mimetype='image/gif'
)
# Logo from http://openid.net/login-bg.gif
# Embedded here for convenience; you should serve this as a static file
OPENID_LOGO_BASE_64 = """
......
......@@ -54,6 +54,24 @@ SECRET_KEY = '34958734985734985734985798437'
DEBUG = True
TEMPLATE_DEBUG = True
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.contrib.auth.context_processors.auth',
'django.template.context_processors.debug',
'django.template.context_processors.i18n',
'django.template.context_processors.media',
'django.template.context_processors.static',
'django.template.context_processors.tz',
'django.contrib.messages.context_processors.messages',
]
}
}
]
ALLOWED_HOSTS = []
......
......@@ -27,20 +27,20 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from django.conf.urls import patterns, include, url
from django.conf.urls import include, url
from django.contrib import admin
from django.contrib.auth import views as auth_views
import views
from example_consumer import views
admin.autodiscover()
urlpatterns = patterns(
'',
urlpatterns = [
url(r'^$', views.index),
url(r'^openid/', include('django_openid_auth.urls')),
url(r'^logout/$', 'django.contrib.auth.views.logout'),
url(r'^logout/$', auth_views.logout),
url(r'^private/$', views.require_authentication),
url(r'^admin/', include(admin.site.urls)),
)
]
......@@ -39,21 +39,28 @@ library also includes the following features:
info.
"""
import sys
from setuptools import find_packages, setup
PY3 = sys.version_info.major >= 3
description, long_description = __doc__.split('\n\n', 1)
VERSION = '0.8'
VERSION = '0.14'
install_requires = ['django>=1.6', 'six']
if PY3:
install_requires.append('python3-openid')
else:
install_requires.append('python-openid>=2.2.0')
setup(
name='django-openid-auth',
version=VERSION,
packages=find_packages(),
install_requires=[
'django>=1.5',
'python-openid>=2.2.0',
],
install_requires=install_requires,
package_data={
'django_openid_auth': ['templates/openid/*.html'],
},
......
[tox]
envlist =
py2.7-django1.5, py2.7-django1.6, py2.7-django1.7, py2.7-django1.8
py27-django{1.8,1.9,1.10}
# py3-django{1.11}
[testenv]
commands = python manage.py test django_openid_auth
deps=
deps =
mock
python-openid
[testenv:py2.7-django1.5]
[testenv:py27]
basepython = python2.7
deps =
django >= 1.5, < 1.6
python-openid
{[testenv]deps}
south==1.0
[testenv:py2.7-django1.6]
basepython = python2.7
[testenv:py3]
basepython = python3
deps =
django >= 1.6, < 1.7
python3-openid
{[testenv]deps}
south==1.0
[testenv:py2.7-django1.7]
basepython = python2.7
[testenv:py27-django1.8]
deps =
django >= 1.7, < 1.8
{[testenv]deps}
django >= 1.8, < 1.9
{[testenv:py27]deps}
[testenv:py2.7-django1.8]
basepython = python2.7
[testenv:py27-django1.9]
deps =
django >= 1.8, < 1.9
{[testenv]deps}
django >= 1.9, < 1.10
{[testenv:py27]deps}
[testenv:py27-django1.10]
deps =
django >= 1.10, < 1.11
{[testenv:py27]deps}
[testenv:py27-django1.11]
deps =
django >= 1.11, < 2
{[testenv:py27]deps}
[testenv:py3-django1.11]
deps =
django >= 1.11, < 2
{[testenv:py3]deps}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment