Commit de539d3e by James Henstridge

Rework the OpenID consumer code to be a bit clearer, and make it

possible to set a fixed OpenID server URL.
parent 0936a94a
from django import forms
from django.utils.translation import ugettext as _
from django.conf import settings
from openid.yadis import xri
class OpenIDLoginForm(forms.Form):
openid_url = forms.CharField(max_length=255, widget=forms.widgets.TextInput(attrs={'class': 'required openid'}))
def clean_openid_url(self):
if 'openid_url' in self.cleaned_data:
openid_url = self.cleaned_data['openid_url']
if xri.identifierScheme(openid_url) == 'XRI' and getattr(
settings, 'OPENID_DISALLOW_INAMES', False
):
raise forms.ValidationError(_('i-names are not supported'))
return self.cleaned_data['openid_url']
from django.contrib.auth.models import User
from django.db import models
class Nonce(models.Model):
server_url = models.CharField(max_length=2047)
timestamp = models.IntegerField()
salt = models.CharField(max_length=40)
def __unicode__(self):
return u"Nonce: %s, %s" % (self.server_url, self.salt)
class Association(models.Model):
server_url = models.TextField(max_length=2047)
handle = models.CharField(max_length=255)
......@@ -16,7 +18,11 @@ class Association(models.Model):
issued = models.IntegerField()
lifetime = models.IntegerField()
assoc_type = models.TextField(max_length=64)
def __unicode__(self):
return u"Association: %s, %s" % (self.server_url, self.handle)
class UserOpenID(models.Model):
user = models.ForeignKey(User)
claimed_id = models.TextField(max_length=2047)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
<html>
<head>
<title>Redirecting to your OpenID Provider</title>
</head>
<body>
<h1>Redirecting to your OpenID Provider</h1>
{{ form }}
</body>
</html>
......@@ -26,12 +26,15 @@ input.openid {
</p>
{% endif %}
<form name="fopenid" action="{{ action }}" method="post">
{{ form.next }}
<fieldset>
<legend>{% trans "Sign In Using Your OpenID" %}</legend>
<div class="form-row"><label for="id_openid_ul">{% trans "OpenId URL :" %}</label><br />{{ form.openid_url }}</div>
<div class="submit-row "><input name="bsignin" type="submit" value="{% trans "Sign in with OPENID" %}"></div>
{% if next %}
<input type="hidden" name="next" value="{{ next }}" />
{% endif %}
</fieldset>
</form>
</body>
......
from django.conf.urls.defaults import patterns
urlpatterns = patterns('django_openidconsumer.views',
(r'^login$', 'login_begin'),
(r'^complete$', 'login_complete'),
)
import base64
import md5
import operator
import time
from django.db.models.query import Q
from django.conf import settings
from openid.association import Association as OIDAssociation
from openid.store.interface import OpenIDStore
......
from django.http import HttpResponse, HttpResponseRedirect, get_host
from django.shortcuts import render_to_response as render
from django.template import RequestContext
import re
import urllib
from django.conf import settings
from django.utils.http import urlquote_plus, urlquote
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.safestring import SafeString
import md5, re, time, urllib
from openid.consumer.consumer import Consumer, \
SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
from openid.consumer.consumer import (
Consumer, SUCCESS, CANCEL, FAILURE)
from openid.consumer.discover import DiscoveryFailure
from openid.yadis import xri
from openid.extensions import sreg
from util import OpenID, DjangoOpenIDStore, from_openid_response
from util import DjangoOpenIDStore
from forms import OpenIDLoginForm
from forms import OpenidSigninForm
from django.utils.html import escape
def get_url_host(request):
if request.is_secure():
protocol = 'https'
else:
protocol = 'http'
host = escape(get_host(request))
return '%s://%s' % (protocol, host)
def get_full_url(request):
if request.is_secure():
protocol = 'https'
else:
protocol = 'http'
host = escape(request.META['HTTP_HOST'])
return get_url_host(request) + request.get_full_path()
next_url_re = re.compile('^/[-\w/]+$')
......@@ -42,122 +28,120 @@ def is_valid_next_url(next):
# path, not a complete URL.
return bool(next_url_re.match(next))
def begin(request, sreg=None, extension_args=None, redirect_to=None,
on_failure=None):
on_failure = on_failure or default_on_failure
extension_args = extension_args or {}
next = ''
if request.GET.get('next'):
next = urllib.urlencode({
'next': request.GET['next']
})
form_signin = OpenidSigninForm(initial={'next':next})
if request.POST:
form_signin = OpenidSigninForm(request.POST)
if form_signin.is_valid():
consumer = Consumer(request.session, DjangoOpenIDStore())
try:
auth_request = consumer.begin(form_signin.cleaned_data['openid_url'])
except DiscoveryFailure:
return on_failure(request, "The OpenID was invalid")
if sreg:
extension_args['sreg.optional'] = sreg
trust_root = getattr(
settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
)
redirect_to = redirect_to or getattr(
settings, 'OPENID_REDIRECT_TO',
# If not explicitly set, assume current URL with complete/ appended
get_full_url(request).split('?')[0] + 'complete/'
)
# TODO: add redirect_to in form
if not redirect_to.startswith('http://'):
redirect_to = get_url_host(request) + redirect_to
if 'next' in form_signin.cleaned_data and next != "":
if '?' in redirect_to:
join = '&'
else:
join = '?'
redirect_to += join + urllib.urlencode({
'next': form_signin.cleaned_data['next']
})
# Add extension args (for things like simple registration)
for name, value in extension_args.items():
namespace, key = name.split('.', 1)
auth_request.addExtensionArg(namespace, key, value)
redirect_url = auth_request.redirectURL(trust_root, redirect_to)
return HttpResponseRedirect(redirect_url)
return render('openid_signin.html', {
'form': form_signin,
'action': request.path,
'logo': request.path + 'logo/',
'openids': request.session.get('openids', []),
})
def complete(request, on_success=None, on_failure=None):
on_success = on_success or default_on_success
on_failure = on_failure or default_on_failure
redirect_to = getattr(
settings, 'OPENID_REDIRECT_TO',
get_full_url(request).split('?')[0]
)
consumer = Consumer(request.session, DjangoOpenIDStore())
openid_response = consumer.complete(dict(request.GET.items()), redirect_to)
def sanitise_redirect_url(redirect_to):
"""Sanitise the redirection URL."""
# Light security check -- make sure redirect_to isn't garbage.
if not redirect_to or '//' in redirect_to or ' ' in redirect_to:
redirect_to = settings.LOGIN_REDIRECT_URL
return redirect_to
def make_consumer(request):
"""Create an OpenID Consumer object for the given Django request."""
# Give the OpenID library its own space in the session object.
session = request.session.setdefault('OPENID', {})
store = DjangoOpenIDStore()
return Consumer(session, store)
def render_openid_request(request, openid_request, return_to, trust_root=None,
template_name='openid/auth-request.html'):
"""Render an OpenID authentication request."""
if trust_root is None:
trust_root = getattr(settings, 'OPENID_TRUST_ROOT',
request.build_absolute_uri('/'))
if openid_request.shouldSendRedirect():
redirect_url = openid_request.redirectURL(
trust_root, return_to)
return HttpResponseRedirect(redirect_url)
else:
form_html = openid_request.htmlMarkup(
trust_root, return_to, form_tag_attrs={'id': 'openid_message'})
return render_to_response(
template_name, {'form': SafeString(form_html)},
context_instance=RequestContext(request))
def parse_openid_response(request):
"""Parse an OpenID response from a Django request."""
# Short cut if there is no request parameters.
#if len(request.REQUEST) == 0:
# return None
current_url = request.build_absolute_uri()
consumer = make_consumer(request)
return consumer.complete(dict(request.REQUEST.items()), current_url)
def login_begin(request, template_name='openid/login.html',
redirect_field_name=REDIRECT_FIELD_NAME):
"""Begin an OpenID login request, possibly asking for an identity URL."""
redirect_to = request.REQUEST.get(redirect_field_name, '')
# Get the OpenID URL to try. First see if we've been configured
# to use a fixed server URL.
openid_url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
if openid_url is None:
if request.POST:
login_form = OpenIDLoginForm(data=request.POST)
if login_form.is_valid():
openid_url = login_form.cleaned_data['openid_url']
else:
login_form = OpenIDLoginForm()
# Invalid or no form data:
if openid_url is None:
return render_to_response(template_name, {
'form': login_form,
redirect_field_name: redirect_to
}, context_instance=RequestContext(request))
error = None
consumer = make_consumer(request)
try:
openid_request = consumer.begin(openid_url)
except DiscoveryFailure, exc:
# XXX: make this a proper error page.
return HttpResponse("OpenID discovery error: %s" % (str(exc),))
# Request some user details.
openid_request.addExtension(
sreg.SRegRequest(optional=['email', 'fullname', 'nickname']))
# Construct the request completion URL, including the page we
# should redirect to.
return_to = request.build_absolute_uri(reverse(login_complete))
if redirect_to:
if '?' in return_to:
return_to += '&'
else:
return_to += '?'
return_to += urllib.urlencode({redirect_field_name: redirect_to})
return render_openid_request(request, openid_request, return_to)
def login_complete(request, redirect_field_name=REDIRECT_FIELD_NAME):
redirect_to = request.REQUEST.get(redirect_field_name, '')
openid_response = parse_openid_response(request)
if not openid_response:
return HttpResponseRedirect(sanitise_redirect_url(redirect_to))
if openid_response.status == SUCCESS:
return on_success(request, openid_response.identity_url, openid_response)
elif openid_response.status == CANCEL:
return on_failure(request, 'The request was cancelled')
return HttpResponse("Success: %s")
elif openid_response.status == FAILURE:
return on_failure(request, openid_response.message)
elif openid_response.status == SETUP_NEEDED:
return on_failure(request, 'Setup needed')
return HttpResponse("Failure: %s" % openid_response.message)
elif openid_response.status == CANCEL:
return HttpResponse("Cancel")
else:
assert False, "Bad openid status: %s" % openid_response.status
def default_on_success(request, identity_url, openid_response):
if 'openids' not in request.session.keys():
request.session['openids'] = []
# Eliminate any duplicates
request.session['openids'] = [
o for o in request.session['openids'] if o.openid != identity_url
]
request.session['openids'].append(from_openid_response(openid_response))
next = request.GET.get('next', '').strip()
if not next or not is_valid_next_url(next):
next = getattr(settings, 'OPENID_REDIRECT_NEXT', '/')
return HttpResponseRedirect(next)
def default_on_failure(request, message):
return render('openid_failure.html', {
'message': message
})
def signout(request):
request.session['openids'] = []
next = request.GET.get('next', '/')
if not is_valid_next_url(next):
next = '/'
return HttpResponseRedirect(next)
assert False, (
"Unknown OpenID response type: %r" % openid_response.status)
def logo(request):
return HttpResponse(
......
......@@ -3,12 +3,12 @@ import views
urlpatterns = patterns('',
(r'^$', views.index),
(r'^openid/$', 'django_openidconsumer.views.begin'),
(r'^openid/with-sreg/$', 'django_openidconsumer.views.begin', {
'sreg': 'email,nickname',
'redirect_to': '/openid/complete/',
}),
(r'^openid/complete/$', 'django_openidconsumer.views.complete'),
(r'^openid/signout/$', 'django_openidconsumer.views.signout'),
(r'^openid/$', 'django_openidconsumer.views.login_begin'),
#(r'^openid/with-sreg/$', 'django_openidconsumer.views.begin', {
# 'sreg': 'email,nickname',
# 'redirect_to': '/openid/complete/',
#}),
(r'^openid/complete/$', 'django_openidconsumer.views.login_complete'),
#(r'^openid/signout/$', 'django_openidconsumer.views.signout'),
(r'^next-works/$', views.next_works),
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment