From: Paul Brossier Date: Mon, 20 Aug 2012 00:41:06 +0000 (-0600) Subject: timeside/decoder/core.py: use a queue instead of blocksize to adjust size of blocks... X-Git-Tag: 0.3.4-noleekmp3~10 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=0f564be647fa7d5a78adc96330b8ba438a62443e;p=timeside.git timeside/decoder/core.py: use a queue instead of blocksize to adjust size of blocks, make sure this works for mono and stereo, without or with resampling --- diff --git a/tests/testdecoding.py b/tests/testdecoding.py index e5c318f..c14b698 100644 --- a/tests/testdecoding.py +++ b/tests/testdecoding.py @@ -36,38 +36,42 @@ class TestDecoding(TestCase): frames, eod = decoder.process() totalframes += frames.shape[0] if eod or decoder.eod: break + self.assertEquals(frames.shape[0], decoder.nframes() ) + self.assertEquals(frames.shape[1], decoder.channels() ) - decoder.release() - + ratio = decoder.output_samplerate / float(decoder.input_samplerate) if 0: - print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate, - ratio = decoder.output_samplerate / float(decoder.input_samplerate) - print "ratio:", ratio - print "input / output_channels:", decoder.input_channels, decoder.output_channels - print "input_duration:", decoder.input_duration - print "input_total_frames:", decoder.input_total_frames - - """ + print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate, + print "ratio:", ratio + print "input / output_channels:", decoder.input_channels, decoder.output_channels + print "input_duration:", decoder.input_duration + print "input_total_frames:", decoder.input_total_frames + # FIXME compute actual number of frames from file - if os.path.splitext(self.source)[-1].lower() == '.mp3': - self.assertEquals(totalframes, ratio * 355969 ) - elif os.path.splitext(self.source)[-1].lower() == '.ogg': - self.assertEquals(totalframes, ratio * 352833) - else: - self.assertEquals(totalframes, ratio * 352801) - """ + if ratio == 1: + if os.path.splitext(self.source)[-1].lower() == '.mp3': + self.assertEquals(totalframes, 353664) + elif os.path.splitext(self.source)[-1].lower() == '.ogg': + self.assertEquals(totalframes, 352832) + else: + self.assertEquals(totalframes, 352800) class TestDecodingStereo(TestDecoding): def setUp(self): self.samplerate, self.channels, self.nframes = None, 2, None -class TestDecodingResampling(TestDecoding): +class TestDecodingMonoUpsampling(TestDecoding): + + def setUp(self): + self.samplerate, self.channels, self.nframes = 48000, None, None + +class TestDecodingMonoDownsampling(TestDecoding): def setUp(self): self.samplerate, self.channels, self.nframes = 16000, None, None -class TestDecodingStereoResampling(TestDecoding): +class TestDecodingStereoDownsampling(TestDecoding): def setUp(self): self.samplerate, self.channels, self.nframes = 32000, 2, None @@ -77,5 +81,10 @@ class TestDecodingStereoUpsampling(TestDecoding): def setUp(self): self.samplerate, self.channels, self.nframes = 96000, 2, None +class TestDecodingShortframes(TestDecoding): + + def setUp(self): + self.samplerate, self.channels, self.nframes = None, None, 256 + if __name__ == '__main__': unittest.main(testRunner=TestRunner()) diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 549e531..70863d7 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -28,7 +28,10 @@ from timeside.core import Processor, implements, interfacedoc from timeside.api import IDecoder from timeside.encoder.gstutils import * +import Queue + GST_APPSINK_MAX_BUFFERS = 10 +QUEUE_SIZE = 10 class FileDecoder(Processor): """ gstreamer-based decoder """ @@ -73,11 +76,11 @@ class FileDecoder(Processor): if samplerate: self.output_samplerate = int(samplerate) if channels: self.output_channels = int(channels) uri = self.uri - blocksize = self.output_nframes + self.pipe = ''' uridecodebin uri=%(uri)s ! audioconvert ! audioresample - ! appsink name=sink blocksize=%(blocksize)s sync=False async=True + ! appsink name=sink sync=False async=True ''' % locals() self.pipeline = gst.parse_launch(self.pipe) @@ -90,13 +93,31 @@ class FileDecoder(Processor): self.sink = self.pipeline.get_by_name('sink') self.sink.set_property("caps", sink_caps) self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) - self.sink.set_property('emit-signals', True) self.sink.set_property("drop", False) + self.sink.set_property('emit-signals', True) + self.sink.connect("new-buffer", self.on_new_buffer) self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() self.bus.connect('message', self.on_message) + self.queue = Queue.Queue(QUEUE_SIZE) + + import threading + class MainloopThread(threading.Thread): + def __init__(self, mainloop): + threading.Thread.__init__(self) + self.mainloop = mainloop + + def run(self): + self.mainloop.run() + self.mainloopthread = MainloopThread(self.mainloop) + self.mainloopthread.start() + + self.eod = False + + self.last_buffer = None + # start pipeline self.pipeline.set_state(gst.STATE_PLAYING) @@ -104,28 +125,40 @@ class FileDecoder(Processor): t = message.type if t == gst.MESSAGE_EOS: self.pipeline.set_state(gst.STATE_NULL) + self.queue.put(gst.MESSAGE_EOS) + self.mainloop.quit() elif t == gst.MESSAGE_ERROR: self.pipeline.set_state(gst.STATE_NULL) err, debug = message.parse_error() + self.mainloop.quit() print "Error: %s" % err, debug elif t == gst.MESSAGE_TAG: # TODO # msg.parse_tags() pass + def on_new_buffer(self, sink): + from numpy import concatenate + buf = sink.emit('pull-buffer') + new_array = gst_buffer_to_numpy_array(buf, self.output_channels) + #print 'processing new buffer', new_array.shape + if self.last_buffer is None: + self.last_buffer = new_array + else: + self.last_buffer = concatenate((self.last_buffer, new_array), axis=0) + while self.last_buffer.shape[0] >= self.output_nframes: + new_block = self.last_buffer[:self.output_nframes] + self.last_buffer = self.last_buffer[self.output_nframes:] + #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape + self.queue.put( [new_block, False ] ) + @interfacedoc def process(self, frames = None, eod = False): - self.eod = eod - try: - buf = self.sink.emit('pull-buffer') - except SystemError, e: - # should never happen - print 'SystemError', e - return array([0.]), True - if buf == None: - return array([0.]), True - #print 'found something' - return gst_buffer_to_numpy_array(buf, self.output_channels), False #self.eod + buf = self.queue.get() + if buf == gst.MESSAGE_EOS: + return self.last_buffer, True + frames, eod = buf + return frames, eod @interfacedoc def channels(self): @@ -141,7 +174,8 @@ class FileDecoder(Processor): @interfacedoc def release(self): - pass + self.pipeline.set_state(gst.STATE_NULL) + self.mainloopthread.join() def __del__(self): self.release()