]> git.parisson.com Git - telemeta.git/commitdiff
emulate sql ON DELETE SET NULL using WeakForeignKey and enhanced model/manager/queryset
authorolivier <>
Mon, 25 Jan 2010 15:13:46 +0000 (15:13 +0000)
committerolivier <>
Mon, 25 Jan 2010 15:13:46 +0000 (15:13 +0000)
telemeta/models/core.py
telemeta/models/crem.py
telemeta/models/cremquery.py
telemeta/tests/model_tests.py

index a220409bfaef9df536c091da06227c2704334959..c634943e358320672cb60948c9c5c02ba2ac41f3 100644 (file)
@@ -38,7 +38,7 @@ from django.utils.translation import ugettext_lazy as _
 import re
 
 class Duration(object):
-
+    """Represent a time duration"""
     def __init__(self, *args, **kwargs):
         if len(args) and isinstance(args[0], datetime.timedelta):
             self._delta = datetime.timedelta(days=args[0].days, seconds=args[0].seconds)
@@ -94,6 +94,9 @@ class Duration(object):
             
 # The following is based on Django TimeField
 class DurationField(models.Field):
+    """Duration Django model field. Essentially the same as a TimeField, but
+    with values over 24h allowed."""
+
     description = _("Duration")
 
     __metaclass__ = models.SubfieldBase
@@ -140,4 +143,57 @@ class DurationField(models.Field):
         defaults = {'form_class': forms.TimeField}
         defaults.update(kwargs)
         return super(DurationField, self).formfield(**defaults)
-            
+           
+
+class WeakForeignKey(models.ForeignKey):
+    """A weak foreign key is the same as foreign key but without cascading
+    delete. Instead the reference is set to null when the referenced record
+    get deleted. This emulates the ON DELETE SET NULL sql behaviour.
+    
+    Warning: must be used in conjunction with EnhancedQuerySet, EnhancedManager,
+    and EnhancedModel
+    """
+    def __init__(self, to, **kwargs):
+        super(WeakForeignKey, self).__init__(to, **kwargs)
+       
+class EnhancedQuerySet(models.query.QuerySet):
+    """QuerySet with added functionalities such as WeakForeignKey handling"""
+
+    def delete(self):
+        CHUNK=1024
+        objects = self.model._meta.get_all_related_objects()
+        ii = self.count()
+        for related in objects:
+            i = 0
+            while i < ii:
+                ids = [v[0] for v in self[i:i + CHUNK].values_list('pk')]
+                filter = {related.field.name + '__pk__in': ids}
+                q = related.model.objects.filter(**filter)
+                if isinstance(related.field, WeakForeignKey):
+                    update = {related.field.name: None}
+                    q.update(**update)
+                else:
+                    q.delete()
+
+                i += CHUNK
+        
+        super(EnhancedQuerySet, self).delete()
+             
+class EnhancedManager(models.Manager):
+    """Manager which is bound to EnhancedQuerySet"""
+    def get_query_set(self):
+        return EnhancedQuerySet(self.model)
+    
+
+class EnhancedModel(models.Model):
+    """Base model class with added functionality. See EnhancedQuerySet"""
+
+    objects = EnhancedManager()
+
+    def delete(self):
+        if not self.pk:
+            raise Exception("Can't delete without a primary key")
+        self.__class__.objects.filter(pk=self.pk).delete()            
+
+    class Meta:
+        abstract = True
index 2c98ffd804023e1c8844c19da2b0600eddc35c6e..0db0975de8fa9d2efdf0a51a77693331cc3d02bd 100755 (executable)
@@ -39,10 +39,10 @@ import cremquery as query
 from xml.dom.minidom import getDOMImplementation
 from telemeta.util.unaccent import unaccent_icmp
 import re
-from telemeta.models.core import DurationField, Duration
+from telemeta.models.core import DurationField, Duration, WeakForeignKey, EnhancedModel
 from telemeta.models import dublincore as dc
 
-class ModelCore(models.Model):
+class ModelCore(EnhancedModel):
 
     @classmethod
     def required_fields(cls):
@@ -133,42 +133,33 @@ class MediaCollection(MediaResource):
 
     reference             = models.CharField(unique=True, max_length=250,
                                              null=True)
