from .base import BaseAuth
from .store import DjangoOpenIDStore
+from .backends import TwitterBackend, OrkutBackend, FacebookBackend, \
+ OpenIDBackend
from .conf import AX_ATTRS, SREG_ATTR, OPENID_ID_FIELD, SESSION_NAME, \
OPENID_GOOGLE_URL, OPENID_YAHOO_URL, TWITTER_SERVER, \
TWITTER_REQUEST_TOKEN_URL, TWITTER_ACCESS_TOKEN_URL, \
TWITTER_AUTHORIZATION_URL, TWITTER_CHECK_AUTH, \
- TWITTER_UNAUTHORIZED_TOKEN_NAME, FACEBOOK_CHECK_AUTH, \
- FACEBOOK_AUTHORIZATION_URL, FACEBOOK_ACCESS_TOKEN_URL, \
- ORKUT_SERVER, ORKUT_SCOPE, ORKUT_UNAUTHORIZED_TOKEN_NAME, \
+ FACEBOOK_CHECK_AUTH, FACEBOOK_AUTHORIZATION_URL, \
+ FACEBOOK_ACCESS_TOKEN_URL, ORKUT_SERVER, ORKUT_SCOPE, \
ORKUT_REQUEST_TOKEN_URL, ORKUT_ACCESS_TOKEN_URL, \
- ORKUT_AUTHORIZATION_URL, ORKUT_REST_ENDPOINT, ORKUT_EXTRA_DATA
+ ORKUT_AUTHORIZATION_URL, ORKUT_REST_ENDPOINT, \
+ ORKUT_EXTRA_DATA
class OpenIdAuth(BaseAuth):
if not response:
raise ValueError, 'This is an OpenID relying party endpoint'
elif response.status == SUCCESS:
- return authenticate(response=response, openid=True)
+ return authenticate(**{'response': response,
+ OpenIDBackend.name: True})
elif response.status == FAILURE:
raise ValueError, 'OpenID authentication failed: %s' % response.message
elif response.status == CANCEL:
self.redirect_uri = self.request.build_absolute_uri(self.redirect)
-class OrkutAuth(BaseOAuth):
- """Orkut OAuth authentication mechanism"""
+class ConsumerBasedOAuth(BaseOAuth):
+ """Consumer based mechanism OAuth authentication, fill the needed
+ parameters to communicate properly with authentication service.
+
+ @AUTHORIZATION_URL Authorization service url
+ @REQUEST_TOKEN_URL Request token URL
+ @ACCESS_TOKEN_URL Access token URL
+ @SERVER_URL Authorization server URL
+ @AUTH_BACKEND Authorization backend related with
+ this service
+ """
+ AUTHORIZATION_URL = ''
+ REQUEST_TOKEN_URL = ''
+ ACCESS_TOKEN_URL = ''
+ SERVER_URL = ''
+ AUTH_BACKEND = None
+
def auth_url(self):
"""Returns redirect url"""
token = self.unauthorized_token()
- self.request.session[ORKUT_UNAUTHORIZED_TOKEN_NAME] = token.to_string()
- return self.oauth_request(token, ORKUT_AUTHORIZATION_URL).to_url()
+ name = self.AUTH_BACKEND.name + 'unauthorized_token_name'
+ self.request.session[name] = token.to_string()
+ return self.oauth_request(token, self.AUTHORIZATION_URL).to_url()
def auth_complete(self):
"""Returns user, might be logged in"""
- unauthed_token = self.request.session.get(ORKUT_UNAUTHORIZED_TOKEN_NAME)
+ name = self.AUTH_BACKEND.name + 'unauthorized_token_name'
+ unauthed_token = self.request.session.get(name)
if not unauthed_token:
raise ValueError, 'Missing unauthorized token'
token = OAuthToken.from_string(unauthed_token)
if token.key != self.request.GET.get('oauth_token', 'no-token'):
raise ValueError, 'Incorrect tokens'
+
access_token = self.access_token(token)
data = self.user_data(access_token)
if data is not None:
data['access_token'] = access_token.to_string()
- return authenticate(response=data, orkut=True)
+
+ return authenticate(**{'response': data, self.AUTH_BACKEND.name: True})
def unauthorized_token(self):
"""Return request for unauthorized token (first stage)"""
- request = self.oauth_request(token=None, url=ORKUT_REQUEST_TOKEN_URL)
+ request = self.oauth_request(token=None, url=self.REQUEST_TOKEN_URL)
response = self.fetch_response(request)
return OAuthToken.from_string(response)
- def oauth_request(self, token, url, params={}):
+ def oauth_request(self, token, url, extra_params=None):
"""Generate OAuth request, setups callback url"""
- params.update({'oauth_callback': self.redirect_uri, \
- 'scope': ORKUT_SCOPE})
+ params = {'oauth_callback': self.redirect_uri}
+ if extra_params:
+ params.update(extra_params)
+
if 'oauth_verifier' in self.request.GET:
params['oauth_verifier'] = self.request.GET['oauth_verifier']
request = OAuthRequest.from_consumer_and_token(self.consumer,
def access_token(self, token):
"""Return request for access token value"""
- request = self.oauth_request(token, ORKUT_ACCESS_TOKEN_URL)
+ request = self.oauth_request(token, self.ACCESS_TOKEN_URL)
return OAuthToken.from_string(self.fetch_response(request))
def user_data(self, access_token):
"""Loads user data from service"""
- params = {'method': 'people.get', \
- 'id': 'myself', \
- 'userId': '@me', \
- 'groupId': '@self', \
- 'fields': 'name,displayName,emails,%s' % ORKUT_EXTRA_DATA}
- request = self.oauth_request(access_token, ORKUT_REST_ENDPOINT, params)
- response = urllib.urlopen(request.to_url()).read()
- try:
- json = simplejson.loads(response)
- return json['data']
- except simplejson.JSONDecodeError:
- return None
+ raise NotImplementedError, 'Implement in subclass'
@property
def connection(self):
"""Setups connection"""
conn = getattr(self, '_connection', None)
if conn is None:
- conn = httplib.HTTPSConnection(ORKUT_SERVER)
+ conn = httplib.HTTPSConnection(self.SERVER_URL)
setattr(self, '_connection', conn)
return conn
"""Setups consumer"""
cons = getattr(self, '_consumer', None)
if cons is None:
- cons = OAuthConsumer(settings.ORKUT_CONSUMER_KEY,
- settings.ORKUT_CONSUMER_SECRET)
+ cons = OAuthConsumer(*self.get_key_and_secret())
setattr(self, '_consumer', cons)
return cons
+ def get_key_and_secret(self):
+ """Return tuple with Consumer Key and Consumer Secret for current
+ service provider. Must return (key, secret), order must be respected.
+ """
+ raise NotImplementedError, 'Implement in subclass'
-class TwitterAuth(BaseOAuth):
- """Twitter OAuth authentication mechanism"""
- def auth_url(self):
- """Returns redirect url"""
- token = self.unauthorized_token()
- self.request.session[TWITTER_UNAUTHORIZED_TOKEN_NAME] = token.to_string()
- return self.oauth_request(token, TWITTER_AUTHORIZATION_URL).to_url()
- def auth_complete(self):
- """Returns user, might be logged in"""
- unauthed_token = self.request.session.get(TWITTER_UNAUTHORIZED_TOKEN_NAME)
- if not unauthed_token:
- raise ValueError, 'Missing unauthorized token'
+class OrkutAuth(ConsumerBasedOAuth):
+ """Orkut OAuth authentication mechanism"""
+ AUTHORIZATION_URL = ORKUT_AUTHORIZATION_URL
+ REQUEST_TOKEN_URL = ORKUT_REQUEST_TOKEN_URL
+ ACCESS_TOKEN_URL = ORKUT_ACCESS_TOKEN_URL
+ SERVER_URL = ORKUT_SERVER
+ AUTH_BACKEND = OrkutBackend
- token = OAuthToken.from_string(unauthed_token)
- if token.key != self.request.GET.get('oauth_token', 'no-token'):
- raise ValueError, 'Incorrect tokens'
- access_token = self.access_token(token)
- data = self.user_data(access_token)
- if data is not None:
- data['access_token'] = access_token.to_string()
- return authenticate(response=data, twitter=True)
+ def user_data(self, access_token):
+ """Loads user data from Orkut service"""
+ fields = 'name,displayName,emails'
+ if ORKUT_EXTRA_DATA:
+ fields += ',' + ORKUT_EXTRA_DATA
+ params = {'method': 'people.get',
+ 'id': 'myself',
+ 'userId': '@me',
+ 'groupId': '@self',
+ 'fields': fields,
+ 'scope': ORKUT_SCOPE}
+ request = self.oauth_request(access_token, ORKUT_REST_ENDPOINT, params)
+ response = urllib.urlopen(request.to_url()).read()
+ try:
+ json = simplejson.loads(response)
+ return json['data']
+ except simplejson.JSONDecodeError:
+ return None
- def unauthorized_token(self):
- """Return request for unauthorized token (first stage)"""
- request = self.oauth_request(token=None, url=TWITTER_REQUEST_TOKEN_URL)
- response = self.fetch_response(request)
- return OAuthToken.from_string(response)
+ def get_key_and_secret(self):
+ """Return Orkut Consumer Key and Consumer Secret pair"""
+ return settings.ORKUT_CONSUMER_KEY, settings.ORKUT_CONSUMER_SECRET
- def access_token(self, token):
- """Return request for access token value"""
- request = self.oauth_request(token, TWITTER_ACCESS_TOKEN_URL)
- return OAuthToken.from_string(self.fetch_response(request))
+
+class TwitterAuth(ConsumerBasedOAuth):
+ """Twitter OAuth authentication mechanism"""
+ AUTHORIZATION_URL = TWITTER_AUTHORIZATION_URL
+ REQUEST_TOKEN_URL = TWITTER_REQUEST_TOKEN_URL
+ ACCESS_TOKEN_URL = TWITTER_ACCESS_TOKEN_URL
+ SERVER_URL = TWITTER_SERVER
+ AUTH_BACKEND = TwitterBackend
def user_data(self, access_token):
"""Return user data provided"""
except simplejson.JSONDecodeError:
return None
- def oauth_request(self, token, url):
- """Generate OAuth request, setups callback url"""
- params = {'oauth_callback': self.redirect_uri}
- if 'oauth_verifier' in self.request.GET:
- params['oauth_verifier'] = self.request.GET['oauth_verifier']
- request = OAuthRequest.from_consumer_and_token(self.consumer,
- token=token,
- http_url=url,
- parameters=params)
- request.sign_request(OAuthSignatureMethod_HMAC_SHA1(), self.consumer,
- token)
- return request
-
- def fetch_response(self, request):
- """Executes request and fetchs service response"""
- self.connection.request(request.http_method, request.to_url())
- response = self.connection.getresponse()
- return response.read()
-
- @property
- def connection(self):
- """Setups connection"""
- conn = getattr(self, '_connection', None)
- if conn is None:
- conn = httplib.HTTPSConnection(TWITTER_SERVER)
- setattr(self, '_connection', conn)
- return conn
-
- @property
- def consumer(self):
- """Setups consumer"""
- cons = getattr(self, '_consumer', None)
- if cons is None:
- cons = OAuthConsumer(settings.TWITTER_CONSUMER_KEY,
- settings.TWITTER_CONSUMER_SECRET)
- setattr(self, '_consumer', cons)
- return cons
+ def get_key_and_secret(self):
+ """Return Twitter Consumer Key and Consumer Secret pair"""
+ return settings.TWITTER_CONSUMER_KEY, settings.TWITTER_CONSUMER_SECRET
class FacebookAuth(BaseOAuth):
if 'error' in data:
raise ValueError, 'Authentication error'
data['access_token'] = access_token
- return authenticate(response=data, facebook=True)
+ return authenticate(**{'response': data,
+ FacebookBackend.name: True})
else:
raise ValueError, 'Authentication error'