def resolution(self):
return self.input_width
-
class FileDecoder(Decoder):
""" gstreamer-based decoder """
implements(IDecoder)
return self.tags
+
+class LiveDecoder(Decoder):
+ """ gstreamer-based decoder from live source"""
+ implements(IDecoder)
+
+ output_blocksize = 8*1024
+
+ pipeline = None
+ mainloopthread = None
+
+ # IProcessor methods
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "gst_live_dec"
+
+ def __init__(self, timeout=10):
+
+ """
+ Construct a new FileDecoder
+
+ Parameters
+ ----------
+ timeout :
+ timeout in seconds
+
+ """
+
+ super(Decoder, self).__init__()
+ self.timeout = timeout
+ self.uri = None
+ self.uri_start = 0
+ self.uri_duration = self.timeout
+ self.is_segment = False
+
+ def setup(self, channels=None, samplerate=None, blocksize=None):
+
+ self.eod = False
+ self.last_buffer = None
+
+ # a lock to wait wait for gstreamer thread to be ready
+ import threading
+ self.discovered_cond = threading.Condition(threading.Lock())
+ self.discovered = False
+
+ # the output data format we want
+ if blocksize:
+ self.output_blocksize = blocksize
+ if samplerate:
+ self.output_samplerate = int(samplerate)
+ if channels:
+ self.output_channels = int(channels)
+
+ # Create the pipe with standard Gstreamer uridecodbin
+ self.pipe = '''alsasrc
+ ! audioconvert name=audioconvert
+ ! audioresample
+ ! appsink name=sink sync=False async=True
+ '''
+
+ self.pipeline = gst.parse_launch(self.pipe)
+
+ if self.output_channels:
+ caps_channels = int(self.output_channels)
+ else:
+ caps_channels = "[ 1, 2 ]"
+ if self.output_samplerate:
+ caps_samplerate = int(self.output_samplerate)
+ else:
+ caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
+ sink_caps = gst.Caps("""audio/x-raw-float,
+ endianness=(int)1234,
+ channels=(int)%s,
+ width=(int)32,
+ rate=(int)%s""" % (caps_channels, caps_samplerate))
+
+ self.conv = self.pipeline.get_by_name('audioconvert')
+ self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
+
+ self.sink = self.pipeline.get_by_name('sink')
+ self.sink.set_property("caps", sink_caps)
+ self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
+ self.sink.set_property("drop", False)
+ self.sink.set_property('emit-signals', True)
+ self.sink.connect("new-buffer", self._on_new_buffer_cb)
+
+ self.bus = self.pipeline.get_bus()
+ self.bus.add_signal_watch()
+ self.bus.connect('message', self._on_message_cb)
+
+ self.queue = Queue.Queue(QUEUE_SIZE)
+
+ import threading
+
+ class MainloopThread(threading.Thread):
+ def __init__(self, mainloop):
+ threading.Thread.__init__(self)
+ self.mainloop = mainloop
+
+ def run(self):
+ self.mainloop.run()
+ self.mainloop = gobject.MainLoop()
+ self.mainloopthread = MainloopThread(self.mainloop)
+ self.mainloopthread.start()
+ #self.mainloopthread = get_loop_thread()
+ ##self.mainloop = self.mainloopthread.mainloop
+
+ # start pipeline
+ self.pipeline.set_state(gst.STATE_PLAYING)
+
+ self.discovered_cond.acquire()
+ while not self.discovered:
+ #print 'waiting'
+ self.discovered_cond.wait()
+ self.discovered_cond.release()
+
+ if not hasattr(self, 'input_samplerate'):
+ if hasattr(self, 'error_msg'):
+ raise IOError(self.error_msg)
+ else:
+ raise IOError('no known audio stream found')
+
+ def _notify_caps_cb(self, pad, args):
+ self.discovered_cond.acquire()
+
+ caps = pad.get_negotiated_caps()
+ if not caps:
+ pad.info("no negotiated caps available")
+ self.discovered = True
+ self.discovered_cond.notify()
+ self.discovered_cond.release()
+ return
+ # the caps are fixed
+ # We now get the total length of that stream
+ q = gst.query_new_duration(gst.FORMAT_TIME)
+ pad.info("sending duration query")
+ if pad.get_peer().query(q):
+ format, length = q.parse_duration()
+ if format == gst.FORMAT_TIME:
+ pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
+ else:
+ pad.info("got duration : %d [format:%d]" % (length, format))
+ else:
+ length = -1
+ gst.warning("duration query failed")
+
+ # We store the caps and length in the proper location
+ if "audio" in caps.to_string():
+ self.input_samplerate = caps[0]["rate"]
+ if not self.output_samplerate:
+ self.output_samplerate = self.input_samplerate
+ self.input_channels = caps[0]["channels"]
+ if not self.output_channels:
+ self.output_channels = self.input_channels
+ self.input_duration = length / 1.e9
+
+ self.input_totalframes = int(self.input_duration * self.input_samplerate)
+ if "x-raw-float" in caps.to_string():
+ self.input_width = caps[0]["width"]
+ else:
+ self.input_width = caps[0]["depth"]
+
+ self.discovered = True
+ self.discovered_cond.notify()
+ self.discovered_cond.release()
+
+ def _on_message_cb(self, bus, message):
+ t = message.type
+ if t == gst.MESSAGE_EOS:
+ self.queue.put(gst.MESSAGE_EOS)
+ self.pipeline.set_state(gst.STATE_NULL)
+ self.mainloop.quit()
+ elif t == gst.MESSAGE_ERROR:
+ self.pipeline.set_state(gst.STATE_NULL)
+ err, debug = message.parse_error()
+ self.discovered_cond.acquire()
+ self.discovered = True
+ self.mainloop.quit()
+ self.error_msg = "Error: %s" % err, debug
+ self.discovered_cond.notify()
+ self.discovered_cond.release()
+ elif t == gst.MESSAGE_TAG:
+ # TODO
+ # msg.parse_tags()
+ pass
+
+ def _on_new_buffer_cb(self, sink):
+ buf = sink.emit('pull-buffer')
+ new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
+ if buf.timestamp / gst.SECOND > self.timeout:
+ self.queue.put(gst.MESSAGE_EOS)
+ self.pipeline.set_state(gst.STATE_NULL)
+ self.mainloop.quit()
+
+ #print 'processing new buffer', new_array.shape
+ if self.last_buffer is None:
+ self.last_buffer = new_array
+ else:
+ self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
+ while self.last_buffer.shape[0] >= self.output_blocksize:
+ new_block = self.last_buffer[:self.output_blocksize]
+ self.last_buffer = self.last_buffer[self.output_blocksize:]
+ #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
+ self.queue.put([new_block, False])
+
+ @interfacedoc
+ def process(self):
+ buf = self.queue.get()
+ if buf == gst.MESSAGE_EOS:
+ return self.last_buffer, True
+ frames, eod = buf
+ return frames, eod
+
+ @interfacedoc
+ def totalframes(self):
+ if self.input_samplerate == self.output_samplerate:
+ return self.input_totalframes
+ else:
+ ratio = self.output_samplerate / self.input_samplerate
+ return int(self.input_totalframes * ratio)
+
+ @interfacedoc
+ def release(self):
+ if self.stack:
+ self.stack = False
+ self.from_stack = True
+ pass
+
+ ## IDecoder methods
+
+ @interfacedoc
+ def format(self):
+ # TODO check
+ if self.mimetype == 'application/x-id3':
+ self.mimetype = 'audio/mpeg'
+ return self.mimetype
+
+ @interfacedoc
+ def encoding(self):
+ # TODO check
+ return self.mimetype.split('/')[-1]
+
+ @interfacedoc
+ def resolution(self):
+ # TODO check: width or depth?
+ return self.input_width
+
+ @interfacedoc
+ def metadata(self):
+ # TODO check
+ return self.tags
+
+
class ArrayDecoder(Decoder):
""" Decoder taking Numpy array as input"""
implements(IDecoder)