]> git.parisson.com Git - timeside.git/commitdiff
timeside/decoder/core.py: use a queue instead of blocksize to adjust size of blocks...
authorPaul Brossier <piem@piem.org>
Mon, 20 Aug 2012 00:41:06 +0000 (18:41 -0600)
committerPaul Brossier <piem@piem.org>
Mon, 20 Aug 2012 00:41:06 +0000 (18:41 -0600)
tests/testdecoding.py
timeside/decoder/core.py

index e5c318f7a175c3a942012ba636a91034ebcbf072..c14b698f042aaa1d1371aa2e8fd28df6ded2b7b3 100644 (file)
@@ -36,38 +36,42 @@ class TestDecoding(TestCase):
             frames, eod = decoder.process()
             totalframes += frames.shape[0]
             if eod or decoder.eod: break
+            self.assertEquals(frames.shape[0], decoder.nframes() )
+            self.assertEquals(frames.shape[1], decoder.channels() )
 
-        decoder.release()
-
+        ratio = decoder.output_samplerate / float(decoder.input_samplerate)
         if 0:
-          print "input / output_samplerate:",   decoder.input_samplerate, '/', decoder.output_samplerate,
-          ratio = decoder.output_samplerate / float(decoder.input_samplerate)
-          print "ratio:",             ratio
-          print "input / output_channels:",   decoder.input_channels, decoder.output_channels
-          print "input_duration:",     decoder.input_duration
-          print "input_total_frames:", decoder.input_total_frames
-
-        """
+            print "input / output_samplerate:",   decoder.input_samplerate, '/', decoder.output_samplerate,
+            print "ratio:",             ratio
+            print "input / output_channels:",   decoder.input_channels, decoder.output_channels
+            print "input_duration:",     decoder.input_duration
+            print "input_total_frames:", decoder.input_total_frames
+
         # FIXME compute actual number of frames from file
-        if os.path.splitext(self.source)[-1].lower() == '.mp3':
-            self.assertEquals(totalframes, ratio * 355969 )
-        elif os.path.splitext(self.source)[-1].lower() == '.ogg':
-            self.assertEquals(totalframes, ratio * 352833)
-        else:
-            self.assertEquals(totalframes, ratio * 352801)
-        """
+        if ratio == 1:
+            if os.path.splitext(self.source)[-1].lower() == '.mp3':
+                self.assertEquals(totalframes, 353664)
+            elif os.path.splitext(self.source)[-1].lower() == '.ogg':
+                self.assertEquals(totalframes, 352832)
+            else:
+                self.assertEquals(totalframes, 352800)
 
 class TestDecodingStereo(TestDecoding):
 
     def setUp(self):
         self.samplerate, self.channels, self.nframes = None, 2, None
 
-class TestDecodingResampling(TestDecoding):
+class TestDecodingMonoUpsampling(TestDecoding):
+
+    def setUp(self):
+        self.samplerate, self.channels, self.nframes = 48000, None, None
+
+class TestDecodingMonoDownsampling(TestDecoding):
 
     def setUp(self):
         self.samplerate, self.channels, self.nframes = 16000, None, None
 
-class TestDecodingStereoResampling(TestDecoding):
+class TestDecodingStereoDownsampling(TestDecoding):
 
     def setUp(self):
         self.samplerate, self.channels, self.nframes = 32000, 2, None
@@ -77,5 +81,10 @@ class TestDecodingStereoUpsampling(TestDecoding):
     def setUp(self):
         self.samplerate, self.channels, self.nframes = 96000, 2, None
 
+class TestDecodingShortframes(TestDecoding):
+
+    def setUp(self):
+        self.samplerate, self.channels, self.nframes = None, None, 256
+
 if __name__ == '__main__':
     unittest.main(testRunner=TestRunner())
index 549e531f12dadf9be8341e61ba21e2c9499f7d6b..70863d79480f586e8dc5d927adbc2b95b2a9a5a0 100644 (file)
@@ -28,7 +28,10 @@ from timeside.core import Processor, implements, interfacedoc
 from timeside.api import IDecoder
 from timeside.encoder.gstutils import *
 
