]> git.parisson.com Git - timeside.git/commitdiff
Atomize tasks by (experience, item), fix worker start, cleanup composition
authorGuillaume Pellerin <yomguy@parisson.com>
Fri, 20 Mar 2015 16:04:24 +0000 (17:04 +0100)
committerGuillaume Pellerin <yomguy@parisson.com>
Fri, 20 Mar 2015 16:04:24 +0000 (17:04 +0100)
docker-compose.yml
examples/deploy/celery_app.sh
examples/deploy/start_app.sh
timeside/server/management/commands/timeside-create-boilerplate.py [new file with mode: 0644]
timeside/server/management/commands/timeside-tests-boilerplate.py [deleted file]
timeside/server/models.py
timeside/server/tasks.py

index b051292566657c33764f95cd9345bb98a839a645..7f9482b49328cf3dd1fbad01392d6a2c89a3a817 100644 (file)
@@ -36,14 +36,6 @@ log:
     - /var/log/uwsgi
   command: /bin/true
 
-lib:
-  image: debian:wheezy
-  volumes:
-    - /var/lib/rabbitmq
-    - /var/lib/postgres
-    - /var/lib/mysql
-  command: /bin/true
-
 home:
   image: debian:wheezy
   volumes:
@@ -53,7 +45,6 @@ home:
 db:
   image: mysql
   volumes_from:
-    - lib
     - log
   environment:
     - MYSQL_ROOT_PASSWORD=mysecretpassword
@@ -89,11 +80,12 @@ worker:
   build: .
   volumes_from:
     - app
+    - home
+    - log
   command: /bin/sh /opt/TimeSide/examples/deploy/celery_app.sh
   links:
     - rabbitmq
     - db
-    - app
 
 nginx:
   image: nginx
index 8be32e159ee63cb2263815f26c8f3c5a8e83c7d1..1ff1f1090a130a5f398997198285365df13e1021 100644 (file)
@@ -1,11 +1,11 @@
 #!/bin/sh
 
 # paths
-app='/opt/TimeSide/'
-sandbox='/home/sandbox/'
-manage=$sandbox'manage.py'
+app_dir='/opt/TimeSide'
+sandbox='/home/sandbox'
+manage=$sandbox'/manage.py'
 
-sh $app/examples/deploy/wait.sh
+sh $app_dir'/examples/deploy/wait.sh'
 
 # Starting celery worker with the --autoreload option will enable the worker to watch for file system changes
 # This is an experimental feature intended for use in development only
index 3b6528be7446216b7d86e243cc7d78a3a85e9520..f0b6d47517a6383b2107d57e8c07089787f3a292 100644 (file)
@@ -1,19 +1,20 @@
 #!/bin/sh
 
 # paths
-app='/opt/TimeSide/'
-static=$app'timeside/player/static/'
-sandbox='/home/sandbox/'
-manage=$sandbox'manage.py'
-wsgi=$sandbox'wsgi.py'
+app_dir='/opt/TimeSide'
+static=$app_dir'/timeside/player/static/'
+sandbox='/home/sandbox'
+manage=$sandbox'/manage.py'
+wsgi=$sandbox'/wsgi.py'
 
-sh $app/examples/deploy/wait.sh
+sh $app_dir/examples/deploy/wait.sh
 
 # django init
 python $manage syncdb --noinput
 python $manage migrate --noinput
 python $manage collectstatic --noinput
 python $manage timeside-create-admin-user
+python $manage timeside-create-boilerplate
 
 # static files auto update
 watchmedo shell-command --patterns="*.js;*.css" --recursive \
