Commit 1a9fd76c by Natalia

Fully test suite passing on djangos from 1.4 to 1.8.

parent 6f4e845e
./django
./MANIFEST
./build
./dist
./sqlite.db
./.tox/
MANIFEST
build
dist
db.sqlite3
.tox
check:
PYTHONPATH=$(shell pwd) python example_consumer/manage.py test \
--verbosity=2 django_openid_auth
PYTHONPATH=$(shell pwd) python manage.py test --verbosity=2 django_openid_auth
run-example-consumer:
PYTHONPATH=$(shell pwd) python example_consumer/manage.py syncdb
PYTHONPATH=$(shell pwd) python example_consumer/manage.py runserver
PYTHONPATH=$(shell pwd) python manage.py syncdb --migrate
PYTHONPATH=$(shell pwd) python manage.py runserver
.PHONY: check run-example-consumer
......@@ -26,4 +26,3 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
......@@ -27,6 +27,9 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from urllib import urlencode
from urlparse import parse_qsl, urlparse
from django.conf import settings
from django.contrib import admin
from django_openid_auth.models import Nonce, Association, UserOpenID
......@@ -69,22 +72,39 @@ class UserOpenIDAdmin(admin.ModelAdmin):
admin.site.register(UserOpenID, UserOpenIDAdmin)
# Support for allowing openid authentication for /admin (django.contrib.admin)
if getattr(settings, 'OPENID_USE_AS_ADMIN_LOGIN', False):
from django.http import HttpResponseRedirect
from django_openid_auth import views
def _openid_login(self, request, error_message='', extra_context=None):
if request.user.is_authenticated():
if not request.user.is_staff:
return views.default_render_failure(
request, "User %s does not have admin access."
% request.user.username)
assert error_message, "Unknown Error: %s" % error_message
else:
# Redirect to openid login path,
return HttpResponseRedirect(
settings.LOGIN_URL + "?next=" + request.get_full_path())
# Overide the standard admin login form.
admin.sites.AdminSite.login = _openid_login
# override a single time
original_admin_login = None
if original_admin_login is None:
original_admin_login = admin.sites.AdminSite.login
from django.http import HttpResponseRedirect
from django_openid_auth import views
def _openid_login(instance, request, error_message='', extra_context=None):
# Support for allowing openid authentication for /admin
# (django.contrib.admin)
if not getattr(settings, 'OPENID_USE_AS_ADMIN_LOGIN', False):
return original_admin_login(
instance, request, extra_context=extra_context)
if not request.user.is_authenticated():
# Redirect to openid login path,
_, _, path, _, query, _ = urlparse(request.get_full_path())
qs = dict(parse_qsl(query))
qs.setdefault('next', path)
return HttpResponseRedirect(
settings.LOGIN_URL + "?" + urlencode(qs))
if not request.user.is_staff:
return views.default_render_failure(
request, "User %s does not have admin/staff access."
% request.user.username)
# No error message was supplied
assert error_message, "Unknown Error: %s" % error_message
# Overide the standard admin login form.
admin.sites.AdminSite.login = _openid_login
......@@ -93,8 +93,9 @@ class OpenIDBackend:
if getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False):
pape_response = pape.Response.fromSuccessResponse(openid_response)
if pape_response is None or \
pape.AUTH_MULTI_FACTOR_PHYSICAL not in pape_response.auth_policies:
key = pape.AUTH_MULTI_FACTOR_PHYSICAL
if (pape_response is None or
key not in pape_response.auth_policies):
raise MissingPhysicalMultiFactor()
teams_response = teams.TeamsResponse.fromSuccessResponse(
......@@ -194,12 +195,12 @@ class OpenIDBackend:
if nickname is None or nickname == '':
raise MissingUsernameViolation()
# If we don't have a nickname, and we're not being strict, use a default
# If we don't have a nickname, and we're not being strict, use default
nickname = nickname or 'openiduser'
# See if we already have this nickname assigned to a username
try:
user = User.objects.get(username__exact=nickname)
User.objects.get(username__exact=nickname)
except User.DoesNotExist:
# No conflict, we can use this nickname
return nickname
......@@ -231,7 +232,6 @@ class OpenIDBackend:
# No user associated with this identity_url
pass
if getattr(settings, 'OPENID_STRICT_USERNAMES', False):
if User.objects.filter(username__exact=nickname).count() > 0:
raise DuplicateUsernameViolation(
......@@ -248,7 +248,7 @@ class OpenIDBackend:
if i > 1:
username += str(i)
try:
user = User.objects.get(username__exact=username)
User.objects.get(username__exact=username)
except User.DoesNotExist:
break
i += 1
......@@ -266,12 +266,12 @@ class OpenIDBackend:
"An attribute required for logging in was not "
"returned ({0}).".format(required_attr))
nickname = self._get_preferred_username(details['nickname'],
details['email'])
nickname = self._get_preferred_username(
details['nickname'], details['email'])
email = details['email'] or ''
username = self._get_available_username(nickname,
openid_response.identity_url)
username = self._get_available_username(
nickname, openid_response.identity_url)
user = User.objects.create_user(username, email, password=None)
self.associate_openid(user, openid_response)
......@@ -328,13 +328,16 @@ class OpenIDBackend:
user.save()
def get_teams_mapping(self):
teams_mapping_auto = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
teams_mapping_auto_blacklist = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST', [])
teams_mapping_auto = getattr(
settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
teams_mapping_auto_blacklist = getattr(
settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST', [])
teams_mapping = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
if teams_mapping_auto:
#ignore teams_mapping. use all django-groups
# ignore teams_mapping. use all django-groups
teams_mapping = dict()
all_groups = Group.objects.exclude(name__in=teams_mapping_auto_blacklist)
all_groups = Group.objects.exclude(
name__in=teams_mapping_auto_blacklist)
for group in all_groups:
teams_mapping[group.name] = group.name
return teams_mapping
......@@ -344,12 +347,12 @@ class OpenIDBackend:
if len(teams_mapping) == 0:
return
current_groups = set(user.groups.filter(
name__in=teams_mapping.values()))
desired_groups = set(Group.objects.filter(
name__in=[teams_mapping[lp_team]
for lp_team in teams_response.is_member
if lp_team in teams_mapping]))
mapping = [
teams_mapping[lp_team] for lp_team in teams_response.is_member
if lp_team in teams_mapping]
current_groups = set(
user.groups.filter(name__in=teams_mapping.values()))
desired_groups = set(Group.objects.filter(name__in=mapping))
for group in current_groups - desired_groups:
user.groups.remove(group)
for group in desired_groups - current_groups:
......
......@@ -28,20 +28,25 @@
"""Exception classes thrown by OpenID Authentication and Validation."""
class DjangoOpenIDException(Exception):
pass
class RequiredAttributeNotReturned(DjangoOpenIDException):
pass
class IdentityAlreadyClaimed(DjangoOpenIDException):
def __init__(self, message=None):
if message is None:
self.message = "Another user already exists for your selected OpenID"
self.message = (
"Another user already exists for your selected OpenID")
else:
self.message = message
class DuplicateUsernameViolation(DjangoOpenIDException):
def __init__(self, message=None):
......@@ -50,6 +55,7 @@ class DuplicateUsernameViolation(DjangoOpenIDException):
else:
self.message = message
class MissingUsernameViolation(DjangoOpenIDException):
def __init__(self, message=None):
......@@ -58,11 +64,12 @@ class MissingUsernameViolation(DjangoOpenIDException):
else:
self.message = message
class MissingPhysicalMultiFactor(DjangoOpenIDException):
def __init__(self, message=None):
if message is None:
self.message = "Login requires physical multi-factor authentication."
self.message = (
"Login requires physical multi-factor authentication.")
else:
self.message = message
......@@ -49,6 +49,7 @@ def teams_new_unicode(self):
return "%s -> %s" % (name, ", ".join(group_teams))
else:
return name
Group.unicode_before_teams = Group.__unicode__
Group.__unicode__ = teams_new_unicode
......@@ -64,9 +65,11 @@ class UserChangeFormWithTeamRestriction(UserChangeForm):
user_groups = self.instance.groups.all()
for group in data:
if group.name in known_teams and group not in user_groups:
raise forms.ValidationError("""The group %s is mapped to an
external team. You cannot assign it manually.""" % group.name)
raise forms.ValidationError(
"The group %s is mapped to an external team. "
"You cannot assign it manually." % group.name)
return data
UserAdmin.form = UserChangeFormWithTeamRestriction
......@@ -78,10 +81,7 @@ class OpenIDLoginForm(forms.Form):
def clean_openid_identifier(self):
if 'openid_identifier' in self.cleaned_data:
openid_identifier = self.cleaned_data['openid_identifier']
if xri.identifierScheme(openid_identifier) == 'XRI' and getattr(
settings, 'OPENID_DISALLOW_INAMES', False
):
if (xri.identifierScheme(openid_identifier) == 'XRI' and
getattr(settings, 'OPENID_DISALLOW_INAMES', False)):
raise forms.ValidationError(_('i-names are not supported'))
return self.cleaned_data['openid_identifier']
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'Nonce'
db.create_table(u'django_openid_auth_nonce', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('server_url', self.gf('django.db.models.fields.CharField')(max_length=2047)),
('timestamp', self.gf('django.db.models.fields.IntegerField')()),
('salt', self.gf('django.db.models.fields.CharField')(max_length=40)),
))
db.send_create_signal(u'django_openid_auth', ['Nonce'])
# Adding model 'Association'
db.create_table(u'django_openid_auth_association', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('server_url', self.gf('django.db.models.fields.TextField')(max_length=2047)),
('handle', self.gf('django.db.models.fields.CharField')(max_length=255)),
('secret', self.gf('django.db.models.fields.TextField')(max_length=255)),
('issued', self.gf('django.db.models.fields.IntegerField')()),
('lifetime', self.gf('django.db.models.fields.IntegerField')()),
('assoc_type', self.gf('django.db.models.fields.TextField')(max_length=64)),
))
db.send_create_signal(u'django_openid_auth', ['Association'])
# Adding model 'UserOpenID'
db.create_table(u'django_openid_auth_useropenid', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('user', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['auth.User'])),
('claimed_id', self.gf('django.db.models.fields.TextField')(unique=True, max_length=2047)),
('display_id', self.gf('django.db.models.fields.TextField')(max_length=2047)),
))
db.send_create_signal(u'django_openid_auth', ['UserOpenID'])
def backwards(self, orm):
# Deleting model 'Nonce'
db.delete_table(u'django_openid_auth_nonce')
# Deleting model 'Association'
db.delete_table(u'django_openid_auth_association')
# Deleting model 'UserOpenID'
db.delete_table(u'django_openid_auth_useropenid')
models = {
u'auth.group': {
'Meta': {'object_name': 'Group'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
},
u'auth.permission': {
'Meta': {'ordering': "(u'content_type__app_label', u'content_type__model', u'codename')", 'unique_together': "((u'content_type', u'codename'),)", 'object_name': 'Permission'},
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['contenttypes.ContentType']"}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
},
u'auth.user': {
'Meta': {'object_name': 'User'},
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
},
u'contenttypes.contenttype': {
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
},
u'django_openid_auth.association': {
'Meta': {'object_name': 'Association'},
'assoc_type': ('django.db.models.fields.TextField', [], {'max_length': '64'}),
'handle': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'issued': ('django.db.models.fields.IntegerField', [], {}),
'lifetime': ('django.db.models.fields.IntegerField', [], {}),
'secret': ('django.db.models.fields.TextField', [], {'max_length': '255'}),
'server_url': ('django.db.models.fields.TextField', [], {'max_length': '2047'})
},
u'django_openid_auth.nonce': {
'Meta': {'object_name': 'Nonce'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'salt': ('django.db.models.fields.CharField', [], {'max_length': '40'}),
'server_url': ('django.db.models.fields.CharField', [], {'max_length': '2047'}),
'timestamp': ('django.db.models.fields.IntegerField', [], {})
},
u'django_openid_auth.useropenid': {
'Meta': {'object_name': 'UserOpenID'},
'claimed_id': ('django.db.models.fields.TextField', [], {'unique': 'True', 'max_length': '2047'}),
'display_id': ('django.db.models.fields.TextField', [], {'max_length': '2047'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['auth.User']"})
}
}
complete_apps = ['django_openid_auth']
\ No newline at end of file
from __future__ import unicode_literals
from django.db import models, migrations
from django.conf import settings
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Association',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('server_url', models.TextField(max_length=2047)),
('handle', models.CharField(max_length=255)),
('secret', models.TextField(max_length=255)),
('issued', models.IntegerField()),
('lifetime', models.IntegerField()),
('assoc_type', models.TextField(max_length=64)),
],
options={
},
bases=(models.Model,),
),
migrations.CreateModel(
name='Nonce',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('server_url', models.CharField(max_length=2047)),
('timestamp', models.IntegerField()),
('salt', models.CharField(max_length=40)),
],
options={
},
bases=(models.Model,),
),
migrations.CreateModel(
name='UserOpenID',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('claimed_id', models.TextField(unique=True, max_length=2047)),
('display_id', models.TextField(max_length=2047)),
('user', models.ForeignKey(to=settings.AUTH_USER_MODEL)),
],
options={
'permissions': (('account_verified', 'The OpenID has been verified'),),
},
bases=(models.Model,),
),
]
......@@ -46,7 +46,7 @@ class Nonce(models.Model):
class Association(models.Model):
server_url = models.TextField(max_length=2047)
handle = models.CharField(max_length=255)
secret = models.TextField(max_length=255) # Stored base64 encoded
secret = models.TextField(max_length=255) # Stored base64 encoded
issued = models.IntegerField()
lifetime = models.IntegerField()
assoc_type = models.TextField(max_length=64)
......
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'Nonce'
db.create_table(u'django_openid_auth_nonce', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('server_url', self.gf('django.db.models.fields.CharField')(max_length=2047)),
('timestamp', self.gf('django.db.models.fields.IntegerField')()),
('salt', self.gf('django.db.models.fields.CharField')(max_length=40)),
))
db.send_create_signal(u'django_openid_auth', ['Nonce'])
# Adding model 'Association'
db.create_table(u'django_openid_auth_association', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('server_url', self.gf('django.db.models.fields.TextField')(max_length=2047)),
('handle', self.gf('django.db.models.fields.CharField')(max_length=255)),
('secret', self.gf('django.db.models.fields.TextField')(max_length=255)),
('issued', self.gf('django.db.models.fields.IntegerField')()),
('lifetime', self.gf('django.db.models.fields.IntegerField')()),
('assoc_type', self.gf('django.db.models.fields.TextField')(max_length=64)),
))
db.send_create_signal(u'django_openid_auth', ['Association'])
# Adding model 'UserOpenID'
db.create_table(u'django_openid_auth_useropenid', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('user', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['auth.User'])),
('claimed_id', self.gf('django.db.models.fields.TextField')(unique=True, max_length=2047)),
('display_id', self.gf('django.db.models.fields.TextField')(max_length=2047)),
))
db.send_create_signal(u'django_openid_auth', ['UserOpenID'])
def backwards(self, orm):
# Deleting model 'Nonce'
db.delete_table(u'django_openid_auth_nonce')
# Deleting model 'Association'
db.delete_table(u'django_openid_auth_association')
# Deleting model 'UserOpenID'
db.delete_table(u'django_openid_auth_useropenid')
models = {
u'auth.group': {
'Meta': {'object_name': 'Group'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
},
u'auth.permission': {
'Meta': {'ordering': "(u'content_type__app_label', u'content_type__model', u'codename')", 'unique_together': "((u'content_type', u'codename'),)", 'object_name': 'Permission'},
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['contenttypes.ContentType']"}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
},
u'auth.user': {
'Meta': {'object_name': 'User'},
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': u"orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
},
u'contenttypes.contenttype': {
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
},
u'django_openid_auth.association': {
'Meta': {'object_name': 'Association'},
'assoc_type': ('django.db.models.fields.TextField', [], {'max_length': '64'}),
'handle': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'issued': ('django.db.models.fields.IntegerField', [], {}),
'lifetime': ('django.db.models.fields.IntegerField', [], {}),
'secret': ('django.db.models.fields.TextField', [], {'max_length': '255'}),
'server_url': ('django.db.models.fields.TextField', [], {'max_length': '2047'})
},
u'django_openid_auth.nonce': {
'Meta': {'object_name': 'Nonce'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'salt': ('django.db.models.fields.CharField', [], {'max_length': '40'}),
'server_url': ('django.db.models.fields.CharField', [], {'max_length': '2047'}),
'timestamp': ('django.db.models.fields.IntegerField', [], {})
},
u'django_openid_auth.useropenid': {
'Meta': {'object_name': 'UserOpenID'},
'claimed_id': ('django.db.models.fields.TextField', [], {'unique': 'True', 'max_length': '2047'}),
'display_id': ('django.db.models.fields.TextField', [], {'max_length': '2047'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'user': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['auth.User']"})
}
}
complete_apps = ['django_openid_auth']
\ No newline at end of file
......@@ -38,8 +38,10 @@ from django_openid_auth.models import Association, Nonce
class DjangoOpenIDStore(OpenIDStore):
def __init__(self):
self.max_nonce_age = 6 * 60 * 60 # Six hours
super(DjangoOpenIDStore, self).__init__()
self.max_nonce_age = 6 * 60 * 60 # Six hours
def storeAssociation(self, server_url, association):
try:
......
......@@ -64,31 +64,28 @@ will be provided:
@since: 2.1.1
"""
from openid.message import registerNamespaceAlias, \
NamespaceAliasRegistrationError
from openid.extension import Extension
from openid import oidutil
try:
basestring #pylint:disable-msg=W0104
except NameError:
# For Python 2.2
basestring = (str, unicode) #pylint:disable-msg=W0622
from openid.extension import Extension
from openid.message import (
registerNamespaceAlias,
NamespaceAliasRegistrationError,
)
__all__ = [
'TeamsRequest',
'TeamsResponse',
'ns_uri',
'supportsTeams',
]
]
ns_uri = 'http://ns.launchpad.net/2007/openid-teams'
try:
registerNamespaceAlias(ns_uri, 'lp')
except NamespaceAliasRegistrationError, e:
oidutil.log('registerNamespaceAlias(%r, %r) failed: %s' % (ns_uri,
'lp', str(e),))
oidutil.log(
'registerNamespaceAlias(%r, %r) failed: %s' % (ns_uri, 'lp', str(e)))
def supportsTeams(endpoint):
"""Does the given endpoint advertise support for Launchpad Teams?
......@@ -101,6 +98,7 @@ def supportsTeams(endpoint):
"""
return endpoint.usesExtension(ns_uri)
class TeamsNamespaceError(ValueError):
"""The Launchpad teams namespace was not found and could not
be created using the expected name (there's another extension
......@@ -115,6 +113,7 @@ class TeamsNamespaceError(ValueError):
the message that is being processed.
"""
def getTeamsNS(message):
"""Extract the Launchpad teams namespace URI from the given
OpenID message.
......@@ -145,7 +144,8 @@ def getTeamsNS(message):
# we know that ns_uri defined, because it's defined in the
# else clause of the loop as well, so disable the warning
return ns_uri #pylint:disable-msg=W0631
return ns_uri
class TeamsRequest(Extension):
"""An object to hold the state of a Launchpad teams request.
......@@ -154,7 +154,8 @@ class TeamsRequest(Extension):
names that the RP is interested in.
@type required: [str]
@group Consumer: requestField, requestTeams, getExtensionArgs, addToOpenIDRequest
@group Consumer: requestField, requestTeams, getExtensionArgs,
addToOpenIDRequest
@group Server: fromOpenIDRequest, parseExtensionArgs
"""
......@@ -308,6 +309,7 @@ class TeamsRequest(Extension):
return args
class TeamsResponse(Extension):
"""Represents the data returned in a Launchpad teams response
inside of an OpenID C{id_res} response. This object will be
......@@ -394,7 +396,6 @@ class TeamsResponse(Extension):
if "is_member" in args:
is_member_str = args["is_member"]
self.is_member = is_member_str.split(',')
#self.is_member = args["is_member"]
return self
......@@ -406,6 +407,5 @@ class TeamsResponse(Extension):
@see: openid.extension
"""
ns_args = {'is_member': ','.join(self.is_member),}
ns_args = {'is_member': ','.join(self.is_member)}
return ns_args
......@@ -26,18 +26,8 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import unittest
from test_views import *
from test_settings import *
from test_store import *
from test_auth import *
from test_admin import *
def suite():
suite = unittest.TestSuite()
for name in ['test_auth', 'test_models', 'test_settings', 'test_store',
'test_views', 'test_admin']:
mod = __import__('%s.%s' % (__name__, name), {}, {}, ['suite'])
suite.addTest(mod.suite())
return suite
from .test_views import * # flake8: noqa
from .test_settings import *
from .test_store import *
from .test_auth import *
from .test_admin import *
......@@ -25,35 +25,17 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Tests for the django_openid_auth Admin login form replacement.
"""
import unittest
"""Tests for the django_openid_auth Admin login form replacement."""
from django.conf import settings
from django.contrib.auth.models import User, AnonymousUser
settings.OPENID_USE_AS_ADMIN_LOGIN = True
from django.contrib.auth.models import User
from django.test import TestCase
from django.test.utils import override_settings
def create_user(is_staff=False, authenticated=True):
"""
Create and return a user, either the AnonymousUser or a normal Django user,
setting the is_staff attribute if appropriate.
"""
if not authenticated:
return AnonymousUser()
else:
user = User(
username=u'testing', email='testing@example.com',
is_staff=is_staff)
user.set_password(u'test')
user.save()
@override_settings(OPENID_USE_AS_ADMIN_LOGIN=True)
class SiteAdminTests(TestCase):
"""
TestCase for accessing /admin/ when the django_openid_auth form replacement
......@@ -65,23 +47,21 @@ class SiteAdminTests(TestCase):
If the request has an authenticated user, who is not flagged as a
staff member, then they get a failure response.
"""
create_user()
self.client.login(username='testing', password='test')
response = self.client.get('/admin/')
self.assertTrue('User testing does not have admin access.' in
response.content, 'Missing error message in response')
User.objects.create_user(
username=u'testing', email='testing@example.com', password=u'test')
assert self.client.login(username='testing', password='test')
response = self.client.get('/admin/', follow=True)
self.assertContains(
response,
'User testing does not have admin/staff access.', status_code=403)
def test_admin_site_with_openid_login_non_authenticated_user(self):
"""
Unauthenticated users accessing the admin page should be directed to
the OpenID login url.
"""
response = self.client.get('/admin/')
self.assertEqual(302, response.status_code)
self.assertEqual('http://testserver' + getattr(settings, 'LOGIN_URL',
'/openid/login') + '?next=/admin/',
response['Location'])
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
response = self.client.get('/admin/', follow=True)
self.assertRedirects(
response,
getattr(settings, 'LOGIN_URL', '/openid/login') +
'?next=%2Fadmin%2F')
......@@ -26,80 +26,39 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import unittest
from django.conf import settings
from django.contrib.auth.models import (
Group,
Permission,
User,
)
from django.contrib.auth.models import Group, Permission, User
from django.test import TestCase
from django.test.utils import override_settings
from django_openid_auth.auth import OpenIDBackend
from django_openid_auth.models import UserOpenID
from django_openid_auth.teams import ns_uri as TEAMS_NS
from django_openid_auth.tests.helpers import override_session_serializer
from openid.consumer.consumer import SuccessResponse
from openid.consumer.discover import OpenIDServiceEndpoint
from openid.message import Message, OPENID2_NS
from django_openid_auth.auth import OpenIDBackend
from django_openid_auth.models import UserOpenID
from django_openid_auth.teams import ns_uri as TEAMS_NS
from django_openid_auth.tests.helpers import override_session_serializer
SREG_NS = "http://openid.net/sreg/1.0"
AX_NS = "http://openid.net/srv/ax/1.0"
@override_session_serializer
@override_settings(
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
class OpenIDBackendTests(TestCase):
def setUp(self):
super(OpenIDBackendTests, self).setUp()
self.backend = OpenIDBackend()
self.old_openid_use_email_for_username = getattr(settings,
'OPENID_USE_EMAIL_FOR_USERNAME', False)
self.old_openid_launchpad_teams_required = getattr(settings,
'OPENID_LAUNCHPAD_TEAMS_REQUIRED', [])
self.old_openid_launchpad_teams_mapping_auto = getattr(settings,
'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
self.old_openid_email_whitelist_regexp_list = getattr(settings,
'OPENID_EMAIL_WHITELIST_REGEXP_LIST', [])
def tearDown(self):
settings.OPENID_USE_EMAIL_FOR_USERNAME = \
self.old_openid_use_email_for_username
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = (
self.old_openid_launchpad_teams_required)
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = (
self.old_openid_launchpad_teams_mapping_auto)
settings.OPENID_EMAIL_WHITELIST_REGEXP_LIST = (
self.old_openid_email_whitelist_regexp_list)
def test_extract_user_details_sreg(self):
expected = {
'nickname': 'someuser',
'first_name': 'Some',
'last_name': 'User',
'email': 'foo@example.com',
'account_verified': False,
}
data = {
'nickname': expected['nickname'],
'fullname': "%s %s" % (expected['first_name'],
expected['last_name']),
'email': expected['email'],
}
response = self.make_response_sreg(**data)
details = self.backend._extract_user_details(response)
self.assertEqual(details, expected)
def make_fake_openid_endpoint(self, claimed_id=None):
def make_openid_response(self, sreg_args=None, teams_args=None):
endpoint = OpenIDServiceEndpoint()
endpoint.claimed_id = claimed_id
return endpoint
endpoint.claimed_id = 'some-id'
def make_openid_response(self, sreg_args=None, teams_args=None):
endpoint = self.make_fake_openid_endpoint(claimed_id='some-id')
message = Message(OPENID2_NS)
if sreg_args is not None:
for key, value in sreg_args.items():
......@@ -111,11 +70,8 @@ class OpenIDBackendTests(TestCase):
endpoint, message, signed_fields=message.toPostArgs().keys())
return response
def make_response_sreg(self, **kwargs):
response = self.make_openid_response(sreg_args=kwargs)
return response
def make_response_ax(self, schema="http://axschema.org/",
def make_response_ax(
self, schema="http://axschema.org/",
fullname="Some User", nickname="someuser", email="foo@example.com",
first=None, last=None, verified=False):
endpoint = OpenIDServiceEndpoint()
......@@ -142,9 +98,69 @@ class OpenIDBackendTests(TestCase):
return SuccessResponse(
endpoint, message, signed_fields=message.toPostArgs().keys())
def make_user_openid(self, user=None,
claimed_id='http://example.com/existing_identity',
display_id='http://example.com/existing_identity'):
if user is None:
user = User.objects.create_user(
username='someuser', email='someuser@example.com',
password='12345678')
user_openid, created = UserOpenID.objects.get_or_create(
user=user, claimed_id=claimed_id, display_id=display_id)
return user_openid
def assert_account_verified(self, user, initially_verified, verified):
# set user's verification status
permission = Permission.objects.get(codename='account_verified')
if initially_verified:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
user = User.objects.get(pk=user.pk)
has_perm = user.has_perm('django_openid_auth.account_verified')
assert has_perm == initially_verified
if hasattr(user, '_perm_cache'):
del user._perm_cache
# get a response including verification status
response = self.make_response_ax()
data = dict(first_name=u"Some56789012345678901234567890123",
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com",
account_verified=verified)
self.backend.update_user_details(user, data, response)
# refresh object from the database
user = User.objects.get(pk=user.pk)
# check the verification status
self.assertEqual(
user.has_perm('django_openid_auth.account_verified'), verified)
def test_extract_user_details_sreg(self):
expected = {
'nickname': 'someuser',
'first_name': 'Some',
'last_name': 'User',
'email': 'foo@example.com',
'account_verified': False,
}
data = {
'nickname': expected['nickname'],
'fullname': "%s %s" % (expected['first_name'],
expected['last_name']),
'email': expected['email'],
}
response = self.make_openid_response(sreg_args=data)
details = self.backend._extract_user_details(response)
self.assertEqual(details, expected)
def test_extract_user_details_ax(self):
response = self.make_response_ax(fullname="Some User",
nickname="someuser", email="foo@example.com")
response = self.make_response_ax(
fullname="Some User", nickname="someuser", email="foo@example.com")
data = self.backend._extract_user_details(response)
......@@ -183,13 +199,14 @@ class OpenIDBackendTests(TestCase):
def test_update_user_details_long_names(self):
response = self.make_response_ax()
user = User.objects.create_user('someuser', 'someuser@example.com',
password=None)
user = User.objects.create_user(
'someuser', 'someuser@example.com', password=None)
user_openid, created = UserOpenID.objects.get_or_create(
user=user,
claimed_id='http://example.com/existing_identity',
display_id='http://example.com/existing_identity')
data = dict(first_name=u"Some56789012345678901234567890123",
data = dict(
first_name=u"Some56789012345678901234567890123",
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com", account_verified=False)
......@@ -198,58 +215,25 @@ class OpenIDBackendTests(TestCase):
self.assertEqual("Some56789012345678901234567890", user.first_name)
self.assertEqual("User56789012345678901234567890", user.last_name)
def make_user(self, username='someuser', email='someuser@example.com',
password=None):
user = User.objects.create_user(username, email, password=password)
return user
def make_user_openid(self, user=None,
claimed_id='http://example.com/existing_identity',
display_id='http://example.com/existing_identity'):
if user is None:
user = self.make_user()
user_openid, created = UserOpenID.objects.get_or_create(
user=user, claimed_id=claimed_id, display_id=display_id)
return user_openid
def _test_account_verified(self, user, initially_verified, expected):
# set user's verification status
permission = Permission.objects.get(codename='account_verified')
if initially_verified:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
def test_update_user_perms_initially_verified_then_verified(self):
self.assert_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=True)
if hasattr(user, '_perm_cache'):
del user._perm_cache
def test_update_user_perms_initially_verified_then_unverified(self):
self.assert_account_verified(
self.make_user_openid().user,
initially_verified=True, verified=False)
# get a response including verification status
response = self.make_response_ax()
data = dict(first_name=u"Some56789012345678901234567890123",
last_name=u"User56789012345678901234567890123",
email=u"someotheruser@example.com",
account_verified=expected)
self.backend.update_user_details(user, data, response)
# refresh object from the database
user = User.objects.get(pk=user.pk)
# check the verification status
self.assertEqual(user.has_perm('django_openid_auth.account_verified'),
expected)
def test_update_user_perms_unverified(self):
user_openid = self.make_user_openid()
for initially_verified in (False, True):
self._test_account_verified(
user_openid.user, initially_verified, expected=False)
def test_update_user_perms_verified(self):
user_openid = self.make_user_openid()
def test_update_user_perms_initially_not_verified_then_verified(self):
self.assert_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=True)
for initially_verified in (False, True):
self._test_account_verified(
user_openid.user, initially_verified, expected=True)
def test_update_user_perms_initially_not_verified_then_unverified(self):
self.assert_account_verified(
self.make_user_openid().user,
initially_verified=False, verified=False)
def test_extract_user_details_name_with_trailing_space(self):
response = self.make_response_ax(fullname="SomeUser ")
......@@ -267,32 +251,35 @@ class OpenIDBackendTests(TestCase):
self.assertEqual("Some", data['first_name'])
self.assertEqual("User", data['last_name'])
@override_settings(OPENID_USE_EMAIL_FOR_USERNAME=True)
def test_preferred_username_email_munging(self):
settings.OPENID_USE_EMAIL_FOR_USERNAME = True
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'fooexamplecom'),
('noemail', '', 'noemail'),
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.assertEqual(expected,
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'fooexamplecom'),
('noemail', '', 'noemail'),
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.assertEqual(
expected,
self.backend._get_preferred_username(nick, email))
def test_preferred_username_no_email_munging(self):
for nick, email, expected in [
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'openiduser'),
('noemail', '', 'noemail'),
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.assertEqual(expected,
('nickcomesfirst', 'foo@example.com', 'nickcomesfirst'),
('', 'foo@example.com', 'openiduser'),
('noemail', '', 'noemail'),
('', '@%.-', 'openiduser'),
('', '', 'openiduser'),
(None, None, 'openiduser')]:
self.assertEqual(
expected,
self.backend._get_preferred_username(nick, email))
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'])
def test_authenticate_when_not_member_of_teams_required(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
Group.objects.create(name='team')
response = self.make_openid_response(
......@@ -302,9 +289,10 @@ class OpenIDBackendTests(TestCase):
self.assertIsNone(user)
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'])
def test_authenticate_when_no_group_mapping_to_required_team(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
......@@ -314,9 +302,10 @@ class OpenIDBackendTests(TestCase):
self.assertIsNone(user)
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'])
def test_authenticate_when_member_of_teams_required(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
Group.objects.create(name='team')
response = self.make_openid_response(
......@@ -326,9 +315,8 @@ class OpenIDBackendTests(TestCase):
self.assertIsNotNone(user)
@override_settings(OPENID_LAUNCHPAD_TEAMS_REQUIRED=[])
def test_authenticate_when_no_teams_required(self):
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = []
response = self.make_openid_response(
sreg_args=dict(nickname='someuser'),
teams_args=dict(is_member='team'))
......@@ -336,9 +324,10 @@ class OpenIDBackendTests(TestCase):
self.assertIsNotNone(user)
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team1', 'team2'])
def test_authenticate_when_member_of_at_least_one_team(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team1', 'team2']
Group.objects.create(name='team1')
response = self.make_openid_response(
......@@ -348,12 +337,12 @@ class OpenIDBackendTests(TestCase):
self.assertIsNotNone(user)
def test_authenticate_when_not_in_required_team_but_email_whitelisted(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
settings.OPENID_EMAIL_WHITELIST_REGEXP_LIST = [
'foo(\+[^@]*)?@foo.com',
]
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'],
OPENID_EMAIL_WHITELIST_REGEXP_LIST=['foo(\+[^@]*)?@foo.com'])
def test_authenticate_when_not_in_required_team_but_email_whitelisted(
self):
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
......@@ -370,12 +359,11 @@ class OpenIDBackendTests(TestCase):
self.assertIsNotNone(user)
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'],
OPENID_EMAIL_WHITELIST_REGEXP_LIST=['foo@foo.com', 'bar@foo.com'])
def test_authenticate_whitelisted_email_multiple_patterns(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
settings.OPENID_EMAIL_WHITELIST_REGEXP_LIST = [
'foo@foo.com', 'bar@foo.com',
]
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
......@@ -385,12 +373,11 @@ class OpenIDBackendTests(TestCase):
self.assertIsNotNone(user)
@override_settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_REQUIRED=['team'],
OPENID_EMAIL_WHITELIST_REGEXP_LIST=['foo@foo.com'])
def test_authenticate_whitelisted_email_not_match(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_REQUIRED = ['team']
settings.OPENID_EMAIL_WHITELIST_REGEXP_LIST = [
'foo@foo.com',
]
assert Group.objects.filter(name='team').count() == 0
response = self.make_openid_response(
......@@ -399,7 +386,3 @@ class OpenIDBackendTests(TestCase):
user = self.backend.authenticate(openid_response=response)
self.assertIsNone(user)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
......@@ -26,8 +26,6 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import unittest
from django.contrib.auth.models import User
from django.test import TestCase
......@@ -72,7 +70,3 @@ class UserOpenIDModelTestCase(TestCase):
self.assertFalse(
User.objects.get(username='someuser').has_perm(
'django_openid_auth.account_verified'))
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
from unittest import skipIf, TestLoader
from unittest import skipIf
from django import VERSION
from django.conf import settings
......@@ -16,20 +16,8 @@ class SessionSerializerTest(TestCase):
[0] https://bit.ly/1myzetd
[1] https://github.com/openid/python-openid/issues/17
"""
@skipIf(VERSION >= (1, 6, 0), "Old versions used the pickle serializer.")
def test_not_using_json_session_serializer(self):
# We use getattr because this setting did not exist in Django
# 1.4 (pickle serialization was hard coded)
serializer = getattr(settings, 'SESSION_SERIALIZER', '')
self.assertNotEqual(
serializer, 'django.contrib.sessions.serializers.JSONSerializer')
@skipIf(VERSION < (1, 6, 0), "Newer versions use JSON by default.")
@skipIf(VERSION < (1, 5), "Django 1.4 does not provide SESSION_SERIALIZER")
def test_using_json_session_serializer(self):
serializer = getattr(settings, 'SESSION_SERIALIZER', '')
self.assertEqual(
serializer, 'django.contrib.sessions.serializers.JSONSerializer')
def suite():
return TestLoader().loadTestsFromName(__name__)
serializer, 'django.contrib.sessions.serializers.PickleSerializer')
......@@ -27,7 +27,6 @@
# POSSIBILITY OF SUCH DAMAGE.
import time
import unittest
from django.test import TestCase
from openid.association import Association as OIDAssociation
......@@ -187,7 +186,3 @@ class OpenIDStoreTests(TestCase):
# The second (non-expired) association is left behind.
self.assertNotEqual(self.store.getAssociation('server-url', 'handle2'),
None)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
......@@ -28,13 +28,16 @@
# POSSIBILITY OF SUCH DAMAGE.
import cgi
import unittest
from urllib import quote_plus
from urlparse import parse_qs
from django.conf import settings
from django.contrib.auth.models import User, Group, Permission
from django.core.urlresolvers import reverse
from django.http import HttpRequest, HttpResponse
from django.test import TestCase
from django.test.utils import override_settings
from mock import patch
from openid.consumer.consumer import Consumer, SuccessResponse
from openid.consumer.discover import OpenIDServiceEndpoint
from openid.extensions import ax, sreg, pape
......@@ -67,6 +70,7 @@ ET = importElementTree()
class StubOpenIDProvider(HTTPFetcher):
def __init__(self, base_url):
super(StubOpenIDProvider, self).__init__()
self.store = MemoryStore()
self.identity_url = base_url + 'identity'
self.localid_url = base_url + 'localid'
......@@ -136,7 +140,9 @@ class StubOpenIDProvider(HTTPFetcher):
class DummyDjangoRequest(object):
def __init__(self, request_path):
super(DummyDjangoRequest, self).__init__()
self.request_path = request_path
self.META = {
'HTTP_HOST': "localhost",
......@@ -164,8 +170,23 @@ class DummyDjangoRequest(object):
@override_session_serializer
@override_settings(
OPENID_CREATE_USERS=False,
OPENID_STRICT_USERNAMES=False,
OPENID_UPDATE_DETAILS_FROM_SREG=False,
OPENID_SSO_SERVER_URL=None,
OPENID_LAUNCHPAD_TEAMS_MAPPING={},
OPENID_USE_AS_ADMIN_LOGIN=False,
OPENID_FOLLOW_RENAMES=False,
OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=False,
OPENID_SREG_REQUIRED_FIELDS=[],
OPENID_USE_EMAIL_FOR_USERNAME=False,
OPENID_VALID_VERIFICATION_SCHEMES={},
)
class RelyingPartyTests(TestCase):
urls = 'django_openid_auth.tests.urls'
login_url = reverse('openid-login')
def setUp(self):
super(RelyingPartyTests, self).setUp()
......@@ -178,70 +199,16 @@ class RelyingPartyTests(TestCase):
self.consumer = make_consumer(self.req)
self.server = Server(DjangoOpenIDStore(), op_endpoint=server_url)
setDefaultFetcher(self.provider, wrap_exceptions=False)
self.addCleanup(setDefaultFetcher, None)
self.old_login_redirect_url = getattr(
settings, 'LOGIN_REDIRECT_URL', '/accounts/profile/')
self.old_create_users = getattr(
settings, 'OPENID_CREATE_USERS', False)
self.old_strict_usernames = getattr(
settings, 'OPENID_STRICT_USERNAMES', False)
self.old_update_details = getattr(
settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False)
self.old_sso_server_url = getattr(
settings, 'OPENID_SSO_SERVER_URL', None)
self.old_teams_map = getattr(
settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
self.old_use_as_admin_login = getattr(
settings, 'OPENID_USE_AS_ADMIN_LOGIN', False)
self.old_follow_renames = getattr(
settings, 'OPENID_FOLLOW_RENAMES', False)
self.old_physical_multifactor = getattr(
settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False)
self.old_login_render_failure = getattr(
settings, 'OPENID_RENDER_FAILURE', None)
self.old_openid_use_email_for_username = getattr(
settings,
'OPENID_USE_EMAIL_FOR_USERNAME', False)
self.old_required_fields = getattr(
settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
self.old_valid_verification_schemes = getattr(
settings, 'OPENID_VALID_VERIFICATION_SCHEMES', {})
self.old_consumer_complete = Consumer.complete
settings.OPENID_CREATE_USERS = False
settings.OPENID_STRICT_USERNAMES = False
settings.OPENID_UPDATE_DETAILS_FROM_SREG = False
settings.OPENID_SSO_SERVER_URL = None
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {}
settings.OPENID_USE_AS_ADMIN_LOGIN = False
settings.OPENID_FOLLOW_RENAMES = False
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = False
settings.OPENID_SREG_REQUIRED_FIELDS = []
settings.OPENID_USE_EMAIL_FOR_USERNAME = False
settings.OPENID_VALID_VERIFICATION_SCHEMES = {}
def tearDown(self):
settings.LOGIN_REDIRECT_URL = self.old_login_redirect_url
settings.OPENID_CREATE_USERS = self.old_create_users
settings.OPENID_STRICT_USERNAMES = self.old_strict_usernames
settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details
settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = self.old_teams_map
settings.OPENID_USE_AS_ADMIN_LOGIN = self.old_use_as_admin_login
settings.OPENID_FOLLOW_RENAMES = self.old_follow_renames
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = (
self.old_physical_multifactor)
settings.OPENID_RENDER_FAILURE = self.old_login_render_failure
Consumer.complete = self.old_consumer_complete
settings.OPENID_SREG_REQUIRED_FIELDS = self.old_required_fields
settings.OPENID_USE_EMAIL_FOR_USERNAME = (
self.old_openid_use_email_for_username)
settings.OPENID_VALID_VERIFICATION_SCHEMES = (
self.old_valid_verification_schemes)
setDefaultFetcher(None)
super(RelyingPartyTests, self).tearDown()
self.openid_req_no_next = {
'openid_identifier': 'http://example.com/identity'}
self.openid_req = {
'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Openid User',
'email': 'test@example.com'}
def complete(self, openid_response):
"""Complete an OpenID authentication request."""
......@@ -249,36 +216,33 @@ class RelyingPartyTests(TestCase):
# here. For simplicity, force generation of a redirect.
openid_response.whichEncoding = lambda: ENCODE_URL
webresponse = self.provider.server.encodeResponse(openid_response)
self.assertEquals(webresponse.code, 302)
self.assertEqual(webresponse.code, 302)
redirect_to = webresponse.headers['location']
self.assertTrue(redirect_to.startswith(
'http://testserver/openid/complete/'))
return self.client.get(
'/openid/complete/',
reverse('openid-complete'),
dict(cgi.parse_qsl(redirect_to.split('?', 1)[1])))
def test_login(self):
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# The login form is displayed:
response = self.client.get('/openid/login/')
response = self.client.get(self.login_url)
self.assertTemplateUsed(response, 'openid/login.html')
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
self.assertEquals(openid_request.mode, 'checkid_setup')
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
'http://testserver/openid/complete/'))
# Complete the request. The user is redirected to the next URL.
openid_response = openid_request.answer(True)
......@@ -287,11 +251,12 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'someuser')
self.assertEqual(response.content, 'someuser')
def test_login_with_nonascii_return_to(self):
"""Ensure non-ascii characters can be used for the 'next' arg."""
response = self.client.post('/openid/login/',
response = self.client.post(
self.login_url,
{'openid_identifier': 'http://example.com/identity',
'next': u'/files/ñandú.jpg'.encode('utf-8')})
self.assertContains(response, 'OpenID transaction in progress')
......@@ -299,47 +264,46 @@ class RelyingPartyTests(TestCase):
def test_login_no_next(self):
"""Logins with no next parameter redirect to LOGIN_REDIRECT_URL."""
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
settings.LOGIN_REDIRECT_URL = '/getuser/'
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity'})
response = self.client.post(self.login_url, self.openid_req_no_next)
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
self.assertEquals(openid_request.mode, 'checkid_setup')
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
'http://testserver/openid/complete/'))
# Complete the request. The user is redirected to the next URL.
openid_response = openid_request.answer(True)
response = self.complete(openid_response)
self.assertRedirects(
response, 'http://testserver' + settings.LOGIN_REDIRECT_URL)
with self.settings(LOGIN_REDIRECT_URL='/getuser/'):
response = self.complete(openid_response)
self.assertRedirects(response, 'http://testserver/getuser/')
def test_login_sso(self):
settings.OPENID_SSO_SERVER_URL = 'http://example.com/identity'
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Requesting the login form immediately begins an
# authentication request.
response = self.client.get('/openid/login/', {'next': '/getuser/'})
self.assertEquals(response.status_code, 200)
with self.settings(
OPENID_SSO_SERVER_URL='http://example.com/identity'):
response = self.client.get(self.login_url, {'next': '/getuser/'})
self.assertEqual(response.status_code, 200)
self.assertContains(response, 'OpenID transaction in progress')
openid_request = self.provider.parseFormPost(response.content)
self.assertEquals(openid_request.mode, 'checkid_setup')
self.assertEqual(openid_request.mode, 'checkid_setup')
self.assertTrue(openid_request.return_to.startswith(
'http://testserver/openid/complete/'))
'http://testserver/openid/complete/'))
# Complete the request. The user is redirected to the next URL.
openid_response = openid_request.answer(True)
......@@ -348,17 +312,14 @@ class RelyingPartyTests(TestCase):
# And they are now logged in:
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'someuser')
self.assertEqual(response.content, 'someuser')
def test_login_create_users(self):
settings.OPENID_CREATE_USERS = True
# Create a user with the same name as we'll pass back via sreg.
User.objects.create_user('someuser', 'someone@example.com')
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -370,19 +331,20 @@ class RelyingPartyTests(TestCase):
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
with self.settings(OPENID_CREATE_USERS=True):
response = self.complete(openid_response)
self.assertRedirects(response, 'http://testserver/getuser/')
# And they are now logged in as a new user (they haven't taken
# over the existing "someuser" user).
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'someuser2')
self.assertEqual(response.content, 'someuser2')
# Check the details of the new user.
user = User.objects.get(username='someuser2')
self.assertEquals(user.first_name, 'Some')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
def _do_user_login(self, req_data, resp_data, use_sreg=True,
use_pape=None):
......@@ -395,7 +357,7 @@ class RelyingPartyTests(TestCase):
def _get_login_request(self, req_data):
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/', req_data)
response = self.client.post(self.login_url, req_data)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -413,27 +375,17 @@ class RelyingPartyTests(TestCase):
sreg_request, resp_data)
openid_response.addExtension(sreg_response)
if use_pape is not None:
policies = [
use_pape
]
policies = [use_pape]
pape_response = pape.Response(auth_policies=policies)
openid_response.addExtension(pape_response)
return openid_response
def parse_query_string(self, query_str):
query_items = map(tuple,
[item.split('=') for item in query_str.split('&')])
query = dict(query_items)
return query
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_request(self):
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
self.provider.type_uris.append(pape.ns_uri)
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
response = self.client.post('/openid/login/', openid_req)
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
request_auth = openid_request.message.getArg(
......@@ -442,85 +394,82 @@ class RelyingPartyTests(TestCase):
)
self.assertEqual(request_auth, preferred_auth)
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_response(self):
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
self.provider.type_uris.append(pape.ns_uri)
def mock_complete(this, request_args, return_to):
request = {'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape' : pape.ns_uri,
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
request = {
'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape': pape.ns_uri,
'openid.pape.auth_policies': request_args.get(
'openid.pape.auth_policies', pape.AUTH_NONE),
}
openid_server = self.provider.server
orequest = openid_server.decodeRequest(request)
response = SuccessResponse(
self.endpoint, orequest.message,
signed_fields=['openid.pape.auth_policies',])
signed_fields=['openid.pape.auth_policies'])
return response
Consumer.complete = mock_complete
patch.object(Consumer, 'complete', mock_complete)
user = User.objects.create_user('testuser', 'test@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
'email': 'test@example.com'}
response = self._do_user_login(openid_req, openid_resp, use_pape=pape.AUTH_MULTI_FACTOR_PHYSICAL)
response = self._do_user_login(
self.openid_req, self.openid_resp,
use_pape=pape.AUTH_MULTI_FACTOR_PHYSICAL)
query = self.parse_query_string(response.request['QUERY_STRING'])
query = parse_qs(response.request['QUERY_STRING'])
self.assertTrue('openid.pape.auth_policies' in query)
self.assertEqual(query['openid.pape.auth_policies'],
quote_plus(preferred_auth))
self.assertEqual(
query['openid.pape.auth_policies'], [preferred_auth])
response = self.client.get('/getuser/')
self.assertEqual(response.content, 'testuser')
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_not_provided(self):
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
self.provider.type_uris.append(pape.ns_uri)
def mock_complete(this, request_args, return_to):
request = {'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape' : pape.ns_uri,
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
request = {
'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape': pape.ns_uri,
'openid.pape.auth_policies': request_args.get(
'openid.pape.auth_policies', pape.AUTH_NONE),
}
openid_server = self.provider.server
orequest = openid_server.decodeRequest(request)
response = SuccessResponse(
self.endpoint, orequest.message,
signed_fields=['openid.pape.auth_policies',])
signed_fields=['openid.pape.auth_policies'])
return response
Consumer.complete = mock_complete
patch.object(Consumer, 'complete', mock_complete)
user = User.objects.create_user('testuser', 'test@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
'email': 'test@example.com'}
openid_request = self._get_login_request(openid_req)
openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
openid_request = self._get_login_request(self.openid_req)
openid_response = self._get_login_response(
openid_request, self.openid_req, self.openid_resp,
use_pape=pape.AUTH_NONE)
response_auth = openid_request.message.getArg(
'http://specs.openid.net/extensions/pape/1.0',
......@@ -529,12 +478,16 @@ class RelyingPartyTests(TestCase):
self.assertNotEqual(response_auth, preferred_auth)
response = self.complete(openid_response)
self.assertEquals(403, response.status_code)
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(response, '<p>Login requires physical multi-factor authentication.</p>', status_code=403)
self.assertEqual(403, response.status_code)
self.assertContains(
response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(
response,
'<p>Login requires physical multi-factor authentication.</p>',
status_code=403)
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
def test_login_physical_multifactor_not_provided_override(self):
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
self.provider.type_uris.append(pape.ns_uri)
......@@ -542,40 +495,39 @@ class RelyingPartyTests(TestCase):
def mock_login_failure_handler(request, message, status=403,
template_name=None,
exception=None):
self.assertTrue(isinstance(exception, MissingPhysicalMultiFactor))
return HttpResponse('Test Failure Override', status=200)
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
self.assertIsInstance(exception, MissingPhysicalMultiFactor)
return HttpResponse('Test Failure Override', status=200)
def mock_complete(this, request_args, return_to):
request = {'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape' : pape.ns_uri,
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
pape_policy = request_args.get(
'openid.pape.auth_policies', pape.AUTH_NONE)
request = {
'openid.mode': 'checkid_setup',
'openid.trust_root': 'http://localhost/',
'openid.return_to': 'http://localhost/',
'openid.identity': IDENTIFIER_SELECT,
'openid.ns.pape': pape.ns_uri,
'openid.pape.auth_policies': pape_policy,
}
openid_server = self.provider.server
orequest = openid_server.decodeRequest(request)
response = SuccessResponse(
self.endpoint, orequest.message,
signed_fields=['openid.pape.auth_policies',])
signed_fields=['openid.pape.auth_policies'])
return response
Consumer.complete = mock_complete
patch.object(Consumer, 'complete', mock_complete)
user = User.objects.create_user('testuser', 'test@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
'email': 'test@example.com'}
openid_request = self._get_login_request(openid_req)
openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
openid_request = self._get_login_request(self.openid_req)
openid_response = self._get_login_response(
openid_request, self.openid_req, self.openid_resp,
use_pape=pape.AUTH_NONE)
response_auth = openid_request.message.getArg(
'http://specs.openid.net/extensions/pape/1.0',
......@@ -583,193 +535,174 @@ class RelyingPartyTests(TestCase):
)
self.assertNotEqual(response_auth, preferred_auth)
# Status code should be 200, since we over-rode the login_failure handler
response = self.complete(openid_response)
self.assertEquals(200, response.status_code)
# Status code should be 200, since we over-rode the login_failure
with self.settings(OPENID_RENDER_FAILURE=mock_login_failure_handler):
response = self.complete(openid_response)
self.assertEqual(200, response.status_code)
self.assertContains(response, 'Test Failure Override')
def test_login_without_nickname(self):
settings.OPENID_CREATE_USERS = True
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': '', 'fullname': 'Openid User',
'email': 'foo@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': '', 'fullname': 'Openid User',
'email': 'foo@example.com'}
with self.settings(OPENID_CREATE_USERS=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# username defaults to 'openiduser'
self.assertEquals(response.content, 'openiduser')
self.assertEqual(response.content, 'openiduser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Openid')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Openid')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
def test_login_without_nickname_with_email_suggestion(self):
settings.OPENID_CREATE_USERS = True
settings.OPENID_USE_EMAIL_FOR_USERNAME = True
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': '', 'fullname': 'Openid User',
'email': 'foo@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': '', 'fullname': 'Openid User',
'email': 'foo@example.com'}
with self.settings(
OPENID_CREATE_USERS=True, OPENID_USE_EMAIL_FOR_USERNAME=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# username defaults to a munged version of the email
self.assertEquals(response.content, 'fooexamplecom')
self.assertEqual(response.content, 'fooexamplecom')
def test_login_duplicate_username_numbering(self):
settings.OPENID_FOLLOW_RENAMES = False
settings.OPENID_CREATE_USERS = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to conflict with
user = User.objects.create_user('testuser', 'someone@example.com')
User.objects.create_user('testuser', 'someone@example.com')
# identity url is for 'renameuser'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
'email': 'test@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
with self.settings(
OPENID_FOLLOW_RENAMES=False, OPENID_CREATE_USERS=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# Since this username is already taken by someone else, we go through
# the process of adding +i to it, and get testuser2.
self.assertEquals(response.content, 'testuser2')
self.assertEqual(response.content, 'testuser2')
def test_login_duplicate_username_numbering_with_conflicts(self):
settings.OPENID_FOLLOW_RENAMES = False
settings.OPENID_CREATE_USERS = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to conflict with
user = User.objects.create_user('testuser', 'someone@example.com')
user = User.objects.create_user('testuser3', 'someone@example.com')
User.objects.create_user('testuser', 'someone@example.com')
User.objects.create_user('testuser3', 'someone@example.com')
# identity url is for 'renameuser'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
'email': 'test@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
with self.settings(
OPENID_FOLLOW_RENAMES=False, OPENID_CREATE_USERS=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which already exists, so it should skip to 4.
self.assertEquals(response.content, 'testuser4')
self.assertEqual(response.content, 'testuser4')
def test_login_duplicate_username_numbering_with_holes(self):
settings.OPENID_FOLLOW_RENAMES = False
settings.OPENID_CREATE_USERS = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to conflict with
user = User.objects.create_user('testuser', 'someone@example.com')
user = User.objects.create_user('testuser1', 'someone@example.com')
user = User.objects.create_user('testuser6', 'someone@example.com')
user = User.objects.create_user('testuser7', 'someone@example.com')
user = User.objects.create_user('testuser8', 'someone@example.com')
User.objects.create_user('testuser', 'someone@example.com')
User.objects.create_user('testuser1', 'someone@example.com')
User.objects.create_user('testuser6', 'someone@example.com')
User.objects.create_user('testuser7', 'someone@example.com')
User.objects.create_user('testuser8', 'someone@example.com')
# identity url is for 'renameuser'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
'email': 'test@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
with self.settings(
OPENID_FOLLOW_RENAMES=False, OPENID_CREATE_USERS=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 5. i should
# start at 6, and increment until it reaches 9.
self.assertEquals(response.content, 'testuser9')
self.assertEqual(response.content, 'testuser9')
def test_login_duplicate_username_numbering_with_nonsequential_matches(self):
settings.OPENID_FOLLOW_RENAMES = False
settings.OPENID_CREATE_USERS = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
def test_login_duplicate_username_numbering_with_nonsequential_matches(
self):
# Setup existing user who's name we're going to conflict with
user = User.objects.create_user('testuser', 'someone@example.com')
user = User.objects.create_user('testuserfoo', 'someone@example.com')
User.objects.create_user('testuser', 'someone@example.com')
User.objects.create_user('testuserfoo', 'someone@example.com')
# identity url is for 'renameuser'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Test User',
'email': 'test@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
with self.settings(
OPENID_FOLLOW_RENAMES=False, OPENID_CREATE_USERS=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# Since this username is already taken by someone else, we go through
# the process of adding +i to it starting with the count of users with
# username starting with 'testuser', of which there are 2. i should
# start at 3, which will be available.
self.assertEquals(response.content, 'testuser3')
self.assertEqual(response.content, 'testuser3')
def test_login_follow_rename(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'someuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': 'someuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# If OPENID_FOLLOW_RENAMES, they are logged in as
# someuser (the passed in nickname has changed the username)
self.assertEquals(response.content, 'someuser')
self.assertEqual(response.content, 'someuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Some')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
def test_login_follow_rename_without_nickname_change(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
settings.OPENID_STRICT_USERNAMES = True
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True, OPENID_STRICT_USERNAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# Username should not have changed
self.assertEquals(response.content, 'testuser')
self.assertEqual(response.content, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Some')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
def test_login_follow_rename_conflict(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to switch to
user = User.objects.create_user('testuser', 'someone@example.com')
UserOpenID.objects.get_or_create(
......@@ -778,35 +711,37 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/existing_identity')
# Setup user who is going to try to change username to 'testuser'
renamed_user = User.objects.create_user('renameuser', 'someone@example.com')
renamed_user = User.objects.create_user(
'renameuser', 'someone@example.com')
UserOpenID.objects.get_or_create(
user=renamed_user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
# identity url is for 'renameuser'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
'email': 'rename@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Rename User',
'email': 'rename@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
# but since that username is already taken by someone else, we go through
# the process of adding +i to it, and get testuser2.
self.assertEquals(response.content, 'testuser2')
# but since that username is already taken by someone else, we go
# through the process of adding +i to it, and get testuser2.
self.assertEqual(response.content, 'testuser2')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Rename')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'rename@example.com')
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
def test_login_follow_rename_false_onlyonce(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to switch to
user = User.objects.create_user('testuser', 'someone@example.com')
UserOpenID.objects.get_or_create(
......@@ -815,38 +750,40 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/existing_identity')
# Setup user who is going to try to change username to 'testuser'
renamed_user = User.objects.create_user('testuser2000eight', 'someone@example.com')
renamed_user = User.objects.create_user(
'testuser2000eight', 'someone@example.com')
UserOpenID.objects.get_or_create(
user=renamed_user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
# identity url is for 'testuser2000eight'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser2', 'fullname': 'Rename User',
'email': 'rename@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
self.openid_resp = {
'nickname': 'testuser2', 'fullname': 'Rename User',
'email': 'rename@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
# but since that username is already taken by someone else, we go through
# the process of adding +i to it. Even though it looks like the username
# follows the nickname+i scheme, it has non-numbers in the suffix, so
# it's not an auto-generated one. The regular process of renaming to
# 'testuser' has a conflict, so we get +2 at the end.
self.assertEquals(response.content, 'testuser2')
# but since that username is already taken by someone else, we go
# through the process of adding +i to it. Even though it looks like
# the username follows the nickname+i scheme, it has non-numbers in the
# suffix, so it's not an auto-generated one. The regular process of
# renaming to 'testuser' has a conflict, so we get +2 at the end.
self.assertEqual(response.content, 'testuser2')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Rename')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'rename@example.com')
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
def test_login_follow_rename_conflict_onlyonce(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's name we're going to switch to
user = User.objects.create_user('testuser', 'someone@example.com')
UserOpenID.objects.get_or_create(
......@@ -855,36 +792,38 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/existing_identity')
# Setup user who is going to try to change username to 'testuser'
renamed_user = User.objects.create_user('testuser2000', 'someone@example.com')
renamed_user = User.objects.create_user(
'testuser2000', 'someone@example.com')
UserOpenID.objects.get_or_create(
user=renamed_user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
# identity url is for 'testuser2000'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which already exists for another identity
openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
'email': 'rename@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
# but returned username is for 'testuser', which already exists for
# another identity
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Rename User',
'email': 'rename@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
# but since that username is already taken by someone else, we go through
# the process of adding +i to it. Since the user for this identity url
# already has a name matching that pattern, check if first.
self.assertEquals(response.content, 'testuser2000')
# but since that username is already taken by someone else, we go
# through the process of adding +i to it. Since the user for this
# identity url already has a name matching that pattern, check if first
self.assertEqual(response.content, 'testuser2000')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Rename')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'rename@example.com')
self.assertEqual(user.first_name, 'Rename')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'rename@example.com')
def test_login_follow_rename_false_conflict(self):
settings.OPENID_FOLLOW_RENAMES = True
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
# Setup existing user who's username matches the name+i pattern
user = User.objects.create_user('testuser2', 'someone@example.com')
UserOpenID.objects.get_or_create(
......@@ -893,34 +832,33 @@ class RelyingPartyTests(TestCase):
display_id='http://example.com/identity')
# identity url is for 'testuser2'
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
# but returned username is for 'testuser', which looks like we've done
# a username+1 for them already, but 'testuser' isn't actually taken
openid_resp = {'nickname': 'testuser', 'fullname': 'Same User',
'email': 'same@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Same User',
'email': 'same@example.com'}
with self.settings(
OPENID_FOLLOW_RENAMES=True,
OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
# If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
# because it wasn't currently taken
self.assertEquals(response.content, 'testuser')
self.assertEqual(response.content, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Same')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'same@example.com')
self.assertEqual(user.first_name, 'Same')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'same@example.com')
@override_settings(
OPENID_CREATE_USERS=True, OPENID_STRICT_USERNAMES=True,
OPENID_SREG_REQUIRED_FIELDS=[])
def test_strict_username_no_nickname(self):
settings.OPENID_CREATE_USERS = True
settings.OPENID_STRICT_USERNAMES = True
settings.OPENID_SREG_REQUIRED_FIELDS = []
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -929,35 +867,36 @@ class RelyingPartyTests(TestCase):
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
sreg_request, {'nickname': '', # No nickname
sreg_request, {'nickname': '', # No nickname
'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
# Status code should be 403: Forbidden
self.assertEquals(403, response.status_code)
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(response, "An attribute required for logging in was not returned "
"(nickname)", status_code=403)
self.assertEqual(403, response.status_code)
self.assertContains(
response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(
response,
"An attribute required for logging in was not returned (nickname)",
status_code=403)
@override_settings(
OPENID_CREATE_USERS=True, OPENID_STRICT_USERNAMES=True,
OPENID_SREG_REQUIRED_FIELDS=[])
def test_strict_username_no_nickname_override(self):
settings.OPENID_CREATE_USERS = True
settings.OPENID_STRICT_USERNAMES = True
settings.OPENID_SREG_REQUIRED_FIELDS = []
# Override the login_failure handler
def mock_login_failure_handler(request, message, status=403,
template_name=None,
exception=None):
self.assertTrue(isinstance(exception, (RequiredAttributeNotReturned, MissingUsernameViolation)))
return HttpResponse('Test Failure Override', status=200)
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
self.assertIsInstance(
exception,
(RequiredAttributeNotReturned, MissingUsernameViolation))
return HttpResponse('Test Failure Override', status=200)
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -966,31 +905,27 @@ class RelyingPartyTests(TestCase):
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
openid_response = openid_request.answer(True)
sreg_response = sreg.SRegResponse.extractResponse(
sreg_request, {'nickname': '', # No nickname
sreg_request, {'nickname': '', # No nickname
'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
with self.settings(OPENID_RENDER_FAILURE=mock_login_failure_handler):
response = self.complete(openid_response)
# Status code should be 200, since we over-rode the login_failure handler
self.assertEquals(200, response.status_code)
# Status code should be 200, since we over-rode the login_failure
self.assertEqual(200, response.status_code)
self.assertContains(response, 'Test Failure Override')
def test_strict_username_duplicate_user(self):
settings.OPENID_CREATE_USERS = True
settings.OPENID_STRICT_USERNAMES = True
# Create a user with the same name as we'll pass back via sreg.
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/different_identity',
display_id='http://example.com/different_identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -1002,40 +937,37 @@ class RelyingPartyTests(TestCase):
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
with self.settings(
OPENID_CREATE_USERS=True, OPENID_STRICT_USERNAMES=True):
response = self.complete(openid_response)
# Status code should be 403: Forbidden
self.assertEquals(403, response.status_code)
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(response,
self.assertEqual(403, response.status_code)
self.assertContains(
response, '<h1>OpenID failed</h1>', status_code=403)
self.assertContains(
response,
"The username (someuser) with which you tried to log in is "
"already in use for a different account.",
status_code=403)
def test_strict_username_duplicate_user_override(self):
settings.OPENID_CREATE_USERS = True
settings.OPENID_STRICT_USERNAMES = True
# Override the login_failure handler
def mock_login_failure_handler(request, message, status=403,
template_name=None,
exception=None):
self.assertTrue(isinstance(exception, DuplicateUsernameViolation))
return HttpResponse('Test Failure Override', status=200)
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
self.assertIsInstance(exception, DuplicateUsernameViolation)
return HttpResponse('Test Failure Override', status=200)
# Create a user with the same name as we'll pass back via sreg.
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/different_identity',
display_id='http://example.com/different_identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -1047,21 +979,18 @@ class RelyingPartyTests(TestCase):
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
with self.settings(
OPENID_RENDER_FAILURE=mock_login_failure_handler,
OPENID_CREATE_USERS=True, OPENID_STRICT_USERNAMES=True):
response = self.complete(openid_response)
# Status code should be 200, since we over-rode the login_failure handler
self.assertEquals(200, response.status_code)
# Status code should be 200, since we over-rode the login_failure
self.assertEqual(200, response.status_code)
self.assertContains(response, 'Test Failure Override')
def test_login_requires_sreg_required_fields(self):
# If any required attributes are not included in the response,
# we fail with a forbidden.
settings.OPENID_CREATE_USERS = True
settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request, passing back some simple registration
......@@ -1074,52 +1003,52 @@ class RelyingPartyTests(TestCase):
'fullname': 'Some User',
'email': 'foo@example.com'})
openid_response.addExtension(sreg_response)
response = self.complete(openid_response)
# If any required attributes are not included in the response,
# we fail with a forbidden.
with self.settings(
OPENID_CREATE_USERS=True,
OPENID_SREG_REQUIRED_FIELDS=('email', 'language')):
response = self.complete(openid_response)
# Status code should be 403: Forbidden as we didn't include
# a required field - language.
self.assertContains(response,
"An attribute required for logging in was not returned "
"(language)", status_code=403)
self.assertContains(
response,
"An attribute required for logging in was not returned (language)",
status_code=403)
def test_login_update_details(self):
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
openid_req = {'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'}
openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
self._do_user_login(openid_req, openid_resp)
response = self.client.get('/getuser/')
self.openid_resp = {
'nickname': 'testuser', 'fullname': 'Some User',
'email': 'foo@example.com'}
with self.settings(OPENID_UPDATE_DETAILS_FROM_SREG=True):
self._do_user_login(self.openid_req, self.openid_resp)
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'testuser')
self.assertEqual(response.content, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username=response.content)
self.assertEquals(user.first_name, 'Some')
self.assertEquals(user.last_name, 'User')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Some')
self.assertEqual(user.last_name, 'User')
self.assertEqual(user.email, 'foo@example.com')
def test_login_uses_sreg_extra_fields(self):
# The configurable sreg attributes are used in the request.
settings.OPENID_SREG_EXTRA_FIELDS = ('language',)
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
with self.settings(OPENID_SREG_EXTRA_FIELDS=('language',)):
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
......@@ -1128,18 +1057,15 @@ class RelyingPartyTests(TestCase):
def test_login_uses_sreg_required_fields(self):
# The configurable sreg attributes are used in the request.
settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
with self.settings(OPENID_SREG_REQUIRED_FIELDS=('email', 'language')):
response = self.client.post(self.login_url, self.openid_req)
openid_request = self.provider.parseFormPost(response.content)
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
......@@ -1149,20 +1075,16 @@ class RelyingPartyTests(TestCase):
def check_login_attribute_exchange(self, validation_type, is_verified,
request_account_verified=True):
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
user = User.objects.create_user('testuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Configure the provider to advertise attribute exchange
# protocol and start the authentication process:
self.provider.type_uris.append('http://openid.net/srv/ax/1.0')
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# The resulting OpenID request uses the Attribute Exchange
......@@ -1173,28 +1095,20 @@ class RelyingPartyTests(TestCase):
self.assertEqual(sreg_request.optional, [])
fetch_request = ax.FetchRequest.fromOpenIDRequest(openid_request)
self.assertTrue(fetch_request.has_key(
'http://axschema.org/contact/email'))
self.assertTrue(fetch_request.has_key(
'http://axschema.org/namePerson'))
self.assertTrue(fetch_request.has_key(
'http://axschema.org/namePerson/first'))
self.assertTrue(fetch_request.has_key(
'http://axschema.org/namePerson/last'))
self.assertTrue(fetch_request.has_key(
'http://axschema.org/namePerson/friendly'))
self.assertIn('http://axschema.org/contact/email', fetch_request)
self.assertIn('http://axschema.org/namePerson', fetch_request)
self.assertIn('http://axschema.org/namePerson/first', fetch_request)
self.assertIn('http://axschema.org/namePerson/last', fetch_request)
self.assertIn('http://axschema.org/namePerson/friendly', fetch_request)
# myOpenID compatibilty attributes:
self.assertTrue(fetch_request.has_key(
'http://schema.openid.net/contact/email'))
self.assertTrue(fetch_request.has_key(
'http://schema.openid.net/namePerson'))
self.assertTrue(fetch_request.has_key(
'http://schema.openid.net/namePerson/friendly'))
self.assertIn('http://schema.openid.net/contact/email', fetch_request)
self.assertIn('http://schema.openid.net/namePerson', fetch_request)
self.assertIn(
'http://schema.openid.net/namePerson/friendly', fetch_request)
# Account verification:
self.assertEqual(
fetch_request.has_key(
'http://ns.login.ubuntu.com/2013/validation/account'),
request_account_verified)
validation = 'http://ns.login.ubuntu.com/2013/validation/account'
self.assertEqual(validation in fetch_request, request_account_verified)
# Build up a response including AX data.
openid_response = openid_request.answer(True)
......@@ -1212,36 +1126,40 @@ class RelyingPartyTests(TestCase):
'http://ns.login.ubuntu.com/2013/validation/account',
validation_type)
openid_response.addExtension(fetch_response)
response = self.complete(openid_response)
with self.settings(OPENID_UPDATE_DETAILS_FROM_SREG=True):
response = self.complete(openid_response)
self.assertRedirects(response, 'http://testserver/getuser/')
# And they are now logged in as testuser (the passed in
# nickname has not caused the username to change), because
# settings.OPENID_FOLLOW_RENAMES is False.
assert not settings.OPENID_FOLLOW_RENAMES, (
'OPENID_FOLLOW_RENAMES must be False')
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'testuser')
self.assertEqual(response.content, 'testuser')
# The user's full name and email have been updated.
user = User.objects.get(username='testuser')
self.assertEquals(user.first_name, 'Firstname')
self.assertEquals(user.last_name, 'Lastname')
self.assertEquals(user.email, 'foo@example.com')
self.assertEqual(user.first_name, 'Firstname')
self.assertEqual(user.last_name, 'Lastname')
self.assertEqual(user.email, 'foo@example.com')
# So have the user's permissions
self.assertEqual(
user.has_perm('django_openid_auth.account_verified'), is_verified)
def test_login_attribute_exchange_with_verification(self):
settings.OPENID_VALID_VERIFICATION_SCHEMES = {
schemes = {
self.provider.endpoint_url: ('token_via_email',),
}
self.check_login_attribute_exchange('token_via_email',
is_verified=True)
with self.settings(OPENID_VALID_VERIFICATION_SCHEMES=schemes):
self.check_login_attribute_exchange('token_via_email',
is_verified=True)
def test_login_attribute_exchange_without_verification(self):
settings.OPENID_VALID_VERIFICATION_SCHEMES = {
schemes = {
self.provider.endpoint_url: ('token_via_email',),
}
self.check_login_attribute_exchange(None, is_verified=False)
with self.settings(OPENID_VALID_VERIFICATION_SCHEMES=schemes):
self.check_login_attribute_exchange(None, is_verified=False)
def test_login_attribute_exchange_without_account_verified(self):
# don't request account_verified attribute in AX request (as there are
......@@ -1252,32 +1170,32 @@ class RelyingPartyTests(TestCase):
request_account_verified=False)
def test_login_attribute_exchange_unrecognised_verification(self):
settings.OPENID_VALID_VERIFICATION_SCHEMES = {
schemes = {
self.provider.endpoint_url: ('token_via_email',),
}
self.check_login_attribute_exchange('unrecognised_scheme',
is_verified=False)
with self.settings(OPENID_VALID_VERIFICATION_SCHEMES=schemes):
self.check_login_attribute_exchange('unrecognised_scheme',
is_verified=False)
def test_login_attribute_exchange_different_default_verification(self):
settings.OPENID_VALID_VERIFICATION_SCHEMES = {
schemes = {
None: ('token_via_email', 'sms'),
'http://otherprovider/': ('unrecognised_scheme',),
}
self.check_login_attribute_exchange('unrecognised_scheme',
is_verified=False)
with self.settings(OPENID_VALID_VERIFICATION_SCHEMES=schemes):
self.check_login_attribute_exchange('unrecognised_scheme',
is_verified=False)
def test_login_attribute_exchange_matched_default_verification(self):
settings.OPENID_VALID_VERIFICATION_SCHEMES = {
schemes = {
None: ('token_via_email',),
'http://otherprovider/': ('unrecognised_scheme',),
}
self.check_login_attribute_exchange('token_via_email',
is_verified=True)
with self.settings(OPENID_VALID_VERIFICATION_SCHEMES=schemes):
self.check_login_attribute_exchange('token_via_email',
is_verified=True)
def test_login_teams(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = False
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
'otherteam': 'othergroup'}
user = User.objects.create_user('testuser', 'someone@example.com')
group = Group(name='groupname')
group.save()
......@@ -1291,16 +1209,13 @@ class RelyingPartyTests(TestCase):
Permission.objects.get(codename='add_useropenid'))
user.groups.add(ogroup)
user.save()
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request
......@@ -1310,23 +1225,23 @@ class RelyingPartyTests(TestCase):
teams_response = teams.TeamsResponse.extractResponse(
teams_request, 'teamname,some-other-team')
openid_response.addExtension(teams_response)
response = self.complete(openid_response)
mapping = {'teamname': 'groupname', 'otherteam': 'othergroup'}
with self.settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING=mapping,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False):
response = self.complete(openid_response)
self.assertRedirects(response, 'http://testserver/getuser/')
# And they are now logged in as testuser
response = self.client.get('/getuser/')
self.assertEquals(response.content, 'testuser')
self.assertEqual(response.content, 'testuser')
# The user's groups have been updated.
user = User.objects.get(username='testuser')
self.assertTrue(group in user.groups.all())
self.assertTrue(ogroup not in user.groups.all())
User.objects.get(username='testuser')
self.assertIn(group, user.groups.all())
self.assertNotIn(ogroup, user.groups.all())
def test_login_teams_automapping(self):
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
'otherteam': 'othergroup'}
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST = ['django-group1', 'django-group2']
user = User.objects.create_user('testuser', 'someone@example.com')
group1 = Group(name='django-group1')
group1.save()
......@@ -1335,26 +1250,29 @@ class RelyingPartyTests(TestCase):
group3 = Group(name='django-group3')
group3.save()
user.save()
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity',
'next': '/getuser/'})
response = self.client.post(self.login_url, self.openid_req)
self.assertContains(response, 'OpenID transaction in progress')
# Complete the request
openid_request = self.provider.parseFormPost(response.content)
openid_response = openid_request.answer(True)
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
self.assertEqual(group1 in user.groups.all(), False)
self.assertEqual(group2 in user.groups.all(), False)
self.assertTrue(group3 not in user.groups.all())
mapping = {'teamname': 'groupname', 'otherteam': 'othergroup'}
blacklist = ['django-group1', 'django-group2']
with self.settings(
OPENID_LAUNCHPAD_TEAMS_MAPPING=mapping,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=True,
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST=blacklist):
openid_request = self.provider.parseFormPost(response.content)
openid_request.answer(True)
teams.TeamsRequest.fromOpenIDRequest(openid_request)
self.assertNotIn(group1, user.groups.all())
self.assertNotIn(group2, user.groups.all())
self.assertNotIn(group3, user.groups.all())
def test_login_teams_staff_not_defined(self):
assert getattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS', None) is None
......@@ -1363,39 +1281,42 @@ class RelyingPartyTests(TestCase):
user.save()
self.assertTrue(user.is_staff)
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
user = self.get_openid_authed_user_with_teams(
user, 'teamname,some-other-team')
self.assertTrue(user.is_staff)
def test_login_teams_staff_assignment(self):
settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('teamname',)
user = User.objects.create_user('testuser', 'someone@example.com')
user.is_staff = False
user.save()
self.assertFalse(user.is_staff)
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
with self.settings(
OPENID_LAUNCHPAD_STAFF_TEAMS=('teamname',)):
user = self.get_openid_authed_user_with_teams(
user, 'teamname,some-other-team')
self.assertTrue(user.is_staff)
def test_login_teams_staff_unassignment(self):
settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('different-teamname',)
user = User.objects.create_user('testuser', 'someone@example.com')
user.is_staff = True
user.save()
self.assertTrue(user.is_staff)
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
with self.settings(
OPENID_LAUNCHPAD_STAFF_TEAMS=('different-teamname',)):
user = self.get_openid_authed_user_with_teams(
user, 'teamname,some-other-team')
self.assertFalse(user.is_staff)
def get_openid_authed_user_with_teams(self, user, teams_str):
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
# Posting in an identity URL begins the authentication request:
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity'})
response = self.client.post(self.login_url, self.openid_req_no_next)
# Complete the request
openid_request = self.provider.parseFormPost(response.content)
......@@ -1411,23 +1332,24 @@ class RelyingPartyTests(TestCase):
# An oauth_login_complete signal is emitted including the
# request and sreg_response.
user = User.objects.create_user('someuser', 'someone@example.com')
useropenid = UserOpenID(
UserOpenID.objects.create(
user=user,
claimed_id='http://example.com/identity',
display_id='http://example.com/identity')
useropenid.save()
response = self.client.post('/openid/login/',
{'openid_identifier': 'http://example.com/identity'})
response = self.client.post(self.login_url, self.openid_req_no_next)
openid_request = self.provider.parseFormPost(response.content)
openid_response = openid_request.answer(True)
# Use a closure to test whether the signal handler was called.
self.signal_handler_called = False
def login_callback(sender, **kwargs):
self.assertTrue(isinstance(
kwargs.get('request', None), HttpRequest))
self.assertTrue(isinstance(
kwargs.get('openid_response', None), SuccessResponse))
self.assertIsInstance(
kwargs.get('request', None), HttpRequest)
self.assertIsInstance(
kwargs.get('openid_response', None), SuccessResponse)
self.signal_handler_called = True
openid_login_complete.connect(login_callback)
response = self.complete(openid_response)
......@@ -1438,9 +1360,11 @@ class RelyingPartyTests(TestCase):
@override_session_serializer
class HelperFunctionsTest(TestCase):
domains = ["example.com", "example.org"]
@override_settings(ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS=domains)
def test_sanitise_redirect_url(self):
settings.ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS = [
"example.com", "example.org"]
# list of URLs and whether they should be passed or not
urls = [
("http://example.com", True),
......@@ -1462,6 +1386,3 @@ class HelperFunctionsTest(TestCase):
self.assertEqual(url, sanitised)
else:
self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
......@@ -26,14 +26,16 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from django.conf.urls import patterns, include
from django.http import HttpResponse
from django.conf.urls import *
def get_user(request):
return HttpResponse(request.user.username)
urlpatterns = patterns('',
urlpatterns = patterns(
'',
(r'^getuser/$', get_user),
(r'^openid/', include('django_openid_auth.urls')),
)
......@@ -27,9 +27,10 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from django.conf.urls import *
from django.conf.urls import patterns, url
urlpatterns = patterns('django_openid_auth.views',
urlpatterns = patterns(
'django_openid_auth.views',
url(r'^login/$', 'login_begin', name='openid-login'),
url(r'^complete/$', 'login_complete', name='openid-complete'),
url(r'^logo.gif$', 'logo', name='openid-logo'),
......
......@@ -62,6 +62,7 @@ from django_openid_auth.exceptions import (
next_url_re = re.compile('^/[-\w/]+$')
def is_valid_next_url(next):
# When we allow this:
# /openid/?next=/welcome/
......@@ -78,8 +79,8 @@ def sanitise_redirect_url(redirect_to):
is_valid = False
elif '//' in redirect_to:
# Allow the redirect URL to be external if it's a permitted domain
allowed_domains = getattr(settings,
"ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS", [])
allowed_domains = getattr(
settings, "ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS", [])
s, netloc, p, q, f = urlsplit(redirect_to)
# allow it if netloc is blank or if the domain is allowed
if netloc:
......@@ -113,11 +114,13 @@ def render_openid_request(request, openid_request, return_to, trust_root=None):
if openid_request.shouldSendRedirect():
redirect_url = openid_request.redirectURL(
trust_root, return_to)
return HttpResponseRedirect(redirect_url)
response = HttpResponseRedirect(redirect_url)
else:
form_html = openid_request.htmlMarkup(
trust_root, return_to, form_tag_attrs={'id': 'openid_message'})
return HttpResponse(form_html, content_type='text/html;charset=UTF-8')
response = HttpResponse(
form_html, content_type='text/html;charset=UTF-8')
return response
def default_render_failure(request, message, status=403,
......@@ -133,7 +136,7 @@ def default_render_failure(request, message, status=403,
def parse_openid_response(request):
"""Parse an OpenID response from a Django request."""
# Short cut if there is no request parameters.
#if len(request.REQUEST) == 0:
# if len(request.REQUEST) == 0:
# return None
current_url = request.build_absolute_uri()
......@@ -164,15 +167,15 @@ def login_begin(request, template_name='openid/login.html',
# Invalid or no form data:
if openid_url is None:
return render_to_response(template_name, {
'form': login_form,
redirect_field_name: redirect_to
}, context_instance=RequestContext(request))
context = {'form': login_form, redirect_field_name: redirect_to}
return render_to_response(
template_name, context,
context_instance=RequestContext(request))
consumer = make_consumer(request)
try:
openid_request = consumer.begin(openid_url)
except DiscoveryFailure, exc:
except DiscoveryFailure as exc:
return render_failure(
request, "OpenID discovery error: %s" % (str(exc),), status=500,
exception=exc)
......@@ -222,11 +225,11 @@ def login_begin(request, template_name='openid/login.html',
sreg_optional_fields.extend(
getattr(settings, 'OPENID_SREG_EXTRA_FIELDS', []))
sreg_optional_fields = [
field for field in sreg_optional_fields if (
not field in sreg_required_fields)]
field for field in sreg_optional_fields
if field not in sreg_required_fields]
openid_request.addExtension(
sreg.SRegRequest(optional=sreg_optional_fields,
required=sreg_required_fields))
required=sreg_required_fields))
if getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False):
preferred_auth = [
......@@ -236,13 +239,16 @@ def login_begin(request, template_name='openid/login.html',
openid_request.addExtension(pape_request)
# Request team info
teams_mapping_auto = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
teams_mapping_auto_blacklist = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST', [])
teams_mapping_auto = getattr(
settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO', False)
teams_mapping_auto_blacklist = getattr(
settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST', [])
launchpad_teams = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
if teams_mapping_auto:
#ignore launchpad teams. use all django-groups
# ignore launchpad teams. use all django-groups
launchpad_teams = dict()
all_groups = Group.objects.exclude(name__in=teams_mapping_auto_blacklist)
all_groups = Group.objects.exclude(
name__in=teams_mapping_auto_blacklist)
for group in all_groups:
launchpad_teams[group.name] = group.name
......@@ -270,9 +276,9 @@ def login_begin(request, template_name='openid/login.html',
def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
render_failure=None):
redirect_to = request.REQUEST.get(redirect_field_name, '')
render_failure = render_failure or \
getattr(settings, 'OPENID_RENDER_FAILURE', None) or \
default_render_failure
render_failure = (
render_failure or getattr(settings, 'OPENID_RENDER_FAILURE', None) or
default_render_failure)
openid_response = parse_openid_response(request)
if not openid_response:
......@@ -288,10 +294,12 @@ def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME,
if user is not None:
if user.is_active:
auth_login(request, user)
response = HttpResponseRedirect(sanitise_redirect_url(redirect_to))
response = HttpResponseRedirect(
sanitise_redirect_url(redirect_to))
# Notify any listeners that we successfully logged in.
openid_login_complete.send(sender=UserOpenID, request=request,
openid_login_complete.send(
sender=UserOpenID, request=request,
openid_response=openid_response)
return response
......
......@@ -27,91 +27,44 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Django settings for example project.
import django
django_version = django.get_version()
DEBUG = True
TEMPLATE_DEBUG = DEBUG
"""
Django settings for example_consumer project.
ADMINS = (
# ('Your Name', 'your_email@domain.com'),
)
For more information on this file, see
https://docs.djangoproject.com/en/1.7/topics/settings/
MANAGERS = ADMINS
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.7/ref/settings/
"""
if django_version >= "1.2":
csrf_middleware = 'django.middleware.csrf.CsrfViewMiddleware'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'sqlite.db',
}
}
TEMPLATE_LOADERS = (
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader',
)
else:
csrf_middleware = 'django.contrib.csrf.middleware.CsrfViewMiddleware'
TEMPLATE_LOADERS = (
'django.template.loaders.filesystem.load_template_source',
'django.template.loaders.app_directories.load_template_source',
)
DATABASE_ENGINE = 'sqlite3' # 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'ado_mssql'.
DATABASE_NAME = 'sqlite.db' # Or path to database file if using sqlite3.
DATABASE_USER = '' # Not used with sqlite3.
DATABASE_PASSWORD = '' # Not used with sqlite3.
DATABASE_HOST = '' # Set to empty string for localhost. Not used with sqlite3.
DATABASE_PORT = '' # Set to empty string for default. Not used with sqlite3.
# Local time zone for this installation. Choices can be found here:
# http://www.postgresql.org/docs/8.1/static/datetime-keywords.html#DATETIME-TIMEZONE-SET-TABLE
# although not all variations may be possible on all operating systems.
# If running in a Windows environment this must be set to the same as your
# system time zone.
TIME_ZONE = 'America/Chicago'
# Language code for this installation. All choices can be found here:
# http://www.w3.org/TR/REC-html40/struct/dirlang.html#langcodes
# http://blogs.law.harvard.edu/tech/stories/storyReader$15
LANGUAGE_CODE = 'en-us'
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
import os
import django
SITE_ID = 1
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# If you set this to False, Django will make some optimizations so as not
# to load the internationalization machinery.
USE_I18N = True
# Absolute path to the directory that holds media.
# Example: "/home/media/media.lawrence.com/"
MEDIA_ROOT = ''
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.7/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '34958734985734985734985798437'
# URL that handles the media served from MEDIA_ROOT.
# Example: "http://media.lawrence.com"
MEDIA_URL = ''
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
# URL prefix for admin media -- CSS, JavaScript and images. Make sure to use a
# trailing slash.
# Examples: "http://foo.com/media/", "/media/".
ADMIN_MEDIA_PREFIX = '/media/'
TEMPLATE_DEBUG = True
# Make this unique, and don't share it with anybody.
SECRET_KEY = '34958734985734985734985798437'
ALLOWED_HOSTS = []
MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
csrf_middleware,
'django.middleware.csrf.CsrfViewMiddleware',
)
ROOT_URLCONF = 'example_consumer.urls'
TEMPLATE_DIRS = (
# Put strings here, like "/home/html/django_templates" or "C:/www/django/templates".
# Always use forward slashes, even on Windows.
# Don't forget to use absolute paths, not relative paths.
)
# Application definition
INSTALLED_APPS = (
'django.contrib.auth',
......@@ -119,9 +72,43 @@ INSTALLED_APPS = (
'django.contrib.sessions',
'django.contrib.admin',
'django_openid_auth',
'south',
)
if django.VERSION < (1, 7):
INSTALLED_APPS += ('south',)
ROOT_URLCONF = 'example_consumer.urls'
WSGI_APPLICATION = 'example_consumer.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.7/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
# Internationalization
# https://docs.djangoproject.com/en/1.7/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.7/howto/static-files/
STATIC_URL = '/static/'
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.PickleSerializer'
AUTHENTICATION_BACKENDS = (
'django_openid_auth.auth.OpenIDBackend',
'django.contrib.auth.backends.ModelBackend',
......@@ -144,11 +131,11 @@ OPENID_VALID_VERIFICATION_SCHEMES = {
# If set, always use this as the identity URL rather than asking the
# user. This only makes sense if it is a server URL.
OPENID_SSO_SERVER_URL = 'https://login.launchpad.net/'
OPENID_SSO_SERVER_URL = 'https://login.ubuntu.com/'
# Tell django.contrib.auth to use the OpenID signin URLs.
LOGIN_URL = '/openid/login/'
LOGIN_REDIRECT_URL = '/'
# Should django_auth_openid be used to sign into the admin interface?
OPENID_USE_AS_ADMIN_LOGIN = False
OPENID_USE_AS_ADMIN_LOGIN = True
......@@ -27,7 +27,7 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from django.conf.urls import *
from django.conf.urls import patterns, include, url
from django.contrib import admin
import views
......@@ -35,11 +35,12 @@ import views
admin.autodiscover()
urlpatterns = patterns('',
(r'^$', views.index),
(r'^openid/', include('django_openid_auth.urls')),
(r'^logout/$', 'django.contrib.auth.views.logout'),
(r'^private/$', views.require_authentication),
urlpatterns = patterns(
'',
url(r'^$', views.index),
url(r'^openid/', include('django_openid_auth.urls')),
url(r'^logout/$', 'django.contrib.auth.views.logout'),
url(r'^private/$', views.require_authentication),
(r'^admin/', include(admin.site.urls)),
url(r'^admin/', include(admin.site.urls)),
)
"""
WSGI config for demo project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.7/howto/deployment/wsgi/
"""
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "demo.settings")
from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()
......@@ -3,7 +3,7 @@ import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "example_consumer.settings")
from django.core.management import execute_from_command_line
......
[tox]
envlist =
py2.7-django1.4, py2.7-django1.5, py2.7-django1.6
py2.7-django1.4, py2.7-django1.5, py2.7-django1.6, py2.7-django1.7, py2.7-django1.8
[testenv]
commands = make check
commands = python manage.py test django_openid_auth
deps=
mock
python-openid
# Python 2.7
[testenv:py2.7-django1.4]
basepython = python2.7
deps = django >= 1.4, < 1.5
python-openid
south
deps =
django >= 1.4, < 1.5
{[testenv]deps}
south==1.0
[testenv:py2.7-django1.5]
basepython = python2.7
deps = django >= 1.5, < 1.6
python-openid
south
deps =
django >= 1.5, < 1.6
{[testenv]deps}
south==1.0
[testenv:py2.7-django1.6]
basepython = python2.7
deps = django >= 1.6, < 1.7
python-openid
south
deps =
django >= 1.6, < 1.7
{[testenv]deps}
south==1.0
[testenv:py2.7-django1.7]
basepython = python2.7
deps =
django >= 1.7, < 1.8
{[testenv]deps}
[testenv:py2.7-django1.8]
basepython = python2.7
deps =
django >= 1.8, < 1.9
{[testenv]deps}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment