From 73547ecde9a0a60bda9d95abf0ccc45c52612afa Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Thu, 26 Dec 2013 22:26:28 +0100 Subject: [PATCH] Decoder : Add a live decoder using Gstreamer alsasrc --- timeside/decoder/core.py | 255 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 254 insertions(+), 1 deletion(-) diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 5b193c2..acd8de5 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -120,7 +120,6 @@ class Decoder(Processor): def resolution(self): return self.input_width - class FileDecoder(Decoder): """ gstreamer-based decoder """ implements(IDecoder) @@ -403,6 +402,260 @@ class FileDecoder(Decoder): return self.tags + +class LiveDecoder(Decoder): + """ gstreamer-based decoder from live source""" + implements(IDecoder) + + output_blocksize = 8*1024 + + pipeline = None + mainloopthread = None + + # IProcessor methods + + @staticmethod + @interfacedoc + def id(): + return "gst_live_dec" + + def __init__(self, timeout=10): + + """ + Construct a new FileDecoder + + Parameters + ---------- + timeout : + timeout in seconds + + """ + + super(Decoder, self).__init__() + self.timeout = timeout + self.uri = None + self.uri_start = 0 + self.uri_duration = self.timeout + self.is_segment = False + + def setup(self, channels=None, samplerate=None, blocksize=None): + + self.eod = False + self.last_buffer = None + + # a lock to wait wait for gstreamer thread to be ready + import threading + self.discovered_cond = threading.Condition(threading.Lock()) + self.discovered = False + + # the output data format we want + if blocksize: + self.output_blocksize = blocksize + if samplerate: + self.output_samplerate = int(samplerate) + if channels: + self.output_channels = int(channels) + + # Create the pipe with standard Gstreamer uridecodbin + self.pipe = '''alsasrc + ! audioconvert name=audioconvert + ! audioresample + ! appsink name=sink sync=False async=True + ''' + + self.pipeline = gst.parse_launch(self.pipe) + + if self.output_channels: + caps_channels = int(self.output_channels) + else: + caps_channels = "[ 1, 2 ]" + if self.output_samplerate: + caps_samplerate = int(self.output_samplerate) + else: + caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" + sink_caps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%s""" % (caps_channels, caps_samplerate)) + + self.conv = self.pipeline.get_by_name('audioconvert') + self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) + + self.sink = self.pipeline.get_by_name('sink') + self.sink.set_property("caps", sink_caps) + self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) + self.sink.set_property("drop", False) + self.sink.set_property('emit-signals', True) + self.sink.connect("new-buffer", self._on_new_buffer_cb) + + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect('message', self._on_message_cb) + + self.queue = Queue.Queue(QUEUE_SIZE) + + import threading + + class MainloopThread(threading.Thread): + def __init__(self, mainloop): + threading.Thread.__init__(self) + self.mainloop = mainloop + + def run(self): + self.mainloop.run() + self.mainloop = gobject.MainLoop() + self.mainloopthread = MainloopThread(self.mainloop) + self.mainloopthread.start() + #self.mainloopthread = get_loop_thread() + ##self.mainloop = self.mainloopthread.mainloop + + # start pipeline + self.pipeline.set_state(gst.STATE_PLAYING) + + self.discovered_cond.acquire() + while not self.discovered: + #print 'waiting' + self.discovered_cond.wait() + self.discovered_cond.release() + + if not hasattr(self, 'input_samplerate'): + if hasattr(self, 'error_msg'): + raise IOError(self.error_msg) + else: + raise IOError('no known audio stream found') + + def _notify_caps_cb(self, pad, args): + self.discovered_cond.acquire() + + caps = pad.get_negotiated_caps() + if not caps: + pad.info("no negotiated caps available") + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + return + # the caps are fixed + # We now get the total length of that stream + q = gst.query_new_duration(gst.FORMAT_TIME) + pad.info("sending duration query") + if pad.get_peer().query(q): + format, length = q.parse_duration() + if format == gst.FORMAT_TIME: + pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) + else: + pad.info("got duration : %d [format:%d]" % (length, format)) + else: + length = -1 + gst.warning("duration query failed") + + # We store the caps and length in the proper location + if "audio" in caps.to_string(): + self.input_samplerate = caps[0]["rate"] + if not self.output_samplerate: + self.output_samplerate = self.input_samplerate + self.input_channels = caps[0]["channels"] + if not self.output_channels: + self.output_channels = self.input_channels + self.input_duration = length / 1.e9 + + self.input_totalframes = int(self.input_duration * self.input_samplerate) + if "x-raw-float" in caps.to_string(): + self.input_width = caps[0]["width"] + else: + self.input_width = caps[0]["depth"] + + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + + def _on_message_cb(self, bus, message): + t = message.type + if t == gst.MESSAGE_EOS: + self.queue.put(gst.MESSAGE_EOS) + self.pipeline.set_state(gst.STATE_NULL) + self.mainloop.quit() + elif t == gst.MESSAGE_ERROR: + self.pipeline.set_state(gst.STATE_NULL) + err, debug = message.parse_error() + self.discovered_cond.acquire() + self.discovered = True + self.mainloop.quit() + self.error_msg = "Error: %s" % err, debug + self.discovered_cond.notify() + self.discovered_cond.release() + elif t == gst.MESSAGE_TAG: + # TODO + # msg.parse_tags() + pass + + def _on_new_buffer_cb(self, sink): + buf = sink.emit('pull-buffer') + new_array = gst_buffer_to_numpy_array(buf, self.output_channels) + if buf.timestamp / gst.SECOND > self.timeout: + self.queue.put(gst.MESSAGE_EOS) + self.pipeline.set_state(gst.STATE_NULL) + self.mainloop.quit() + + #print 'processing new buffer', new_array.shape + if self.last_buffer is None: + self.last_buffer = new_array + else: + self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) + while self.last_buffer.shape[0] >= self.output_blocksize: + new_block = self.last_buffer[:self.output_blocksize] + self.last_buffer = self.last_buffer[self.output_blocksize:] + #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape + self.queue.put([new_block, False]) + + @interfacedoc + def process(self): + buf = self.queue.get() + if buf == gst.MESSAGE_EOS: + return self.last_buffer, True + frames, eod = buf + return frames, eod + + @interfacedoc + def totalframes(self): + if self.input_samplerate == self.output_samplerate: + return self.input_totalframes + else: + ratio = self.output_samplerate / self.input_samplerate + return int(self.input_totalframes * ratio) + + @interfacedoc + def release(self): + if self.stack: + self.stack = False + self.from_stack = True + pass + + ## IDecoder methods + + @interfacedoc + def format(self): + # TODO check + if self.mimetype == 'application/x-id3': + self.mimetype = 'audio/mpeg' + return self.mimetype + + @interfacedoc + def encoding(self): + # TODO check + return self.mimetype.split('/')[-1] + + @interfacedoc + def resolution(self): + # TODO check: width or depth? + return self.input_width + + @interfacedoc + def metadata(self): + # TODO check + return self.tags + + class ArrayDecoder(Decoder): """ Decoder taking Numpy array as input""" implements(IDecoder) -- 2.39.5