diff --git a/timeside/server/management/commands/timeside-create-boilerplate.py b/timeside/server/management/commands/timeside-create-boilerplate.py
new file mode 100644 (file)
index 0000000..b919cdf
--- /dev/null
@@ -0,0 +1,62 @@
+from optparse import make_option
+from django.conf import settings
+from django.core.management.base import BaseCommand, CommandError
+from django.contrib.auth.models import User
+from django.template.defaultfilters import slugify
+
+import os
+import timeside.core
+from timeside.server.models import *
+from timeside.core.tools.test_samples import generateSamples
+
+
+class Command(BaseCommand):
+    help = "Setup and run a boilerplate for testing"
+    cleanup =  True
+
+    def processor_cleanup(self):
+        for processor in Processor.objects.all():
+            processor.delete()
+
+    def result_cleanup(self):
+        for result in Result.objects.all():
+            result.delete()
+
+    def handle(self, *args, **options):
+        selection, c = Selection.objects.get_or_create(title='Tests')
+        if c:
+            presets = []
+            blacklist =['decoder', 'live', 'gain', 'vamp']
+            processors = timeside.core.processor.processors(timeside.core.api.IProcessor)
+            for proc in processors:
+                trig = True
+                for black in blacklist:
+                    if black in proc.id():
+                        trig = False
+                if trig:
+                    processor, c = Processor.objects.get_or_create(pid=proc.id())
+                    preset, c = Preset.objects.get_or_create(processor=processor, parameters='{}')
+                    presets.append(preset)
+
+            media_dir = 'items' + os.sep + 'tests'
+            samples_dir = settings.MEDIA_ROOT + media_dir
+            samples = generateSamples(samples_dir=samples_dir)
+
+            for sample in samples.iteritems():
+                filename, path = sample
+                title = os.path.splitext(filename)[0]
+                path = media_dir + os.sep + filename
+                item, c = Item.objects.get_or_create(title=title, file=path)
+                if not item in selection.items.all():
+                    selection.items.add(item)
+                if self.cleanup:
+                    for result in item.results.all():
+                        result.delete()
+
+            experience, c = Experience.objects.get_or_create(title='All')
+            for preset in presets:
+                if not preset in experience.presets.all():
+                    experience.presets.add(preset)
+
+            task = Task(experience=experience, selection=selection)
+            task.save()
diff --git a/timeside/server/management/commands/timeside-tests-boilerplate.py b/timeside/server/management/commands/timeside-tests-boilerplate.py
deleted file mode 100644 (file)
index fbb4f2a..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-from optparse import make_option
-from django.conf import settings
-from django.core.management.base import BaseCommand, CommandError
-from django.contrib.auth.models import User
-from django.template.defaultfilters import slugify
-
-import os
-import timeside.core
-from timeside.server.models import *
-from timeside.core.tools.test_samples import generateSamples
-
-
-class Command(BaseCommand):
-    help = "Setup and run a boilerplate for testing"
-    cleanup =  True
-
-    def processor_cleanup(self):
-        for processor in Processor.objects.all():
-            processor.delete()
-
-    def result_cleanup(self):
-        for result in Result.objects.all():
-            result.delete()
-
-    def handle(self, *args, **options):
-        presets = []
-        blacklist =['decoder', 'live', 'gain']
-        processors = timeside.core.processor.processors(timeside.core.api.IProcessor)
-        for proc in processors:
-            trig = True
-            for black in blacklist:
-                if black in proc.id():
-                    trig = False
-            if trig:
-                processor, c = Processor.objects.get_or_create(pid=proc.id())
-                preset, c = Preset.objects.get_or_create(processor=processor, parameters='{}')
-                presets.append(preset)
-
-        media_dir = 'items' + os.sep + 'tests'
-        samples_dir = settings.MEDIA_ROOT + media_dir
-        samples = generateSamples(samples_dir=samples_dir)
-        selection, c = Selection.objects.get_or_create(title='Tests')
-
-        for sample in samples.iteritems():
-            filename, path = sample
-            title = os.path.splitext(filename)[0]
-            path = media_dir + os.sep + filename
-            item, c = Item.objects.get_or_create(title=title, file=path)
-            if not item in selection.items.all():
-                selection.items.add(item)
-            if self.cleanup:
-                for result in item.results.all():
-                    result.delete()
-
-        experience, c = Experience.objects.get_or_create(title='All')
-        for preset in presets:
-            if not preset in experience.presets.all():
-                experience.presets.add(preset)
-
-        task = Task(experience=experience, selection=selection)
-        task.status_setter(2)
index 36226f382989927ee4389418a2ff872391a3a0ff..b6e585af030d7f288f7a0eb554354143b6db102c 100644 (file)
@@ -186,6 +186,79 @@ class Item(DocBaseResource):
     def get_results_path(self):
         return os.path.join(results_path, self.uuid)
 
