]> git.parisson.com Git - timeside.git/commitdiff
add old FileDecoder as FileDecoderOld wihtout the gst adapter
authoryomguy <yomguy@parisson.com>
Thu, 14 Apr 2011 21:58:16 +0000 (21:58 +0000)
committeryomguy <yomguy@parisson.com>
Thu, 14 Apr 2011 21:58:16 +0000 (21:58 +0000)
timeside/decoder/core.py
timeside/tests/testgraphers.py

index c81c4a71ede863e2653ebf58358040664e56902d..697a7b9bdd85ca3f7ed14a4030dc4446718a80d0 100644 (file)
@@ -255,6 +255,145 @@ class FileDecoder(Processor):
         samples.resize([len(samples)/chan, chan])
         return samples
 
+class FileDecoderOld(Processor):
+    """ gstreamer-based decoder """
+    implements(IDecoder)
+
+    # duration ms, before discovery process times out
+    MAX_DISCOVERY_TIME = 3000
+
+    audioformat = None
+    audiochannels = None
+    audiorate = None
+    audionframes = None
+    mimetype = ''
+
+    # IProcessor methods
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "gstreamerdecold"
+
+    def setup(self, channels = None, samplerate = None, nframes = None):
+        # the output data format we want
+        caps = "audio/x-raw-float, width=32"
+        pipeline = gst.parse_launch('''uridecodebin uri="%s"
+            ! audioconvert
+            ! %s
+            ! appsink name=sink sync=False ''' % (self.uri, caps))
+        # store a pointer to appsink in our decoder object
+        self.sink = pipeline.get_by_name('sink')
+        # adjust length of emitted buffers
+        # self.sink.set_property('blocksize', 0x10000)
+        # start pipeline
+        pipeline.set_state(gst.STATE_PLAYING)
+
+    @interfacedoc
+    def channels(self):
+        return  self.audiochannels
+
+    @interfacedoc
+    def samplerate(self):
+        return self.audiorate
+
+    @interfacedoc
+    def nframes(self):
+        return self.audionframes
+
+    @interfacedoc
+    def process(self, frames = None, eod = False):
+        try:
+            buf = self.sink.emit('pull-buffer')
+        except SystemError, e:
+            # should never happen
+            print 'SystemError', e
+            return array([0.]), True
+        if buf == None:
+            return array([0.]), True
+        return self.gst_buffer_to_numpy_array(buf), False
+
+    @interfacedoc
+    def release(self):
+        # nothing to do for now
+        pass
+
+    ## IDecoder methods
+
+    @interfacedoc
+    def __init__(self, uri):
+        # is this a file?
+        import os.path
+        if os.path.exists(uri):
+            # get the absolute path
+            uri = os.path.abspath(uri)
+            # first run the file/uri through the discover pipeline
+            self.discover(uri)
+            # and make a uri of it
+            from urllib import quote
+            self.uri = 'file://'+quote(uri)
+        else:
+            self.uri = uri
+
+    @interfacedoc
+    def format(self):
+        # TODO check
+        if self.mimetype == 'application/x-id3':
+            self.mimetype = 'audio/mpeg'
+        return self.mimetype
+
+    @interfacedoc
+    def encoding(self):
+        # TODO check
+        return self.mimetype.split('/')[-1]
+
+    @interfacedoc
+    def resolution(self):
+        # TODO check: width or depth?
+        return self.audiowidth
+
+    @interfacedoc
+    def metadata(self):
+        # TODO check
+        return self.tags
+
+    ## gst.extend discoverer
+
+    def discover(self, path):
+        """ gstreamer based helper function to get file attributes """
+        from gst.extend import discoverer
+        d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
+        d.connect('discovered', self.discovered)
+        self.mainloop = gobject.MainLoop()
+        d.discover()
+        self.mainloop.run()
+
+    def discovered(self, d, is_media):
+        """ gstreamer based helper executed upon discover() completion """
+        if is_media and d.is_audio:
+            # copy the discoverer attributes to self
+            self.audiorate = d.audiorate
+            self.mimetype= d.mimetype
+            self.audiochannels = d.audiochannels
+            self.audiowidth = d.audiowidth
+            # conversion from time in nanoseconds to frames
+            from math import ceil
+            duration = d.audiorate * d.audiolength * 1.e-9
+            self.audionframes = int (ceil ( duration ) )
+            self.tags = d.tags
+        elif not d.is_audio:
+            print "error, no audio found!"
+        else:
+            print "fail", path
+        self.mainloop.quit()
+
+    def gst_buffer_to_numpy_array(self, buf):
+        """ gstreamer buffer to numpy array conversion """
+        chan = self.audiochannels
+        samples = frombuffer(buf.data, dtype=float32)
+        samples.resize([len(samples)/chan, chan])
+        return samples
+
 
 class SubProcessPipe:
 
index ec24418a8fa7a83a9c1e423768278ee23c24d50f..082156c4526209392770202e8e3efdfb759ac030 100644 (file)
@@ -45,6 +45,31 @@ class TestGraphers(TestCase):
         self.image = "/tmp/test_waveform_sweep_ogg.png"
         self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
     
+    # WAVEFORMS JOYDIV
+    def testWav2WaveformJoyDiv(self):
+        "Test WAV to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+        self.image = "/tmp/test_waveformjoydiv_sweep_wav.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+    def testFlac2WaveformJoyDiv(self):
+        "Test FLAC to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+        self.image = "/tmp/test_waveformjoydiv_sweep_flac.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testMp32WaveformJoyDiv(self):
+        "Test MP3 to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+        self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testOgg2WaveformJoyDiv(self):
+        "Test OGG to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+        self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+        
     # SPECTROGRAMS
     def testWav2Spectrogram(self):
         "Test WAV to Spectrogram"