"""Admin settings"""
+from social_auth.utils import setting
-from social_auth import conf
-
-
-if conf.get_models_module().NAME == 'django_models':
-
+if setting('SOCIAL_AUTH_MODELS') in (None, 'social_auth.db.django_models'):
from django.contrib import admin
-
from social_auth.models import UserSocialAuth, Nonce, Association
-
class UserSocialAuthOption(admin.ModelAdmin):
"""Social Auth user options"""
list_display = ('id', 'user', 'provider', 'uid')
raw_id_fields = ('user',)
list_select_related = True
-
class NonceOption(admin.ModelAdmin):
"""Nonce options"""
list_display = ('id', 'server_url', 'timestamp', 'salt')
search_fields = ('server_url',)
-
class AssociationOption(admin.ModelAdmin):
"""Association options"""
list_display = ('id', 'server_url', 'assoc_type')
list_filter = ('assoc_type',)
search_fields = ('server_url',)
-
admin.site.register(UserSocialAuth, UserSocialAuthOption)
admin.site.register(Nonce, NonceOption)
admin.site.register(Association, AssociationOption)
from django.utils import simplejson
from django.utils.importlib import import_module
-from social_auth.models import get_social_auth_for_user
-from social_auth.models import get_user
+from social_auth.models import UserSocialAuth
from social_auth.utils import setting, log, model_to_ctype, ctype_to_model, \
clean_partial_pipeline
from social_auth.store import DjangoOpenIDStore
return {}
def get_user(self, user_id):
-
"""
Return user with given ID from the User model used by this backend
"""
- return get_user(user_id)
+ return UserSocialAuth.get_user(user_id)
class OAuthBackend(SocialAuthBackend):
Override if extra operations are needed.
"""
if association_id:
- get_social_auth_for_user(user).get(id=association_id).delete()
+ UserSocialAuth.get_social_auth_for_user(user)\
+ .get(id=association_id).delete()
else:
- get_social_auth_for_user(user).filter(provider=self.AUTH_BACKEND.name).delete()
+ UserSocialAuth.get_social_auth_for_user(user)\
+ .filter(provider=self.AUTH_BACKEND.name).delete()
def build_absolute_uri(self, path=None):
"""Build absolute URI for given path. Replace http:// schema with
+from django.core.exceptions import MultipleObjectsReturned, ObjectDoesNotExist
+
from social_auth.utils import setting
-from social_auth.models import get_user_by_email
+from social_auth.models import UserSocialAuth
from social_auth.backends.pipeline import warn_setting
from social_auth.backends.exceptions import AuthException
# only if it's a single object. AuthException is raised if multiple
# objects are returned
try:
- return {'user': get_user_by_email(email=email)}
- except User.MultipleObjectsReturned:
+ return {'user': UserSocialAuth.get_user_by_email(email=email)}
+ except MultipleObjectsReturned:
raise AuthException(kwargs['backend'], 'Not unique email address.')
- except User.DoesNotExist:
+ except ObjectDoesNotExist:
pass
from django.db.utils import IntegrityError
from social_auth.utils import setting
-from social_auth.models import create_social_auth
-from social_auth.models import get_social_auth
+from social_auth.models import UserSocialAuth
from social_auth.backends.pipeline import warn_setting
from social_auth.backends.exceptions import AuthException
from django.utils.translation import ugettext
Raise AuthException if UserSocialAuth entry belongs to another user.
"""
- social_user = get_social_auth(backend.name, uid)
+ social_user = UserSocialAuth.get_social_auth(backend.name, uid)
if social_user:
if user and social_user.user != user:
- raise AuthException(backend, ugettext('This %(provider)s account already in use.') % {
- 'provider':backend.name,
- })
+ msg = ugettext('This %(provider)s account already in use.')
+ raise AuthException(backend, msg % {'provider': backend.name})
elif not user:
user = social_user.user
return {'social_user': social_user, 'user': user}
return None
try:
- social = create_social_auth(user, uid, backend.name)
+ social = UserSocialAuth.create_social_auth(user, uid, backend.name)
except IntegrityError:
# Protect for possible race condition, those bastard with FTL
# clicking capabilities, check issue #131:
from uuid import uuid4
from social_auth.utils import setting
-from social_auth.models import create_user as create_user_in_db
-from social_auth.models import simple_user_exists
-from social_auth.models import USERNAME, USERNAME_MAX_LENGTH
+from social_auth.models import UserSocialAuth, USERNAME
from social_auth.backends.pipeline import warn_setting
from social_auth.signals import socialauth_not_registered, \
socialauth_registered, \
pre_update
-def get_username(details, user=None, user_exists=simple_user_exists,
+def get_username(details, user=None,
+ user_exists=UserSocialAuth.simple_user_exists,
*args, **kwargs):
"""Return an username for new user. Return current user username
if user was given.
uuid_length = setting('SOCIAL_AUTH_UUID_LENGTH', 16)
username_fixer = setting('SOCIAL_AUTH_USERNAME_FIXER', lambda u: u)
- short_username = username[:USERNAME_MAX_LENGTH - uuid_length]
- final_username = username_fixer(username)[:USERNAME_MAX_LENGTH]
+ max_length = UserSocialAuth.username_max_length()
+ short_username = username[:max_length - uuid_length]
+ final_username = username_fixer(username)[:max_length]
# Generate a unique username for current user using username
# as base but adding a unique hash at the end. Original
# username is cut to avoid any field max_length.
while user_exists(username=final_username):
username = short_username + uuid4().get_hex()[:uuid_length]
- final_username = username_fixer(username)[:USERNAME_MAX_LENGTH]
+ final_username = username_fixer(username)[:max_length]
return {'username': final_username}
email = details.get('email') or None
return {
- 'user': create_user_in_db(username=username, email=email),
+ 'user': UserSocialAuth.create_user(username=username, email=email),
'is_new': True
}
from django.utils import simplejson
-from social_auth.models import resolve_user_or_id
+from social_auth.models import UserSocialAuth
def consumer_oauth_url_request(backend, url, user_or_id, redirect_uri='/',
json=True):
"""Builds and retrieves an OAuth signed response."""
- user = resolve_user_or_id(user_or_id)
+ user = UserSocialAuth.resolve_user_or_id(user_or_id)
oauth_info = user.social_auth.filter(provider=backend.AUTH_BACKEND.name)[0]
token = Token.from_string(oauth_info.tokens['access_token'])
request = build_consumer_oauth_request(backend, token, url, redirect_uri)
def build_consumer_oauth_request(backend, token, url, redirect_uri='/',
- oauth_verifier=None, extra_params=None, method=HTTP_METHOD):
+ oauth_verifier=None, extra_params=None,
+ method=HTTP_METHOD):
"""Builds a Consumer OAuth request."""
params = {'oauth_callback': redirect_uri}
if extra_params:
+++ /dev/null
-"""Centralized definition of settings"""
-
-
-from django.utils import importlib
-from social_auth.utils import setting
-
-
-SOCIAL_AUTH_MODELS = setting('SOCIAL_AUTH_MODELS', 'social_auth.django_models')
-
-
-def get_models_module():
- """Load and return the module specified by the SOCIAL_AUTH_MODELS config
- setting.
-
- """
- return importlib.import_module(SOCIAL_AUTH_MODELS)
+from social_auth.models import UserSocialAuth
from social_auth.backends import get_backends
from social_auth.utils import group_backend_by_type
-from social_auth.models import get_social_auth_for_user
# Note: social_auth_backends, social_auth_by_type_backends and
if hasattr(user, 'is_authenticated') and user.is_authenticated():
accounts.update((assoc.provider.replace('-', '_'), assoc)
- for assoc in get_social_auth_for_user(user))
+ for assoc in UserSocialAuth.get_social_auth_for_user(user))
return {'social_auth': accounts}
# user comes from request.user usually, on /admin/ it will be an instance
# of auth.User and this code will fail if a custom User model was defined
if hasattr(user, 'is_authenticated') and user.is_authenticated():
- associated = get_social_auth_for_user(user)
+ associated = UserSocialAuth.get_social_auth_for_user(user)
not_associated = list(set(available) -
set(assoc.provider for assoc in associated))
values['associated'] = associated
--- /dev/null
+"""Models mixins for Social Auth"""
+import base64
+from datetime import datetime
+
+from django.utils.timezone import utc
+
+from openid.association import Association as OIDAssociation
+
+from social_auth.utils import setting
+
+
+class UserSocialAuthMixin(object):
+ User = None
+ user = ''
+ provider = ''
+
+ def __unicode__(self):
+ """Return associated user unicode representation"""
+ return u'%s - %s' % (unicode(self.user), self.provider.title())
+
+ @property
+ def tokens(self):
+ """Return access_token stored in extra_data or None"""
+ # Make import here to avoid recursive imports :-/
+ from social_auth.backends import get_backends
+ backend = get_backends().get(self.provider)
+ if backend:
+ return backend.AUTH_BACKEND.tokens(self)
+ else:
+ return {}
+
+ def expiration_datetime(self):
+ """Return saved session expiration seconds if any. Is returned in
+ the form of timezone-aware datetime. None is returned if there's no
+ value stored or it's malformed.
+ """
+ if self.extra_data:
+ name = setting('SOCIAL_AUTH_EXPIRATION', 'expires')
+ try:
+ return datetime.utcfromtimestamp(self.extra_data.get(name))\
+ .replace(tzinfo=utc)
+ except (ValueError, TypeError):
+ pass
+ return None
+
+ @classmethod
+ def username_max_length(cls):
+ raise NotImplementedError('Implement in subclass')
+
+ @classmethod
+ def simple_user_exists(cls, *args, **kwargs):
+ """
+ Return True/False if a User instance exists with the given arguments.
+ Arguments are directly passed to filter() manager method.
+ """
+ return cls.User.objects.filter(*args, **kwargs).count() > 0
+
+ @classmethod
+ def create_user(cls, *args, **kwargs):
+ return cls.User.objects.create(*args, **kwargs)
+
+ @classmethod
+ def get_user(cls, pk):
+ try:
+ return cls.User.objects.get(pk=pk)
+ except cls.User.DoesNotExist:
+ return None
+
+ @classmethod
+ def get_user_by_email(cls, email):
+ return cls.User.objects.get(email=email)
+
+ @classmethod
+ def resolve_user_or_id(cls, user_or_id):
+ if isinstance(user_or_id, cls.User):
+ return user_or_id
+ return cls.User.objects.get(pk=user_or_id)
+
+ @classmethod
+ def get_social_auth(cls, provider, uid):
+ try:
+ return cls.objects.get(provider=provider, uid=uid)
+ except cls.DoesNotExist:
+ return None
+
+ @classmethod
+ def get_social_auth_for_user(cls, user):
+ return user.social_auth.all()
+
+ @classmethod
+ def create_social_auth(cls, user, uid, provider):
+ return cls.objects.create(user=user, uid=uid, provider=provider)
+
+ @classmethod
+ def store_association(cls, server_url, association):
+ from social_auth.models import Association
+ args = {'server_url': server_url, 'handle': association.handle}
+ try:
+ assoc = Association.objects.get(**args)
+ except Association.DoesNotExist:
+ assoc = Association(**args)
+ assoc.secret = base64.encodestring(association.secret)
+ assoc.issued = association.issued
+ assoc.lifetime = association.lifetime
+ assoc.assoc_type = association.assoc_type
+ assoc.save()
+
+ @classmethod
+ def get_oid_associations(cls, server_url, handle=None):
+ from social_auth.models import Association
+ args = {'server_url': server_url}
+ if handle is not None:
+ args['handle'] = handle
+
+ return sorted([
+ (assoc.id,
+ OIDAssociation(assoc.handle,
+ base64.decodestring(assoc.secret),
+ assoc.issued,
+ assoc.lifetime,
+ assoc.assoc_type))
+ for assoc in Association.objects.filter(**args)
+ ], key=lambda x: x[1].issued, reverse=True)
+
+ @classmethod
+ def delete_associations(cls, ids_to_delete):
+ from social_auth.models import Association
+ Association.objects.filter(pk__in=ids_to_delete).delete()
+
+ @classmethod
+ def use_nonce(cls, server_url, timestamp, salt):
+ from social_auth.models import Nonce
+ return Nonce.objects.get_or_create(server_url=server_url,
+ timestamp=timestamp,
+ salt=salt)[1]
+
+
+class NonceMixin(object):
+ """One use numbers"""
+ server_url = ''
+ timestamp = 0
+ salt = ''
+
+ def __unicode__(self):
+ """Unicode representation"""
+ return self.server_url
+
+
+class AssociationMixin(object):
+ """OpenId account association"""
+ server_url = ''
+ handle = ''
+ secret = ''
+ issued = 0
+ lifetime = 0
+ assoc_type = ''
+
+ def __unicode__(self):
+ """Unicode representation"""
+ return '%s %s' % (self.handle, self.issued)
--- /dev/null
+"""Django ORM models for Social Auth"""
+from django.db import models
+
+from social_auth.db.base import UserSocialAuthMixin, AssociationMixin, \
+ NonceMixin
+from social_auth.fields import JSONField
+from social_auth.utils import setting
+
+
+# If User class is overridden, it *must* provide the following fields
+# and methods work with django-social-auth:
+#
+# username = CharField()
+# last_login = DateTimeField()
+# is_active = BooleanField()
+# def is_authenticated():
+# ...
+if setting('SOCIAL_AUTH_USER_MODEL'):
+ UserModel = models.get_model(*setting('SOCIAL_AUTH_USER_MODEL')\
+ .rsplit('.', 1))
+else:
+ from django.contrib.auth.models import User as UserModel
+
+
+# TODO make this a complementary config setting to SOCIAL_AUTH_USER_MODEL
+USERNAME = 'username'
+
+
+class UserSocialAuth(models.Model, UserSocialAuthMixin):
+ """Social Auth association model"""
+ User = UserModel
+ user = models.ForeignKey(UserModel, related_name='social_auth')
+ provider = models.CharField(max_length=32)
+ uid = models.CharField(max_length=255)
+ extra_data = JSONField(default='{}')
+
+ class Meta:
+ """Meta data"""
+ unique_together = ('provider', 'uid')
+ app_label = 'social_auth'
+
+ @classmethod
+ def create_user(cls, *args, **kwargs):
+ return cls.User.objects.create_user(*args, **kwargs)
+
+ @classmethod
+ def get_social_auth(cls, provider, uid):
+ try:
+ return cls.objects.select_related('user').get(provider=provider,
+ uid=uid)
+ except UserSocialAuth.DoesNotExist:
+ return None
+
+
+class Nonce(models.Model, NonceMixin):
+ """One use numbers"""
+ server_url = models.CharField(max_length=255)
+ timestamp = models.IntegerField()
+ salt = models.CharField(max_length=40)
+
+ class Meta:
+ app_label = 'social_auth'
+
+
+class Association(models.Model, AssociationMixin):
+ """OpenId account association"""
+ server_url = models.CharField(max_length=255)
+ handle = models.CharField(max_length=255)
+ secret = models.CharField(max_length=255) # Stored base64 encoded
+ issued = models.IntegerField()
+ lifetime = models.IntegerField()
+ assoc_type = models.CharField(max_length=64)
+
+ class Meta:
+ app_label = 'social_auth'
--- /dev/null
+"""
+MongoEngine models for Social Auth
+
+Requires MongoEngine 0.6.10
+"""
+from mongoengine import DictField, Document, IntField, ReferenceField, \
+ StringField
+from mongoengine.django.auth import User
+
+from social_auth.db.base import UserSocialAuthMixin, AssociationMixin, \
+ NonceMixin
+
+
+# TODO make this a complementary config setting to SOCIAL_AUTH_USER_MODEL
+USERNAME = 'username'
+
+
+class UserSocialAuth(Document, UserSocialAuthMixin):
+ """Social Auth association model"""
+ user = ReferenceField(User)
+ provider = StringField(max_length=32)
+ uid = StringField(max_length=255, unique_with='provider')
+ extra_data = DictField()
+
+ @classmethod
+ def get_social_auth_for_user(cls, user):
+ return cls.objects(user=user)
+
+ @classmethod
+ def create_social_auth(cls, user, uid, provider):
+ if not isinstance(type(uid), basestring):
+ uid = str(uid)
+ return cls.objects.create(user=user, uid=uid, provider=provider)
+
+
+class Nonce(Document, NonceMixin):
+ """One use numbers"""
+ server_url = StringField(max_length=255)
+ timestamp = IntField()
+ salt = StringField(max_length=40)
+
+
+class Association(Document, AssociationMixin):
+ """OpenId account association"""
+ server_url = StringField(max_length=255)
+ handle = StringField(max_length=255)
+ secret = StringField(max_length=255) # Stored base64 encoded
+ issued = IntField()
+ lifetime = IntField()
+ assoc_type = StringField(max_length=64)
+++ /dev/null
-"""Django ORM models for Social Auth"""
-import base64
-from datetime import datetime
-
-from django.db import models
-from django.utils.timezone import utc
-
-from openid.association import Association as OIDAssociation
-
-from social_auth.fields import JSONField
-from social_auth.utils import setting
-
-
-NAME = 'django_models'
-
-
-# If User class is overridden, it *must* provide the following fields
-# and methods work with django-social-auth:
-#
-# username = CharField()
-# last_login = DateTimeField()
-# is_active = BooleanField()
-# def is_authenticated():
-# ...
-
-if setting('SOCIAL_AUTH_USER_MODEL'):
- User = models.get_model(*setting('SOCIAL_AUTH_USER_MODEL').rsplit('.', 1))
-else:
- from django.contrib.auth.models import User
-
-
-# TODO make this a complementary config setting to SOCIAL_AUTH_USER_MODEL
-USERNAME = 'username'
-USERNAME_MAX_LENGTH = User._meta.get_field(USERNAME).max_length
-
-
-def simple_user_exists(*args, **kwargs):
- """Return True/False if a User instance exists with the given arguments.
- Arguments are directly passed to filter() manager method."""
- return User.objects.filter(*args, **kwargs).exists()
-
-
-def create_user(*args, **kwargs):
- return User.objects.create_user(*args, **kwargs)
-
-
-def get_user(user_id):
- try:
- return User.objects.get(id=user_id)
- except User.DoesNotExist:
- return None
-
-
-def get_user_by_email(email):
- return User.objects.get(email=email)
-
-
-def resolve_user_or_id(user_or_id):
- if isinstance(user_or_id, User):
- return user_or_id
- return User.objects.get(pk=user_or_id)
-
-
-def get_social_auth(provider, uid):
- try:
- return UserSocialAuth.objects.select_related('user').get(
- provider=provider, uid=uid)
- except UserSocialAuth.DoesNotExist:
- return None
-
-
-def get_social_auth_for_user(user):
- return user.social_auth.all()
-
-
-def create_social_auth(user, uid, provider):
- return UserSocialAuth.objects.create(user=user, uid=uid, provider=provider)
-
-
-def store_association(server_url, association):
- args = {'server_url': server_url, 'handle': association.handle}
- try:
- assoc = Association.objects.get(**args)
- except Association.DoesNotExist:
- assoc = Association(**args)
- assoc.secret = base64.encodestring(association.secret)
- assoc.issued = association.issued
- assoc.lifetime = association.lifetime
- assoc.assoc_type = association.assoc_type
- assoc.save()
-
-
-def get_oid_associations(server_url, handle=None):
- args = {'server_url': server_url}
- if handle is not None:
- args['handle'] = handle
-
- return sorted([
- (assoc.id,
- OIDAssociation(assoc.handle,
- base64.decodestring(assoc.secret),
- assoc.issued,
- assoc.lifetime,
- assoc.assoc_type))
- for assoc in Association.objects.filter(**args)
- ], key=lambda x: x[1].issued, reverse=True)
-
-
-def delete_associations(ids_to_delete):
- Association.objects.filter(pk__in=ids_to_delete).delete()
-
-
-def use_nonce(server_url, timestamp, salt):
- return Nonce.objects.get_or_create(server_url=server_url,
- timestamp=timestamp,
- salt=salt)[1]
-
-
-class UserSocialAuth(models.Model):
- """Social Auth association model"""
- user = models.ForeignKey(User, related_name='social_auth')
- provider = models.CharField(max_length=32)
- uid = models.CharField(max_length=255)
- extra_data = JSONField(default='{}')
-
- class Meta:
- """Meta data"""
- unique_together = ('provider', 'uid')
- app_label = 'social_auth'
-
- def __unicode__(self):
- """Return associated user unicode representation"""
- return u'%s - %s' % (unicode(self.user), self.provider.title())
-
- @property
- def tokens(self):
- """Return access_token stored in extra_data or None"""
- # Make import here to avoid recursive imports :-/
- from social_auth.backends import get_backends
- backend = get_backends().get(self.provider)
- if backend:
- return backend.AUTH_BACKEND.tokens(self)
- else:
- return {}
-
- def expiration_datetime(self):
- """Return saved session expiration seconds if any. Is returned in
- the form of timezone-aware datetime. None is returned if there's no
- value stored or it's malformed.
- """
- if self.extra_data:
- name = setting('SOCIAL_AUTH_EXPIRATION', 'expires')
- try:
- return datetime.utcfromtimestamp(self.extra_data.get(name))\
- .replace(tzinfo=utc)
- except (ValueError, TypeError):
- pass
- return None
-
-
-class Nonce(models.Model):
- """One use numbers"""
- server_url = models.CharField(max_length=255)
- timestamp = models.IntegerField()
- salt = models.CharField(max_length=40)
-
- class Meta:
- app_label = 'social_auth'
-
- def __unicode__(self):
- """Unicode representation"""
- return self.server_url
-
-
-class Association(models.Model):
- """OpenId account association"""
- server_url = models.CharField(max_length=255)
- handle = models.CharField(max_length=255)
- secret = models.CharField(max_length=255) # Stored base64 encoded
- issued = models.IntegerField()
- lifetime = models.IntegerField()
- assoc_type = models.CharField(max_length=64)
-
- class Meta:
- app_label = 'social_auth'
-
- def __unicode__(self):
- """Unicode representation"""
- return '%s %s' % (self.handle, self.issued)
"""Social auth models"""
-# TODO define protocol for implementing modules...
+from django.utils.importlib import import_module
-from social_auth import conf
+from social_auth.utils import setting
-models_module = conf.get_models_module()
+SOCIAL_AUTH_MODELS_MODULE = import_module(setting('SOCIAL_AUTH_MODELS',
+ 'social_auth.db.django_models'))
-this_module = globals()
-for key in dir(models_module):
- this_module[key] = getattr(models_module, key)
+globals().update((name, getattr(SOCIAL_AUTH_MODELS_MODULE, name))
+ for name in dir(SOCIAL_AUTH_MODELS_MODULE))
+++ /dev/null
-"""MongoEngine models for Social Auth
-
-Requires MongoEngine 0.6.10
-
-"""
-# TODO extract common code into base objects/mixins
-
-
-import base64
-from datetime import timedelta
-
-from openid.association import Association as OIDAssociation
-
-from mongoengine import DictField
-from mongoengine import Document
-from mongoengine import IntField
-from mongoengine import ReferenceField
-from mongoengine import StringField
-from social_auth.utils import setting
-
-
-NAME = 'mongoengine_models'
-
-
-# If User class is overridden, it *must* provide the following fields
-# and methods work with django-social-auth:
-#
-# username = CharField()
-# last_login = DateTimeField()
-# is_active = BooleanField()
-# def is_authenticated():
-# ...
-
-if setting('SOCIAL_AUTH_USER_MODEL'):
- User = models.get_model(*setting('SOCIAL_AUTH_USER_MODEL').rsplit('.', 1))
-else:
- from mongoengine.django.auth import User
-
-
-# TODO make this a complementary config setting to SOCIAL_AUTH_USER_MODEL
-USERNAME = 'username'
-USERNAME_MAX_LENGTH = getattr(User, USERNAME).max_length
-
-
-def simple_user_exists(*args, **kwargs):
- """Return True/False if a User instance exists with the given arguments.
- Arguments are directly passed to filter() manager method."""
- return User.objects.filter(*args, **kwargs).count()
-
-
-def create_user(*args, **kwargs):
- return User.objects.create(*args, **kwargs)
-
-
-def get_user(user_id):
- try:
- return User.objects.get(id=user_id)
- except User.DoesNotExist:
- return None
-
-
-def get_user_by_email(email):
- return User.objects.get(email=email)
-
-
-def resolve_user_or_id(user_or_id):
- if isinstance(user_or_id, User):
- return user_or_id
- return User.objects.get(pk=user_or_id)
-
-
-def get_social_auth(provider, uid):
- try:
- return UserSocialAuth.objects.get(provider=provider, uid=uid)
- except UserSocialAuth.DoesNotExist:
- return None
-
-
-def get_social_auth_for_user(user):
- return UserSocialAuth.objects(user=user)
-
-
-def create_social_auth(user, uid, provider):
- if type(uid) is not str:
- uid = str(uid)
- return UserSocialAuth.objects.create(user=user, uid=uid, provider=provider)
-
-
-def store_association(server_url, association):
- args = {'server_url': server_url, 'handle': association.handle}
- try:
- assoc = Association.objects.get(**args)
- except Association.DoesNotExist:
- assoc = Association(**args)
- assoc.secret = base64.encodestring(association.secret)
- assoc.issued = association.issued
- assoc.lifetime = association.lifetime
- assoc.assoc_type = association.assoc_type
- assoc.save()
-
-
-def get_oid_associations(server_url, handle=None):
- args = {'server_url': server_url}
- if handle is not None:
- args['handle'] = handle
-
- return sorted([
- (assoc.id,
- OIDAssociation(assoc.handle,
- base64.decodestring(assoc.secret),
- assoc.issued,
- assoc.lifetime,
- assoc.assoc_type))
- for assoc in Association.objects.filter(**args)
- ], key=lambda x: x[1].issued, reverse=True)
-
-
-def use_nonce(server_url, timestamp, salt):
- return Nonce.objects.get_or_create(server_url=server_url,
- timestamp=timestamp,
- salt=salt)[1]
-
-
-def delete_associations(ids_to_delete):
- Association.objects.filter(pk__in=ids_to_delete).delete()
-
-
-class UserSocialAuth(Document):
- """Social Auth association model"""
- user = ReferenceField(User)
- provider = StringField(max_length=32)
- uid = StringField(max_length=255, unique_with='provider')
- extra_data = DictField()
-
- def __unicode__(self):
- """Return associated user unicode representation"""
- return u'%s - %s' % (unicode(self.user), self.provider)
-
- @property
- def tokens(self):
- """Return access_token stored in extra_data or None"""
- # Make import here to avoid recursive imports :-/
- from social_auth.backends import get_backends
- backend = get_backends().get(self.provider)
- if backend:
- return backend.AUTH_BACKEND.tokens(self)
- else:
- return {}
-
- def expiration_delta(self):
- """Return saved session expiration seconds if any. Is returned in
- the form of a timedelta data type. None is returned if there's no
- value stored or it's malformed.
- """
- if self.extra_data:
- name = setting('SOCIAL_AUTH_EXPIRATION', 'expires')
- try:
- return timedelta(seconds=int(self.extra_data.get(name)))
- except (ValueError, TypeError):
- pass
- return None
-
-
-class Nonce(Document):
- """One use numbers"""
- server_url = StringField(max_length=255)
- timestamp = IntField()
- salt = StringField(max_length=40)
-
- def __unicode__(self):
- """Unicode representation"""
- return self.server_url
-
-
-class Association(Document):
- """OpenId account association"""
- server_url = StringField(max_length=255)
- handle = StringField(max_length=255)
- secret = StringField(max_length=255) # Stored base64 encoded
- issued = IntField()
- lifetime = IntField()
- assoc_type = StringField(max_length=64)
-
- def __unicode__(self):
- """Unicode representation"""
- return '%s %s' % (self.handle, self.issued)
"""OpenId storage that saves to django models"""
import time
-import base64
from openid.store.interface import OpenIDStore
from openid.store.nonce import SKEW
-from social_auth.models import delete_associations
-from social_auth.models import get_oid_associations
-from social_auth.models import store_association
-from social_auth.models import use_nonce
+from social_auth.models import UserSocialAuth
class DjangoOpenIDStore(OpenIDStore):
def storeAssociation(self, server_url, association):
"""Store new assocition if doesn't exist"""
- store_association(server_url, association)
+ UserSocialAuth.store_association(server_url, association)
def getAssociation(self, server_url, handle=None):
"""Return stored assocition"""
- oid_associations = get_oid_associations(server_url, handle)
+ oid_associations = UserSocialAuth.get_oid_associations(server_url,
+ handle)
associations = [association
for assoc_id, association in oid_associations
if association.getExpiresIn() > 0]
if association.getExpiresIn() == 0]
if expired: # clear expired associations
- delete_associations(expired)
+ UserSocialAuth.delete_associations(expired)
if associations: # return most recet association
return associations[0]
"""Generate one use number and return *if* it was created"""
if abs(timestamp - time.time()) > SKEW:
return False
- return use_nonce(server_url, timestamp, salt)
+ return UserSocialAuth.use_nonce(server_url, timestamp, salt)