]> git.parisson.com Git - timeside.git/commitdiff
LiveDecoder : first prototype for a live decoder. The process can be interrupted...
authorThomas Fillon <thomas@parisson.com>
Mon, 6 Jan 2014 12:11:38 +0000 (13:11 +0100)
committerThomas Fillon <thomas@parisson.com>
Mon, 6 Jan 2014 12:11:38 +0000 (13:11 +0100)
timeside/core.py
timeside/decoder/core.py

index e817e252ea6848296353bfa20380d885833fdf8a..50f1f3b3c1508e2a1cc994cc678c790135c3c0d7 100644 (file)
@@ -284,7 +284,7 @@ class ProcessPipe(object):
         items = self.processors[1:]
         source.setup(channels=channels, samplerate=samplerate,
                      blocksize=blocksize)
-
+        source.SIG_STOP = False
         last = source
 
         # setup/reset processors and configure properties throughout the pipe
@@ -298,16 +298,29 @@ class ProcessPipe(object):
 
         # now stream audio data along the pipe
         eod = False
+
+        # Set handler for Interruption signal
+        import signal
+
+        def signal_handler(signum, frame):
+            source.stop()
+            signal.signal(signum, signal.SIG_DFL)
+
+        signal.signal(signal.SIGINT, signal_handler)
+
         while not eod:
             frames, eod = source.process()
             for item in items:
                 frames, eod = item.process(frames, eod)
 
+        # Restore default handler for Interruption signal
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
+
         # Post-processing
         for item in items:
             item.post_process()
 
-        # Release processors
+       # Release processors
         for item in items:
             item.release()
             self.processors.remove(item)
index acd8de5ce87619da445081745b1c7d8c6ff60b05..c86e134c39e8d1ec179aaf2c20a079549def1164 100644 (file)
@@ -120,6 +120,10 @@ class Decoder(Processor):
     def resolution(self):
         return self.input_width
 
+    def stop(self):
+        self.src.send_event(gst.event_new_eos())
+
+
 class FileDecoder(Decoder):
     """ gstreamer-based decoder """
     implements(IDecoder)
@@ -193,7 +197,7 @@ class FileDecoder(Decoder):
 
         if self.is_segment:
             # Create the pipe with Gnonlin gnlurisource
-            self.pipe = ''' gnlurisource uri={uri}
+            self.pipe = ''' gnlurisource name=src uri={uri}
                             start=0
                             duration={uri_duration}
                             media-start={uri_start}
@@ -207,7 +211,7 @@ class FileDecoder(Decoder):
                                        # convert uri_start and uri_duration to nanoseconds
         else:
             # Create the pipe with standard Gstreamer uridecodbin
-            self.pipe = ''' uridecodebin name=uridecodebin uri={uri}
+            self.pipe = ''' uridecodebin name=src uri={uri}
                            ! audioconvert name=audioconvert
                            ! audioresample
                            ! appsink name=sink sync=False async=True
@@ -229,6 +233,7 @@ class FileDecoder(Decoder):
             width=(int)32,
             rate=(int)%s""" % (caps_channels, caps_samplerate))
 
+        self.src = self.pipeline.get_by_name('src')
         self.conv = self.pipeline.get_by_name('audioconvert')
         self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
 
@@ -402,7 +407,6 @@ class FileDecoder(Decoder):
         return self.tags
 
 
-
 class LiveDecoder(Decoder):
     """ gstreamer-based decoder from live source"""
     implements(IDecoder)
@@ -419,23 +423,25 @@ class LiveDecoder(Decoder):
     def id():
         return "gst_live_dec"
 
-    def __init__(self, timeout=10):
+    def __init__(self, num_buffers=-1):
 
         """
-        Construct a new FileDecoder
+        Construct a new LiveDecoder capturing audio from alsasrc
 
         Parameters
         ----------
-        timeout :
-            timeout in seconds
+        num_buffers :
+            Number of buffers to output before sending EOS (-1 = unlimited).
+            Allowed values: >= -1
+            Default value: -1
 
         """
 
         super(Decoder, self).__init__()
-        self.timeout = timeout
+        self.num_buffers = num_buffers
         self.uri = None
         self.uri_start = 0
-        self.uri_duration = self.timeout
+        self.uri_duration = None
         self.is_segment = False
 
     def setup(self, channels=None, samplerate=None, blocksize=None):
@@ -457,11 +463,11 @@ class LiveDecoder(Decoder):
             self.output_channels = int(channels)
 
         # Create the pipe with standard Gstreamer uridecodbin
-        self.pipe = '''alsasrc
+        self.pipe = '''alsasrc num-buffers=%d name=src
                        ! audioconvert name=audioconvert
                        ! audioresample
                        ! appsink name=sink sync=False async=True
-                       '''
+                       ''' % self.num_buffers
 
         self.pipeline = gst.parse_launch(self.pipe)
 
@@ -479,6 +485,7 @@ class LiveDecoder(Decoder):
             width=(int)32,
             rate=(int)%s""" % (caps_channels, caps_samplerate))
 
+        self.src = self.pipeline.get_by_name('src')
         self.conv = self.pipeline.get_by_name('audioconvert')
         self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
 
@@ -592,10 +599,6 @@ class LiveDecoder(Decoder):
     def _on_new_buffer_cb(self, sink):
         buf = sink.emit('pull-buffer')
         new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
-        if buf.timestamp / gst.SECOND > self.timeout:
-            self.queue.put(gst.MESSAGE_EOS)
-            self.pipeline.set_state(gst.STATE_NULL)
-            self.mainloop.quit()
 
         #print 'processing new buffer', new_array.shape
         if self.last_buffer is None:
@@ -613,6 +616,7 @@ class LiveDecoder(Decoder):
         buf = self.queue.get()
         if buf == gst.MESSAGE_EOS:
             return self.last_buffer, True
+
         frames, eod = buf
         return frames, eod