-    physical_format       = models.ForeignKey('PhysicalFormat', related_name="collections", 
-                                              null=True)
+    physical_format       = WeakForeignKey('PhysicalFormat', related_name="collections", null=True)
     old_code              = models.CharField(unique=True, max_length=250, null=True)
     code                  = models.CharField(unique=True, max_length=250)
     title                 = models.CharField(max_length=250)
     alt_title             = models.CharField(max_length=250, default="")
     physical_items_num    = models.IntegerField(default=0)
-    publishing_status     = models.ForeignKey('PublishingStatus', related_name="collections",
-                                              null=True)
+    publishing_status     = WeakForeignKey('PublishingStatus', related_name="collections", null=True)
     creator               = models.CharField(max_length=250, default="")
     booklet_author        = models.CharField(max_length=250, default="")
     booklet_description   = models.TextField(default="")
     collector             = models.CharField(max_length=250, default="")
     collector_is_creator  = models.BooleanField(default="")
-    publisher             = models.ForeignKey('Publisher', related_name="collections",
-                                              null=True)     
+    publisher             = WeakForeignKey('Publisher', related_name="collections", null=True)     
     is_published          = models.BooleanField(default="")
     year_published        = models.IntegerField(default=0)
-    publisher_collection  = models.ForeignKey('PublisherCollection', related_name="collections",
-                                              null=True)
+    publisher_collection  = WeakForeignKey('PublisherCollection', related_name="collections", null=True)
     publisher_serial      = models.CharField(max_length=250, default="")
     external_references   = models.TextField(default="")
-    acquisition_mode      = models.ForeignKey('AcquisitionMode', related_name="collections",
-                                              null=True)
+    acquisition_mode      = WeakForeignKey('AcquisitionMode', related_name="collections", null=True)
     comment               = models.TextField(default="")
-    metadata_author       = models.ForeignKey('MetadataAuthor', related_name="collections",
-                                              null=True)
-    metadata_writer       = models.ForeignKey('MetadataWriter', related_name="collections",
-                                              null=True)
-    legal_rights          = models.ForeignKey('LegalRight', related_name="collections",
-                                              null=True)
+    metadata_author       = WeakForeignKey('MetadataAuthor', related_name="collections", null=True)
+    metadata_writer       = WeakForeignKey('MetadataWriter', related_name="collections", null=True)
+    legal_rights          = WeakForeignKey('LegalRight', related_name="collections", null=True)
     alt_ids               = models.CharField(max_length=250, default="")
     recorded_from_year    = models.IntegerField(default=0)
     recorded_to_year      = models.IntegerField(default=0)
-    recording_context     = models.ForeignKey('RecordingContext', related_name="collections",
-                                              null=True)
+    recording_context     = WeakForeignKey('RecordingContext', related_name="collections", null=True)
     approx_duration       = DurationField(default='00:00')
     doctype_code          = models.IntegerField(default=0)
     travail               = models.CharField(max_length=250, default="")
@@ -176,8 +167,7 @@ class MediaCollection(MediaResource):
     cnrs_contributor      = models.CharField(max_length=250, default="")
     items_done            = models.CharField(max_length=250, default="")
     a_informer_07_03      = models.CharField(max_length=250, default="")
-    ad_conversion         = models.ForeignKey('AdConversion', related_name='collections',
-                                              null=True)
+    ad_conversion         = WeakForeignKey('AdConversion', related_name='collections', null=True)
     public_access         = models.CharField(choices=PUBLIC_ACCESS_CHOICES, max_length=16, default="metadata")
 
     objects               = query.MediaCollectionManager()
@@ -256,24 +246,22 @@ class MediaItem(MediaResource):
     approx_duration       = DurationField(default='00:00')
     recorded_from_date    = models.DateField(default=0)
     recorded_to_date      = models.DateField(default=0)
-    location              = models.ForeignKey('Location', related_name="items",
-                                              db_column='location_name', null=True, default="")
+    location              = WeakForeignKey('Location', related_name="items",
+                                           db_column='location_name', null=True)
     location_comment      = models.CharField(max_length=250, default="")
-    ethnic_group          = models.ForeignKey('EthnicGroup', related_name="items",
-                                              null=True)
+    ethnic_group          = WeakForeignKey('EthnicGroup', related_name="items", null=True)
     title                 = models.CharField(max_length=250)
     alt_title             = models.CharField(max_length=250, default="")
     author                = models.CharField(max_length=250, default="")
