From 0fae2d92e091e32632028ef740500fd1e7077fdb Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Tue, 23 May 2017 16:22:24 +0200 Subject: [PATCH] Remove file serving during transcoding --- telemeta/static/telemeta/js/playerLoader.js | 18 +- telemeta/tasks.py | 38 ++++ telemeta/urls.py | 2 + telemeta/views/core.py | 7 +- telemeta/views/item.py | 200 +++++++++++++------- 5 files changed, 196 insertions(+), 69 deletions(-) create mode 100644 telemeta/tasks.py diff --git a/telemeta/static/telemeta/js/playerLoader.js b/telemeta/static/telemeta/js/playerLoader.js index 50250d3c..885f2708 100644 --- a/telemeta/static/telemeta/js/playerLoader.js +++ b/telemeta/static/telemeta/js/playerLoader.js @@ -491,7 +491,23 @@ function loadPlayer(analizerUrl, soundUrl, soundImgSize, itemId, visualizers, cu }); }; //and finally, load the player: - Timeside.load(timesideConfig); + function checkSoundUrlAndLoad() { + $J.get( soundUrl+"/isAvailable", function( data ) { + //check if soundUrl is available + if (!data['available']) { + setTimeout(checkSoundUrlAndLoad, 2000); + } else { + // + Timeside.load(timesideConfig); + } + }); + + } + + checkSoundUrlAndLoad(); + + + }; diff --git a/telemeta/tasks.py b/telemeta/tasks.py new file mode 100644 index 00000000..ec6cc63b --- /dev/null +++ b/telemeta/tasks.py @@ -0,0 +1,38 @@ +from __future__ import absolute_import + +from celery import shared_task + +from timeside.core import get_processor +from telemeta.models.item import MediaItem, MediaItemTranscodingFlag + +@shared_task +def task_transcode(source, media, encoder_id, + item_public_id, mime_type, + metadata=None): + # Get or Set transcoding status flag + item = MediaItem.objects.get(public_id=item_public_id) + transcoded_flag = MediaItemTranscodingFlag.objects.get( + item=item, + mime_type=mime_type) + progress_flag, c = MediaItemTranscodingFlag.objects.get_or_create( + item=item, + mime_type=mime_type + '/transcoding') + + progress_flag.value = False + progress_flag.save() + # Transcode + decoder = get_processor('file_decoder')(source) + encoder = get_processor(encoder_id)(media, + streaming=False, + overwrite=True) + if metadata: + encoder.set_metadata(metadata) + pipe = decoder | encoder + + progress_flag.value = True + progress_flag.save() + pipe.run() + + transcoded_flag.value = True + transcoded_flag.save() + progress_flag.delete() diff --git a/telemeta/urls.py b/telemeta/urls.py index 8456c858..3e5bfa4d 100644 --- a/telemeta/urls.py +++ b/telemeta/urls.py @@ -66,6 +66,8 @@ urlpatterns = patterns('', url(r'^archives/items/(?P[A-Za-z0-9._-]+)/dc/$', ItemDetailDCView.as_view(), name="telemeta-item-dublincore"), url(r'^archives/items/(?P[A-Za-z0-9._-]+)/dc/xml/$', item_view.item_detail, {'format': 'dublin_core_xml'}, name="telemeta-item-dublincore-xml"), url(r'^archives/items/download/(?P[A-Za-z0-9._-]+)\.(?P' + export_extensions + ')$', item_view.item_export, name="telemeta-item-export"), + url(r'^archives/items/download/(?P[A-Za-z0-9._-]+)\.(?P' + export_extensions + ')/isAvailable$', item_view.item_export_available, name="telemeta-item-export-available"), + url(r'^archives/items/(?P[A-Za-z0-9._-]+)/visualize/(?P[0-9a-z_]+)/(?P[0-9A-Z]+)x(?P[0-9A-Z]+)/$', item_view.item_visualize, name="telemeta-item-visualize"), url(r'^archives/items/(?P[A-Za-z0-9._-]+)/analyze/xml/$', item_view.item_analyze_xml, name="telemeta-item-analyze-xml"), url(r'^archives/items/(?P[A-Za-z0-9._-]+)/item_xspf.xml$', item_view.item_playlist, dict(template="telemeta/mediaitem_xspf.xml", mimetype="application/xspf+xml"), name="telemeta-item-xspf"), diff --git a/telemeta/views/core.py b/telemeta/views/core.py index 32aa0b81..d35d962d 100644 --- a/telemeta/views/core.py +++ b/telemeta/views/core.py @@ -117,6 +117,7 @@ def nginx_media_accel(media_path, content_type="", buffering=True): if not buffering: response['X-Accel-Buffering'] = 'no' + response['X-Accel-Limit-Rate'] = 524288 return response @@ -126,14 +127,12 @@ def render(request, template, data=None, mimetype=None): mimetype=mimetype) -def stream_from_processor(decoder, encoder, flag): +def stream_from_processor(decoder, encoder): pipe = decoder | encoder for chunk in pipe.stream(): yield chunk - flag.value = True - flag.save() - + def stream_from_file(file): chunk_size = 0x100000 f = open(file, 'r') diff --git a/telemeta/views/item.py b/telemeta/views/item.py index 2f6c73c7..e7989d3e 100644 --- a/telemeta/views/item.py +++ b/telemeta/views/item.py @@ -28,7 +28,7 @@ from telemeta.views.marker import * import timeside.core import timeside.server as ts import sys - +import time class ItemBaseMixin(TelemetaBaseMixin): @@ -284,7 +284,95 @@ class ItemView(ItemBaseMixin): list.append('mp4') return list - def item_export(self, request, public_id, extension): + def item_transcode(self, item, extension): + for encoder in self.encoders: + if encoder.file_extension() == extension: + break + + if encoder.file_extension() != extension: + raise Http404('Unknown export file extension: %s' % extension) + + mime_type = encoder.mime_type() + file = item.public_id + '.' + encoder.file_extension() + source, source_type = item.get_source() + try: + is_transcoded_flag, c = MediaItemTranscodingFlag.objects.get_or_create( + item=item, + mime_type=mime_type, + defaults={'value':False}) + except MediaItemTranscodingFlag.MultipleObjectsReturned: + flags = MediaItemTranscodingFlag.objects.filter( + item=item, + mime_type=mime_type) + value = all([f.value for f in flags]) + is_transcoded_flag = flags[0] + is_transcoded_flag.value = value + is_transcoded_flag.save() + for f in flags[1:]: + f.delete() + + + format = item.mime_type + dc_metadata = dublincore.express_item(item).to_list() + mapping = DublinCoreToFormatMetadata(extension) + if not extension in mapping.unavailable_extensions: + metadata = mapping.get_metadata(dc_metadata) + else: + metadata = None + + if mime_type in format and source_type == 'file': + # source > stream + if metadata: + proc = encoder(source, overwrite=True) + proc.set_metadata(metadata) + try: + # FIXME: should test if metadata writer is available + proc.write_metadata() + except: + pass + return (source, mime_type) + else: + media = self.cache_export.dir + os.sep + file + if not is_transcoded_flag.value: + try: + progress_flag = MediaItemTranscodingFlag.objects.get( + item=item, + mime_type=mime_type + '/transcoding') + if progress_flag.value: + # The media is being transcoded + # return None + return (None, None) + + else: + # wait for the transcode to begin + time.sleep(1) + return (None, None) #self.item_transcode(item, extension) + + except MediaItemTranscodingFlag.DoesNotExist: + pass + # source > encoder > stream + from telemeta.tasks import task_transcode + # Sent the transcoding task synchronously to the worker + task_transcode.apply_async(kwargs={'source':source, + 'media':media, + 'encoder_id':encoder.id(), + 'item_public_id':item.public_id, + 'mime_type':mime_type, + 'metadata':metadata}) + + self.cache_export.add_file(file) + if not os.path.exists(media): + return (None, None) + else: + # cache > stream + if not os.path.exists(media): + is_transcoded_flag.value = False + is_transcoded_flag.save() + return self.item_transcode(item, extension) + + return (media, mime_type) + + def item_export(self, request, public_id, extension, return_availability=False): """Export a given media item in the specified format (OGG, FLAC, ...)""" item = MediaItem.objects.get(public_id=public_id) @@ -306,6 +394,8 @@ class ItemView(ItemBaseMixin): # response['Content-Disposition'] = 'attachment' # TF : I don't know why empty attachment was set # TODO: remove if useless + if return_availability: + return True return response if 'webm' in extension: @@ -315,69 +405,37 @@ class ItemView(ItemBaseMixin): # response['Content-Disposition'] = 'attachment' # TF : I don't know why empty attachment was set, # TODO: remove if useless + if return_availability: + data = json.dumps({'available':True}) + return HttpResponse(data, content_type='application/json') + return response + + (media, mime_type) = self.item_transcode(item, extension) + #media = None + if media: + if return_availability: + data = json.dumps({'available':True}) + return HttpResponse(data, content_type='application/json') + response = serve_media(media, content_type=mime_type) return response - - for encoder in self.encoders: - if encoder.file_extension() == extension: - break - - if encoder.file_extension() != extension: - raise Http404('Unknown export file extension: %s' % extension) - - mime_type = encoder.mime_type() - file = public_id + '.' + encoder.file_extension() - source, source_type = item.get_source() - - flag = MediaItemTranscodingFlag.objects.filter(item=item, mime_type=mime_type) - if not flag: - flag = MediaItemTranscodingFlag(item=item, mime_type=mime_type) - flag.value = False - flag.save() - else: - flag = flag[0] - - format = item.mime_type - dc_metadata = dublincore.express_item(item).to_list() - mapping = DublinCoreToFormatMetadata(extension) - metadata = mapping.get_metadata(dc_metadata) - - if mime_type in format and source_type == 'file': - # source > stream - if not extension in mapping.unavailable_extensions: - proc = encoder(source, overwrite=True) - proc.set_metadata(metadata) - try: - # FIXME: should test if metadata writer is available - proc.write_metadata() - except: - pass - response = serve_media(source, content_type=mime_type) else: - media = self.cache_export.dir + os.sep + file - if not os.path.exists(media) or not flag.value: - # source > encoder > stream - if extension in mapping.unavailable_extensions: - metadata = None - - decoder = timeside.core.get_processor('file_decoder')(source) - processor = encoder(media, streaming=False, - overwrite=True) - if metadata: - processor.set_metadata(metadata) - pipe = decoder | processor - pipe.run() - - self.cache_export.add_file(file) - flag.value = True - flag.save() - - response = serve_media(media, content_type=mime_type) # , buffering=False) - - else: - # cache > stream - response = serve_media(media, content_type=mime_type) + if return_availability: + data = json.dumps({'available':False}) + return HttpResponse(data, content_type='application/json') + + mess = ugettext('Transcoding in progress') + title = ugettext('Item') + ' : ' + public_id + ' : ' + mess + description = ugettext('The media transcoding is in progress. ' + 'Please wait for the trancoding process to complete.') + messages.info(request, title) + response = render(request, 'telemeta/messages.html', {'description': description}) + from django.utils.cache import patch_cache_control + #patch_cache_control(response, no_cache=True, no_store=True, must_revalidate=True) + return response - return response + def item_export_available(self, request, public_id, extension): + return self.item_export(request, public_id, extension, return_availability=True) + def item_playlist(self, request, public_id, template, mimetype): try: @@ -687,6 +745,7 @@ class ItemDetailView(ItemViewMixin, DetailView): def item_analyze(self, item): analyses = item.analysis.all() + encoders_id = ['mp3_encoder']#, 'vorbis_encoder'] mime_type = '' if analyses: @@ -704,8 +763,9 @@ class ItemDetailView(ItemViewMixin, DetailView): analyzers = [] analyzers_sub = [] graphers_sub = [] + encoders_sub = [] - source, _ = item.get_source() + source = item.get_source()[0] if source: @@ -725,7 +785,17 @@ class ItemDetailView(ItemViewMixin, DetailView): path = self.cache_data.dir + os.sep + image_file graph = default_grapher(width=int(width), height=int(height)) graphers_sub.append({'graph': graph, 'path': path}) - pipe = pipe | graph + pipe |= graph + + for proc_id in encoders_id: + encoder_cls = timeside.core.get_processor(proc_id) + mime_type = encoder_cls.mime_type() + cache_file = item.public_id + '.' + encoder_cls.file_extension() + media = self.cache_export.dir + os.sep + cache_file + encoder = encoder_cls(output=media, overwrite=True) + encoders_sub.append(encoder) + pipe |= encoder + pipe.run() @@ -771,6 +841,8 @@ class ItemDetailView(ItemViewMixin, DetailView): analyzer_id=result.id, unit=result.unit, value=unicode(value)) analysis.save() + for encoder in encoders_sub: + pass # FIXME: parse tags on first load # tags = decoder.tags -- 2.39.5