from uuid import uuid4
from urllib2 import Request, urlopen
from urllib import urlencode
-from httplib import HTTPSConnection
+from urlparse import urlsplit
from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE
from openid.consumer.discover import DiscoveryFailure
"""Completes loging process, must return user instance"""
raise NotImplementedError('Implement in subclass')
+ def auth_extra_arguments(self):
+ """Return extra argumens needed on auth process, setting is per bancked
+ and defined by <backend name in uppercase>_AUTH_EXTRA_ARGUMENTS.
+ """
+ name = self.AUTH_BACKEND.name.upper() + '_AUTH_EXTRA_ARGUMENTS'
+ return getattr(settings, name, {})
+
@property
def uses_redirect(self):
"""Return True if this provider uses redirect url method,
def auth_url(self):
"""Return auth URL returned by service"""
- openid_request = self.setup_request()
+ openid_request = self.setup_request(self.auth_extra_arguments())
# Construct completion URL, including page we should redirect to
return_to = self.request.build_absolute_uri(self.redirect)
return openid_request.redirectURL(self.trust_root(), return_to)
def auth_html(self):
"""Return auth HTML returned by service"""
- openid_request = self.setup_request()
+ openid_request = self.setup_request(self.auth_extra_arguments())
return_to = self.request.build_absolute_uri(self.redirect)
form_tag = {'id': 'openid_message'}
return openid_request.htmlMarkup(self.trust_root(), return_to,
raise ValueError('Unknown OpenID response type: %r' % \
response.status)
- def setup_request(self):
+ def setup_request(self, extra_params=None):
"""Setup request"""
- openid_request = self.openid_request()
+ openid_request = self.openid_request(extra_params)
# Request some user details. Use attribute exchange if provider
# advertises support.
if openid_request.endpoint.supportsType(ax.AXMessage.ns_uri):
"""Return true if openid request will be handled with redirect or
HTML content will be returned.
"""
- if not hasattr(self, '_uses_redirect'):
- setattr(self, '_uses_redirect',
- self.openid_request().shouldSendRedirect())
- return getattr(self, '_uses_redirect', True)
+ return self.openid_request().shouldSendRedirect()
- def openid_request(self):
+ def openid_request(self, extra_params=None):
"""Return openid request"""
- if not hasattr(self, '_openid_request'):
- openid_url = self.openid_url()
- try:
- openid_request = self.consumer().begin(openid_url)
- except DiscoveryFailure, err:
- raise ValueError('OpenID discovery error: %s' % err)
- else:
- setattr(self, '_openid_request', openid_request)
- return getattr(self, '_openid_request', None)
+ openid_url = self.openid_url()
+ if extra_params:
+ query = urlsplit(openid_url).query
+ openid_url += (query and '&' or '?') + urlencode(extra_params)
+
+ try:
+ return self.consumer().begin(openid_url)
+ except DiscoveryFailure, err:
+ raise ValueError('OpenID discovery error: %s' % err)
def openid_url(self):
"""Return service provider URL.
token = self.unauthorized_token()
name = self.AUTH_BACKEND.name + 'unauthorized_token_name'
self.request.session[name] = token.to_string()
- return self.oauth_request(token, self.AUTHORIZATION_URL).to_url()
+ return self.oauth_request(token, self.AUTHORIZATION_URL,
+ self.auth_extra_arguments()).to_url()
def auth_complete(self, *args, **kwargs):
"""Return user, might be logged in"""
def fetch_response(self, request):
"""Executes request and fetchs service response"""
- connection = HTTPSConnection(self.SERVER_URL)
- connection.request(request.method.upper(), request.to_url())
- return connection.getresponse().read()
+ response = urlopen(request.to_url())
+ return '\n'.join(response.readlines())
def access_token(self, token):
"""Return request for access token value"""
@property
def consumer(self):
"""Setups consumer"""
- consumer = getattr(self, '_consumer', None)
- if consumer is None:
- consumer = OAuthConsumer(*self.get_key_and_secret())
- setattr(self, '_consumer', consumer)
- return consumer
+ return OAuthConsumer(*self.get_key_and_secret())
def get_key_and_secret(self):
"""Return tuple with Consumer Key and Consumer Secret for current
'scope': ' '.join(self.get_scope()),
'redirect_uri': self.redirect_uri,
'response_type': 'code'} # requesting code
+ args.update(self.auth_extra_arguments())
return self.AUTHORIZATION_URL + '?' + urlencode(args)
def auth_complete(self, *args, **kwargs):
"""Completes loging process, must return user instance"""
+ if self.data.get('error'):
+ error = self.data.get('error_description') or self.data['error']
+ raise ValueError('OAuth2 authentication failed: %s' % error)
+
client_id, client_secret = self.get_key_and_secret()
params = {'grant_type': 'authorization_code', # request auth code
'code': self.data.get('code', ''), # server response code