-    vernacular_style      = models.ForeignKey('VernacularStyle', related_name="items",
+    vernacular_style      = WeakForeignKey('VernacularStyle', related_name="items",
                                               null=True)
     context_comment       = models.TextField(default="")
     external_references   = models.TextField(default="")
     moda_execut           = models.CharField(max_length=250, default="")
-    copied_from_item      = models.ForeignKey('self', related_name="copies",
-                                              null=True)
+    copied_from_item      = WeakForeignKey('self', related_name="copies", null=True)
     collector             = models.CharField(max_length=250, default="")
     cultural_area         = models.CharField(max_length=250, default="")
-    generic_style         = models.ForeignKey('GenericStyle', related_name="items",
+    generic_style         = WeakForeignKey('GenericStyle', related_name="items",
                                               null=True)
     collector_selection   = models.CharField(max_length=250, default="")
     creator_reference     = models.CharField(max_length=250, default="")
@@ -456,10 +444,8 @@ class InstrumentAliasRelation(ModelCore):
 class MediaItemPerformance(ModelCore):
     "Item performance"
     media_item      = models.ForeignKey('MediaItem', related_name="performances")
-    instrument      = models.ForeignKey('Instrument', related_name="performances",
-                                        null=True)
-    alias           = models.ForeignKey('InstrumentAlias', related_name="performances",
-                                        null=True)
+    instrument      = WeakForeignKey('Instrument', related_name="performances", null=True)
+    alias           = WeakForeignKey('InstrumentAlias', related_name="performances", null=True)
     instruments_num = models.CharField(max_length=250, default="")
     musicians       = models.CharField(max_length=250, default="")
 
@@ -512,8 +498,8 @@ class Location(ModelCore):
     name             = models.CharField(primary_key=True, max_length=150)
     type             = models.CharField(choices=TYPE_CHOICES, max_length=16)
     complete_type    = models.ForeignKey('LocationType', related_name="types")
-    current_name     = models.ForeignKey('self', related_name="past_names", 
-                                         db_column="current_name", null=True) 
+    current_name     = WeakForeignKey('self', related_name="past_names", 
+                                      db_column="current_name", null=True) 
     is_authoritative = models.BooleanField(default=0)
 
     def parent(self):
index c46a309a347ca6f19e89511a76dbe936260c2522..8c3477bf42c224e7e7a5e885596d7cafadc7ea11 100644 (file)
 #          David LIPSZYC <davidlipszyc@gmail.com>
 
 from django.db.models import Manager, Q
-from django.db.models.query import QuerySet
+from telemeta.models.core import EnhancedQuerySet, EnhancedManager
 import re
 
-class CoreQuerySet(QuerySet):
+class CoreQuerySet(EnhancedQuerySet):
     "Base class for all query sets"
 
     def none(self): # redundant with none() in recent Django svn
@@ -67,7 +67,7 @@ class CoreQuerySet(QuerySet):
         return self.extra(
             where = ["id IN (SELECT DISTINCT element_id FROM revisions WHERE %s)" % " AND ".join(where)]);
 
-class CoreManager(Manager):
+class CoreManager(EnhancedManager):
     "Base class for all models managers"
 
     def none(self, *args, **kwargs):
index 45edeeb49b686ef971011310b5ce711ac129dfdb..db45bae25734cb0af981ca2984d1f70be9294d90 100644 (file)
@@ -38,19 +38,17 @@ from telemeta.models import *
 from datetime import datetime, timedelta
 
 class CollectionItemTestCase(unittest.TestCase):
+
     def setUp(self):
         "Create a test database based on objects created in Django"
    
-        User.objects.all().delete() 
         self.david   = User.objects.create(username="david", level="user")
         self.olivier = User.objects.create(username="olivier", level="admin")    
 
-        LocationType.objects.all().delete()
         self.country = LocationType.objects.create(id="country", name="Country")
         self.continent = LocationType.objects.create(id="continent", name="Continent")
         self.city = LocationType.objects.create(id="city", name="City")
 
-        Location.objects.all().delete()        
         self.paris = Location.objects.create(name="Paris", type="other", complete_type=self.city)
         self.france = Location.objects.create(name="France", type="country", complete_type=self.country)
         self.europe = Location.objects.create(name="Europe", type="continent", complete_type=self.continent)
@@ -59,13 +57,11 @@ class CollectionItemTestCase(unittest.TestCase):
         LocationRelation.objects.create(location=self.paris, parent_location=self.france)
         LocationRelation.objects.create(location=self.france, parent_location=self.europe)
 
-        EthnicGroup.objects.all().delete()
         self.a = EthnicGroup.objects.create(name="a")
         self.b = EthnicGroup.objects.create(name="b")
         self.c = EthnicGroup.objects.create(name="c")
         self.d = EthnicGroup.objects.create(name="d")
 
-        MediaCollection.objects.all().delete()
         self.persepolis = MediaCollection(id=1, code="CNRSMH_E_1970_001_002", reference="A1", title="persepolis", 
             creator="Abraham LINCOLN", collector="Friedrich HEINZ", year_published=2009, is_published=True,
             recorded_from_year=1970, recorded_to_year=1980)
@@ -84,7 +80,6 @@ class CollectionItemTestCase(unittest.TestCase):
                                    
         self.nicolas.save_with_revision(self.olivier)
      
-        MediaItem.objects.all().delete()        
         self.item_1 = MediaItem(id=1, collection=self.persepolis, code="CNRSMH_E_1970_001_002_44", 
             recorded_from_date="1971-01-12", recorded_to_date="1971-02-24", location=self.paris, 
             ethnic_group=self.a, title="item 1", author="Mickael SHEPHERD", collector="Charles PREMIER",  
@@ -130,6 +125,14 @@ class CollectionItemTestCase(unittest.TestCase):
         self.collections = MediaCollection.objects.all()
         self.items       = MediaItem.objects.all()
 
+    def tearDown(self):
+        User.objects.all().delete() 
+        LocationType.objects.all().delete()
+        Location.objects.all().delete()        
+        EthnicGroup.objects.all().delete()
+        MediaCollection.objects.all().delete()
+        MediaItem.objects.all().delete()        
+
     def testQuickSearchOnCollections(self):
         "Test quick_search property of MediaCollection class"
         result = self.collections.quick_search("persepolis")
@@ -276,4 +279,52 @@ class CollectionItemTestCase(unittest.TestCase):
         self.assertEquals(self.volonte.get_countries(), [self.belgique, self.france])
 
         
+class RelatedDeleteTestCase(unittest.TestCase):
+    def setUp(self):
+        self.publisher1 = Publisher.objects.create(id=1, value='publisher1')
+        self.publisher2 = Publisher.objects.create(id=2, value='publisher2')
+        self.pubcollection1 = PublisherCollection.objects.create(publisher=self.publisher1, value='pub1_collection1')
+   
+        self.rights1 = LegalRight.objects.create(id=1, value='right1')
+
+        MediaCollection.objects.create(id=1, code='CNRSMH_I_1256_456', title='Collection1',
+                                       publisher=self.publisher1, publisher_collection=self.pubcollection1,
+                                       legal_rights=self.rights1)
+        MediaCollection.objects.create(id=2, code='CNRSMH_I_1256_123', title='Collection2',
+                                       publisher=self.publisher2)
+
+    def tearDown(self):
+        Publisher.objects.all().delete()
+        PublisherCollection.objects.all().delete()
+        LegalRight.objects.all().delete()
+        MediaCollection.objects.all().delete()
+
+    def testOnDeleteSetNull(self):
+        self.rights1.delete()
+        self.assertEquals(LegalRight.objects.filter(id=1).count(), 0)
+        q = MediaCollection.objects.filter(id=1)
+        self.assertEquals(q.count(), 1)
+        self.assertEquals(q[0].legal_rights, None)
+
+    def testOnDeleteCascade(self):
+        self.publisher1.delete()
+        self.assertEquals(Publisher.objects.filter(id=1).count(), 0)
+        self.assertEquals(Publisher.objects.filter(id=2).count(), 1)
+        self.assertEquals(PublisherCollection.objects.filter(id=1).count(), 0)
+
+        q = MediaCollection.objects.filter(id=1)
+        self.assertEquals(q.count(), 1)
+        self.assertEquals(q[0].publisher, None)
+        self.assertEquals(q[0].publisher_collection, None)
+
+        q = MediaCollection.objects.filter(id=2)
+        self.assertEquals(q.count(), 1)
+        self.assertEquals(q[0].publisher, self.publisher2)
+        self.assertEquals(q[0].publisher_collection, None)
+
+    def testOnDeleteCascadeMultiple(self):
+        Publisher.objects.all().delete()
+        self.assertEquals(Publisher.objects.count(), 0)
+        self.assertEquals(PublisherCollection.objects.count(), 0)
+        self.assertEquals(MediaCollection.objects.count(), 2)