From ee63244073e19244714327d1adce57dc096b3d3c Mon Sep 17 00:00:00 2001 From: olivier <> Date: Mon, 25 Jan 2010 15:13:46 +0000 Subject: [PATCH] emulate sql ON DELETE SET NULL using WeakForeignKey and enhanced model/manager/queryset --- telemeta/models/core.py | 60 +++++++++++++++++++++++++++++++-- telemeta/models/crem.py | 58 ++++++++++++-------------------- telemeta/models/cremquery.py | 6 ++-- telemeta/tests/model_tests.py | 63 +++++++++++++++++++++++++++++++---- 4 files changed, 140 insertions(+), 47 deletions(-) diff --git a/telemeta/models/core.py b/telemeta/models/core.py index a220409b..c634943e 100644 --- a/telemeta/models/core.py +++ b/telemeta/models/core.py @@ -38,7 +38,7 @@ from django.utils.translation import ugettext_lazy as _ import re class Duration(object): - + """Represent a time duration""" def __init__(self, *args, **kwargs): if len(args) and isinstance(args[0], datetime.timedelta): self._delta = datetime.timedelta(days=args[0].days, seconds=args[0].seconds) @@ -94,6 +94,9 @@ class Duration(object): # The following is based on Django TimeField class DurationField(models.Field): + """Duration Django model field. Essentially the same as a TimeField, but + with values over 24h allowed.""" + description = _("Duration") __metaclass__ = models.SubfieldBase @@ -140,4 +143,57 @@ class DurationField(models.Field): defaults = {'form_class': forms.TimeField} defaults.update(kwargs) return super(DurationField, self).formfield(**defaults) - + + +class WeakForeignKey(models.ForeignKey): + """A weak foreign key is the same as foreign key but without cascading + delete. Instead the reference is set to null when the referenced record + get deleted. This emulates the ON DELETE SET NULL sql behaviour. + + Warning: must be used in conjunction with EnhancedQuerySet, EnhancedManager, + and EnhancedModel + """ + def __init__(self, to, **kwargs): + super(WeakForeignKey, self).__init__(to, **kwargs) + +class EnhancedQuerySet(models.query.QuerySet): + """QuerySet with added functionalities such as WeakForeignKey handling""" + + def delete(self): + CHUNK=1024 + objects = self.model._meta.get_all_related_objects() + ii = self.count() + for related in objects: + i = 0 + while i < ii: + ids = [v[0] for v in self[i:i + CHUNK].values_list('pk')] + filter = {related.field.name + '__pk__in': ids} + q = related.model.objects.filter(**filter) + if isinstance(related.field, WeakForeignKey): + update = {related.field.name: None} + q.update(**update) + else: + q.delete() + + i += CHUNK + + super(EnhancedQuerySet, self).delete() + +class EnhancedManager(models.Manager): + """Manager which is bound to EnhancedQuerySet""" + def get_query_set(self): + return EnhancedQuerySet(self.model) + + +class EnhancedModel(models.Model): + """Base model class with added functionality. See EnhancedQuerySet""" + + objects = EnhancedManager() + + def delete(self): + if not self.pk: + raise Exception("Can't delete without a primary key") + self.__class__.objects.filter(pk=self.pk).delete() + + class Meta: + abstract = True diff --git a/telemeta/models/crem.py b/telemeta/models/crem.py index 2c98ffd8..0db0975d 100755 --- a/telemeta/models/crem.py +++ b/telemeta/models/crem.py @@ -39,10 +39,10 @@ import cremquery as query from xml.dom.minidom import getDOMImplementation from telemeta.util.unaccent import unaccent_icmp import re -from telemeta.models.core import DurationField, Duration +from telemeta.models.core import DurationField, Duration, WeakForeignKey, EnhancedModel from telemeta.models import dublincore as dc -class ModelCore(models.Model): +class ModelCore(EnhancedModel): @classmethod def required_fields(cls): @@ -133,42 +133,33 @@ class MediaCollection(MediaResource): reference = models.CharField(unique=True, max_length=250, null=True) - physical_format = models.ForeignKey('PhysicalFormat', related_name="collections", - null=True) + physical_format = WeakForeignKey('PhysicalFormat', related_name="collections", null=True) old_code = models.CharField(unique=True, max_length=250, null=True) code = models.CharField(unique=True, max_length=250) title = models.CharField(max_length=250) alt_title = models.CharField(max_length=250, default="") physical_items_num = models.IntegerField(default=0) - publishing_status = models.ForeignKey('PublishingStatus', related_name="collections", - null=True) + publishing_status = WeakForeignKey('PublishingStatus', related_name="collections", null=True) creator = models.CharField(max_length=250, default="") booklet_author = models.CharField(max_length=250, default="") booklet_description = models.TextField(default="") collector = models.CharField(max_length=250, default="") collector_is_creator = models.BooleanField(default="") - publisher = models.ForeignKey('Publisher', related_name="collections", - null=True) + publisher = WeakForeignKey('Publisher', related_name="collections", null=True) is_published = models.BooleanField(default="") year_published = models.IntegerField(default=0) - publisher_collection = models.ForeignKey('PublisherCollection', related_name="collections", - null=True) + publisher_collection = WeakForeignKey('PublisherCollection', related_name="collections", null=True) publisher_serial = models.CharField(max_length=250, default="") external_references = models.TextField(default="") - acquisition_mode = models.ForeignKey('AcquisitionMode', related_name="collections", - null=True) + acquisition_mode = WeakForeignKey('AcquisitionMode', related_name="collections", null=True) comment = models.TextField(default="") - metadata_author = models.ForeignKey('MetadataAuthor', related_name="collections", - null=True) - metadata_writer = models.ForeignKey('MetadataWriter', related_name="collections", - null=True) - legal_rights = models.ForeignKey('LegalRight', related_name="collections", - null=True) + metadata_author = WeakForeignKey('MetadataAuthor', related_name="collections", null=True) + metadata_writer = WeakForeignKey('MetadataWriter', related_name="collections", null=True) + legal_rights = WeakForeignKey('LegalRight', related_name="collections", null=True) alt_ids = models.CharField(max_length=250, default="") recorded_from_year = models.IntegerField(default=0) recorded_to_year = models.IntegerField(default=0) - recording_context = models.ForeignKey('RecordingContext', related_name="collections", - null=True) + recording_context = WeakForeignKey('RecordingContext', related_name="collections", null=True) approx_duration = DurationField(default='00:00') doctype_code = models.IntegerField(default=0) travail = models.CharField(max_length=250, default="") @@ -176,8 +167,7 @@ class MediaCollection(MediaResource): cnrs_contributor = models.CharField(max_length=250, default="") items_done = models.CharField(max_length=250, default="") a_informer_07_03 = models.CharField(max_length=250, default="") - ad_conversion = models.ForeignKey('AdConversion', related_name='collections', - null=True) + ad_conversion = WeakForeignKey('AdConversion', related_name='collections', null=True) public_access = models.CharField(choices=PUBLIC_ACCESS_CHOICES, max_length=16, default="metadata") objects = query.MediaCollectionManager() @@ -256,24 +246,22 @@ class MediaItem(MediaResource): approx_duration = DurationField(default='00:00') recorded_from_date = models.DateField(default=0) recorded_to_date = models.DateField(default=0) - location = models.ForeignKey('Location', related_name="items", - db_column='location_name', null=True, default="") + location = WeakForeignKey('Location', related_name="items", + db_column='location_name', null=True) location_comment = models.CharField(max_length=250, default="") - ethnic_group = models.ForeignKey('EthnicGroup', related_name="items", - null=True) + ethnic_group = WeakForeignKey('EthnicGroup', related_name="items", null=True) title = models.CharField(max_length=250) alt_title = models.CharField(max_length=250, default="") author = models.CharField(max_length=250, default="") - vernacular_style = models.ForeignKey('VernacularStyle', related_name="items", + vernacular_style = WeakForeignKey('VernacularStyle', related_name="items", null=True) context_comment = models.TextField(default="") external_references = models.TextField(default="") moda_execut = models.CharField(max_length=250, default="") - copied_from_item = models.ForeignKey('self', related_name="copies", - null=True) + copied_from_item = WeakForeignKey('self', related_name="copies", null=True) collector = models.CharField(max_length=250, default="") cultural_area = models.CharField(max_length=250, default="") - generic_style = models.ForeignKey('GenericStyle', related_name="items", + generic_style = WeakForeignKey('GenericStyle', related_name="items", null=True) collector_selection = models.CharField(max_length=250, default="") creator_reference = models.CharField(max_length=250, default="") @@ -456,10 +444,8 @@ class InstrumentAliasRelation(ModelCore): class MediaItemPerformance(ModelCore): "Item performance" media_item = models.ForeignKey('MediaItem', related_name="performances") - instrument = models.ForeignKey('Instrument', related_name="performances", - null=True) - alias = models.ForeignKey('InstrumentAlias', related_name="performances", - null=True) + instrument = WeakForeignKey('Instrument', related_name="performances", null=True) + alias = WeakForeignKey('InstrumentAlias', related_name="performances", null=True) instruments_num = models.CharField(max_length=250, default="") musicians = models.CharField(max_length=250, default="") @@ -512,8 +498,8 @@ class Location(ModelCore): name = models.CharField(primary_key=True, max_length=150) type = models.CharField(choices=TYPE_CHOICES, max_length=16) complete_type = models.ForeignKey('LocationType', related_name="types") - current_name = models.ForeignKey('self', related_name="past_names", - db_column="current_name", null=True) + current_name = WeakForeignKey('self', related_name="past_names", + db_column="current_name", null=True) is_authoritative = models.BooleanField(default=0) def parent(self): diff --git a/telemeta/models/cremquery.py b/telemeta/models/cremquery.py index c46a309a..8c3477bf 100644 --- a/telemeta/models/cremquery.py +++ b/telemeta/models/cremquery.py @@ -34,10 +34,10 @@ # David LIPSZYC from django.db.models import Manager, Q -from django.db.models.query import QuerySet +from telemeta.models.core import EnhancedQuerySet, EnhancedManager import re -class CoreQuerySet(QuerySet): +class CoreQuerySet(EnhancedQuerySet): "Base class for all query sets" def none(self): # redundant with none() in recent Django svn @@ -67,7 +67,7 @@ class CoreQuerySet(QuerySet): return self.extra( where = ["id IN (SELECT DISTINCT element_id FROM revisions WHERE %s)" % " AND ".join(where)]); -class CoreManager(Manager): +class CoreManager(EnhancedManager): "Base class for all models managers" def none(self, *args, **kwargs): diff --git a/telemeta/tests/model_tests.py b/telemeta/tests/model_tests.py index 45edeeb4..db45bae2 100644 --- a/telemeta/tests/model_tests.py +++ b/telemeta/tests/model_tests.py @@ -38,19 +38,17 @@ from telemeta.models import * from datetime import datetime, timedelta class CollectionItemTestCase(unittest.TestCase): + def setUp(self): "Create a test database based on objects created in Django" - User.objects.all().delete() self.david = User.objects.create(username="david", level="user") self.olivier = User.objects.create(username="olivier", level="admin") - LocationType.objects.all().delete() self.country = LocationType.objects.create(id="country", name="Country") self.continent = LocationType.objects.create(id="continent", name="Continent") self.city = LocationType.objects.create(id="city", name="City") - Location.objects.all().delete() self.paris = Location.objects.create(name="Paris", type="other", complete_type=self.city) self.france = Location.objects.create(name="France", type="country", complete_type=self.country) self.europe = Location.objects.create(name="Europe", type="continent", complete_type=self.continent) @@ -59,13 +57,11 @@ class CollectionItemTestCase(unittest.TestCase): LocationRelation.objects.create(location=self.paris, parent_location=self.france) LocationRelation.objects.create(location=self.france, parent_location=self.europe) - EthnicGroup.objects.all().delete() self.a = EthnicGroup.objects.create(name="a") self.b = EthnicGroup.objects.create(name="b") self.c = EthnicGroup.objects.create(name="c") self.d = EthnicGroup.objects.create(name="d") - MediaCollection.objects.all().delete() self.persepolis = MediaCollection(id=1, code="CNRSMH_E_1970_001_002", reference="A1", title="persepolis", creator="Abraham LINCOLN", collector="Friedrich HEINZ", year_published=2009, is_published=True, recorded_from_year=1970, recorded_to_year=1980) @@ -84,7 +80,6 @@ class CollectionItemTestCase(unittest.TestCase): self.nicolas.save_with_revision(self.olivier) - MediaItem.objects.all().delete() self.item_1 = MediaItem(id=1, collection=self.persepolis, code="CNRSMH_E_1970_001_002_44", recorded_from_date="1971-01-12", recorded_to_date="1971-02-24", location=self.paris, ethnic_group=self.a, title="item 1", author="Mickael SHEPHERD", collector="Charles PREMIER", @@ -130,6 +125,14 @@ class CollectionItemTestCase(unittest.TestCase): self.collections = MediaCollection.objects.all() self.items = MediaItem.objects.all() + def tearDown(self): + User.objects.all().delete() + LocationType.objects.all().delete() + Location.objects.all().delete() + EthnicGroup.objects.all().delete() + MediaCollection.objects.all().delete() + MediaItem.objects.all().delete() + def testQuickSearchOnCollections(self): "Test quick_search property of MediaCollection class" result = self.collections.quick_search("persepolis") @@ -276,4 +279,52 @@ class CollectionItemTestCase(unittest.TestCase): self.assertEquals(self.volonte.get_countries(), [self.belgique, self.france]) +class RelatedDeleteTestCase(unittest.TestCase): + def setUp(self): + self.publisher1 = Publisher.objects.create(id=1, value='publisher1') + self.publisher2 = Publisher.objects.create(id=2, value='publisher2') + self.pubcollection1 = PublisherCollection.objects.create(publisher=self.publisher1, value='pub1_collection1') + + self.rights1 = LegalRight.objects.create(id=1, value='right1') + + MediaCollection.objects.create(id=1, code='CNRSMH_I_1256_456', title='Collection1', + publisher=self.publisher1, publisher_collection=self.pubcollection1, + legal_rights=self.rights1) + MediaCollection.objects.create(id=2, code='CNRSMH_I_1256_123', title='Collection2', + publisher=self.publisher2) + + def tearDown(self): + Publisher.objects.all().delete() + PublisherCollection.objects.all().delete() + LegalRight.objects.all().delete() + MediaCollection.objects.all().delete() + + def testOnDeleteSetNull(self): + self.rights1.delete() + self.assertEquals(LegalRight.objects.filter(id=1).count(), 0) + q = MediaCollection.objects.filter(id=1) + self.assertEquals(q.count(), 1) + self.assertEquals(q[0].legal_rights, None) + + def testOnDeleteCascade(self): + self.publisher1.delete() + self.assertEquals(Publisher.objects.filter(id=1).count(), 0) + self.assertEquals(Publisher.objects.filter(id=2).count(), 1) + self.assertEquals(PublisherCollection.objects.filter(id=1).count(), 0) + + q = MediaCollection.objects.filter(id=1) + self.assertEquals(q.count(), 1) + self.assertEquals(q[0].publisher, None) + self.assertEquals(q[0].publisher_collection, None) + + q = MediaCollection.objects.filter(id=2) + self.assertEquals(q.count(), 1) + self.assertEquals(q[0].publisher, self.publisher2) + self.assertEquals(q[0].publisher_collection, None) + + def testOnDeleteCascadeMultiple(self): + Publisher.objects.all().delete() + self.assertEquals(Publisher.objects.count(), 0) + self.assertEquals(PublisherCollection.objects.count(), 0) + self.assertEquals(MediaCollection.objects.count(), 2) -- 2.39.5