From: Guillaume Pellerin Date: Fri, 20 Mar 2015 16:04:24 +0000 (+0100) Subject: Atomize tasks by (experience, item), fix worker start, cleanup composition X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=f431131297465c5455018e7496c15a9479911f21;p=timeside.git Atomize tasks by (experience, item), fix worker start, cleanup composition --- diff --git a/docker-compose.yml b/docker-compose.yml index b051292..7f9482b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,14 +36,6 @@ log: - /var/log/uwsgi command: /bin/true -lib: - image: debian:wheezy - volumes: - - /var/lib/rabbitmq - - /var/lib/postgres - - /var/lib/mysql - command: /bin/true - home: image: debian:wheezy volumes: @@ -53,7 +45,6 @@ home: db: image: mysql volumes_from: - - lib - log environment: - MYSQL_ROOT_PASSWORD=mysecretpassword @@ -89,11 +80,12 @@ worker: build: . volumes_from: - app + - home + - log command: /bin/sh /opt/TimeSide/examples/deploy/celery_app.sh links: - rabbitmq - db - - app nginx: image: nginx diff --git a/examples/deploy/celery_app.sh b/examples/deploy/celery_app.sh index 8be32e1..1ff1f10 100644 --- a/examples/deploy/celery_app.sh +++ b/examples/deploy/celery_app.sh @@ -1,11 +1,11 @@ #!/bin/sh # paths -app='/opt/TimeSide/' -sandbox='/home/sandbox/' -manage=$sandbox'manage.py' +app_dir='/opt/TimeSide' +sandbox='/home/sandbox' +manage=$sandbox'/manage.py' -sh $app/examples/deploy/wait.sh +sh $app_dir'/examples/deploy/wait.sh' # Starting celery worker with the --autoreload option will enable the worker to watch for file system changes # This is an experimental feature intended for use in development only diff --git a/examples/deploy/start_app.sh b/examples/deploy/start_app.sh index 3b6528b..f0b6d47 100644 --- a/examples/deploy/start_app.sh +++ b/examples/deploy/start_app.sh @@ -1,19 +1,20 @@ #!/bin/sh # paths -app='/opt/TimeSide/' -static=$app'timeside/player/static/' -sandbox='/home/sandbox/' -manage=$sandbox'manage.py' -wsgi=$sandbox'wsgi.py' +app_dir='/opt/TimeSide' +static=$app_dir'/timeside/player/static/' +sandbox='/home/sandbox' +manage=$sandbox'/manage.py' +wsgi=$sandbox'/wsgi.py' -sh $app/examples/deploy/wait.sh +sh $app_dir/examples/deploy/wait.sh # django init python $manage syncdb --noinput python $manage migrate --noinput python $manage collectstatic --noinput python $manage timeside-create-admin-user +python $manage timeside-create-boilerplate # static files auto update watchmedo shell-command --patterns="*.js;*.css" --recursive \ diff --git a/timeside/server/management/commands/timeside-create-boilerplate.py b/timeside/server/management/commands/timeside-create-boilerplate.py new file mode 100644 index 0000000..b919cdf --- /dev/null +++ b/timeside/server/management/commands/timeside-create-boilerplate.py @@ -0,0 +1,62 @@ +from optparse import make_option +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth.models import User +from django.template.defaultfilters import slugify + +import os +import timeside.core +from timeside.server.models import * +from timeside.core.tools.test_samples import generateSamples + + +class Command(BaseCommand): + help = "Setup and run a boilerplate for testing" + cleanup = True + + def processor_cleanup(self): + for processor in Processor.objects.all(): + processor.delete() + + def result_cleanup(self): + for result in Result.objects.all(): + result.delete() + + def handle(self, *args, **options): + selection, c = Selection.objects.get_or_create(title='Tests') + if c: + presets = [] + blacklist =['decoder', 'live', 'gain', 'vamp'] + processors = timeside.core.processor.processors(timeside.core.api.IProcessor) + for proc in processors: + trig = True + for black in blacklist: + if black in proc.id(): + trig = False + if trig: + processor, c = Processor.objects.get_or_create(pid=proc.id()) + preset, c = Preset.objects.get_or_create(processor=processor, parameters='{}') + presets.append(preset) + + media_dir = 'items' + os.sep + 'tests' + samples_dir = settings.MEDIA_ROOT + media_dir + samples = generateSamples(samples_dir=samples_dir) + + for sample in samples.iteritems(): + filename, path = sample + title = os.path.splitext(filename)[0] + path = media_dir + os.sep + filename + item, c = Item.objects.get_or_create(title=title, file=path) + if not item in selection.items.all(): + selection.items.add(item) + if self.cleanup: + for result in item.results.all(): + result.delete() + + experience, c = Experience.objects.get_or_create(title='All') + for preset in presets: + if not preset in experience.presets.all(): + experience.presets.add(preset) + + task = Task(experience=experience, selection=selection) + task.save() diff --git a/timeside/server/management/commands/timeside-tests-boilerplate.py b/timeside/server/management/commands/timeside-tests-boilerplate.py deleted file mode 100644 index fbb4f2a..0000000 --- a/timeside/server/management/commands/timeside-tests-boilerplate.py +++ /dev/null @@ -1,61 +0,0 @@ -from optparse import make_option -from django.conf import settings -from django.core.management.base import BaseCommand, CommandError -from django.contrib.auth.models import User -from django.template.defaultfilters import slugify - -import os -import timeside.core -from timeside.server.models import * -from timeside.core.tools.test_samples import generateSamples - - -class Command(BaseCommand): - help = "Setup and run a boilerplate for testing" - cleanup = True - - def processor_cleanup(self): - for processor in Processor.objects.all(): - processor.delete() - - def result_cleanup(self): - for result in Result.objects.all(): - result.delete() - - def handle(self, *args, **options): - presets = [] - blacklist =['decoder', 'live', 'gain'] - processors = timeside.core.processor.processors(timeside.core.api.IProcessor) - for proc in processors: - trig = True - for black in blacklist: - if black in proc.id(): - trig = False - if trig: - processor, c = Processor.objects.get_or_create(pid=proc.id()) - preset, c = Preset.objects.get_or_create(processor=processor, parameters='{}') - presets.append(preset) - - media_dir = 'items' + os.sep + 'tests' - samples_dir = settings.MEDIA_ROOT + media_dir - samples = generateSamples(samples_dir=samples_dir) - selection, c = Selection.objects.get_or_create(title='Tests') - - for sample in samples.iteritems(): - filename, path = sample - title = os.path.splitext(filename)[0] - path = media_dir + os.sep + filename - item, c = Item.objects.get_or_create(title=title, file=path) - if not item in selection.items.all(): - selection.items.add(item) - if self.cleanup: - for result in item.results.all(): - result.delete() - - experience, c = Experience.objects.get_or_create(title='All') - for preset in presets: - if not preset in experience.presets.all(): - experience.presets.add(preset) - - task = Task(experience=experience, selection=selection) - task.status_setter(2) diff --git a/timeside/server/models.py b/timeside/server/models.py index 36226f3..b6e585a 100644 --- a/timeside/server/models.py +++ b/timeside/server/models.py @@ -186,6 +186,79 @@ class Item(DocBaseResource): def get_results_path(self): return os.path.join(results_path, self.uuid) + def run(self, experience): + result_path = self.get_results_path() + if not os.path.exists(result_path): + os.makedirs(result_path) + + pipe = timeside.plugins.decoder.file.FileDecoder(self.file.path, + sha1=self.sha1) + presets = {} + for preset in experience.presets.all(): + proc = get_processor(preset.processor.pid) + if proc.type == 'encoder': + result, c = Result.objects.get_or_create(preset=preset, + item=self) + media_file = '.'.join([str(result.uuid), + proc.file_extension()]) + result.file = os.path.join(result_path, media_file) + result.save() + proc = proc(result.file.path, overwrite=True, + streaming=False) + else: + proc = proc() + if proc.type == 'analyzer': + proc.set_parameters(preset.parameters) + presets[preset] = proc + pipe = pipe | proc + + # item.lock_setter(True) + + if not self.hdf5: + hdf5_file = str(experience.uuid) + '.hdf5' + self.hdf5 = os.path.join(result_path, hdf5_file) + self.save() + + pipe.run() + + for preset, proc in presets.iteritems(): + if proc.type == 'analyzer': + for result_id in proc.results.keys(): + parameters = proc.results[result_id].parameters + preset, c = Preset.objects.get_or_create( + processor=preset.processor, + parameters=unicode(parameters)) + result, c = Result.objects.get_or_create(preset=preset, + item=self) + hdf5_file = str(result.uuid) + '.hdf5' + result.hdf5 = os.path.join(result_path, hdf5_file) + # while result.lock: + # time.sleep(3) + # result.lock_setter(True) + proc.results.to_hdf5(result.hdf5.path) + # result.lock_setter(False) + result.status_setter(_DONE) + + elif proc.type == 'grapher': + parameters = {} + result, c = Result.objects.get_or_create(preset=preset, + item=self) + image_file = str(result.uuid) + '.png' + result.file = os.path.join(result_path, image_file) + proc.render(output=result.file.path) + result.mime_type_setter(get_mime_type(result.file.path)) + result.status_setter(_DONE) + + elif proc.type == 'encoder': + result = Result.objects.get(preset=preset, item=self) + result.mime_type_setter(get_mime_type(result.file.path)) + result.status_setter(_DONE) + + del proc + + del pipe + # item.lock_setter(False) + class Experience(DocBaseResource): @@ -263,6 +336,7 @@ class Result(BaseResource): author = models.ForeignKey(User, related_name="results", verbose_name=_('author'), blank=True, null=True, on_delete=models.SET_NULL) + # lock = models.BooleanField(default=False) class Meta(MetaCore): db_table = app + '_results' @@ -277,6 +351,10 @@ class Result(BaseResource): self.mime_type = mime_type self.save() + def lock_setter(self, lock): + self.lock = lock + self.save() + def __unicode__(self): return '_'.join([self.item.title, unicode(self.preset.processor)]) @@ -306,101 +384,12 @@ class Task(BaseResource): self.status = status self.save() - def post_run(self, item, presets): - item.lock_setter(True) - item_path = item.get_results_path() - - # pipe.results.to_hdf5(item.hdf5.path) - for preset in presets.keys(): - proc = presets[preset] - if proc.type == 'analyzer': - for result_id in proc.results.keys(): - parameters = proc.results[result_id].parameters - preset, c = Preset.objects.get_or_create( - processor=preset.processor, - parameters=unicode(parameters)) - result, c = Result.objects.get_or_create(preset=preset, - item=item) - hdf5_file = str(result.uuid) + '.hdf5' - result.hdf5 = os.path.join(item_path, hdf5_file) - proc.results.to_hdf5(result.hdf5.path) - result.status_setter(_DONE) - elif proc.type == 'grapher': - parameters = {} - result, c = Result.objects.get_or_create(preset=preset, - item=item) - image_file = str(result.uuid) + '.png' - result.file = os.path.join(item_path, image_file) - proc.render(output=result.file.path) - result.mime_type_setter(get_mime_type(result.file.path)) - result.status_setter(_DONE) - elif proc.type == 'encoder': - result = Result.objects.get(preset=preset, item=item) - result.mime_type_setter(get_mime_type(result.file.path)) - result.status_setter(_DONE) - del proc - - item.lock_setter(False) - def run(self, streaming=False): + from timeside.server.tasks import experience_run self.status_setter(_RUNNING) - - for item in self.selection. get_all_items(): - item_path = item.get_results_path() - if not os.path.exists(item_path): - os.makedirs(item_path) - - pipe = timeside.plugins.decoder.file.FileDecoder(item.file.path, - sha1=item.sha1) - - presets = {} - for preset in self.experience.presets.all(): - proc = get_processor(preset.processor.pid) - if proc.type == 'encoder': - result, c = Result.objects.get_or_create(preset=preset, - item=item) - media_file = '.'.join([str(result.uuid), - proc.file_extension()]) - result.file = os.path.join(item_path, media_file) - result.save() - proc = proc(result.file.path, overwrite=True, - streaming=streaming) - else: - proc = proc() - if proc.type == 'analyzer': - proc.set_parameters(preset.parameters) - presets[preset] = proc - pipe = pipe | proc - - # while item.lock: - # time.sleep(30) - - if not item.hdf5: - hdf5_file = str(self.experience.uuid) + '.hdf5' - item.hdf5 = os.path.join(item_path, hdf5_file) - item.save() - - def stream_task(pipe, item, presets): - for chunk in pipe.stream(): - yield chunk - self.post_run(item, presets) - self.status_setter(_DONE) - del pipe - - if streaming: - return stream_task(pipe, item, presets) - else: - pipe.run() - - self.post_run(item, presets) - - # except: - # self.status_setter(0) - # item.lock_setter(False) - # break - + for item in self.selection.get_all_items(): + experience_run.delay(self.experience.id, item.id) self.status_setter(_DONE) - del pipe def set_mimetype(sender, **kwargs): diff --git a/timeside/server/tasks.py b/timeside/server/tasks.py index f70c495..a94067a 100644 --- a/timeside/server/tasks.py +++ b/timeside/server/tasks.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from celery import shared_task -from .models import Item, Selection, Preset, Task +from .models import Item, Selection, Preset, Experience, Task @shared_task @@ -9,23 +9,8 @@ def task_run(id): task = Task.objects.get(id=id) task.run() - -def experience_run(experience_id, item_id): - item = Item.objects.get(id=item_id) - experience = Experience.objects.get(id=experience_id) - experience.run(item) - - @shared_task -def add(x, y): - return x + y - - -@shared_task -def mul(x, y): - return x * y - - -@shared_task -def xsum(numbers): - return sum(numbers) \ No newline at end of file +def experience_run(exp_id, item_id): + item = Item.objects.get(id=item_id) + experience = Experience.objects.get(id=exp_id) + item.run(experience)