frames, eod = decoder.process()
totalframes += frames.shape[0]
if eod or decoder.eod: break
+ self.assertEquals(frames.shape[0], decoder.nframes() )
+ self.assertEquals(frames.shape[1], decoder.channels() )
- decoder.release()
-
+ ratio = decoder.output_samplerate / float(decoder.input_samplerate)
if 0:
- print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate,
- ratio = decoder.output_samplerate / float(decoder.input_samplerate)
- print "ratio:", ratio
- print "input / output_channels:", decoder.input_channels, decoder.output_channels
- print "input_duration:", decoder.input_duration
- print "input_total_frames:", decoder.input_total_frames
-
- """
+ print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate,
+ print "ratio:", ratio
+ print "input / output_channels:", decoder.input_channels, decoder.output_channels
+ print "input_duration:", decoder.input_duration
+ print "input_total_frames:", decoder.input_total_frames
+
# FIXME compute actual number of frames from file
- if os.path.splitext(self.source)[-1].lower() == '.mp3':
- self.assertEquals(totalframes, ratio * 355969 )
- elif os.path.splitext(self.source)[-1].lower() == '.ogg':
- self.assertEquals(totalframes, ratio * 352833)
- else:
- self.assertEquals(totalframes, ratio * 352801)
- """
+ if ratio == 1:
+ if os.path.splitext(self.source)[-1].lower() == '.mp3':
+ self.assertEquals(totalframes, 353664)
+ elif os.path.splitext(self.source)[-1].lower() == '.ogg':
+ self.assertEquals(totalframes, 352832)
+ else:
+ self.assertEquals(totalframes, 352800)
class TestDecodingStereo(TestDecoding):
def setUp(self):
self.samplerate, self.channels, self.nframes = None, 2, None
-class TestDecodingResampling(TestDecoding):
+class TestDecodingMonoUpsampling(TestDecoding):
+
+ def setUp(self):
+ self.samplerate, self.channels, self.nframes = 48000, None, None
+
+class TestDecodingMonoDownsampling(TestDecoding):
def setUp(self):
self.samplerate, self.channels, self.nframes = 16000, None, None
-class TestDecodingStereoResampling(TestDecoding):
+class TestDecodingStereoDownsampling(TestDecoding):
def setUp(self):
self.samplerate, self.channels, self.nframes = 32000, 2, None
def setUp(self):
self.samplerate, self.channels, self.nframes = 96000, 2, None
+class TestDecodingShortframes(TestDecoding):
+
+ def setUp(self):
+ self.samplerate, self.channels, self.nframes = None, None, 256
+
if __name__ == '__main__':
unittest.main(testRunner=TestRunner())
from timeside.api import IDecoder
from timeside.encoder.gstutils import *
+import Queue
+
GST_APPSINK_MAX_BUFFERS = 10
+QUEUE_SIZE = 10
class FileDecoder(Processor):
""" gstreamer-based decoder """
if samplerate: self.output_samplerate = int(samplerate)
if channels: self.output_channels = int(channels)
uri = self.uri
- blocksize = self.output_nframes
+
self.pipe = ''' uridecodebin uri=%(uri)s
! audioconvert
! audioresample
- ! appsink name=sink blocksize=%(blocksize)s sync=False async=True
+ ! appsink name=sink sync=False async=True
''' % locals()
self.pipeline = gst.parse_launch(self.pipe)
self.sink = self.pipeline.get_by_name('sink')
self.sink.set_property("caps", sink_caps)
self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
- self.sink.set_property('emit-signals', True)
self.sink.set_property("drop", False)
+ self.sink.set_property('emit-signals', True)
+ self.sink.connect("new-buffer", self.on_new_buffer)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message', self.on_message)
+ self.queue = Queue.Queue(QUEUE_SIZE)
+
+ import threading
+ class MainloopThread(threading.Thread):
+ def __init__(self, mainloop):
+ threading.Thread.__init__(self)
+ self.mainloop = mainloop
+
+ def run(self):
+ self.mainloop.run()
+ self.mainloopthread = MainloopThread(self.mainloop)
+ self.mainloopthread.start()
+
+ self.eod = False
+
+ self.last_buffer = None
+
# start pipeline
self.pipeline.set_state(gst.STATE_PLAYING)
t = message.type
if t == gst.MESSAGE_EOS:
self.pipeline.set_state(gst.STATE_NULL)
+ self.queue.put(gst.MESSAGE_EOS)
+ self.mainloop.quit()
elif t == gst.MESSAGE_ERROR:
self.pipeline.set_state(gst.STATE_NULL)
err, debug = message.parse_error()
+ self.mainloop.quit()
print "Error: %s" % err, debug
elif t == gst.MESSAGE_TAG:
# TODO
# msg.parse_tags()
pass
+ def on_new_buffer(self, sink):
+ from numpy import concatenate
+ buf = sink.emit('pull-buffer')
+ new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
+ #print 'processing new buffer', new_array.shape
+ if self.last_buffer is None:
+ self.last_buffer = new_array
+ else:
+ self.last_buffer = concatenate((self.last_buffer, new_array), axis=0)
+ while self.last_buffer.shape[0] >= self.output_nframes:
+ new_block = self.last_buffer[:self.output_nframes]
+ self.last_buffer = self.last_buffer[self.output_nframes:]
+ #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
+ self.queue.put( [new_block, False ] )
+
@interfacedoc
def process(self, frames = None, eod = False):
- self.eod = eod
- try:
- buf = self.sink.emit('pull-buffer')
- except SystemError, e:
- # should never happen
- print 'SystemError', e
- return array([0.]), True
- if buf == None:
- return array([0.]), True
- #print 'found something'
- return gst_buffer_to_numpy_array(buf, self.output_channels), False #self.eod
+ buf = self.queue.get()
+ if buf == gst.MESSAGE_EOS:
+ return self.last_buffer, True
+ frames, eod = buf
+ return frames, eod
@interfacedoc
def channels(self):
@interfacedoc
def release(self):
- pass
+ self.pipeline.set_state(gst.STATE_NULL)
+ self.mainloopthread.join()
def __del__(self):
self.release()