+    def run(self, experience):
+        result_path = self.get_results_path()
+        if not os.path.exists(result_path):
+            os.makedirs(result_path)
+
+        pipe = timeside.plugins.decoder.file.FileDecoder(self.file.path,
+                                                 sha1=self.sha1)
+        presets = {}
+        for preset in experience.presets.all():
+            proc = get_processor(preset.processor.pid)
+            if proc.type == 'encoder':
+                result, c = Result.objects.get_or_create(preset=preset,
+                                                         item=self)
+                media_file = '.'.join([str(result.uuid),
+                                       proc.file_extension()])
+                result.file = os.path.join(result_path, media_file)
+                result.save()
+                proc = proc(result.file.path, overwrite=True,
+                            streaming=False)
+            else:
+                proc = proc()
+            if proc.type == 'analyzer':
+                proc.set_parameters(preset.parameters)
+            presets[preset] = proc
+            pipe = pipe | proc
+
+        # item.lock_setter(True)
+
+        if not self.hdf5:
+            hdf5_file = str(experience.uuid) + '.hdf5'
+            self.hdf5 = os.path.join(result_path, hdf5_file)
+            self.save()
+
+        pipe.run()
+
+        for preset, proc in presets.iteritems():
+            if proc.type == 'analyzer':
+                for result_id in proc.results.keys():
+                    parameters = proc.results[result_id].parameters
+                    preset, c = Preset.objects.get_or_create(
+                        processor=preset.processor,
+                        parameters=unicode(parameters))
+                    result, c = Result.objects.get_or_create(preset=preset,
+                                                             item=self)
+                    hdf5_file = str(result.uuid) + '.hdf5'
+                    result.hdf5 = os.path.join(result_path, hdf5_file)
+                    # while result.lock:
+                    #     time.sleep(3)
+                    # result.lock_setter(True)
+                    proc.results.to_hdf5(result.hdf5.path)
+                    # result.lock_setter(False)
+                    result.status_setter(_DONE)
+
+            elif proc.type == 'grapher':
+                parameters = {}
+                result, c = Result.objects.get_or_create(preset=preset,
+                                                         item=self)
+                image_file = str(result.uuid) + '.png'
+                result.file = os.path.join(result_path, image_file)
+                proc.render(output=result.file.path)
+                result.mime_type_setter(get_mime_type(result.file.path))
+                result.status_setter(_DONE)
+
+            elif proc.type == 'encoder':
+                result = Result.objects.get(preset=preset, item=self)
+                result.mime_type_setter(get_mime_type(result.file.path))
+                result.status_setter(_DONE)
+
+            del proc
+
+        del pipe
+        # item.lock_setter(False)
+
 
 class Experience(DocBaseResource):
 
@@ -263,6 +336,7 @@ class Result(BaseResource):
     author = models.ForeignKey(User, related_name="results",
                                verbose_name=_('author'), blank=True, null=True,
                                on_delete=models.SET_NULL)
+    # lock = models.BooleanField(default=False)
 
     class Meta(MetaCore):
         db_table = app + '_results'
@@ -277,6 +351,10 @@ class Result(BaseResource):
         self.mime_type = mime_type
         self.save()
 
+    def lock_setter(self, lock):
+        self.lock = lock
+        self.save()
+
     def __unicode__(self):
         return '_'.join([self.item.title, unicode(self.preset.processor)])
 
@@ -306,101 +384,12 @@ class Task(BaseResource):
         self.status = status
         self.save()
 
