]> git.parisson.com Git - timeside.git/commitdiff
On the way to fix #38
authorThomas Fillon <thomas@parisson.com>
Thu, 13 Feb 2014 18:11:51 +0000 (19:11 +0100)
committerThomas Fillon <thomas@parisson.com>
Thu, 13 Feb 2014 18:11:51 +0000 (19:11 +0100)
- Add a stream() function to the pipe to get an iterator that streams (for Telemeta HttpResponse)
- inside stream(), run pipe.run() inside a thread and get back appsink signal through a Queue

tests/sandbox/test_lolevel_streaming.py
tests/sandbox/test_lolevel_streaming_threaded.py [new file with mode: 0644]
timeside/core.py
timeside/encoder/core.py
timeside/encoder/mp3.py

index e394f0c099a8e96fd6e1ee3fed00b17e740f67ea..ee48d697ced62d26d3376cb3a33e5260834875f5 100644 (file)
@@ -1,10 +1,8 @@
 # -*- coding: utf-8 -*-
 
 from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
+from timeside.decoder import FileDecoder
+from timeside.encoder import Mp3Encoder
 
 import sys
 if len(sys.argv) > 1:
@@ -14,27 +12,37 @@ else:
     source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.flac")
 
 decoder = FileDecoder(source)
+
 print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
 decoder.setup()
+
 channels  = decoder.channels()
 print 'channels :', channels
 samplerate = decoder.samplerate()
-nframes = decoder.nframes()
+#nframes = decoder.nframes()
 
 dest1 = "/tmp/test_filesink.mp3"
 dest2 = "/tmp/test_appsink.mp3"
 f = open(dest2,'w')
 
 streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
+encoder = Mp3Encoder(dest1, streaming=True, overwrite=True)
+encoder.setup(channels=channels, samplerate=samplerate,
+              blocksize=decoder.blocksize(), totalframes=decoder.totalframes())
 while True:
     encoder.process(*decoder.process())
     if streaming:
         f.write(encoder.chunk)
-    if encoder.eod :
+    if encoder.eod:
         break
 
 f.close()
 print encoder.pipe
+
+import os
+dest1_size = os.path.getsize(dest1)
+dest2_size = os.path.getsize(dest2)
+
+print "sizes : %d , %d" % (dest1_size, dest2_size)
+
+assert os.path.getsize(dest1)==os.path.getsize(dest2)
diff --git a/tests/sandbox/test_lolevel_streaming_threaded.py b/tests/sandbox/test_lolevel_streaming_threaded.py
new file mode 100644 (file)
index 0000000..8777fdb
--- /dev/null
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import FileDecoder
+from timeside.encoder import Mp3Encoder
+
+import sys
+#if len(sys.argv) > 1:
+#    source = sys.argv[1]
+#else:
+#    import os.path
+audio_file = '/home/thomas/code/timeside/TimeSide/tests/samples/sweep.flac'
+#    source= os.path.join (os.path.dirname(__file__),  audio_file)
+
+decoder = FileDecoder(audio_file)
+
+print "Creating decoder with id=%s for: %s" % (decoder.id(), audio_file)
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=streaming, overwrite=True)
+
+pipe = (decoder | encoder)
+print pipe
+#pipe.run()
+
+for chunk in pipe.stream():
+    f.write(chunk)
+#while True:
+#    encoder.process(*decoder.process())
+#    if streaming:
+#        f.write(encoder.chunk)
+#    if encoder.eod:
+#        break
+
+f.close()
+#print encoder.pipe
+
+import os
+dest1_size = os.path.getsize(dest1)
+dest2_size = os.path.getsize(dest2)
+
+print "sizes : %d , %d" % (dest1_size, dest2_size)
+
+assert os.path.getsize(dest1)==os.path.getsize(dest2)
index 93822609f40d0dc9d93d2865bb2b6ec0484ef600..afbc1d6375c31e732bec6dfe3978d34cf8f7a5c3 100644 (file)
@@ -28,6 +28,9 @@ import time
 import numpy
 import uuid
 
