]> git.parisson.com Git - timeside.git/commitdiff
Clean up code to treat log prints madness
authorThomas Fillon <thomas@parisson.com>
Thu, 13 Feb 2014 19:59:56 +0000 (20:59 +0100)
committerThomas Fillon <thomas@parisson.com>
Thu, 13 Feb 2014 19:59:56 +0000 (20:59 +0100)
tests/sandbox/test_lolevel_streaming_threaded.py
timeside/core.py
timeside/encoder/core.py

index 8777fdb0aaa4b264edc0b752bde7fe5181898740..2fa1451fd7a9a302c7e6edd35c2bb120ed2719c1 100644 (file)
@@ -5,12 +5,12 @@ from timeside.decoder import FileDecoder
 from timeside.encoder import Mp3Encoder
 
 import sys
-#if len(sys.argv) > 1:
-#    source = sys.argv[1]
-#else:
-#    import os.path
-audio_file = '/home/thomas/code/timeside/TimeSide/tests/samples/sweep.flac'
-#    source= os.path.join (os.path.dirname(__file__),  audio_file)
+if len(sys.argv) > 1:
+    source = sys.argv[1]
+else:
+    import os.path
+    audio_file = '../samples/sweep.flac'
+    source = os.path.join(os.path.dirname(__file__), audio_file)
 
 decoder = FileDecoder(audio_file)
 
@@ -18,10 +18,10 @@ print "Creating decoder with id=%s for: %s" % (decoder.id(), audio_file)
 
 dest1 = "/tmp/test_filesink.mp3"
 dest2 = "/tmp/test_appsink.mp3"
-f = open(dest2,'w')
+f = open(dest2, 'w')
 
 
-streaming=True
+streaming = True
 encoder = Mp3Encoder(dest1, streaming=streaming, overwrite=True)
 
 pipe = (decoder | encoder)
@@ -46,4 +46,9 @@ dest2_size = os.path.getsize(dest2)
 
 print "sizes : %d , %d" % (dest1_size, dest2_size)
 
-assert os.path.getsize(dest1)==os.path.getsize(dest2)
+assert os.path.getsize(dest1) == os.path.getsize(dest2)
+
+# Sometime randomly freeze
+# Appsink file is always 1 buffer longer than filesink
+# TODO : Try to transcode with a pure gstreamer pipe to see the file length
+# maybe appsink is fine but filesink not ? just to be checked
index afbc1d6375c31e732bec6dfe3978d34cf8f7a5c3..2b09243b3acc6b844b3c249c12fe66053327aeed 100644 (file)
@@ -376,20 +376,15 @@ class ProcessPipe(object):
 
         if self._streamer is None:
             raise TypeError('Function only available in streaming mode')
-        print 'streaming'
 
         while pipe_thread.is_alive():
             #yield count
             chunk = self._streamer.get_stream_chunk()
-            #time.sleep(0.1)
             if chunk is not None:
-                yield chunk  # print len(chunk)
+                yield chunk
             else:
                 break
 
-
-        return
-
     def _register_streamer(self, processor):
         if hasattr(processor, 'streaming') and processor.streaming:
             if self._streamer is None:
index ec18a4e8b005a0f2df29d01d700bed651aa03cca..dac9286fedcd2cc4689979a5a416d1891cb1d469 100644 (file)
@@ -95,7 +95,7 @@ class GstEncoder(Processor):
             self.app.set_property("drop", False)
             self.app.set_property('emit-signals', True)
             self.app.connect("new-buffer", self._on_new_buffer_streaming)
-            self.app.connect('new-preroll', self._on_new_preroll_streaming)
+            #self.app.connect('new-preroll', self._on_new_preroll_streaming)
 
         srccaps = gst.Caps("""audio/x-raw-float,
             endianness=(int)1234,
@@ -113,6 +113,7 @@ class GstEncoder(Processor):
         self.bus.connect("message", self._on_message_cb)
 
         import threading
+
         class MainloopThread(threading.Thread):
             def __init__(self, mainloop):
                 threading.Thread.__init__(self)
@@ -130,11 +131,10 @@ class GstEncoder(Processor):
     def _on_message_cb(self, bus, message):
         t = message.type
         if t == gst.MESSAGE_EOS:
-
+            self.end_cond.acquire()
             if self.streaming:
                 self._streaming_queue.put(gst.MESSAGE_EOS)
 
-            self.end_cond.acquire()
             self.pipeline.set_state(gst.STATE_NULL)
             self.mainloop.quit()
             self.end_reached = True
@@ -154,19 +154,10 @@ class GstEncoder(Processor):
     def _on_new_buffer_streaming(self, appsink):
         #print 'pull-buffer'
         chunk = appsink.emit('pull-buffer')
-        if chunk == gst.MESSAGE_EOS:
-            print 'chunk is eos *************'
-            raise TypeError
-        else:
-            self._chunk_len += len(chunk)
-            print 'new buffer', self._chunk_len
-
-        if appsink.get_property('eos'):
-            print 'property EOS'
-        #print 'put buffer in queue'
         self._streaming_queue.put(chunk)
-        #print 'qsize : %d' % self._streaming_queue.qsize()
-        #print 'put ok'
+        chunk_len = len(chunk)
+        self._chunk_len += chunk_len
+        print 'new buffer length:', self._chunk_len
 
     def _on_new_preroll_streaming(self, appsink):
         print 'preroll'
@@ -177,31 +168,26 @@ class GstEncoder(Processor):
     def process(self, frames, eod=False):
         self.eod = eod
         if eod:
-            self.num_samples +=  frames.shape[0]
+            self.num_samples += frames.shape[0]
         else:
             self.num_samples += self.blocksize()
-        buf = numpy_array_to_gst_buffer(frames, frames.shape[0],self.num_samples, self.samplerate())
+        buf = numpy_array_to_gst_buffer(frames, frames.shape[0],
+                                        self.num_samples, self.samplerate())
         self.src.emit('push-buffer', buf)
         if self.eod:
             self.src.emit('end-of-stream')
-        if self.streaming:
-            pass #self.chunk = self.app.emit('pull-buffer')
+
         return frames, eod
 
     def get_stream_chunk(self):
         if self.streaming:
-            #if not self.app.get_property('eos'):
-            #print 'get chunk from queue'
-            #print 'qsize : %d' % self._streaming_queue.qsize()
             chunk = self._streaming_queue.get(block=True)
-            if  chunk == gst.MESSAGE_EOS:
+            if chunk == gst.MESSAGE_EOS:
                 return None
             else:
                 self._streaming_queue.task_done()
                 return chunk
 
-            print 'new buffer', self._chunk_len
-
         else:
             raise TypeError('function only available in streaming mode')