-    def post_run(self, item, presets):
-        item.lock_setter(True)
-        item_path = item.get_results_path()
-
-        # pipe.results.to_hdf5(item.hdf5.path)
-        for preset in presets.keys():
-            proc = presets[preset]
-            if proc.type == 'analyzer':
-                for result_id in proc.results.keys():
-                    parameters = proc.results[result_id].parameters
-                    preset, c = Preset.objects.get_or_create(
-                        processor=preset.processor,
-                        parameters=unicode(parameters))
-                    result, c = Result.objects.get_or_create(preset=preset,
-                                                             item=item)
-                    hdf5_file = str(result.uuid) + '.hdf5'
-                    result.hdf5 = os.path.join(item_path, hdf5_file)
-                    proc.results.to_hdf5(result.hdf5.path)
-                    result.status_setter(_DONE)
-            elif proc.type == 'grapher':
-                parameters = {}
-                result, c = Result.objects.get_or_create(preset=preset,
-                                                         item=item)
-                image_file = str(result.uuid) + '.png'
-                result.file = os.path.join(item_path, image_file)
-                proc.render(output=result.file.path)
-                result.mime_type_setter(get_mime_type(result.file.path))
-                result.status_setter(_DONE)
-            elif proc.type == 'encoder':
-                result = Result.objects.get(preset=preset, item=item)
-                result.mime_type_setter(get_mime_type(result.file.path))
-                result.status_setter(_DONE)
-            del proc
-
-        item.lock_setter(False)
-
     def run(self, streaming=False):
+        from timeside.server.tasks import experience_run
         self.status_setter(_RUNNING)
-        
-        for item in self.selection. get_all_items():
-            item_path = item.get_results_path()
-            if not os.path.exists(item_path):
-                os.makedirs(item_path)
-
-            pipe = timeside.plugins.decoder.file.FileDecoder(item.file.path,
-                                                     sha1=item.sha1)
-
-            presets = {}
-            for preset in self.experience.presets.all():
-                proc = get_processor(preset.processor.pid)
-                if proc.type == 'encoder':
-                    result, c = Result.objects.get_or_create(preset=preset,
-                                                             item=item)
-                    media_file = '.'.join([str(result.uuid),
-                                           proc.file_extension()])
-                    result.file = os.path.join(item_path, media_file)
-                    result.save()
-                    proc = proc(result.file.path, overwrite=True,
-                                streaming=streaming)
-                else:
-                    proc = proc()
-                if proc.type == 'analyzer':
-                    proc.set_parameters(preset.parameters)
-                presets[preset] = proc
-                pipe = pipe | proc
-
-            # while item.lock:
-            #     time.sleep(30)
-
-            if not item.hdf5:
-                hdf5_file = str(self.experience.uuid) + '.hdf5'
-                item.hdf5 = os.path.join(item_path, hdf5_file)
-                item.save()
-
-            def stream_task(pipe, item, presets):
-                for chunk in pipe.stream():
-                    yield chunk
-                self.post_run(item, presets)
-                self.status_setter(_DONE)
-                del pipe
-
-            if streaming:
-                return stream_task(pipe, item, presets)
-            else:
-                pipe.run()
-
-            self.post_run(item, presets)
-
-            # except:
-            #     self.status_setter(0)
-            #     item.lock_setter(False)
-            #     break
-
+        for item in self.selection.get_all_items():
+            experience_run.delay(self.experience.id, item.id)
         self.status_setter(_DONE)
-        del pipe
 
 
 def set_mimetype(sender, **kwargs):
index f70c495ff4f17b1c2396074b50cc51f2cda5cae6..a94067a3597bb94daedac6f069f88a31d4ebf1d7 100644 (file)
@@ -1,7 +1,7 @@
 from __future__ import absolute_import
 
 from celery import shared_task
-from .models import Item, Selection, Preset, Task
+from .models import Item, Selection, Preset, Experience, Task
 
 
 @shared_task
@@ -9,23 +9,8 @@ def task_run(id):
     task = Task.objects.get(id=id)
     task.run()
 
-
-def experience_run(experience_id, item_id):
-    item = Item.objects.get(id=item_id)
-    experience = Experience.objects.get(id=experience_id)
-    experience.run(item)
-
-
 @shared_task
-def add(x, y):
-    return x + y
-
-
-@shared_task
-def mul(x, y):
-    return x * y
-
-
-@shared_task
-def xsum(numbers):
-    return sum(numbers)
\ No newline at end of file
+def experience_run(exp_id, item_id):
+    item = Item.objects.get(id=item_id)
+    experience = Experience.objects.get(id=exp_id)
+    item.run(experience)