From 7ce24bed666034b9c9c542f2e39cfb9d1fb4ff71 Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Sun, 2 Sep 2012 16:41:54 -0600 Subject: [PATCH] timeside/decoder/core.py: query duration to speed things up, avoiding the use of discover --- timeside/decoder/core.py | 85 +++++++++++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 9 deletions(-) diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 70863d7..db99a0c 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -46,6 +46,9 @@ class FileDecoder(Processor): output_channels = 1 was_discovered = False + pipeline = None + mainloopthread = None + read_error = None # IProcessor methods @@ -77,8 +80,8 @@ class FileDecoder(Processor): if channels: self.output_channels = int(channels) uri = self.uri - self.pipe = ''' uridecodebin uri=%(uri)s - ! audioconvert + self.pipe = ''' uridecodebin name=uridecodebin uri=%(uri)s + ! audioconvert name=audioconvert ! audioresample ! appsink name=sink sync=False async=True ''' % locals() @@ -90,16 +93,24 @@ class FileDecoder(Processor): width=(int)32, rate=(int)%d""" % (int(self.output_channels), int(self.output_samplerate))) + self.decodebin = self.pipeline.get_by_name('uridecodebin') + self.decodebin.connect("pad-added", self._pad_added_cb) + self.decodebin.connect("no-more-pads", self._no_more_pads_cb) + self.decodebin.connect("unknown-type", self._unknown_type_cb) + + self.conv = self.pipeline.get_by_name('audioconvert') + self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) + self.sink = self.pipeline.get_by_name('sink') self.sink.set_property("caps", sink_caps) self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) self.sink.set_property("drop", False) self.sink.set_property('emit-signals', True) - self.sink.connect("new-buffer", self.on_new_buffer) + self.sink.connect("new-buffer", self._on_new_buffer_cb) self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() - self.bus.connect('message', self.on_message) + self.bus.connect('message', self._on_message_cb) self.queue = Queue.Queue(QUEUE_SIZE) @@ -113,15 +124,71 @@ class FileDecoder(Processor): self.mainloop.run() self.mainloopthread = MainloopThread(self.mainloop) self.mainloopthread.start() + #self.mainloopthread = get_loop_thread() + ##self.mainloop = self.mainloopthread.mainloop self.eod = False self.last_buffer = None + self._pad_found = False + # start pipeline self.pipeline.set_state(gst.STATE_PLAYING) - def on_message(self, bus, message): + if self.read_error: + self.release() + raise self.read_error + + def _pad_added_cb(self, decodebin, pad): + caps = pad.get_caps() + if caps.to_string().startswith('audio'): + if not self.conv.get_pad('sink').is_linked(): + self._pad_found = True + pad.link(self.conv.get_pad('sink')) + + def _no_more_pads_cb(self, decodebin): + self.pipeline.info("no more pads") + if not self._pad_found: + self.read_exc = Exception("no audio stream found") + + def _unknown_type_cb(self, decodebin, pad, caps): + self.pipeline.debug("unknown type : %s" % caps.to_string()) + if not caps.to_string().startswith('audio'): + return + self.read_error = Exception("no known audio stream found") + + def _notify_caps_cb(self, pad, args): + caps = pad.get_negotiated_caps() + if not caps: + pad.info("no negotiated caps available") + return + # the caps are fixed + # We now get the total length of that stream + q = gst.query_new_duration(gst.FORMAT_TIME) + pad.info("sending duration query") + if pad.get_peer().query(q): + format, length = q.parse_duration() + if format == gst.FORMAT_TIME: + pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) + else: + pad.info("got duration : %d [format:%d]" % (length, format)) + else: + length = -1 + gst.warning("duration query failed") + + # We store the caps and length in the proper location + if "audio" in caps.to_string(): + self.input_samplerate = caps[0]["rate"] + self.input_channels = caps[0]["channels"] + self.input_duration = length / 1.e9 + self.input_total_frames = int(self.input_duration * self.input_samplerate) + if "x-raw-float" in caps.to_string(): + self.input_width = caps[0]["width"] + else: + self.input_width = caps[0]["depth"] + + def _on_message_cb(self, bus, message): t = message.type if t == gst.MESSAGE_EOS: self.pipeline.set_state(gst.STATE_NULL) @@ -137,7 +204,7 @@ class FileDecoder(Processor): # msg.parse_tags() pass - def on_new_buffer(self, sink): + def _on_new_buffer_cb(self, sink): from numpy import concatenate buf = sink.emit('pull-buffer') new_array = gst_buffer_to_numpy_array(buf, self.output_channels) @@ -170,12 +237,12 @@ class FileDecoder(Processor): @interfacedoc def nframes(self): - return self.output_nframes + return self.input_total_frames @interfacedoc def release(self): - self.pipeline.set_state(gst.STATE_NULL) - self.mainloopthread.join() + if self.pipeline: self.pipeline.set_state(gst.STATE_NULL) + if self.mainloopthread: self.mainloopthread.join() def __del__(self): self.release() -- 2.39.5