From: Thomas Fillon Date: Thu, 13 Feb 2014 18:11:51 +0000 (+0100) Subject: On the way to fix #38 X-Git-Tag: 0.5.4~6^2~9 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=7913c8c54233129e78e7d96a1c35a3a25171f4ba;p=timeside.git On the way to fix #38 - Add a stream() function to the pipe to get an iterator that streams (for Telemeta HttpResponse) - inside stream(), run pipe.run() inside a thread and get back appsink signal through a Queue --- diff --git a/tests/sandbox/test_lolevel_streaming.py b/tests/sandbox/test_lolevel_streaming.py index e394f0c..ee48d69 100644 --- a/tests/sandbox/test_lolevel_streaming.py +++ b/tests/sandbox/test_lolevel_streaming.py @@ -1,10 +1,8 @@ # -*- coding: utf-8 -*- from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * +from timeside.decoder import FileDecoder +from timeside.encoder import Mp3Encoder import sys if len(sys.argv) > 1: @@ -14,27 +12,37 @@ else: source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") decoder = FileDecoder(source) + print "Creating decoder with id=%s for: %s" % (decoder.id(), source) decoder.setup() + channels = decoder.channels() print 'channels :', channels samplerate = decoder.samplerate() -nframes = decoder.nframes() +#nframes = decoder.nframes() dest1 = "/tmp/test_filesink.mp3" dest2 = "/tmp/test_appsink.mp3" f = open(dest2,'w') streaming=True -encoder = Mp3Encoder(dest1, streaming=True) -encoder.setup(channels=channels, samplerate=samplerate) - +encoder = Mp3Encoder(dest1, streaming=True, overwrite=True) +encoder.setup(channels=channels, samplerate=samplerate, + blocksize=decoder.blocksize(), totalframes=decoder.totalframes()) while True: encoder.process(*decoder.process()) if streaming: f.write(encoder.chunk) - if encoder.eod : + if encoder.eod: break f.close() print encoder.pipe + +import os +dest1_size = os.path.getsize(dest1) +dest2_size = os.path.getsize(dest2) + +print "sizes : %d , %d" % (dest1_size, dest2_size) + +assert os.path.getsize(dest1)==os.path.getsize(dest2) diff --git a/tests/sandbox/test_lolevel_streaming_threaded.py b/tests/sandbox/test_lolevel_streaming_threaded.py new file mode 100644 index 0000000..8777fdb --- /dev/null +++ b/tests/sandbox/test_lolevel_streaming_threaded.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +from timeside.core import * +from timeside.decoder import FileDecoder +from timeside.encoder import Mp3Encoder + +import sys +#if len(sys.argv) > 1: +# source = sys.argv[1] +#else: +# import os.path +audio_file = '/home/thomas/code/timeside/TimeSide/tests/samples/sweep.flac' +# source= os.path.join (os.path.dirname(__file__), audio_file) + +decoder = FileDecoder(audio_file) + +print "Creating decoder with id=%s for: %s" % (decoder.id(), audio_file) + +dest1 = "/tmp/test_filesink.mp3" +dest2 = "/tmp/test_appsink.mp3" +f = open(dest2,'w') + + +streaming=True +encoder = Mp3Encoder(dest1, streaming=streaming, overwrite=True) + +pipe = (decoder | encoder) +print pipe +#pipe.run() + +for chunk in pipe.stream(): + f.write(chunk) +#while True: +# encoder.process(*decoder.process()) +# if streaming: +# f.write(encoder.chunk) +# if encoder.eod: +# break + +f.close() +#print encoder.pipe + +import os +dest1_size = os.path.getsize(dest1) +dest2_size = os.path.getsize(dest2) + +print "sizes : %d , %d" % (dest1_size, dest2_size) + +assert os.path.getsize(dest1)==os.path.getsize(dest2) diff --git a/timeside/core.py b/timeside/core.py index 9382260..afbc1d6 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -28,6 +28,9 @@ import time import numpy import uuid +import gobject +gobject.threads_init() + __all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract', 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe', 'FixedSizeInputAdapter'] @@ -241,6 +244,10 @@ class ProcessPipe(object): def __init__(self, *others): self.processors = [] + self._streamer = None + self._stream_thread = False + self._is_running = False + self |= others from timeside.analyzer.core import AnalyzerResultContainer @@ -278,8 +285,7 @@ class ProcessPipe(object): return pipe def run(self, channels=None, samplerate=None, blocksize=None): - """Setup/reset all processors in cascade and stream audio data along - the pipe.""" + """Setup/reset all processors in cascade""" source = self.processors[0] items = self.processors[1:] @@ -295,9 +301,16 @@ class ProcessPipe(object): samplerate=last.samplerate(), blocksize=last.blocksize(), totalframes=last.totalframes()) + self._register_streamer(item) last = item # now stream audio data along the pipe + if self._stream_thread: + self._running_cond.acquire() + self._is_running = True + self._running_cond.notify() + self._running_cond.release() + eod = False if source.id() == 'gst_live_dec': @@ -326,3 +339,62 @@ class ProcessPipe(object): for item in items: item.release() self.processors.remove(item) + + self._is_running = False + + def stream(self): + self._stream_thread = True + import threading + class MainloopThread(threading.Thread): + def __init__(self, mainloop): + threading.Thread.__init__(self) + self.mainloop = mainloop + + def run(self): + self.mainloop.run() + self.mainloop = gobject.MainLoop() + self.mainloopthread = MainloopThread(self.mainloop) + self.mainloopthread.start() + + class PipeThread(threading.Thread): + def __init__(self, process_pipe): + super(PipeThread, self).__init__(name='pipe_thread') + self.process_pipe = process_pipe + + def run(self): + self.process_pipe.run() + + pipe_thread = PipeThread(self) + pipe_thread.start() + + # wait for pipe thread to be ready to stream + self._running_cond = threading.Condition(threading.Lock()) + self._running_cond.acquire() + while not self._is_running: + self._running_cond.wait() + self._running_cond.release() + + if self._streamer is None: + raise TypeError('Function only available in streaming mode') + print 'streaming' + + while pipe_thread.is_alive(): + #yield count + chunk = self._streamer.get_stream_chunk() + #time.sleep(0.1) + if chunk is not None: + yield chunk # print len(chunk) + else: + break + + + return + + def _register_streamer(self, processor): + if hasattr(processor, 'streaming') and processor.streaming: + if self._streamer is None: + self._streamer = processor + else: + raise TypeError('More than one streaming processor in pipe') + print self._streamer + diff --git a/timeside/encoder/core.py b/timeside/encoder/core.py index 7b1e0d3..ec18a4e 100644 --- a/timeside/encoder/core.py +++ b/timeside/encoder/core.py @@ -24,7 +24,17 @@ from timeside.component import implements, abstract from timeside.api import IEncoder from timeside.tools import * -from gst import _gst as gst +#from gst import _gst as gst +import pygst +pygst.require('0.10') +import gst + +import gobject +gobject.threads_init() + +# Streaming queue configuration +QUEUE_SIZE = 10 +GST_APPSINK_MAX_BUFFERS = 10 class GstEncoder(Processor): @@ -56,6 +66,8 @@ class GstEncoder(Processor): self.metadata = None self.num_samples = 0 + self._chunk_len = 0 + @interfacedoc def release(self): if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'): @@ -73,8 +85,17 @@ class GstEncoder(Processor): self.pipeline = gst.parse_launch(self.pipe) # store a pointer to appsrc in our encoder object self.src = self.pipeline.get_by_name('src') - # store a pointer to appsink in our encoder object - self.app = self.pipeline.get_by_name('app') + + if self.streaming: + import Queue + self._streaming_queue = Queue.Queue(QUEUE_SIZE) + # store a pointer to appsink in our encoder object + self.app = self.pipeline.get_by_name('app') + self.app.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) + self.app.set_property("drop", False) + self.app.set_property('emit-signals', True) + self.app.connect("new-buffer", self._on_new_buffer_streaming) + self.app.connect('new-preroll', self._on_new_preroll_streaming) srccaps = gst.Caps("""audio/x-raw-float, endianness=(int)1234, @@ -109,6 +130,10 @@ class GstEncoder(Processor): def _on_message_cb(self, bus, message): t = message.type if t == gst.MESSAGE_EOS: + + if self.streaming: + self._streaming_queue.put(gst.MESSAGE_EOS) + self.end_cond.acquire() self.pipeline.set_state(gst.STATE_NULL) self.mainloop.quit() @@ -126,6 +151,28 @@ class GstEncoder(Processor): self.end_cond.notify() self.end_cond.release() + def _on_new_buffer_streaming(self, appsink): + #print 'pull-buffer' + chunk = appsink.emit('pull-buffer') + if chunk == gst.MESSAGE_EOS: + print 'chunk is eos *************' + raise TypeError + else: + self._chunk_len += len(chunk) + print 'new buffer', self._chunk_len + + if appsink.get_property('eos'): + print 'property EOS' + #print 'put buffer in queue' + self._streaming_queue.put(chunk) + #print 'qsize : %d' % self._streaming_queue.qsize() + #print 'put ok' + + def _on_new_preroll_streaming(self, appsink): + print 'preroll' + chunk = appsink.emit('pull-preroll') + self._streaming_queue.put(chunk) + @interfacedoc def process(self, frames, eod=False): self.eod = eod @@ -138,9 +185,25 @@ class GstEncoder(Processor): if self.eod: self.src.emit('end-of-stream') if self.streaming: - self.chunk = self.app.emit('pull-buffer') + pass #self.chunk = self.app.emit('pull-buffer') return frames, eod + def get_stream_chunk(self): + if self.streaming: + #if not self.app.get_property('eos'): + #print 'get chunk from queue' + #print 'qsize : %d' % self._streaming_queue.qsize() + chunk = self._streaming_queue.get(block=True) + if chunk == gst.MESSAGE_EOS: + return None + else: + self._streaming_queue.task_done() + return chunk + + print 'new buffer', self._chunk_len + + else: + raise TypeError('function only available in streaming mode') if __name__ == "__main__": # Run doctest from __main__ and unittest from test_analyzer_preprocessors diff --git a/timeside/encoder/mp3.py b/timeside/encoder/mp3.py index 5de1bd0..8c781ec 100644 --- a/timeside/encoder/mp3.py +++ b/timeside/encoder/mp3.py @@ -48,14 +48,14 @@ class Mp3Encoder(GstEncoder): if self.filename and self.streaming: self.pipe += ''' ! tee name=t - ! queue ! filesink location=%s - t. ! queue ! appsink name=app sync=False + ! queue2 ! filesink location=%s + t. ! queue2 ! appsink name=app sync=False ''' % self.filename elif self.filename : self.pipe += '! filesink location=%s async=False sync=False ' % self.filename else: - self.pipe += '! queue ! appsink name=app sync=False ' + self.pipe += '! queue ! appsink name=app sync=False async=True' self.start_pipeline(channels, samplerate)