From 1a7cf80019d48cb0c4fa21cec66044c7c799a0fe Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Thu, 13 Feb 2014 20:59:56 +0100 Subject: [PATCH] Clean up code to treat log prints madness --- .../test_lolevel_streaming_threaded.py | 23 +++++++----- timeside/core.py | 7 +--- timeside/encoder/core.py | 36 ++++++------------- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/tests/sandbox/test_lolevel_streaming_threaded.py b/tests/sandbox/test_lolevel_streaming_threaded.py index 8777fdb..2fa1451 100644 --- a/tests/sandbox/test_lolevel_streaming_threaded.py +++ b/tests/sandbox/test_lolevel_streaming_threaded.py @@ -5,12 +5,12 @@ from timeside.decoder import FileDecoder from timeside.encoder import Mp3Encoder import sys -#if len(sys.argv) > 1: -# source = sys.argv[1] -#else: -# import os.path -audio_file = '/home/thomas/code/timeside/TimeSide/tests/samples/sweep.flac' -# source= os.path.join (os.path.dirname(__file__), audio_file) +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + audio_file = '../samples/sweep.flac' + source = os.path.join(os.path.dirname(__file__), audio_file) decoder = FileDecoder(audio_file) @@ -18,10 +18,10 @@ print "Creating decoder with id=%s for: %s" % (decoder.id(), audio_file) dest1 = "/tmp/test_filesink.mp3" dest2 = "/tmp/test_appsink.mp3" -f = open(dest2,'w') +f = open(dest2, 'w') -streaming=True +streaming = True encoder = Mp3Encoder(dest1, streaming=streaming, overwrite=True) pipe = (decoder | encoder) @@ -46,4 +46,9 @@ dest2_size = os.path.getsize(dest2) print "sizes : %d , %d" % (dest1_size, dest2_size) -assert os.path.getsize(dest1)==os.path.getsize(dest2) +assert os.path.getsize(dest1) == os.path.getsize(dest2) + +# Sometime randomly freeze +# Appsink file is always 1 buffer longer than filesink +# TODO : Try to transcode with a pure gstreamer pipe to see the file length +# maybe appsink is fine but filesink not ? just to be checked diff --git a/timeside/core.py b/timeside/core.py index afbc1d6..2b09243 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -376,20 +376,15 @@ class ProcessPipe(object): if self._streamer is None: raise TypeError('Function only available in streaming mode') - print 'streaming' while pipe_thread.is_alive(): #yield count chunk = self._streamer.get_stream_chunk() - #time.sleep(0.1) if chunk is not None: - yield chunk # print len(chunk) + yield chunk else: break - - return - def _register_streamer(self, processor): if hasattr(processor, 'streaming') and processor.streaming: if self._streamer is None: diff --git a/timeside/encoder/core.py b/timeside/encoder/core.py index ec18a4e..dac9286 100644 --- a/timeside/encoder/core.py +++ b/timeside/encoder/core.py @@ -95,7 +95,7 @@ class GstEncoder(Processor): self.app.set_property("drop", False) self.app.set_property('emit-signals', True) self.app.connect("new-buffer", self._on_new_buffer_streaming) - self.app.connect('new-preroll', self._on_new_preroll_streaming) + #self.app.connect('new-preroll', self._on_new_preroll_streaming) srccaps = gst.Caps("""audio/x-raw-float, endianness=(int)1234, @@ -113,6 +113,7 @@ class GstEncoder(Processor): self.bus.connect("message", self._on_message_cb) import threading + class MainloopThread(threading.Thread): def __init__(self, mainloop): threading.Thread.__init__(self) @@ -130,11 +131,10 @@ class GstEncoder(Processor): def _on_message_cb(self, bus, message): t = message.type if t == gst.MESSAGE_EOS: - + self.end_cond.acquire() if self.streaming: self._streaming_queue.put(gst.MESSAGE_EOS) - self.end_cond.acquire() self.pipeline.set_state(gst.STATE_NULL) self.mainloop.quit() self.end_reached = True @@ -154,19 +154,10 @@ class GstEncoder(Processor): def _on_new_buffer_streaming(self, appsink): #print 'pull-buffer' chunk = appsink.emit('pull-buffer') - if chunk == gst.MESSAGE_EOS: - print 'chunk is eos *************' - raise TypeError - else: - self._chunk_len += len(chunk) - print 'new buffer', self._chunk_len - - if appsink.get_property('eos'): - print 'property EOS' - #print 'put buffer in queue' self._streaming_queue.put(chunk) - #print 'qsize : %d' % self._streaming_queue.qsize() - #print 'put ok' + chunk_len = len(chunk) + self._chunk_len += chunk_len + print 'new buffer length:', self._chunk_len def _on_new_preroll_streaming(self, appsink): print 'preroll' @@ -177,31 +168,26 @@ class GstEncoder(Processor): def process(self, frames, eod=False): self.eod = eod if eod: - self.num_samples += frames.shape[0] + self.num_samples += frames.shape[0] else: self.num_samples += self.blocksize() - buf = numpy_array_to_gst_buffer(frames, frames.shape[0],self.num_samples, self.samplerate()) + buf = numpy_array_to_gst_buffer(frames, frames.shape[0], + self.num_samples, self.samplerate()) self.src.emit('push-buffer', buf) if self.eod: self.src.emit('end-of-stream') - if self.streaming: - pass #self.chunk = self.app.emit('pull-buffer') + return frames, eod def get_stream_chunk(self): if self.streaming: - #if not self.app.get_property('eos'): - #print 'get chunk from queue' - #print 'qsize : %d' % self._streaming_queue.qsize() chunk = self._streaming_queue.get(block=True) - if chunk == gst.MESSAGE_EOS: + if chunk == gst.MESSAGE_EOS: return None else: self._streaming_queue.task_done() return chunk - print 'new buffer', self._chunk_len - else: raise TypeError('function only available in streaming mode') -- 2.39.5