+"""
+Django-social-auth application, allows OpenId or OAuth user
+registration/authentication just adding a few configurations.
+"""
+"""Admin settings"""
from django.contrib import admin
from .models import UserSocialAuth, Nonce, Association
+"""Authentication handling class"""
import cgi
import urllib
import httplib
from django.conf import settings
from django.utils import simplejson
from django.contrib.auth import authenticate
-from django.core.urlresolvers import reverse
from .base import BaseAuth
from .store import DjangoOpenIDStore
class OpenIdAuth(BaseAuth):
+ """OpenId process handling"""
def auth_url(self):
openid_request = self.setup_request()
# Construct completion URL, including page we should redirect to
raise ValueError, 'Unknown OpenID response type: %r' % response.status
def setup_request(self):
+ """Setup request"""
openid_request = self.openid_request()
# Request some user details. If the provider advertises support
# for attribute exchange, use that.
@property
def uses_redirect(self):
+ """Return true if openid request will be handled with redirect or
+ HTML content will be returned.
+ """
if not hasattr(self, '_uses_redirect'):
setattr(self, '_uses_redirect', self.openid_request().shouldSendRedirect())
return getattr(self, '_uses_redirect', True)
def openid_request(self):
+ """Return openid request"""
if not hasattr(self, '_openid_request'):
openid_url = self.openid_url()
try:
return getattr(self, '_openid_request', None)
def openid_url(self):
+ """Return service provider URL.
+ This base class is generic accepting a POST parameter that specifies
+ provider URL."""
if self.request.method != 'POST' or OPENID_ID_FIELD not in self.request.POST:
raise ValueError, 'Missing openid identifier'
return self.request.POST[OPENID_ID_FIELD]
class GoogleAuth(OpenIdAuth):
"""Google OpenID authentication"""
def openid_url(self):
+ """Return Google OpenID service url"""
return OPENID_GOOGLE_URL
class YahooAuth(OpenIdAuth):
"""Yahoo OpenID authentication"""
def openid_url(self):
+ """Return Yahoo OpenID service url"""
return OPENID_YAHOO_URL
class BaseOAuth(BaseAuth):
+ """OAuth base class"""
def __init__(self, request, redirect):
+ """Init method"""
super(BaseOAuth, self).__init__(request, redirect)
self.redirect_uri = self.request.build_absolute_uri(self.redirect)
return authenticate(response=data, twitter=True)
def unauthorized_token(self):
+ """Return request for unauthorized token (first stage)"""
request = self.oauth_request(token=None, url=TWITTER_REQUEST_TOKEN_URL)
response = self.fetch_response(request)
return OAuthToken.from_string(response)
def access_token(self, token):
+ """Return request for access token value"""
request = self.oauth_request(token, TWITTER_ACCESS_TOKEN_URL)
return OAuthToken.from_string(self.fetch_response(request))
def user_data(self, access_token):
+ """Return user data provided"""
request = self.oauth_request(access_token, TWITTER_CHECK_AUTH)
json = self.fetch_response(request)
try:
return None
def oauth_request(self, token, url):
+ """Generate OAuth request, setups callback url"""
params = {'oauth_callback': self.redirect_uri}
if 'oauth_verifier' in self.request.GET:
params['oauth_verifier'] = self.request.GET['oauth_verifier']
return request
def fetch_response(self, request):
+ """Executes request and fetchs service response"""
self.connection.request(request.http_method, request.to_url())
response = self.connection.getresponse()
return response.read()
@property
def connection(self):
+ """Setups connection"""
conn = getattr(self, '_connection', None)
if conn is None:
conn = httplib.HTTPSConnection(TWITTER_SERVER)
@property
def consumer(self):
+ """Setups consumer"""
cons = getattr(self, '_consumer', None)
if cons is None:
cons = OAuthConsumer(settings.TWITTER_CONSUMER_KEY,
raise ValueError, 'Authentication error'
def user_data(self, access_token):
+ """Loads user data from service"""
params = {'access_token': access_token}
url = FACEBOOK_CHECK_AUTH + '?' + urllib.urlencode(params)
try:
+"""
+Authentication backeds for django.contrib.auth AUTHENTICATION_BACKENDS setting
+"""
from openid.extensions import ax, sreg
from .base import SocialAuthBackend
name = 'twitter'
def authenticate(self, **kwargs):
+ """Authenticate user only if this was a Twitter request"""
if kwargs.pop('twitter', False):
return super(TwitterBackend, self).authenticate(**kwargs)
def get_user_details(self, response):
+ """Return user details from Twitter account"""
return {'email': '', # not supplied
'username': response['screen_name'],
'fullname': response['name'],
name = 'facebook'
def authenticate(self, **kwargs):
+ """Authenticate user only if this was a Facebook request"""
if kwargs.pop('facebook', False):
return super(FacebookBackend, self).authenticate(**kwargs)
def get_user_details(self, response):
+ """Return user details from Facebook account"""
return {'email': response.get('email', ''),
'username': response['name'],
'fullname': response['name'],
return super(OpenIDBackend, self).authenticate(**kwargs)
def get_user_id(self, details, response):
+ """Return user unique id provided by service"""
return response.identity_url
def get_user_details(self, response):
+ """Return user details from an OpenID request"""
values = {'email': '',
'username': '',
'fullname': '',
+"""Some base classes"""
import os
import md5
return user
def get_username(self, details):
+ """Return an unique username, if SOCIAL_AUTH_FORCE_RANDOM_USERNAME
+ setting is True, then username will be a random 30 chars md5 hash
+ """
def get_random_username():
+ """Return hash from random string cut at 30 chars"""
return md5.md5(str(os.urandom(10))).hexdigest()[:30]
if getattr(settings, 'SOCIAL_AUTH_FORCE_RANDOM_USERNAME', False):
return username
def create_user(self, response, details):
+ """Create user with unique username"""
username = self.get_username(details)
user = User.objects.create_user(username, details.get('email', ''))
- self.update_user_details(user, details)
- self.associate_auth(user, response, details)
+ self.update_user_details(user, details) # load details
+ self.associate_auth(user, response, details) # save account association
return user
def associate_auth(self, user, response, details):
return user_oauth
def update_user_details(self, user, details):
+ """Update user details with new (maybe) data"""
first_name = details.get('firstname') or user.first_name
last_name = details.get('lastname') or user.last_name
email = details.get('email') or user.email
+"""Conf settings"""
# Twitter configuration
TWITTER_SERVER = 'api.twitter.com'
TWITTER_REQUEST_TOKEN_URL = 'https://%s/oauth/request_token' % TWITTER_SERVER
+"""Social auth models"""
from django.db import models
from django.contrib.auth.models import User
class UserSocialAuth(models.Model):
"""Social Auth association model"""
- user = models.ForeignKey(User)
+ user = models.ForeignKey(User, related_name='social_auth')
provider = models.CharField(max_length=32)
- uid = models.CharField(max_length=2048)
+ uid = models.TextField()
class Meta:
+ """Meta data"""
unique_together = ('provider', 'uid')
class Nonce(models.Model):
- server_url = models.CharField(max_length=2047)
+ """One use numbers"""
+ server_url = models.TextField()
timestamp = models.IntegerField()
salt = models.CharField(max_length=40)
class Association(models.Model):
+ """OpenId account association"""
server_url = models.TextField(max_length=2047)
handle = models.CharField(max_length=255)
secret = models.TextField(max_length=255) # Stored base64 encoded
self.signature_methods = signature_methods or {}
def set_data_store(self, oauth_data_store):
- self.data_store = data_store
+ self.data_store = oauth_data_store
def get_data_store(self):
return self.data_store
token = self._get_token(oauth_request, 'request')
except OAuthError:
# no token required for the initial token request
- version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
self._check_signature(oauth_request, consumer, None)
# fetch a new token
# process an access_token request
# returns the access token on success
def fetch_access_token(self, oauth_request):
- version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
# get the request token
token = self._get_token(oauth_request, 'request')
# verify an api call, checks all the parameters
def verify_request(self, oauth_request):
# -> consumer and token
- version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
# get the access token
token = self._get_token(oauth_request, 'access')
if not valid_sig:
key, base = signature_method.build_signature_base_string(oauth_request, consumer, token)
raise OAuthError('Invalid signature. Expected signature base string: %s' % base)
- built = signature_method.build_signature(oauth_request, consumer, token)
+ signature_method.build_signature(oauth_request, consumer, token)
def _check_timestamp(self, timestamp):
# verify that timestamp is recentish
+"""OpenId storage that saves to django models"""
import time
import base64
class DjangoOpenIDStore(OpenIDStore):
+ """Storage class"""
def __init__(self):
+ """Init method"""
+ super(DjangoOpenIDStore, self).__init__()
self.max_nonce_age = 6 * 60 * 60 # Six hours
def storeAssociation(self, server_url, association):
+ """Store new assocition if doesn't exist"""
args = {'server_url': server_url, 'handle': association.handle}
try:
assoc = Association.objects.get(**args)
assoc.save()
def getAssociation(self, server_url, handle=None):
+ """Return stored assocition"""
args = {'server_url': server_url}
if handle is not None:
args['handle'] = handle
return associations[0]
def useNonce(self, server_url, timestamp, salt):
+ """Generate one use number and return if it was created"""
if abs(timestamp - time.time()) > SKEW:
return False
- nonce, created = Nonce.objects.get_or_create(server_url=server_url,
- timestamp=timestamp,
- salt=salt)
- return created
+ return Nonce.objects.get_or_create(server_url=server_url,
+ timestamp=timestamp,
+ salt=salt)[1]
+"""URLs module"""
from django.conf.urls.defaults import patterns, url
from .views import auth, complete
+"""Views"""
from django.conf import settings
from django.http import HttpResponseRedirect, HttpResponse, \
HttpResponseServerError
from .auth import TwitterAuth, FacebookAuth, OpenIdAuth, GoogleAuth, YahooAuth
+# Authenticatin backeds
BACKENDS = {
'twitter': TwitterAuth,
'facebook': FacebookAuth,
'openid': OpenIdAuth,
}
-
def auth(request, backend):
+ """Authentication starting process"""
if backend not in BACKENDS:
return HttpResponseServerError('Incorrect authentication service')
request.session[REDIRECT_FIELD_NAME] = request.GET.get(REDIRECT_FIELD_NAME,
def complete(request, backend):
+ """Authentication complete process"""
if backend not in BACKENDS:
return HttpResponseServerError('Incorrect authentication service')
backend = BACKENDS[backend](request, request.path)