+import gobject
+gobject.threads_init()
+
 __all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
            'interfacedoc', 'processors', 'get_processor', 'ProcessPipe',
            'FixedSizeInputAdapter']
@@ -241,6 +244,10 @@ class ProcessPipe(object):
 
     def __init__(self, *others):
         self.processors = []
+        self._streamer = None
+        self._stream_thread = False
+        self._is_running = False
+
         self |= others
 
         from timeside.analyzer.core import AnalyzerResultContainer
@@ -278,8 +285,7 @@ class ProcessPipe(object):
         return pipe
 
     def run(self, channels=None, samplerate=None, blocksize=None):
-        """Setup/reset all processors in cascade and stream audio data along
-        the pipe."""
+        """Setup/reset all processors in cascade"""
 
         source = self.processors[0]
         items = self.processors[1:]
@@ -295,9 +301,16 @@ class ProcessPipe(object):
                        samplerate=last.samplerate(),
                        blocksize=last.blocksize(),
                        totalframes=last.totalframes())
+            self._register_streamer(item)
             last = item
 
         # now stream audio data along the pipe
+        if self._stream_thread:
+            self._running_cond.acquire()
+            self._is_running = True
+            self._running_cond.notify()
+            self._running_cond.release()
+
         eod = False
 
         if source.id() == 'gst_live_dec':
@@ -326,3 +339,62 @@ class ProcessPipe(object):
         for item in items:
             item.release()
             self.processors.remove(item)
+
+        self._is_running = False
+
+    def stream(self):
+        self._stream_thread = True
+        import threading
+        class MainloopThread(threading.Thread):
+            def __init__(self, mainloop):
+                threading.Thread.__init__(self)
+                self.mainloop = mainloop
+
+            def run(self):
+                self.mainloop.run()
+        self.mainloop = gobject.MainLoop()
+        self.mainloopthread = MainloopThread(self.mainloop)
+        self.mainloopthread.start()
+
+        class PipeThread(threading.Thread):
+            def __init__(self, process_pipe):
+                super(PipeThread, self).__init__(name='pipe_thread')
+                self.process_pipe = process_pipe
+
+            def run(self):
+                self.process_pipe.run()
+
+        pipe_thread = PipeThread(self)
+        pipe_thread.start()
+
+        # wait for pipe thread to be ready to stream
+        self._running_cond = threading.Condition(threading.Lock())
+        self._running_cond.acquire()
+        while not self._is_running:
+            self._running_cond.wait()
+        self._running_cond.release()
+
+        if self._streamer is None:
+            raise TypeError('Function only available in streaming mode')
+        print 'streaming'
+
+        while pipe_thread.is_alive():
+            #yield count
+            chunk = self._streamer.get_stream_chunk()
+            #time.sleep(0.1)
+            if chunk is not None:
+                yield chunk  # print len(chunk)
+            else:
+                break
+
+
+        return
+
+    def _register_streamer(self, processor):
+        if hasattr(processor, 'streaming') and processor.streaming:
+            if self._streamer is None:
+                self._streamer = processor
+            else:
+                raise TypeError('More than one streaming processor in pipe')
+            print self._streamer
+
index 7b1e0d341ee737fdb690038145379ca144216109..ec18a4e8b005a0f2df29d01d700bed651aa03cca 100644 (file)
@@ -24,7 +24,17 @@ from timeside.component import implements, abstract
 from timeside.api import IEncoder
 from timeside.tools import *
 
-from gst import _gst as gst
+#from gst import _gst as gst
+import pygst
+pygst.require('0.10')
+import gst
+
+import gobject
+gobject.threads_init()
+
+# Streaming queue configuration
+QUEUE_SIZE = 10
+GST_APPSINK_MAX_BUFFERS = 10
 
 
 class GstEncoder(Processor):
@@ -56,6 +66,8 @@ class GstEncoder(Processor):
         self.metadata = None
         self.num_samples = 0
 
+        self._chunk_len = 0
+
     @interfacedoc
     def release(self):
         if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'):
