]> git.parisson.com Git - timeside.git/commitdiff
timeside/decoder/core.py: query duration to speed things up, avoiding the use of...
authorPaul Brossier <piem@piem.org>
Sun, 2 Sep 2012 22:41:54 +0000 (16:41 -0600)
committerPaul Brossier <piem@piem.org>
Sun, 2 Sep 2012 22:41:54 +0000 (16:41 -0600)
timeside/decoder/core.py

index 70863d79480f586e8dc5d927adbc2b95b2a9a5a0..db99a0c1eafd4bbc7c419823b374669ff2b50a6a 100644 (file)
@@ -46,6 +46,9 @@ class FileDecoder(Processor):
     output_channels   = 1
 
     was_discovered    = False
+    pipeline          = None
+    mainloopthread    = None
+    read_error        = None
 
     # IProcessor methods
 
@@ -77,8 +80,8 @@ class FileDecoder(Processor):
         if channels:   self.output_channels   = int(channels)
         uri = self.uri
 
-        self.pipe = ''' uridecodebin uri=%(uri)s
-                  ! audioconvert
+        self.pipe = ''' uridecodebin name=uridecodebin uri=%(uri)s
+                  ! audioconvert name=audioconvert
                   ! audioresample
                   ! appsink name=sink sync=False async=True
                   ''' % locals()
@@ -90,16 +93,24 @@ class FileDecoder(Processor):
             width=(int)32,
             rate=(int)%d""" % (int(self.output_channels), int(self.output_samplerate)))
 
+        self.decodebin = self.pipeline.get_by_name('uridecodebin')
+        self.decodebin.connect("pad-added", self._pad_added_cb)
+        self.decodebin.connect("no-more-pads", self._no_more_pads_cb)
+        self.decodebin.connect("unknown-type", self._unknown_type_cb)
+
+        self.conv = self.pipeline.get_by_name('audioconvert')
+        self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
+
         self.sink = self.pipeline.get_by_name('sink')
         self.sink.set_property("caps", sink_caps)
         self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
         self.sink.set_property("drop", False)
         self.sink.set_property('emit-signals', True)
-        self.sink.connect("new-buffer", self.on_new_buffer)
+        self.sink.connect("new-buffer", self._on_new_buffer_cb)
 
         self.bus = self.pipeline.get_bus()
         self.bus.add_signal_watch()
-        self.bus.connect('message', self.on_message)
+        self.bus.connect('message', self._on_message_cb)
 
         self.queue = Queue.Queue(QUEUE_SIZE)
 
@@ -113,15 +124,71 @@ class FileDecoder(Processor):
                 self.mainloop.run()
         self.mainloopthread = MainloopThread(self.mainloop)
         self.mainloopthread.start()
+        #self.mainloopthread = get_loop_thread()
+        ##self.mainloop = self.mainloopthread.mainloop
 
         self.eod = False
 
         self.last_buffer = None
 
+        self._pad_found = False
+
         # start pipeline
         self.pipeline.set_state(gst.STATE_PLAYING)
 
-    def on_message(self, bus, message):
+        if self.read_error:
+            self.release()
+            raise self.read_error
+
+    def _pad_added_cb(self, decodebin, pad):
+        caps = pad.get_caps()
+        if caps.to_string().startswith('audio'):
+            if not self.conv.get_pad('sink').is_linked():
+                self._pad_found = True
+                pad.link(self.conv.get_pad('sink'))
+
+    def _no_more_pads_cb(self, decodebin):
+        self.pipeline.info("no more pads")
+        if not self._pad_found:
+            self.read_exc = Exception("no audio stream found")
+
+    def _unknown_type_cb(self, decodebin, pad, caps):
+        self.pipeline.debug("unknown type : %s" % caps.to_string())
+        if not caps.to_string().startswith('audio'):
+            return
+        self.read_error = Exception("no known audio stream found")
+
+    def _notify_caps_cb(self, pad, args):
+        caps = pad.get_negotiated_caps()
+        if not caps:
+            pad.info("no negotiated caps available")
+            return
+        # the caps are fixed
+        # We now get the total length of that stream
+        q = gst.query_new_duration(gst.FORMAT_TIME)
+        pad.info("sending duration query")
+        if pad.get_peer().query(q):
+            format, length = q.parse_duration()
+            if format == gst.FORMAT_TIME:
+                pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
+            else:
+                pad.info("got duration : %d [format:%d]" % (length, format))
+        else:
+            length = -1
+            gst.warning("duration query failed")
+
+        # We store the caps and length in the proper location
+        if "audio" in caps.to_string():
+            self.input_samplerate = caps[0]["rate"]
+            self.input_channels = caps[0]["channels"]
+            self.input_duration = length / 1.e9
+            self.input_total_frames = int(self.input_duration * self.input_samplerate)
+            if "x-raw-float" in caps.to_string():
+                self.input_width = caps[0]["width"]
+            else:
+                self.input_width = caps[0]["depth"]
+
+    def _on_message_cb(self, bus, message):
         t = message.type
         if t == gst.MESSAGE_EOS:
             self.pipeline.set_state(gst.STATE_NULL)
@@ -137,7 +204,7 @@ class FileDecoder(Processor):
             # msg.parse_tags()
             pass
 
-    def on_new_buffer(self, sink):
+    def _on_new_buffer_cb(self, sink):
         from numpy import concatenate
         buf = sink.emit('pull-buffer')
         new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
@@ -170,12 +237,12 @@ class FileDecoder(Processor):
 
     @interfacedoc
     def nframes(self):
-        return self.output_nframes
+        return self.input_total_frames
 
     @interfacedoc
     def release(self):
-        self.pipeline.set_state(gst.STATE_NULL)
-        self.mainloopthread.join()
+        if self.pipeline: self.pipeline.set_state(gst.STATE_NULL)
+        if self.mainloopthread: self.mainloopthread.join()
 
     def __del__(self):
         self.release()