items = self.processors[1:]
source.setup(channels=channels, samplerate=samplerate,
blocksize=blocksize)
-
+ source.SIG_STOP = False
last = source
# setup/reset processors and configure properties throughout the pipe
# now stream audio data along the pipe
eod = False
+
+ # Set handler for Interruption signal
+ import signal
+
+ def signal_handler(signum, frame):
+ source.stop()
+ signal.signal(signum, signal.SIG_DFL)
+
+ signal.signal(signal.SIGINT, signal_handler)
+
while not eod:
frames, eod = source.process()
for item in items:
frames, eod = item.process(frames, eod)
+ # Restore default handler for Interruption signal
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
# Post-processing
for item in items:
item.post_process()
- # Release processors
+ # Release processors
for item in items:
item.release()
self.processors.remove(item)
def resolution(self):
return self.input_width
+ def stop(self):
+ self.src.send_event(gst.event_new_eos())
+
+
class FileDecoder(Decoder):
""" gstreamer-based decoder """
implements(IDecoder)
if self.is_segment:
# Create the pipe with Gnonlin gnlurisource
- self.pipe = ''' gnlurisource uri={uri}
+ self.pipe = ''' gnlurisource name=src uri={uri}
start=0
duration={uri_duration}
media-start={uri_start}
# convert uri_start and uri_duration to nanoseconds
else:
# Create the pipe with standard Gstreamer uridecodbin
- self.pipe = ''' uridecodebin name=uridecodebin uri={uri}
+ self.pipe = ''' uridecodebin name=src uri={uri}
! audioconvert name=audioconvert
! audioresample
! appsink name=sink sync=False async=True
width=(int)32,
rate=(int)%s""" % (caps_channels, caps_samplerate))
+ self.src = self.pipeline.get_by_name('src')
self.conv = self.pipeline.get_by_name('audioconvert')
self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
return self.tags
-
class LiveDecoder(Decoder):
""" gstreamer-based decoder from live source"""
implements(IDecoder)
def id():
return "gst_live_dec"
- def __init__(self, timeout=10):
+ def __init__(self, num_buffers=-1):
"""
- Construct a new FileDecoder
+ Construct a new LiveDecoder capturing audio from alsasrc
Parameters
----------
- timeout :
- timeout in seconds
+ num_buffers :
+ Number of buffers to output before sending EOS (-1 = unlimited).
+ Allowed values: >= -1
+ Default value: -1
"""
super(Decoder, self).__init__()
- self.timeout = timeout
+ self.num_buffers = num_buffers
self.uri = None
self.uri_start = 0
- self.uri_duration = self.timeout
+ self.uri_duration = None
self.is_segment = False
def setup(self, channels=None, samplerate=None, blocksize=None):
self.output_channels = int(channels)
# Create the pipe with standard Gstreamer uridecodbin
- self.pipe = '''alsasrc
+ self.pipe = '''alsasrc num-buffers=%d name=src
! audioconvert name=audioconvert
! audioresample
! appsink name=sink sync=False async=True
- '''
+ ''' % self.num_buffers
self.pipeline = gst.parse_launch(self.pipe)
width=(int)32,
rate=(int)%s""" % (caps_channels, caps_samplerate))
+ self.src = self.pipeline.get_by_name('src')
self.conv = self.pipeline.get_by_name('audioconvert')
self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
def _on_new_buffer_cb(self, sink):
buf = sink.emit('pull-buffer')
new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
- if buf.timestamp / gst.SECOND > self.timeout:
- self.queue.put(gst.MESSAGE_EOS)
- self.pipeline.set_state(gst.STATE_NULL)
- self.mainloop.quit()
#print 'processing new buffer', new_array.shape
if self.last_buffer is None:
buf = self.queue.get()
if buf == gst.MESSAGE_EOS:
return self.last_buffer, True
+
frames, eod = buf
return frames, eod