import re
class Duration(object):
-
+ """Represent a time duration"""
def __init__(self, *args, **kwargs):
if len(args) and isinstance(args[0], datetime.timedelta):
self._delta = datetime.timedelta(days=args[0].days, seconds=args[0].seconds)
# The following is based on Django TimeField
class DurationField(models.Field):
+ """Duration Django model field. Essentially the same as a TimeField, but
+ with values over 24h allowed."""
+
description = _("Duration")
__metaclass__ = models.SubfieldBase
defaults = {'form_class': forms.TimeField}
defaults.update(kwargs)
return super(DurationField, self).formfield(**defaults)
-
+
+
+class WeakForeignKey(models.ForeignKey):
+ """A weak foreign key is the same as foreign key but without cascading
+ delete. Instead the reference is set to null when the referenced record
+ get deleted. This emulates the ON DELETE SET NULL sql behaviour.
+
+ Warning: must be used in conjunction with EnhancedQuerySet, EnhancedManager,
+ and EnhancedModel
+ """
+ def __init__(self, to, **kwargs):
+ super(WeakForeignKey, self).__init__(to, **kwargs)
+
+class EnhancedQuerySet(models.query.QuerySet):
+ """QuerySet with added functionalities such as WeakForeignKey handling"""
+
+ def delete(self):
+ CHUNK=1024
+ objects = self.model._meta.get_all_related_objects()
+ ii = self.count()
+ for related in objects:
+ i = 0
+ while i < ii:
+ ids = [v[0] for v in self[i:i + CHUNK].values_list('pk')]
+ filter = {related.field.name + '__pk__in': ids}
+ q = related.model.objects.filter(**filter)
+ if isinstance(related.field, WeakForeignKey):
+ update = {related.field.name: None}
+ q.update(**update)
+ else:
+ q.delete()
+
+ i += CHUNK
+
+ super(EnhancedQuerySet, self).delete()
+
+class EnhancedManager(models.Manager):
+ """Manager which is bound to EnhancedQuerySet"""
+ def get_query_set(self):
+ return EnhancedQuerySet(self.model)
+
+
+class EnhancedModel(models.Model):
+ """Base model class with added functionality. See EnhancedQuerySet"""
+
+ objects = EnhancedManager()
+
+ def delete(self):
+ if not self.pk:
+ raise Exception("Can't delete without a primary key")
+ self.__class__.objects.filter(pk=self.pk).delete()
+
+ class Meta:
+ abstract = True
from xml.dom.minidom import getDOMImplementation
from telemeta.util.unaccent import unaccent_icmp
import re
-from telemeta.models.core import DurationField, Duration
+from telemeta.models.core import DurationField, Duration, WeakForeignKey, EnhancedModel
from telemeta.models import dublincore as dc
-class ModelCore(models.Model):
+class ModelCore(EnhancedModel):
@classmethod
def required_fields(cls):
reference = models.CharField(unique=True, max_length=250,
null=True)
- physical_format = models.ForeignKey('PhysicalFormat', related_name="collections",
- null=True)
+ physical_format = WeakForeignKey('PhysicalFormat', related_name="collections", null=True)
old_code = models.CharField(unique=True, max_length=250, null=True)
code = models.CharField(unique=True, max_length=250)
title = models.CharField(max_length=250)
alt_title = models.CharField(max_length=250, default="")
physical_items_num = models.IntegerField(default=0)
- publishing_status = models.ForeignKey('PublishingStatus', related_name="collections",
- null=True)
+ publishing_status = WeakForeignKey('PublishingStatus', related_name="collections", null=True)
creator = models.CharField(max_length=250, default="")
booklet_author = models.CharField(max_length=250, default="")
booklet_description = models.TextField(default="")
collector = models.CharField(max_length=250, default="")
collector_is_creator = models.BooleanField(default="")
- publisher = models.ForeignKey('Publisher', related_name="collections",
- null=True)
+ publisher = WeakForeignKey('Publisher', related_name="collections", null=True)
is_published = models.BooleanField(default="")
year_published = models.IntegerField(default=0)
- publisher_collection = models.ForeignKey('PublisherCollection', related_name="collections",
- null=True)
+ publisher_collection = WeakForeignKey('PublisherCollection', related_name="collections", null=True)
publisher_serial = models.CharField(max_length=250, default="")
external_references = models.TextField(default="")
- acquisition_mode = models.ForeignKey('AcquisitionMode', related_name="collections",
- null=True)
+ acquisition_mode = WeakForeignKey('AcquisitionMode', related_name="collections", null=True)
comment = models.TextField(default="")
- metadata_author = models.ForeignKey('MetadataAuthor', related_name="collections",
- null=True)
- metadata_writer = models.ForeignKey('MetadataWriter', related_name="collections",
- null=True)
- legal_rights = models.ForeignKey('LegalRight', related_name="collections",
- null=True)
+ metadata_author = WeakForeignKey('MetadataAuthor', related_name="collections", null=True)
+ metadata_writer = WeakForeignKey('MetadataWriter', related_name="collections", null=True)
+ legal_rights = WeakForeignKey('LegalRight', related_name="collections", null=True)
alt_ids = models.CharField(max_length=250, default="")
recorded_from_year = models.IntegerField(default=0)
recorded_to_year = models.IntegerField(default=0)
- recording_context = models.ForeignKey('RecordingContext', related_name="collections",
- null=True)
+ recording_context = WeakForeignKey('RecordingContext', related_name="collections", null=True)
approx_duration = DurationField(default='00:00')
doctype_code = models.IntegerField(default=0)
travail = models.CharField(max_length=250, default="")
cnrs_contributor = models.CharField(max_length=250, default="")
items_done = models.CharField(max_length=250, default="")
a_informer_07_03 = models.CharField(max_length=250, default="")
- ad_conversion = models.ForeignKey('AdConversion', related_name='collections',
- null=True)
+ ad_conversion = WeakForeignKey('AdConversion', related_name='collections', null=True)
public_access = models.CharField(choices=PUBLIC_ACCESS_CHOICES, max_length=16, default="metadata")
objects = query.MediaCollectionManager()
approx_duration = DurationField(default='00:00')
recorded_from_date = models.DateField(default=0)
recorded_to_date = models.DateField(default=0)
- location = models.ForeignKey('Location', related_name="items",
- db_column='location_name', null=True, default="")
+ location = WeakForeignKey('Location', related_name="items",
+ db_column='location_name', null=True)
location_comment = models.CharField(max_length=250, default="")
- ethnic_group = models.ForeignKey('EthnicGroup', related_name="items",
- null=True)
+ ethnic_group = WeakForeignKey('EthnicGroup', related_name="items", null=True)
title = models.CharField(max_length=250)
alt_title = models.CharField(max_length=250, default="")
author = models.CharField(max_length=250, default="")
- vernacular_style = models.ForeignKey('VernacularStyle', related_name="items",
+ vernacular_style = WeakForeignKey('VernacularStyle', related_name="items",
null=True)
context_comment = models.TextField(default="")
external_references = models.TextField(default="")
moda_execut = models.CharField(max_length=250, default="")
- copied_from_item = models.ForeignKey('self', related_name="copies",
- null=True)
+ copied_from_item = WeakForeignKey('self', related_name="copies", null=True)
collector = models.CharField(max_length=250, default="")
cultural_area = models.CharField(max_length=250, default="")
- generic_style = models.ForeignKey('GenericStyle', related_name="items",
+ generic_style = WeakForeignKey('GenericStyle', related_name="items",
null=True)
collector_selection = models.CharField(max_length=250, default="")
creator_reference = models.CharField(max_length=250, default="")
class MediaItemPerformance(ModelCore):
"Item performance"
media_item = models.ForeignKey('MediaItem', related_name="performances")
- instrument = models.ForeignKey('Instrument', related_name="performances",
- null=True)
- alias = models.ForeignKey('InstrumentAlias', related_name="performances",
- null=True)
+ instrument = WeakForeignKey('Instrument', related_name="performances", null=True)
+ alias = WeakForeignKey('InstrumentAlias', related_name="performances", null=True)
instruments_num = models.CharField(max_length=250, default="")
musicians = models.CharField(max_length=250, default="")
name = models.CharField(primary_key=True, max_length=150)
type = models.CharField(choices=TYPE_CHOICES, max_length=16)
complete_type = models.ForeignKey('LocationType', related_name="types")
- current_name = models.ForeignKey('self', related_name="past_names",
- db_column="current_name", null=True)
+ current_name = WeakForeignKey('self', related_name="past_names",
+ db_column="current_name", null=True)
is_authoritative = models.BooleanField(default=0)
def parent(self):
# David LIPSZYC <davidlipszyc@gmail.com>
from django.db.models import Manager, Q
-from django.db.models.query import QuerySet
+from telemeta.models.core import EnhancedQuerySet, EnhancedManager
import re
-class CoreQuerySet(QuerySet):
+class CoreQuerySet(EnhancedQuerySet):
"Base class for all query sets"
def none(self): # redundant with none() in recent Django svn
return self.extra(
where = ["id IN (SELECT DISTINCT element_id FROM revisions WHERE %s)" % " AND ".join(where)]);
-class CoreManager(Manager):
+class CoreManager(EnhancedManager):
"Base class for all models managers"
def none(self, *args, **kwargs):
from datetime import datetime, timedelta
class CollectionItemTestCase(unittest.TestCase):
+
def setUp(self):
"Create a test database based on objects created in Django"
- User.objects.all().delete()
self.david = User.objects.create(username="david", level="user")
self.olivier = User.objects.create(username="olivier", level="admin")
- LocationType.objects.all().delete()
self.country = LocationType.objects.create(id="country", name="Country")
self.continent = LocationType.objects.create(id="continent", name="Continent")
self.city = LocationType.objects.create(id="city", name="City")
- Location.objects.all().delete()
self.paris = Location.objects.create(name="Paris", type="other", complete_type=self.city)
self.france = Location.objects.create(name="France", type="country", complete_type=self.country)
self.europe = Location.objects.create(name="Europe", type="continent", complete_type=self.continent)
LocationRelation.objects.create(location=self.paris, parent_location=self.france)
LocationRelation.objects.create(location=self.france, parent_location=self.europe)
- EthnicGroup.objects.all().delete()
self.a = EthnicGroup.objects.create(name="a")
self.b = EthnicGroup.objects.create(name="b")
self.c = EthnicGroup.objects.create(name="c")
self.d = EthnicGroup.objects.create(name="d")
- MediaCollection.objects.all().delete()
self.persepolis = MediaCollection(id=1, code="CNRSMH_E_1970_001_002", reference="A1", title="persepolis",
creator="Abraham LINCOLN", collector="Friedrich HEINZ", year_published=2009, is_published=True,
recorded_from_year=1970, recorded_to_year=1980)
self.nicolas.save_with_revision(self.olivier)
- MediaItem.objects.all().delete()
self.item_1 = MediaItem(id=1, collection=self.persepolis, code="CNRSMH_E_1970_001_002_44",
recorded_from_date="1971-01-12", recorded_to_date="1971-02-24", location=self.paris,
ethnic_group=self.a, title="item 1", author="Mickael SHEPHERD", collector="Charles PREMIER",
self.collections = MediaCollection.objects.all()
self.items = MediaItem.objects.all()
+ def tearDown(self):
+ User.objects.all().delete()
+ LocationType.objects.all().delete()
+ Location.objects.all().delete()
+ EthnicGroup.objects.all().delete()
+ MediaCollection.objects.all().delete()
+ MediaItem.objects.all().delete()
+
def testQuickSearchOnCollections(self):
"Test quick_search property of MediaCollection class"
result = self.collections.quick_search("persepolis")
self.assertEquals(self.volonte.get_countries(), [self.belgique, self.france])
+class RelatedDeleteTestCase(unittest.TestCase):
+ def setUp(self):
+ self.publisher1 = Publisher.objects.create(id=1, value='publisher1')
+ self.publisher2 = Publisher.objects.create(id=2, value='publisher2')
+ self.pubcollection1 = PublisherCollection.objects.create(publisher=self.publisher1, value='pub1_collection1')
+
+ self.rights1 = LegalRight.objects.create(id=1, value='right1')
+
+ MediaCollection.objects.create(id=1, code='CNRSMH_I_1256_456', title='Collection1',
+ publisher=self.publisher1, publisher_collection=self.pubcollection1,
+ legal_rights=self.rights1)
+ MediaCollection.objects.create(id=2, code='CNRSMH_I_1256_123', title='Collection2',
+ publisher=self.publisher2)
+
+ def tearDown(self):
+ Publisher.objects.all().delete()
+ PublisherCollection.objects.all().delete()
+ LegalRight.objects.all().delete()
+ MediaCollection.objects.all().delete()
+
+ def testOnDeleteSetNull(self):
+ self.rights1.delete()
+ self.assertEquals(LegalRight.objects.filter(id=1).count(), 0)
+ q = MediaCollection.objects.filter(id=1)
+ self.assertEquals(q.count(), 1)
+ self.assertEquals(q[0].legal_rights, None)
+
+ def testOnDeleteCascade(self):
+ self.publisher1.delete()
+ self.assertEquals(Publisher.objects.filter(id=1).count(), 0)
+ self.assertEquals(Publisher.objects.filter(id=2).count(), 1)
+ self.assertEquals(PublisherCollection.objects.filter(id=1).count(), 0)
+
+ q = MediaCollection.objects.filter(id=1)
+ self.assertEquals(q.count(), 1)
+ self.assertEquals(q[0].publisher, None)
+ self.assertEquals(q[0].publisher_collection, None)
+
+ q = MediaCollection.objects.filter(id=2)
+ self.assertEquals(q.count(), 1)
+ self.assertEquals(q[0].publisher, self.publisher2)
+ self.assertEquals(q[0].publisher_collection, None)
+
+ def testOnDeleteCascadeMultiple(self):
+ Publisher.objects.all().delete()
+ self.assertEquals(Publisher.objects.count(), 0)
+ self.assertEquals(PublisherCollection.objects.count(), 0)
+ self.assertEquals(MediaCollection.objects.count(), 2)