output_channels = 1
was_discovered = False
+ pipeline = None
+ mainloopthread = None
+ read_error = None
# IProcessor methods
if channels: self.output_channels = int(channels)
uri = self.uri
- self.pipe = ''' uridecodebin uri=%(uri)s
- ! audioconvert
+ self.pipe = ''' uridecodebin name=uridecodebin uri=%(uri)s
+ ! audioconvert name=audioconvert
! audioresample
! appsink name=sink sync=False async=True
''' % locals()
width=(int)32,
rate=(int)%d""" % (int(self.output_channels), int(self.output_samplerate)))
+ self.decodebin = self.pipeline.get_by_name('uridecodebin')
+ self.decodebin.connect("pad-added", self._pad_added_cb)
+ self.decodebin.connect("no-more-pads", self._no_more_pads_cb)
+ self.decodebin.connect("unknown-type", self._unknown_type_cb)
+
+ self.conv = self.pipeline.get_by_name('audioconvert')
+ self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
+
self.sink = self.pipeline.get_by_name('sink')
self.sink.set_property("caps", sink_caps)
self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
self.sink.set_property("drop", False)
self.sink.set_property('emit-signals', True)
- self.sink.connect("new-buffer", self.on_new_buffer)
+ self.sink.connect("new-buffer", self._on_new_buffer_cb)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
- self.bus.connect('message', self.on_message)
+ self.bus.connect('message', self._on_message_cb)
self.queue = Queue.Queue(QUEUE_SIZE)
self.mainloop.run()
self.mainloopthread = MainloopThread(self.mainloop)
self.mainloopthread.start()
+ #self.mainloopthread = get_loop_thread()
+ ##self.mainloop = self.mainloopthread.mainloop
self.eod = False
self.last_buffer = None
+ self._pad_found = False
+
# start pipeline
self.pipeline.set_state(gst.STATE_PLAYING)
- def on_message(self, bus, message):
+ if self.read_error:
+ self.release()
+ raise self.read_error
+
+ def _pad_added_cb(self, decodebin, pad):
+ caps = pad.get_caps()
+ if caps.to_string().startswith('audio'):
+ if not self.conv.get_pad('sink').is_linked():
+ self._pad_found = True
+ pad.link(self.conv.get_pad('sink'))
+
+ def _no_more_pads_cb(self, decodebin):
+ self.pipeline.info("no more pads")
+ if not self._pad_found:
+ self.read_exc = Exception("no audio stream found")
+
+ def _unknown_type_cb(self, decodebin, pad, caps):
+ self.pipeline.debug("unknown type : %s" % caps.to_string())
+ if not caps.to_string().startswith('audio'):
+ return
+ self.read_error = Exception("no known audio stream found")
+
+ def _notify_caps_cb(self, pad, args):
+ caps = pad.get_negotiated_caps()
+ if not caps:
+ pad.info("no negotiated caps available")
+ return
+ # the caps are fixed
+ # We now get the total length of that stream
+ q = gst.query_new_duration(gst.FORMAT_TIME)
+ pad.info("sending duration query")
+ if pad.get_peer().query(q):
+ format, length = q.parse_duration()
+ if format == gst.FORMAT_TIME:
+ pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
+ else:
+ pad.info("got duration : %d [format:%d]" % (length, format))
+ else:
+ length = -1
+ gst.warning("duration query failed")
+
+ # We store the caps and length in the proper location
+ if "audio" in caps.to_string():
+ self.input_samplerate = caps[0]["rate"]
+ self.input_channels = caps[0]["channels"]
+ self.input_duration = length / 1.e9
+ self.input_total_frames = int(self.input_duration * self.input_samplerate)
+ if "x-raw-float" in caps.to_string():
+ self.input_width = caps[0]["width"]
+ else:
+ self.input_width = caps[0]["depth"]
+
+ def _on_message_cb(self, bus, message):
t = message.type
if t == gst.MESSAGE_EOS:
self.pipeline.set_state(gst.STATE_NULL)
# msg.parse_tags()
pass
- def on_new_buffer(self, sink):
+ def _on_new_buffer_cb(self, sink):
from numpy import concatenate
buf = sink.emit('pull-buffer')
new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
@interfacedoc
def nframes(self):
- return self.output_nframes
+ return self.input_total_frames
@interfacedoc
def release(self):
- self.pipeline.set_state(gst.STATE_NULL)
- self.mainloopthread.join()
+ if self.pipeline: self.pipeline.set_state(gst.STATE_NULL)
+ if self.mainloopthread: self.mainloopthread.join()
def __del__(self):
self.release()