+import Queue
+
 GST_APPSINK_MAX_BUFFERS = 10
+QUEUE_SIZE = 10
 
 class FileDecoder(Processor):
     """ gstreamer-based decoder """
@@ -73,11 +76,11 @@ class FileDecoder(Processor):
         if samplerate: self.output_samplerate = int(samplerate)
         if channels:   self.output_channels   = int(channels)
         uri = self.uri
-        blocksize = self.output_nframes
+
         self.pipe = ''' uridecodebin uri=%(uri)s
                   ! audioconvert
                   ! audioresample
-                  ! appsink name=sink blocksize=%(blocksize)s sync=False async=True
+                  ! appsink name=sink sync=False async=True
                   ''' % locals()
         self.pipeline = gst.parse_launch(self.pipe)
 
@@ -90,13 +93,31 @@ class FileDecoder(Processor):
         self.sink = self.pipeline.get_by_name('sink')
         self.sink.set_property("caps", sink_caps)
         self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
-        self.sink.set_property('emit-signals', True)
         self.sink.set_property("drop", False)
+        self.sink.set_property('emit-signals', True)
+        self.sink.connect("new-buffer", self.on_new_buffer)
 
         self.bus = self.pipeline.get_bus()
         self.bus.add_signal_watch()
         self.bus.connect('message', self.on_message)
 
+        self.queue = Queue.Queue(QUEUE_SIZE)
+
+        import threading
+        class MainloopThread(threading.Thread):
+            def __init__(self, mainloop):
+                threading.Thread.__init__(self)
+                self.mainloop = mainloop
+
+            def run(self):
+                self.mainloop.run()
+        self.mainloopthread = MainloopThread(self.mainloop)
+        self.mainloopthread.start()
+
+        self.eod = False
+
+        self.last_buffer = None
+
         # start pipeline
         self.pipeline.set_state(gst.STATE_PLAYING)
 
@@ -104,28 +125,40 @@ class FileDecoder(Processor):
         t = message.type
         if t == gst.MESSAGE_EOS:
             self.pipeline.set_state(gst.STATE_NULL)
+            self.queue.put(gst.MESSAGE_EOS)
+            self.mainloop.quit()
         elif t == gst.MESSAGE_ERROR:
             self.pipeline.set_state(gst.STATE_NULL)
             err, debug = message.parse_error()
+            self.mainloop.quit()
             print "Error: %s" % err, debug
         elif t == gst.MESSAGE_TAG:
             # TODO
             # msg.parse_tags()
             pass
 
+    def on_new_buffer(self, sink):
+        from numpy import concatenate
+        buf = sink.emit('pull-buffer')
+        new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
+        #print 'processing new buffer', new_array.shape
+        if self.last_buffer is None:
+            self.last_buffer = new_array
+        else:
+            self.last_buffer = concatenate((self.last_buffer, new_array), axis=0)
+        while self.last_buffer.shape[0] >= self.output_nframes:
+            new_block = self.last_buffer[:self.output_nframes]
+            self.last_buffer = self.last_buffer[self.output_nframes:]
+            #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
+            self.queue.put( [new_block, False ] )
+
     @interfacedoc
     def process(self, frames = None, eod = False):
-        self.eod = eod
-        try:
-            buf = self.sink.emit('pull-buffer')
-        except SystemError, e:
-            # should never happen
-            print 'SystemError', e
-            return array([0.]), True
-        if buf == None:
-            return array([0.]), True
-        #print 'found something'
-        return gst_buffer_to_numpy_array(buf, self.output_channels), False #self.eod
+        buf = self.queue.get()
+        if buf == gst.MESSAGE_EOS:
+            return self.last_buffer, True
+        frames, eod = buf
+        return frames, eod
 
     @interfacedoc
     def channels(self):
@@ -141,7 +174,8 @@ class FileDecoder(Processor):
 
     @interfacedoc
     def release(self):
-        pass
+        self.pipeline.set_state(gst.STATE_NULL)
+        self.mainloopthread.join()
 
     def __del__(self):
         self.release()