@@ -73,8 +85,17 @@ class GstEncoder(Processor):
         self.pipeline = gst.parse_launch(self.pipe)
         # store a pointer to appsrc in our encoder object
         self.src = self.pipeline.get_by_name('src')
-        # store a pointer to appsink in our encoder object
-        self.app = self.pipeline.get_by_name('app')
+
+        if self.streaming:
+            import Queue
+            self._streaming_queue = Queue.Queue(QUEUE_SIZE)
+            # store a pointer to appsink in our encoder object
+            self.app = self.pipeline.get_by_name('app')
+            self.app.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
+            self.app.set_property("drop", False)
+            self.app.set_property('emit-signals', True)
+            self.app.connect("new-buffer", self._on_new_buffer_streaming)
+            self.app.connect('new-preroll', self._on_new_preroll_streaming)
 
         srccaps = gst.Caps("""audio/x-raw-float,
             endianness=(int)1234,
@@ -109,6 +130,10 @@ class GstEncoder(Processor):
     def _on_message_cb(self, bus, message):
         t = message.type
         if t == gst.MESSAGE_EOS:
+
+            if self.streaming:
+                self._streaming_queue.put(gst.MESSAGE_EOS)
+
             self.end_cond.acquire()
             self.pipeline.set_state(gst.STATE_NULL)
             self.mainloop.quit()
@@ -126,6 +151,28 @@ class GstEncoder(Processor):
             self.end_cond.notify()
             self.end_cond.release()
 
+    def _on_new_buffer_streaming(self, appsink):
+        #print 'pull-buffer'
+        chunk = appsink.emit('pull-buffer')
+        if chunk == gst.MESSAGE_EOS:
+            print 'chunk is eos *************'
+            raise TypeError
+        else:
+            self._chunk_len += len(chunk)
+            print 'new buffer', self._chunk_len
+
+        if appsink.get_property('eos'):
+            print 'property EOS'
+        #print 'put buffer in queue'
+        self._streaming_queue.put(chunk)
+        #print 'qsize : %d' % self._streaming_queue.qsize()
+        #print 'put ok'
+
+    def _on_new_preroll_streaming(self, appsink):
+        print 'preroll'
+        chunk = appsink.emit('pull-preroll')
+        self._streaming_queue.put(chunk)
+
     @interfacedoc
     def process(self, frames, eod=False):
         self.eod = eod
@@ -138,9 +185,25 @@ class GstEncoder(Processor):
         if self.eod:
             self.src.emit('end-of-stream')
         if self.streaming:
-            self.chunk = self.app.emit('pull-buffer')
+            pass #self.chunk = self.app.emit('pull-buffer')
         return frames, eod
 
+    def get_stream_chunk(self):
+        if self.streaming:
+            #if not self.app.get_property('eos'):
+            #print 'get chunk from queue'
+            #print 'qsize : %d' % self._streaming_queue.qsize()
+            chunk = self._streaming_queue.get(block=True)
+            if  chunk == gst.MESSAGE_EOS:
+                return None
+            else:
+                self._streaming_queue.task_done()
+                return chunk
+
+            print 'new buffer', self._chunk_len
+
+        else:
+            raise TypeError('function only available in streaming mode')
 
 if __name__ == "__main__":
     # Run doctest from __main__ and unittest from test_analyzer_preprocessors
index 5de1bd04bedd6d784576c5d93b0ed76c7317d274..8c781ecec253a5cb5f1e3471a3217d2392471b4b 100644 (file)
@@ -48,14 +48,14 @@ class Mp3Encoder(GstEncoder):
 
         if self.filename and self.streaming:
             self.pipe += ''' ! tee name=t
-            ! queue ! filesink location=%s
-            t. ! queue ! appsink name=app sync=False
+            ! queue2 ! filesink location=%s
+            t. ! queue2 ! appsink name=app sync=False
             ''' % self.filename
 
         elif self.filename :
             self.pipe += '! filesink location=%s async=False sync=False ' % self.filename
         else:
-            self.pipe += '! queue ! appsink name=app sync=False '
+            self.pipe += '! queue ! appsink name=app sync=False async=True'
 
         self.start_pipeline(channels, samplerate)