From 17391ccd54e816073bc9cc0c54be22097f2aa216 Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Mon, 6 Jan 2014 13:11:38 +0100 Subject: [PATCH] LiveDecoder : first prototype for a live decoder. The process can be interrupted by pressing CTRL+C two times --- timeside/core.py | 17 +++++++++++++++-- timeside/decoder/core.py | 34 +++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/timeside/core.py b/timeside/core.py index e817e25..50f1f3b 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -284,7 +284,7 @@ class ProcessPipe(object): items = self.processors[1:] source.setup(channels=channels, samplerate=samplerate, blocksize=blocksize) - + source.SIG_STOP = False last = source # setup/reset processors and configure properties throughout the pipe @@ -298,16 +298,29 @@ class ProcessPipe(object): # now stream audio data along the pipe eod = False + + # Set handler for Interruption signal + import signal + + def signal_handler(signum, frame): + source.stop() + signal.signal(signum, signal.SIG_DFL) + + signal.signal(signal.SIGINT, signal_handler) + while not eod: frames, eod = source.process() for item in items: frames, eod = item.process(frames, eod) + # Restore default handler for Interruption signal + signal.signal(signal.SIGINT, signal.SIG_DFL) + # Post-processing for item in items: item.post_process() - # Release processors + # Release processors for item in items: item.release() self.processors.remove(item) diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index acd8de5..c86e134 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -120,6 +120,10 @@ class Decoder(Processor): def resolution(self): return self.input_width + def stop(self): + self.src.send_event(gst.event_new_eos()) + + class FileDecoder(Decoder): """ gstreamer-based decoder """ implements(IDecoder) @@ -193,7 +197,7 @@ class FileDecoder(Decoder): if self.is_segment: # Create the pipe with Gnonlin gnlurisource - self.pipe = ''' gnlurisource uri={uri} + self.pipe = ''' gnlurisource name=src uri={uri} start=0 duration={uri_duration} media-start={uri_start} @@ -207,7 +211,7 @@ class FileDecoder(Decoder): # convert uri_start and uri_duration to nanoseconds else: # Create the pipe with standard Gstreamer uridecodbin - self.pipe = ''' uridecodebin name=uridecodebin uri={uri} + self.pipe = ''' uridecodebin name=src uri={uri} ! audioconvert name=audioconvert ! audioresample ! appsink name=sink sync=False async=True @@ -229,6 +233,7 @@ class FileDecoder(Decoder): width=(int)32, rate=(int)%s""" % (caps_channels, caps_samplerate)) + self.src = self.pipeline.get_by_name('src') self.conv = self.pipeline.get_by_name('audioconvert') self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) @@ -402,7 +407,6 @@ class FileDecoder(Decoder): return self.tags - class LiveDecoder(Decoder): """ gstreamer-based decoder from live source""" implements(IDecoder) @@ -419,23 +423,25 @@ class LiveDecoder(Decoder): def id(): return "gst_live_dec" - def __init__(self, timeout=10): + def __init__(self, num_buffers=-1): """ - Construct a new FileDecoder + Construct a new LiveDecoder capturing audio from alsasrc Parameters ---------- - timeout : - timeout in seconds + num_buffers : + Number of buffers to output before sending EOS (-1 = unlimited). + Allowed values: >= -1 + Default value: -1 """ super(Decoder, self).__init__() - self.timeout = timeout + self.num_buffers = num_buffers self.uri = None self.uri_start = 0 - self.uri_duration = self.timeout + self.uri_duration = None self.is_segment = False def setup(self, channels=None, samplerate=None, blocksize=None): @@ -457,11 +463,11 @@ class LiveDecoder(Decoder): self.output_channels = int(channels) # Create the pipe with standard Gstreamer uridecodbin - self.pipe = '''alsasrc + self.pipe = '''alsasrc num-buffers=%d name=src ! audioconvert name=audioconvert ! audioresample ! appsink name=sink sync=False async=True - ''' + ''' % self.num_buffers self.pipeline = gst.parse_launch(self.pipe) @@ -479,6 +485,7 @@ class LiveDecoder(Decoder): width=(int)32, rate=(int)%s""" % (caps_channels, caps_samplerate)) + self.src = self.pipeline.get_by_name('src') self.conv = self.pipeline.get_by_name('audioconvert') self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) @@ -592,10 +599,6 @@ class LiveDecoder(Decoder): def _on_new_buffer_cb(self, sink): buf = sink.emit('pull-buffer') new_array = gst_buffer_to_numpy_array(buf, self.output_channels) - if buf.timestamp / gst.SECOND > self.timeout: - self.queue.put(gst.MESSAGE_EOS) - self.pipeline.set_state(gst.STATE_NULL) - self.mainloop.quit() #print 'processing new buffer', new_array.shape if self.last_buffer is None: @@ -613,6 +616,7 @@ class LiveDecoder(Decoder): buf = self.queue.get() if buf == gst.MESSAGE_EOS: return self.last_buffer, True + frames, eod = buf return frames, eod -- 2.39.5