]> git.parisson.com Git - timeside.git/commitdiff
moved tests outside timeside module
authorPaul Brossier <piem@piem.org>
Tue, 22 Nov 2011 03:30:55 +0000 (19:30 -0800)
committerPaul Brossier <piem@piem.org>
Tue, 22 Nov 2011 03:30:55 +0000 (19:30 -0800)
61 files changed:
tests/alltests.py [new file with mode: 0644]
tests/api/__init__.py [new file with mode: 0644]
tests/api/examples.py [new file with mode: 0644]
tests/api/gstreamer.py [new file with mode: 0644]
tests/api/test_analyzer.py [new file with mode: 0644]
tests/api/test_analyzer3.py [new file with mode: 0644]
tests/api/test_flac.py [new file with mode: 0644]
tests/api/test_lolevel.py [new file with mode: 0644]
tests/api/test_lolevel_streaming.py [new file with mode: 0644]
tests/api/test_lolevel_streaming_bad.py [new file with mode: 0644]
tests/api/test_lolevel_streaming_vorbis.py [new file with mode: 0644]
tests/api/test_mp3.py [new file with mode: 0644]
tests/api/test_pipe.py [new file with mode: 0644]
tests/api/test_pipe_spectrogram.py [new file with mode: 0644]
tests/api/test_pipe_waveform.py [new file with mode: 0644]
tests/api/test_vorbis.py [new file with mode: 0644]
tests/api/test_wav.py [new file with mode: 0644]
tests/listprocessors.py [new file with mode: 0644]
tests/samples/guitar.wav [new file with mode: 0644]
tests/samples/sweep.flac [new file with mode: 0644]
tests/samples/sweep.mp3 [new file with mode: 0644]
tests/samples/sweep.ogg [new file with mode: 0644]
tests/samples/sweep.wav [new file with mode: 0644]
tests/samples/sweep_source.wav [new file with mode: 0644]
tests/testcomponent.py [new file with mode: 0644]
tests/testdecoding.py [new file with mode: 0644]
tests/testgraphers.py [new file with mode: 0644]
tests/testinputadapter.py [new file with mode: 0644]
tests/testtranscoding.py [new file with mode: 0644]
tests/unit_timeside.py [new file with mode: 0644]
timeside/__init__.py
timeside/tests/__init__.py [deleted file]
timeside/tests/alltests.py [deleted file]
timeside/tests/api/__init__.py [deleted file]
timeside/tests/api/examples.py [deleted file]
timeside/tests/api/gstreamer.py [deleted file]
timeside/tests/api/test_analyzer.py [deleted file]
timeside/tests/api/test_analyzer3.py [deleted file]
timeside/tests/api/test_flac.py [deleted file]
timeside/tests/api/test_lolevel.py [deleted file]
timeside/tests/api/test_lolevel_streaming.py [deleted file]
timeside/tests/api/test_lolevel_streaming_bad.py [deleted file]
timeside/tests/api/test_lolevel_streaming_vorbis.py [deleted file]
timeside/tests/api/test_mp3.py [deleted file]
timeside/tests/api/test_pipe.py [deleted file]
timeside/tests/api/test_pipe_spectrogram.py [deleted file]
timeside/tests/api/test_pipe_waveform.py [deleted file]
timeside/tests/api/test_vorbis.py [deleted file]
timeside/tests/api/test_wav.py [deleted file]
timeside/tests/listprocessors.py [deleted file]
timeside/tests/samples/guitar.wav [deleted file]
timeside/tests/samples/sweep.flac [deleted file]
timeside/tests/samples/sweep.mp3 [deleted file]
timeside/tests/samples/sweep.ogg [deleted file]
timeside/tests/samples/sweep.wav [deleted file]
timeside/tests/samples/sweep_source.wav [deleted file]
timeside/tests/testcomponent.py [deleted file]
timeside/tests/testdecoding.py [deleted file]
timeside/tests/testgraphers.py [deleted file]
timeside/tests/testinputadapter.py [deleted file]
timeside/tests/testtranscoding.py [deleted file]

diff --git a/tests/alltests.py b/tests/alltests.py
new file mode 100644 (file)
index 0000000..4543878
--- /dev/null
@@ -0,0 +1,8 @@
+from testcomponent import *
+from testinputadapter import *
+from testgraphers import *
+from testdecoding import *
+from testtranscoding import *
+from unit_timeside import *
+
+unittest.main(testRunner=TestRunner())
diff --git a/tests/api/__init__.py b/tests/api/__init__.py
new file mode 100644 (file)
index 0000000..40a96af
--- /dev/null
@@ -0,0 +1 @@
+# -*- coding: utf-8 -*-
diff --git a/tests/api/examples.py b/tests/api/examples.py
new file mode 100644 (file)
index 0000000..747c197
--- /dev/null
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+
+from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.api import *
+import numpy
+
+
+class Gain(Processor):
+    implements(IEffect)
+
+    @interfacedoc
+    def __init__(self, gain=1.0):
+        self.gain = gain
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "test_gain"
+
+    @staticmethod
+    @interfacedoc
+    def name():
+        return "Gain test effect"
+
+    def process(self, frames, eod=False):
+        return numpy.multiply(frames, self.gain), eod
+
+
+class FixedInputProcessor(Processor):
+    """Processor which does absolutely nothing except illustrating the use
+    of the FixedInputSizeAdapter. It also tests things a bit."""
+
+    implements(IProcessor)
+
+    BUFFER_SIZE = 1024
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "test_fixed"
+
+    @interfacedoc
+    def setup(self, channels, samplerate, nframes):
+        super(FixedInputProcessor, self).setup(channels, samplerate, nframes)
+        self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True)
+
+    @interfacedoc
+    def process(self, frames, eod=False):
+        for buffer, end in self.adapter.process(frames, eod):
+            # Test that the adapter is actually doing the job:
+            if len(buffer) != self.BUFFER_SIZE:
+                raise Exception("Bad buffer size from adapter")
+
+        return frames, eod
+
+
+
+
+
diff --git a/tests/api/gstreamer.py b/tests/api/gstreamer.py
new file mode 100644 (file)
index 0000000..897ac42
--- /dev/null
@@ -0,0 +1,238 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2007-2009 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import Processor, implements, interfacedoc
+from timeside.api import IDecoder, IEncoder
+from numpy import array, frombuffer, getbuffer, float32
+
+import pygst
+pygst.require('0.10')
+import gst
+import gobject
+gobject.threads_init ()
+
+class FileDecoder(Processor):
+    """ gstreamer-based decoder """
+    implements(IDecoder)
+
+    # duration ms, before discovery process times out
+    MAX_DISCOVERY_TIME = 3000
+
+    audioformat = None
+    audiochannels = None
+    audiorate = None
+    audionframes = None
+    mimetype = ''
+
+    # IProcessor methods
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "test_gstreamerdec"
+
+    def setup(self, channels = None, samplerate = None, nframes = None):
+        # the output data format we want
+        caps = "audio/x-raw-float, width=32"
+        pipeline = gst.parse_launch('''uridecodebin uri=%s
+            ! audioconvert
+            ! %s
+            ! appsink name=sink sync=False ''' % (self.uri, caps))
+        # store a pointer to appsink in our decoder object 
+        self.sink = pipeline.get_by_name('sink')
+        # adjust length of emitted buffers
+        # self.sink.set_property('blocksize', 0x10000)
+        # start pipeline
+        pipeline.set_state(gst.STATE_PLAYING)
+
+    @interfacedoc
+    def channels(self):
+        return  self.audiochannels
+
+    @interfacedoc
+    def samplerate(self):
+        return self.audiorate 
+
+    @interfacedoc
+    def nframes(self):
+        return self.audionframes
+
+    @interfacedoc
+    def process(self, frames = None, eod = False):
+        try:
+            buf = self.sink.emit('pull-buffer')
+        except SystemError, e:
+            # should never happen
+            print 'SystemError', e
+            return array([0.]), True
+        if buf == None:
+            return array([0.]), True
+        return self.gst_buffer_to_numpy_array(buf), False
+
+    @interfacedoc
+    def release(self):
+        # nothing to do for now 
+        pass
+
+    ## IDecoder methods
+
+    @interfacedoc
+    def __init__(self, uri):
+
+        # is this a file? 
+        import os.path
+        if os.path.exists(uri):
+            # get the absolute path
+            uri = os.path.abspath(uri)
+            # first run the file/uri through the discover pipeline
+            self.discover(uri)
+            # and make a uri of it
+            from urllib import quote
+            self.uri = 'file://'+quote(uri)
+
+    @interfacedoc
+    def format(self):
+        # TODO check
+        return self.mimetype
+
+    @interfacedoc
+    def encoding(self):
+        # TODO check
+        return self.mimetype.split('/')[-1]
+
+    @interfacedoc
+    def resolution(self):
+        # TODO check: width or depth?
+        return self.audiowidth 
+
+    @interfacedoc
+    def metadata(self):
+        # TODO check
+        return self.tags
+
+    ## gst.extend discoverer
+
+    def discover(self, path):
+        """ gstreamer based helper function to get file attributes """
+        from gst.extend import discoverer
+        d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
+        d.connect('discovered', self.discovered)
+        self.mainloop = gobject.MainLoop()
+        d.discover()
+        self.mainloop.run()
+
+    def discovered(self, d, is_media):
+        """ gstreamer based helper executed upon discover() completion """
+        if is_media and d.is_audio:
+            # copy the discoverer attributes to self
+            self.audiorate = d.audiorate
+            self.mimetype= d.mimetype
+            self.audiochannels = d.audiochannels
+            self.audiowidth = d.audiowidth
+            # conversion from time in nanoseconds to frames 
+            from math import ceil
+            duration = d.audiorate * d.audiolength * 1.e-9
+            self.audionframes = int (ceil ( duration ) )
+            self.tags = d.tags
+        elif not d.is_audio:
+            print "error, no audio found!"
+        else:
+            print "fail", path
+        self.mainloop.quit()
+
+    def gst_buffer_to_numpy_array(self, buf):
+        """ gstreamer buffer to numpy array conversion """
+        chan = self.audiochannels
+        samples = frombuffer(buf.data, dtype=float32) 
+        samples.resize([len(samples)/chan, chan])
+        return samples
+
+class WavEncoder(Processor):
+    """ gstreamer-based encoder """
+    implements(IEncoder)
+
+    def __init__(self, output):
+        self.file = None
+        if isinstance(output, basestring):
+            self.filename = output
+        else:
+            raise Exception("Streaming not supported")
+
+    @interfacedoc
+    def setup(self, channels=None, samplerate=None, nframes=None):
+        super(WavEncoder, self).setup(channels, samplerate, nframes)
+        # TODO open file for writing
+        # the output data format we want
+        pipeline = gst.parse_launch(''' appsrc name=src
+            ! audioconvert
+            ! wavenc
+            ! filesink location=%s ''' % self.filename)
+        # store a pointer to appsink in our encoder object 
+        self.src = pipeline.get_by_name('src')
+        srccaps = gst.Caps("""audio/x-raw-float,
+            endianness=(int)1234,
+            channels=(int)%s,
+            width=(int)32,
+            rate=(int)%d""" % (int(channels), int(samplerate)))
+        self.src.set_property("caps", srccaps)
+
+        # start pipeline
+        pipeline.set_state(gst.STATE_PLAYING)
+        self.pipeline = pipeline
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "test_gstreamerenc"
+
+    @staticmethod
+    @interfacedoc
+    def description():
+        return "Gstreamer based encoder"
+
+    @staticmethod
+    @interfacedoc
+    def file_extension():
+        return "wav"
+
+    @staticmethod
+    @interfacedoc
+    def mime_type():
+        return "audio/x-wav"
+
+    @interfacedoc
+    def set_metadata(self, metadata):
+        #TODO
+        pass
+
+    @interfacedoc
+    def process(self, frames, eod=False):
+        buf = self.numpy_array_to_gst_buffer(frames)
+        self.src.emit('push-buffer', buf)
+        if eod: self.src.emit('end-of-stream')
+        return frames, eod
+
+    def numpy_array_to_gst_buffer(self, frames):
+        """ gstreamer buffer to numpy array conversion """
+        buf = gst.Buffer(getbuffer(frames))
+        return buf
diff --git a/tests/api/test_analyzer.py b/tests/api/test_analyzer.py
new file mode 100644 (file)
index 0000000..2b8198d
--- /dev/null
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+
+import timeside
+from sys import stdout
+import os.path
+import numpy
+
+
+class TestAnalyzer:
+
+    graphers = timeside.core.processors(timeside.api.IGrapher)
+    decoders = timeside.core.processors(timeside.api.IDecoder)
+    encoders= timeside.core.processors(timeside.api.IEncoder)
+    analyzers = timeside.core.processors(timeside.api.IAnalyzer)
+
+    def __init__(self, path):
+        self.source = os.path.join(os.path.dirname(__file__), path)
+        print "Processing %s" % self.source
+        self.decoder  = timeside.decoder.FileDecoder(self.source)
+        print 'format: ', self.decoder.format()
+        self.pipe = self.decoder
+        self.analyzers_sub_pipe = []
+
+    def process(self):
+        for analyzer in self.analyzers:
+            sub_pipe = analyzer()
+            self.analyzers_sub_pipe.append(sub_pipe)
+            self.pipe = self.pipe | sub_pipe
+        self.pipe.run()
+
+    def results(self):
+        analyzers = []
+        for analyzer in self.analyzers_sub_pipe:
+            value = analyzer.result()
+            analyzers.append({'name':analyzer.name(),
+                            'id':analyzer.id(),
+                            'unit':analyzer.unit(),
+                            'value':str(value)})
+        print analyzers
+
+
+test = TestAnalyzer('../samples/guitar.wav')
+#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav')
+test.process()
+test.results()
+
diff --git a/tests/api/test_analyzer3.py b/tests/api/test_analyzer3.py
new file mode 100644 (file)
index 0000000..8114723
--- /dev/null
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+
+import timeside
+import sys
+import os.path
+import numpy
+import time
+
+
+class TestAnalyzer:
+
+    analyzer = timeside.analyzer.MaxLevel()
+
+    def __init__(self, path):
+        self.source = os.path.join(os.path.dirname(__file__), path)
+        print "Processing %s" % self.source
+        self.decoder  = timeside.decoder.FileDecoder(self.source)
+        print 'format: ', self.decoder.format()
+        self.pipe = self.decoder
+        self.sub_pipe = self.analyzer
+
+    def process(self):
+        self.pipe = self.pipe | self.sub_pipe
+        self.pipe.run()
+
+    def results(self):
+        print {'name':self.analyzer.name(),
+                            'id':self.analyzer.id(),
+                            'unit':self.analyzer.unit(),
+                            'value':str(self.analyzer.value)}
+
+test = TestAnalyzer(sys.argv[-1])
+test.process()
+test.results()
diff --git a/tests/api/test_flac.py b/tests/api/test_flac.py
new file mode 100644 (file)
index 0000000..ee590c4
--- /dev/null
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac")
+
+decoder  = FileDecoder(source)
+encoder  = FlacEncoder(dest)
+
+(decoder | encoder).run()
diff --git a/tests/api/test_lolevel.py b/tests/api/test_lolevel.py
new file mode 100644 (file)
index 0000000..421ce95
--- /dev/null
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+
+from timeside.tests.api.examples import Gain
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+    source = sys.argv[1]
+else:
+    import os.path
+    source= os.path.join (os.path.dirname(__file__),  "../samples/guitar.wav")
+
+Decoder = FileDecoder
+print "Creating decoder with id=%s for: %s" % (Decoder.id(), source)
+decoder = Decoder(source)
+analyzer = MaxLevel()
+decoder.setup()
+nchannels  = decoder.channels()
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+analyzer.setup(nchannels, samplerate)
+
+print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
+        nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution())
+
+while True:
+    frames, eod = decoder.process()
+    analyzer.process(frames, eod)
+    if eod:
+        break
+
+max_level = analyzer.result()
+print "Max level: %f" % max_level
+
+destination = "../results/guitar_normalized.wav"
+Encoder = WavEncoder
+print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
+encoder = Encoder(destination)
+
+gain = 1
+if max_level > 0:
+    gain = 0.9 / max_level
+
+effect = Gain(gain)
+
+decoder.setup()
+effect.setup(decoder.channels(), decoder.samplerate())
+encoder.setup(effect.channels(), effect.samplerate())
+
+print "Applying effect id=%s with gain=%f" % (effect.id(), gain)
+
+while True:
+    frames, eod = decoder.process()
+    encoder.process(*effect.process(frames, eod))
+    if eod:
+        break
+
diff --git a/tests/api/test_lolevel_streaming.py b/tests/api/test_lolevel_streaming.py
new file mode 100644 (file)
index 0000000..e394f0c
--- /dev/null
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+    source = sys.argv[1]
+else:
+    import os.path
+    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.flac")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels  = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+while True:
+    encoder.process(*decoder.process())
+    if streaming:
+        f.write(encoder.chunk)
+    if encoder.eod :
+        break
+
+f.close()
+print encoder.pipe
diff --git a/tests/api/test_lolevel_streaming_bad.py b/tests/api/test_lolevel_streaming_bad.py
new file mode 100644 (file)
index 0000000..e414ead
--- /dev/null
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+    source = sys.argv[1]
+else:
+    import os.path
+    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.wav")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels  = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+print encoder.pipe
+
+while True:
+    encoder.process(*decoder.process())
+    if streaming:
+        f.write(encoder.chunk)
+    if encoder.eod :
+        break
+
+f.close()
+print encoder.pipe
diff --git a/tests/api/test_lolevel_streaming_vorbis.py b/tests/api/test_lolevel_streaming_vorbis.py
new file mode 100644 (file)
index 0000000..ecc3d3f
--- /dev/null
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+    source = sys.argv[1]
+else:
+    import os.path
+    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.flac")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels  = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.ogg"
+dest2 = "/tmp/test_appsink.ogg"
+f = open(dest2,'w')
+
+streaming=True
+encoder = VorbisEncoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+print encoder.pipe
+
+while True:
+    encoder.process(*decoder.process())
+    if streaming:
+        f.write(encoder.chunk)
+    if encoder.eod :
+        break
+
+f.close()
+print encoder.pipe
diff --git a/tests/api/test_mp3.py b/tests/api/test_mp3.py
new file mode 100644 (file)
index 0000000..620696b
--- /dev/null
@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3")
+
+decoder  = FileDecoder(source)
+encoder  = Mp3Encoder(dest)
+
+(decoder | encoder).run()
+
+metadata = {'TIT2': 'title',  #title2
+             'TCOM': 'composer',  #composer
+             'TPE1': 'lead creator', #lead
+             'UFID': 'identifier',  #Unique ID...
+             'TALB': 'album',  #album
+             'TCON': 'genre',  #genre
+             'TDRC': '2011', #year
+#             'COMM': 'blabla',  #comment
+             }
+
+encoder.set_metadata(metadata)
+encoder.write_metadata(dest)
+
diff --git a/tests/api/test_pipe.py b/tests/api/test_pipe.py
new file mode 100644 (file)
index 0000000..629b3e9
--- /dev/null
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+
+from timeside.tests.api.examples import Gain
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+from sys import stdout
+import os.path
+import numpy
+
+source = os.path.join(os.path.dirname(__file__),  "../samples/guitar.wav")
+
+print "Normalizing %s" % source
+decoder  = FileDecoder(source)
+maxlevel = MaxLevel()
+duration = Duration()
+
+(decoder | maxlevel | duration).run()
+
+gain = 1
+if maxlevel.result() < 0:
+    gain = 0.9 / numpy.exp(maxlevel.result()/20)
+
+print "input maxlevel: %f" % maxlevel.result()
+print "gain: %f" % gain
+print "duration: %f %s" % (duration.result(), duration.unit())
+
+gain     = Gain(gain)
+encoder  = WavEncoder("../results/guitar_normalized.wav")
+
+subpipe  = gain | maxlevel
+
+(decoder | subpipe | encoder).run()
+
+print "output maxlevel: %f" % maxlevel.result()
+
+
diff --git a/tests/api/test_pipe_spectrogram.py b/tests/api/test_pipe_spectrogram.py
new file mode 100644 (file)
index 0000000..49eaa8c
--- /dev/null
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+
+import os
+from timeside.core import *
+from timeside.api import *
+from timeside.decoder import *
+from timeside.grapher import *
+
+sample_dir = '../samples'
+img_dir = '../results/img'
+if not os.path.exists(img_dir):
+    os.mkdir(img_dir)
+
+test_dict = {'sweep.wav': 'spec_wav.png',
+            'sweep.flac': 'spec_flac.png',
+            'sweep.ogg': 'spec_ogg.png',
+            'sweep.mp3': 'spec_mp3.png',
+            }
+
+for source, image in test_dict.iteritems():
+    audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
+    image = img_dir + os.sep + image
+    print 'Test : decoder(%s) | waveform (%s)' % (source, image)
+    decoder  = FileDecoder(audio)
+    spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    (decoder | spectrogram).run()
+    print 'frames per pixel = ', spectrogram.graph.samples_per_pixel
+    print "render spectrogram to: %s" %  image
+    spectrogram.render(image)
+
+
+
diff --git a/tests/api/test_pipe_waveform.py b/tests/api/test_pipe_waveform.py
new file mode 100644 (file)
index 0000000..bbfb54a
--- /dev/null
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+
+import os
+from timeside.core import *
+from timeside.api import *
+from timeside.decoder import *
+from timeside.grapher import *
+
+sample_dir = '../samples'
+img_dir = '../results/img'
+if not os.path.exists(img_dir):
+    os.mkdir(img_dir)
+
+test_dict = {'sweep.wav': 'waveform_wav.png',
+            'sweep.flac': 'waveform_flac.png',
+            'sweep.ogg': 'waveform_ogg.png',
+            'sweep.mp3': 'waveform_mp3.png',
+            }
+
+for source, image in test_dict.iteritems():
+    audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
+    image = img_dir + os.sep + image
+    print 'Test : decoder(%s) | waveform (%s)' % (source, image)
+    decoder  = FileDecoder(audio)
+    waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    (decoder | waveform).run()
+    print 'frames per pixel = ', waveform.graph.samples_per_pixel
+    print "render waveform to: %s" %  image
+    waveform.render(image)
+
+
diff --git a/tests/api/test_vorbis.py b/tests/api/test_vorbis.py
new file mode 100644 (file)
index 0000000..53818cd
--- /dev/null
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg")
+
+decoder  = FileDecoder(source)
+encoder  = VorbisEncoder(dest)
+
+(decoder | encoder).run()
diff --git a/tests/api/test_wav.py b/tests/api/test_wav.py
new file mode 100644 (file)
index 0000000..077a01e
--- /dev/null
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+
+import os.path
+source = os.path.join(os.path.dirname(__file__),  "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav")
+
+decoder  = FileDecoder(source)
+encoder  = WavEncoder(dest)
+
+(decoder | encoder).run()
+
diff --git a/tests/listprocessors.py b/tests/listprocessors.py
new file mode 100644 (file)
index 0000000..840c087
--- /dev/null
@@ -0,0 +1,12 @@
+import timeside
+
+def list_processors(interface, prefix=""):
+    print prefix + interface.__name__
+    subinterfaces = interface.__subclasses__()
+    for i in subinterfaces:
+        list_processors(i, prefix + "  ")
+    processors = timeside.processors(interface, False)
+    for p in processors:
+        print prefix + "  %s [%s]" % (p.__name__, p.id())
+
+list_processors(timeside.api.IProcessor)        
diff --git a/tests/samples/guitar.wav b/tests/samples/guitar.wav
new file mode 100644 (file)
index 0000000..b5a9e80
Binary files /dev/null and b/tests/samples/guitar.wav differ
diff --git a/tests/samples/sweep.flac b/tests/samples/sweep.flac
new file mode 100644 (file)
index 0000000..decee06
Binary files /dev/null and b/tests/samples/sweep.flac differ
diff --git a/tests/samples/sweep.mp3 b/tests/samples/sweep.mp3
new file mode 100644 (file)
index 0000000..dc75fe0
Binary files /dev/null and b/tests/samples/sweep.mp3 differ
diff --git a/tests/samples/sweep.ogg b/tests/samples/sweep.ogg
new file mode 100644 (file)
index 0000000..e30bf99
Binary files /dev/null and b/tests/samples/sweep.ogg differ
diff --git a/tests/samples/sweep.wav b/tests/samples/sweep.wav
new file mode 100644 (file)
index 0000000..fc28a32
Binary files /dev/null and b/tests/samples/sweep.wav differ
diff --git a/tests/samples/sweep_source.wav b/tests/samples/sweep_source.wav
new file mode 100644 (file)
index 0000000..5dcf9ba
Binary files /dev/null and b/tests/samples/sweep_source.wav differ
diff --git a/tests/testcomponent.py b/tests/testcomponent.py
new file mode 100644 (file)
index 0000000..a06952c
--- /dev/null
@@ -0,0 +1,165 @@
+
+from timeside.component import *
+from unit_timeside import TestCase, TestRunner
+
+__all__ = ['TestComponentArchitecture']
+
+class TestComponentArchitecture(TestCase):
+    "Test the component and interface system"
+   
+    def testOneInterface(self):
+        "Test a component implementing one interface"
+        self.assertSameList(implementations(I1), [C1])
+
+    def testTwoInterfaces(self):        
+        "Test a component implementing two interfaces"
+        self.assertSameList(implementations(I2), [C2])
+        self.assertSameList(implementations(I3), [C2])
+
+    def testTwoImplementations(self):
+        "Test an interface implemented by two components"
+        self.assertSameList(implementations(I4), [C3, C4])
+
+    def testInterfaceInheritance(self):
+        "Test whether a component implements an interface's parent"
+        self.assertSameList(implementations(I5), [C5])
+
+    def testImplementationInheritance(self):
+        "Test that a component doesn't implement the interface implemented by its parent" 
+        self.assertSameList(implementations(I7), [C6])
+
+    def testImplementationRedundancy(self):
+        "Test implementation redundancy across inheritance" 
+        self.assertSameList(implementations(I8), [C8, C9])
+
+    def testAbstractImplementation(self):    
+        "Test abstract implementation"
+        self.assertSameList(implementations(I11), [])
+        self.assertSameList(implementations(I11, abstract=True), [C11])
+
+    def testInterfaceDoc(self):        
+        "Test @interfacedoc decorator"
+        self.assertEquals(C10.test.__doc__, "testdoc")
+
+    def testInterfaceDocStatic(self):        
+        "Test @interfacedoc decorator on static method"
+        self.assertEquals(C10.teststatic.__doc__, "teststaticdoc")
+
+    def testIntefaceDocReversed(self):
+        "Test @interfacedoc on static method (decorators reversed)"
+
+        try:
+
+            class BogusDoc1(Component):
+                implements(I10)
+
+                @interfacedoc
+                @staticmethod        
+                def teststatic(self):
+                    pass
+
+            self.fail("No error raised with reversed decorators")
+
+        except ComponentError:
+            pass
+   
+    def testInterfaceDocBadMethod(self):
+        "Test @interfacedoc with unexistant method in interface"
+
+        try:
+            class BogusDoc2(Component):
+                implements(I10)
+
+                @interfacedoc
+                def nosuchmethod(self):
+                    pass
+
+            self.fail("No error raised when decorating an unexistant method")
+
+        except ComponentError:
+            pass
+
+class I1(Interface):
+    pass
+
+class I2(Interface):
+    pass
+
+class I3(Interface):
+    pass
+
+class I4(Interface):
+    pass
+
+class I5(Interface):
+    pass
+
+class I6(I5):
+    pass
+
+class I7(Interface):
+    pass
+
+class I8(Interface):
+    pass
+
+class I9(I8):
+    pass
+
+class I10(Interface):
+    def test(self):
+        """testdoc"""
+
+    @staticmethod        
+    def teststatic(self):
+        """teststaticdoc"""
+
+class I11(Interface):
+    pass
+
+class C1(Component):
+    implements(I1)
+
+class C2(Component):
+    implements(I2, I3)
+
+class C3(Component):
+    implements(I4)
+
+class C4(Component):
+    implements(I4)
+
+class C5(Component):
+    implements(I6)
+
+class C6(Component):
+    implements(I7)
+
+class C7(C6):
+    pass
+
+class C8(Component):
+    implements(I8)
+
+class C9(Component):
+    implements(I8, I9)
+
+class C10(Component):
+    implements(I10)
+
+    @interfacedoc
+    def test(self):
+        pass
+
+    @staticmethod        
+    @interfacedoc
+    def teststatic(self):
+        pass
+
+class C11(Component):
+    abstract()
+    implements(I11)
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
+
diff --git a/tests/testdecoding.py b/tests/testdecoding.py
new file mode 100644 (file)
index 0000000..c847c4c
--- /dev/null
@@ -0,0 +1,52 @@
+from timeside.decoder import *
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestDecoding']
+
+class TestDecoding(TestCase):
+    "Test the low level streaming features"
+
+    def setUp(self):
+        pass
+   
+    def testWav(self):
+        "Test wav decoding"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+
+    def testFlac(self):
+        "Test flac decoding"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+
+    def testOgg(self):
+        "Test ogg decoding"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+
+    def testMp3(self):
+        "Test mp3 decoding"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+
+    def tearDown(self):
+        decoder = FileDecoder(self.source)
+        decoder.setup()
+
+        totalframes = 0.
+
+        while True:
+            frames, eod = decoder.process()
+            totalframes += frames.shape[0]
+            if eod: break
+
+        # FIXME compute actual number of frames from file
+        if os.path.splitext(self.source)[-1].lower() == '.mp3':
+            self.assertEquals(totalframes, 355969)
+        elif os.path.splitext(self.source)[-1].lower() == '.ogg':
+            self.assertEquals(totalframes, 352833)
+        else:
+            self.assertEquals(totalframes, 352801)
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
+
+
diff --git a/tests/testgraphers.py b/tests/testgraphers.py
new file mode 100644 (file)
index 0000000..622eeda
--- /dev/null
@@ -0,0 +1,105 @@
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.grapher import *
+from timeside.api import *
+
+from timeside.component import *
+
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestGraphers']
+
+class TestGraphers(TestCase):
+    "Test all graphers with various input media formats"
+
+    def setUp(self):
+        pass
+    
+    # WAVEFORMS
+    def testWav2Waveform(self):
+        "Test WAV to Waveform"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+        self.image = "/tmp/test_waveform_sweep_wav.png"
+        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+    def testFlac2Waveform(self):
+        "Test FLAC to Waveform"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+        self.image = "/tmp/test_waveform_sweep_flac.png"
+        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testMp32Waveform(self):
+        "Test MP3 to Waveform"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+        self.image = "/tmp/test_waveform_sweep_mp3.png"
+        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testOgg2Waveform(self):
+        "Test OGG to Waveform"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+        self.image = "/tmp/test_waveform_sweep_ogg.png"
+        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    # WAVEFORMS JOYDIV
+    def testWav2WaveformJoyDiv(self):
+        "Test WAV to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+        self.image = "/tmp/test_waveformjoydiv_sweep_wav.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+    def testFlac2WaveformJoyDiv(self):
+        "Test FLAC to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+        self.image = "/tmp/test_waveformjoydiv_sweep_flac.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testMp32WaveformJoyDiv(self):
+        "Test MP3 to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+        self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testOgg2WaveformJoyDiv(self):
+        "Test OGG to WaveformJoyDiv"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+        self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png"
+        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+        
+    # SPECTROGRAMS
+    def testWav2Spectrogram(self):
+        "Test WAV to Spectrogram"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+        self.image = "/tmp/test_spectrogram_sweep_wav.png"
+        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testMp32Spectrogram(self):
+        "Test MP3 to Spectrogram"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+        self.image = "/tmp/test_spectrogram_sweep_mp3.png"
+        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testFlac2Spectrogram(self):
+        "Test FLAC to Spectrogram"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+        self.image = "/tmp/test_spectrogram_sweep_flac.png"
+        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+    
+    def testOgg2Spectrogram(self):
+        "Test OGG to Spectrogram"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+        self.image = "/tmp/test_spectrogram_sweep_ogg.png"
+        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+     
+    def tearDown(self):
+        decoder = FileDecoder(self.source)
+        (decoder | self.grapher).run()
+        self.grapher.render(self.image)
+        
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
+
diff --git a/tests/testinputadapter.py b/tests/testinputadapter.py
new file mode 100644 (file)
index 0000000..531866a
--- /dev/null
@@ -0,0 +1,70 @@
+from timeside.core import FixedSizeInputAdapter
+from unit_timeside import TestCase, TestRunner
+import numpy
+
+class TestFixedSizeInputAdapter(TestCase):
+    "Test the fixed-sized input adapter"
+
+    def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None):
+        output = output[:]
+        output.reverse()
+        _eod = None
+        for buffer, _eod in adapter.process(input, input_eod):
+            a = output.pop()
+            if not numpy.array_equiv(buffer, a):
+                self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a)))
+
+        if _eod != output_eod:
+            self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod)))
+
+        if output:
+            self.fail("trailing expected data: %s" % output)
+
+    def setUp(self):
+        self.data = numpy.arange(44).reshape(2,22).transpose()
+
+    def testTwoChannels(self):
+        "Test simple stream with two channels"
+        adapter = FixedSizeInputAdapter(4, 2)
+
+        self.assertEquals(len(self.data), adapter.nframes(len(self.data)))
+
+        self.assertIOEquals(adapter, self.data[0:1], False, [])
+        self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False)
+        self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False)
+        self.assertIOEquals(adapter, self.data[12:13], False, [])
+        self.assertIOEquals(adapter, self.data[13:14], False, [])
+        self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False)
+        self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False)
+        self.assertIOEquals(adapter, self.data[20:21], False, [])
+        self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True)
+
+    def testPadding(self):
+        "Test automatic padding support"
+        adapter = FixedSizeInputAdapter(4, 2, pad=True)
+
+        self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data)))
+
+        self.assertIOEquals(adapter, self.data[0:21], False, 
+            [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], 
+            False)
+
+        self.assertIOEquals(adapter, self.data[21:22], True, [[
+            [20, 42],
+            [21, 43],
+            [0, 0],
+            [0, 0]
+        ]], True)
+
+    def testSizeMultiple(self):
+        "Test a stream which contain a multiple number of buffers"
+        adapter = FixedSizeInputAdapter(4, 2)
+
+        self.assertIOEquals(adapter, self.data[0:20], True, 
+            [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], 
+            True)
+
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
+
diff --git a/tests/testtranscoding.py b/tests/testtranscoding.py
new file mode 100644 (file)
index 0000000..617b014
--- /dev/null
@@ -0,0 +1,125 @@
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+from timeside.component import *
+
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestTranscoding']
+
+class TestTranscoding(TestCase):
+    "Test the low level streaming features"
+
+    def setUp(self):
+        pass
+   
+    def testWav2Mp3(self):
+        "Test wav to mp3 conversion"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+
+        dest1 = "/tmp/test_wav_filesink.mp3"
+        dest2 = "/tmp/test_wav_appsink.mp3"
+        self.f = open(dest2,'w')
+
+        self.streaming=True
+
+        encoder = Mp3Encoder(dest1, streaming=True)
+        self.encoder = encoder
+
+    def testFlac2Mp3(self):
+        "Test flac to mp3 conversion"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+
+        dest1 = "/tmp/test_flac_filesink.mp3"
+        dest2 = "/tmp/test_flac_appsink.mp3"
+        self.f = open(dest2,'w')
+
+        self.streaming=True
+
+        encoder = Mp3Encoder(dest1, streaming=True)
+        self.encoder = encoder
+
+    """
+    def testFlac2Ogg(self):
+        "Test flac to ogg conversion"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+
+        dest1 = "/tmp/test_flac_filesink.ogg"
+        dest2 = "/tmp/test_flac_appsink.ogg"
+        self.f = open(dest2,'w')
+
+        self.streaming=True
+
+        encoder = VorbisEncoder(dest1, streaming=True)
+        self.encoder = encoder
+
+    def testWav2Ogg(self):
+        "Test wav to ogg conversion"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+
+        dest1 = "/tmp/test_wav_filesink.ogg"
+        dest2 = "/tmp/test_wav_appsink.ogg"
+        self.f = open(dest2,'w')
+
+        self.streaming=True
+
+        encoder = VorbisEncoder(dest1, streaming=True)
+        self.encoder = encoder
+    
+    def testWav2Flac(self):
+        "Test wav to flac conversion"
+        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+
+        dest1 = "/tmp/test_wav_filesink.flac"
+        dest2 = "/tmp/test_wav_appsink.flac"
+        self.f = open(dest2,'w')
+
+        self.streaming=True
+
+        encoder = FlacEncoder(dest1, streaming=True)
+        self.encoder = encoder
+    """    
+
+    def setUpDecoder(self):
+        self.decoder = FileDecoder(self.source)
+        self.decoder.setup()
+        self.channels  = self.decoder.channels()
+        self.samplerate = self.decoder.samplerate()
+
+    def setUpEncoder(self): 
+        self.encoder.setup(channels = self.channels, samplerate = self.samplerate)
+
+    def tearDown(self):
+        self.setUpDecoder()
+        self.setUpEncoder()
+        encoder = self.encoder
+        f = self.f
+
+        #print "decoder pipe:\n", decoder.pipe
+        #print "encoder pipe:\n", encoder.pipe
+        totalframes = 0.
+
+        while True:
+            frames, eod = self.decoder.process()
+            #print frames.shape[0]
+            totalframes += frames.shape[0]
+            encoder.process(frames, eod)
+            if self.streaming:
+                f.write(encoder.chunk)
+            if eod:
+                break
+            if encoder.eod :
+                break
+        f.close()
+
+        # FIXME compute actual number of frames from file
+        self.assertEquals(totalframes, 352801)
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
+
diff --git a/tests/unit_timeside.py b/tests/unit_timeside.py
new file mode 100644 (file)
index 0000000..09bc2d6
--- /dev/null
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+
+import unittest
+import sys
+import time
+
+class TestCase(unittest.TestCase):
+
+    def assertSameList(self, list1, list2):
+        "Test that two lists contain the same elements, in any order"
+        if len(list1) != len(list2):
+            self.fail("Lists length differ : %d != %d" % (len(list1), len(list2)))
+
+        for item in list1:
+            if not item in list2:
+                self.fail("%s is not in list2" % str(item))
+
+        for item in list2:
+            if not item in list1:
+                self.fail("%s is not in list1" % str(item))
+
+class _TextTestResult(unittest.TestResult):
+    """A test result class that can print formatted text results to a stream.
+
+    Used by TextTestRunner.
+    """
+    separator1 = '=' * 70
+    separator2 = '-' * 70
+
+    def __init__(self, stream, descriptions, verbosity):
+        unittest.TestResult.__init__(self)
+        self.stream = stream
+        self.showAll = verbosity > 1
+        self.dots = verbosity == 1
+        self.descriptions = descriptions
+        self.currentTestCase = None
+
+    def getDescription(self, test):
+        if self.descriptions:
+            return test.shortDescription() or str(test)
+        else:
+            return str(test)
+
+    def startTest(self, test):
+        unittest.TestResult.startTest(self, test)
+        if self.showAll:
+            if self.currentTestCase != test.__class__:
+                self.currentTestCase = test.__class__
+                self.stream.writeln()
+                self.stream.writeln("[%s]" % self.currentTestCase.__name__)
+            self.stream.write("  " + self.getDescription(test))
+            self.stream.write(" ... ")
+
+    def addSuccess(self, test):
+        unittest.TestResult.addSuccess(self, test)
+        if self.showAll:
+            self.stream.writeln("ok")
+        elif self.dots:
+            self.stream.write('.')
+
+    def addError(self, test, err):
+        unittest.TestResult.addError(self, test, err)
+        if self.showAll:
+            self.stream.writeln("ERROR")
+        elif self.dots:
+            self.stream.write('E')
+
+    def addFailure(self, test, err):
+        unittest.TestResult.addFailure(self, test, err)
+        if self.showAll:
+            self.stream.writeln("FAIL")
+        elif self.dots:
+            self.stream.write('F')
+
+    def printErrors(self):
+        if self.dots or self.showAll:
+            self.stream.writeln()
+        self.printErrorList('ERROR', self.errors)
+        self.printErrorList('FAIL', self.failures)
+
+    def printErrorList(self, flavour, errors):
+        for test, err in errors:
+            self.stream.writeln(self.separator1)
+            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
+            self.stream.writeln(self.separator2)
+            self.stream.writeln("%s" % err)
+
+
+class _WritelnDecorator:
+    """Used to decorate file-like objects with a handy 'writeln' method"""
+    def __init__(self,stream):
+        self.stream = stream
+
+    def __getattr__(self, attr):
+        return getattr(self.stream,attr)
+
+    def writeln(self, arg=None):
+        if arg: self.write(arg)
+        self.write('\n') # text-mode streams translate to \r\n if needed
+
+class TestRunner:
+    """A test runner class that displays results in textual form.
+
+    It prints out the names of tests as they are run, errors as they
+    occur, and a summary of the results at the end of the test run.
+    """
+    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2):
+        self.stream = _WritelnDecorator(stream)
+        self.descriptions = descriptions
+        self.verbosity = verbosity
+
+    def _makeResult(self):
+        return _TextTestResult(self.stream, self.descriptions, self.verbosity)
+
+    def run(self, test):
+        "Run the given test case or test suite."
+        result = self._makeResult()
+        startTime = time.time()
+        test(result)
+        stopTime = time.time()
+        timeTaken = stopTime - startTime
+        result.printErrors()
+        self.stream.writeln(result.separator2)
+        run = result.testsRun
+        self.stream.writeln("Ran %d test%s in %.3fs" %
+                            (run, run != 1 and "s" or "", timeTaken))
+        self.stream.writeln()
+        if not result.wasSuccessful():
+            self.stream.write("FAILED (")
+            failed, errored = map(len, (result.failures, result.errors))
+            if failed:
+                self.stream.write("failures=%d" % failed)
+            if errored:
+                if failed: self.stream.write(", ")
+                self.stream.write("errors=%d" % errored)
+            self.stream.writeln(")")
+        else:
+            self.stream.writeln("OK")
+        return result
+
+
index 0d62329b8a4fe95112bb5cf7999c2f5ca0708745..acb52891dca2f1ffc39655cec9818262dafa5137 100644 (file)
@@ -9,7 +9,6 @@ import timeside.decoder
 import timeside.encoder
 import timeside.grapher
 import timeside.analyzer
-import timeside.tests
 from timeside.core import *
 
 __version__ = '0.3.2'
diff --git a/timeside/tests/__init__.py b/timeside/tests/__init__.py
deleted file mode 100644 (file)
index 09bc2d6..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-import sys
-import time
-
-class TestCase(unittest.TestCase):
-
-    def assertSameList(self, list1, list2):
-        "Test that two lists contain the same elements, in any order"
-        if len(list1) != len(list2):
-            self.fail("Lists length differ : %d != %d" % (len(list1), len(list2)))
-
-        for item in list1:
-            if not item in list2:
-                self.fail("%s is not in list2" % str(item))
-
-        for item in list2:
-            if not item in list1:
-                self.fail("%s is not in list1" % str(item))
-
-class _TextTestResult(unittest.TestResult):
-    """A test result class that can print formatted text results to a stream.
-
-    Used by TextTestRunner.
-    """
-    separator1 = '=' * 70
-    separator2 = '-' * 70
-
-    def __init__(self, stream, descriptions, verbosity):
-        unittest.TestResult.__init__(self)
-        self.stream = stream
-        self.showAll = verbosity > 1
-        self.dots = verbosity == 1
-        self.descriptions = descriptions
-        self.currentTestCase = None
-
-    def getDescription(self, test):
-        if self.descriptions:
-            return test.shortDescription() or str(test)
-        else:
-            return str(test)
-
-    def startTest(self, test):
-        unittest.TestResult.startTest(self, test)
-        if self.showAll:
-            if self.currentTestCase != test.__class__:
-                self.currentTestCase = test.__class__
-                self.stream.writeln()
-                self.stream.writeln("[%s]" % self.currentTestCase.__name__)
-            self.stream.write("  " + self.getDescription(test))
-            self.stream.write(" ... ")
-
-    def addSuccess(self, test):
-        unittest.TestResult.addSuccess(self, test)
-        if self.showAll:
-            self.stream.writeln("ok")
-        elif self.dots:
-            self.stream.write('.')
-
-    def addError(self, test, err):
-        unittest.TestResult.addError(self, test, err)
-        if self.showAll:
-            self.stream.writeln("ERROR")
-        elif self.dots:
-            self.stream.write('E')
-
-    def addFailure(self, test, err):
-        unittest.TestResult.addFailure(self, test, err)
-        if self.showAll:
-            self.stream.writeln("FAIL")
-        elif self.dots:
-            self.stream.write('F')
-
-    def printErrors(self):
-        if self.dots or self.showAll:
-            self.stream.writeln()
-        self.printErrorList('ERROR', self.errors)
-        self.printErrorList('FAIL', self.failures)
-
-    def printErrorList(self, flavour, errors):
-        for test, err in errors:
-            self.stream.writeln(self.separator1)
-            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
-            self.stream.writeln(self.separator2)
-            self.stream.writeln("%s" % err)
-
-
-class _WritelnDecorator:
-    """Used to decorate file-like objects with a handy 'writeln' method"""
-    def __init__(self,stream):
-        self.stream = stream
-
-    def __getattr__(self, attr):
-        return getattr(self.stream,attr)
-
-    def writeln(self, arg=None):
-        if arg: self.write(arg)
-        self.write('\n') # text-mode streams translate to \r\n if needed
-
-class TestRunner:
-    """A test runner class that displays results in textual form.
-
-    It prints out the names of tests as they are run, errors as they
-    occur, and a summary of the results at the end of the test run.
-    """
-    def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2):
-        self.stream = _WritelnDecorator(stream)
-        self.descriptions = descriptions
-        self.verbosity = verbosity
-
-    def _makeResult(self):
-        return _TextTestResult(self.stream, self.descriptions, self.verbosity)
-
-    def run(self, test):
-        "Run the given test case or test suite."
-        result = self._makeResult()
-        startTime = time.time()
-        test(result)
-        stopTime = time.time()
-        timeTaken = stopTime - startTime
-        result.printErrors()
-        self.stream.writeln(result.separator2)
-        run = result.testsRun
-        self.stream.writeln("Ran %d test%s in %.3fs" %
-                            (run, run != 1 and "s" or "", timeTaken))
-        self.stream.writeln()
-        if not result.wasSuccessful():
-            self.stream.write("FAILED (")
-            failed, errored = map(len, (result.failures, result.errors))
-            if failed:
-                self.stream.write("failures=%d" % failed)
-            if errored:
-                if failed: self.stream.write(", ")
-                self.stream.write("errors=%d" % errored)
-            self.stream.writeln(")")
-        else:
-            self.stream.writeln("OK")
-        return result
-
-
diff --git a/timeside/tests/alltests.py b/timeside/tests/alltests.py
deleted file mode 100644 (file)
index 92f6abb..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-from timeside.tests.testcomponent import *
-from timeside.tests.testinputadapter import *
-from timeside.tests.testgraphers import *
-from timeside.tests.testdecoding import *
-from timeside.tests.testtranscoding import *
-from timeside.tests import TestRunner
-
-unittest.main(testRunner=TestRunner())
diff --git a/timeside/tests/api/__init__.py b/timeside/tests/api/__init__.py
deleted file mode 100644 (file)
index 40a96af..0000000
+++ /dev/null
@@ -1 +0,0 @@
-# -*- coding: utf-8 -*-
diff --git a/timeside/tests/api/examples.py b/timeside/tests/api/examples.py
deleted file mode 100644 (file)
index 747c197..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.api import *
-import numpy
-
-
-class Gain(Processor):
-    implements(IEffect)
-
-    @interfacedoc
-    def __init__(self, gain=1.0):
-        self.gain = gain
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "test_gain"
-
-    @staticmethod
-    @interfacedoc
-    def name():
-        return "Gain test effect"
-
-    def process(self, frames, eod=False):
-        return numpy.multiply(frames, self.gain), eod
-
-
-class FixedInputProcessor(Processor):
-    """Processor which does absolutely nothing except illustrating the use
-    of the FixedInputSizeAdapter. It also tests things a bit."""
-
-    implements(IProcessor)
-
-    BUFFER_SIZE = 1024
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "test_fixed"
-
-    @interfacedoc
-    def setup(self, channels, samplerate, nframes):
-        super(FixedInputProcessor, self).setup(channels, samplerate, nframes)
-        self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True)
-
-    @interfacedoc
-    def process(self, frames, eod=False):
-        for buffer, end in self.adapter.process(frames, eod):
-            # Test that the adapter is actually doing the job:
-            if len(buffer) != self.BUFFER_SIZE:
-                raise Exception("Bad buffer size from adapter")
-
-        return frames, eod
-
-
-
-
-
diff --git a/timeside/tests/api/gstreamer.py b/timeside/tests/api/gstreamer.py
deleted file mode 100644 (file)
index 897ac42..0000000
+++ /dev/null
@@ -1,238 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2007-2009 Parisson
-# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
-# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
-#
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import Processor, implements, interfacedoc
-from timeside.api import IDecoder, IEncoder
-from numpy import array, frombuffer, getbuffer, float32
-
-import pygst
-pygst.require('0.10')
-import gst
-import gobject
-gobject.threads_init ()
-
-class FileDecoder(Processor):
-    """ gstreamer-based decoder """
-    implements(IDecoder)
-
-    # duration ms, before discovery process times out
-    MAX_DISCOVERY_TIME = 3000
-
-    audioformat = None
-    audiochannels = None
-    audiorate = None
-    audionframes = None
-    mimetype = ''
-
-    # IProcessor methods
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "test_gstreamerdec"
-
-    def setup(self, channels = None, samplerate = None, nframes = None):
-        # the output data format we want
-        caps = "audio/x-raw-float, width=32"
-        pipeline = gst.parse_launch('''uridecodebin uri=%s
-            ! audioconvert
-            ! %s
-            ! appsink name=sink sync=False ''' % (self.uri, caps))
-        # store a pointer to appsink in our decoder object 
-        self.sink = pipeline.get_by_name('sink')
-        # adjust length of emitted buffers
-        # self.sink.set_property('blocksize', 0x10000)
-        # start pipeline
-        pipeline.set_state(gst.STATE_PLAYING)
-
-    @interfacedoc
-    def channels(self):
-        return  self.audiochannels
-
-    @interfacedoc
-    def samplerate(self):
-        return self.audiorate 
-
-    @interfacedoc
-    def nframes(self):
-        return self.audionframes
-
-    @interfacedoc
-    def process(self, frames = None, eod = False):
-        try:
-            buf = self.sink.emit('pull-buffer')
-        except SystemError, e:
-            # should never happen
-            print 'SystemError', e
-            return array([0.]), True
-        if buf == None:
-            return array([0.]), True
-        return self.gst_buffer_to_numpy_array(buf), False
-
-    @interfacedoc
-    def release(self):
-        # nothing to do for now 
-        pass
-
-    ## IDecoder methods
-
-    @interfacedoc
-    def __init__(self, uri):
-
-        # is this a file? 
-        import os.path
-        if os.path.exists(uri):
-            # get the absolute path
-            uri = os.path.abspath(uri)
-            # first run the file/uri through the discover pipeline
-            self.discover(uri)
-            # and make a uri of it
-            from urllib import quote
-            self.uri = 'file://'+quote(uri)
-
-    @interfacedoc
-    def format(self):
-        # TODO check
-        return self.mimetype
-
-    @interfacedoc
-    def encoding(self):
-        # TODO check
-        return self.mimetype.split('/')[-1]
-
-    @interfacedoc
-    def resolution(self):
-        # TODO check: width or depth?
-        return self.audiowidth 
-
-    @interfacedoc
-    def metadata(self):
-        # TODO check
-        return self.tags
-
-    ## gst.extend discoverer
-
-    def discover(self, path):
-        """ gstreamer based helper function to get file attributes """
-        from gst.extend import discoverer
-        d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
-        d.connect('discovered', self.discovered)
-        self.mainloop = gobject.MainLoop()
-        d.discover()
-        self.mainloop.run()
-
-    def discovered(self, d, is_media):
-        """ gstreamer based helper executed upon discover() completion """
-        if is_media and d.is_audio:
-            # copy the discoverer attributes to self
-            self.audiorate = d.audiorate
-            self.mimetype= d.mimetype
-            self.audiochannels = d.audiochannels
-            self.audiowidth = d.audiowidth
-            # conversion from time in nanoseconds to frames 
-            from math import ceil
-            duration = d.audiorate * d.audiolength * 1.e-9
-            self.audionframes = int (ceil ( duration ) )
-            self.tags = d.tags
-        elif not d.is_audio:
-            print "error, no audio found!"
-        else:
-            print "fail", path
-        self.mainloop.quit()
-
-    def gst_buffer_to_numpy_array(self, buf):
-        """ gstreamer buffer to numpy array conversion """
-        chan = self.audiochannels
-        samples = frombuffer(buf.data, dtype=float32) 
-        samples.resize([len(samples)/chan, chan])
-        return samples
-
-class WavEncoder(Processor):
-    """ gstreamer-based encoder """
-    implements(IEncoder)
-
-    def __init__(self, output):
-        self.file = None
-        if isinstance(output, basestring):
-            self.filename = output
-        else:
-            raise Exception("Streaming not supported")
-
-    @interfacedoc
-    def setup(self, channels=None, samplerate=None, nframes=None):
-        super(WavEncoder, self).setup(channels, samplerate, nframes)
-        # TODO open file for writing
-        # the output data format we want
-        pipeline = gst.parse_launch(''' appsrc name=src
-            ! audioconvert
-            ! wavenc
-            ! filesink location=%s ''' % self.filename)
-        # store a pointer to appsink in our encoder object 
-        self.src = pipeline.get_by_name('src')
-        srccaps = gst.Caps("""audio/x-raw-float,
-            endianness=(int)1234,
-            channels=(int)%s,
-            width=(int)32,
-            rate=(int)%d""" % (int(channels), int(samplerate)))
-        self.src.set_property("caps", srccaps)
-
-        # start pipeline
-        pipeline.set_state(gst.STATE_PLAYING)
-        self.pipeline = pipeline
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "test_gstreamerenc"
-
-    @staticmethod
-    @interfacedoc
-    def description():
-        return "Gstreamer based encoder"
-
-    @staticmethod
-    @interfacedoc
-    def file_extension():
-        return "wav"
-
-    @staticmethod
-    @interfacedoc
-    def mime_type():
-        return "audio/x-wav"
-
-    @interfacedoc
-    def set_metadata(self, metadata):
-        #TODO
-        pass
-
-    @interfacedoc
-    def process(self, frames, eod=False):
-        buf = self.numpy_array_to_gst_buffer(frames)
-        self.src.emit('push-buffer', buf)
-        if eod: self.src.emit('end-of-stream')
-        return frames, eod
-
-    def numpy_array_to_gst_buffer(self, frames):
-        """ gstreamer buffer to numpy array conversion """
-        buf = gst.Buffer(getbuffer(frames))
-        return buf
diff --git a/timeside/tests/api/test_analyzer.py b/timeside/tests/api/test_analyzer.py
deleted file mode 100644 (file)
index 2b8198d..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import timeside
-from sys import stdout
-import os.path
-import numpy
-
-
-class TestAnalyzer:
-
-    graphers = timeside.core.processors(timeside.api.IGrapher)
-    decoders = timeside.core.processors(timeside.api.IDecoder)
-    encoders= timeside.core.processors(timeside.api.IEncoder)
-    analyzers = timeside.core.processors(timeside.api.IAnalyzer)
-
-    def __init__(self, path):
-        self.source = os.path.join(os.path.dirname(__file__), path)
-        print "Processing %s" % self.source
-        self.decoder  = timeside.decoder.FileDecoder(self.source)
-        print 'format: ', self.decoder.format()
-        self.pipe = self.decoder
-        self.analyzers_sub_pipe = []
-
-    def process(self):
-        for analyzer in self.analyzers:
-            sub_pipe = analyzer()
-            self.analyzers_sub_pipe.append(sub_pipe)
-            self.pipe = self.pipe | sub_pipe
-        self.pipe.run()
-
-    def results(self):
-        analyzers = []
-        for analyzer in self.analyzers_sub_pipe:
-            value = analyzer.result()
-            analyzers.append({'name':analyzer.name(),
-                            'id':analyzer.id(),
-                            'unit':analyzer.unit(),
-                            'value':str(value)})
-        print analyzers
-
-
-test = TestAnalyzer('../samples/guitar.wav')
-#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav')
-test.process()
-test.results()
-
diff --git a/timeside/tests/api/test_analyzer3.py b/timeside/tests/api/test_analyzer3.py
deleted file mode 100644 (file)
index 8114723..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import timeside
-import sys
-import os.path
-import numpy
-import time
-
-
-class TestAnalyzer:
-
-    analyzer = timeside.analyzer.MaxLevel()
-
-    def __init__(self, path):
-        self.source = os.path.join(os.path.dirname(__file__), path)
-        print "Processing %s" % self.source
-        self.decoder  = timeside.decoder.FileDecoder(self.source)
-        print 'format: ', self.decoder.format()
-        self.pipe = self.decoder
-        self.sub_pipe = self.analyzer
-
-    def process(self):
-        self.pipe = self.pipe | self.sub_pipe
-        self.pipe.run()
-
-    def results(self):
-        print {'name':self.analyzer.name(),
-                            'id':self.analyzer.id(),
-                            'unit':self.analyzer.unit(),
-                            'value':str(self.analyzer.value)}
-
-test = TestAnalyzer(sys.argv[-1])
-test.process()
-test.results()
diff --git a/timeside/tests/api/test_flac.py b/timeside/tests/api/test_flac.py
deleted file mode 100644 (file)
index ee590c4..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac")
-
-decoder  = FileDecoder(source)
-encoder  = FlacEncoder(dest)
-
-(decoder | encoder).run()
diff --git a/timeside/tests/api/test_lolevel.py b/timeside/tests/api/test_lolevel.py
deleted file mode 100644 (file)
index 421ce95..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.tests.api.examples import Gain
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
-    source = sys.argv[1]
-else:
-    import os.path
-    source= os.path.join (os.path.dirname(__file__),  "../samples/guitar.wav")
-
-Decoder = FileDecoder
-print "Creating decoder with id=%s for: %s" % (Decoder.id(), source)
-decoder = Decoder(source)
-analyzer = MaxLevel()
-decoder.setup()
-nchannels  = decoder.channels()
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-analyzer.setup(nchannels, samplerate)
-
-print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
-        nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution())
-
-while True:
-    frames, eod = decoder.process()
-    analyzer.process(frames, eod)
-    if eod:
-        break
-
-max_level = analyzer.result()
-print "Max level: %f" % max_level
-
-destination = "../results/guitar_normalized.wav"
-Encoder = WavEncoder
-print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
-encoder = Encoder(destination)
-
-gain = 1
-if max_level > 0:
-    gain = 0.9 / max_level
-
-effect = Gain(gain)
-
-decoder.setup()
-effect.setup(decoder.channels(), decoder.samplerate())
-encoder.setup(effect.channels(), effect.samplerate())
-
-print "Applying effect id=%s with gain=%f" % (effect.id(), gain)
-
-while True:
-    frames, eod = decoder.process()
-    encoder.process(*effect.process(frames, eod))
-    if eod:
-        break
-
diff --git a/timeside/tests/api/test_lolevel_streaming.py b/timeside/tests/api/test_lolevel_streaming.py
deleted file mode 100644 (file)
index e394f0c..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
-    source = sys.argv[1]
-else:
-    import os.path
-    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.flac")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels  = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.mp3"
-dest2 = "/tmp/test_appsink.mp3"
-f = open(dest2,'w')
-
-streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-while True:
-    encoder.process(*decoder.process())
-    if streaming:
-        f.write(encoder.chunk)
-    if encoder.eod :
-        break
-
-f.close()
-print encoder.pipe
diff --git a/timeside/tests/api/test_lolevel_streaming_bad.py b/timeside/tests/api/test_lolevel_streaming_bad.py
deleted file mode 100644 (file)
index e414ead..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
-    source = sys.argv[1]
-else:
-    import os.path
-    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.wav")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels  = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.mp3"
-dest2 = "/tmp/test_appsink.mp3"
-f = open(dest2,'w')
-
-streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-print encoder.pipe
-
-while True:
-    encoder.process(*decoder.process())
-    if streaming:
-        f.write(encoder.chunk)
-    if encoder.eod :
-        break
-
-f.close()
-print encoder.pipe
diff --git a/timeside/tests/api/test_lolevel_streaming_vorbis.py b/timeside/tests/api/test_lolevel_streaming_vorbis.py
deleted file mode 100644 (file)
index ecc3d3f..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
-    source = sys.argv[1]
-else:
-    import os.path
-    source= os.path.join (os.path.dirname(__file__),  "../samples/sweep.flac")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels  = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.ogg"
-dest2 = "/tmp/test_appsink.ogg"
-f = open(dest2,'w')
-
-streaming=True
-encoder = VorbisEncoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-print encoder.pipe
-
-while True:
-    encoder.process(*decoder.process())
-    if streaming:
-        f.write(encoder.chunk)
-    if encoder.eod :
-        break
-
-f.close()
-print encoder.pipe
diff --git a/timeside/tests/api/test_mp3.py b/timeside/tests/api/test_mp3.py
deleted file mode 100644 (file)
index 620696b..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3")
-
-decoder  = FileDecoder(source)
-encoder  = Mp3Encoder(dest)
-
-(decoder | encoder).run()
-
-metadata = {'TIT2': 'title',  #title2
-             'TCOM': 'composer',  #composer
-             'TPE1': 'lead creator', #lead
-             'UFID': 'identifier',  #Unique ID...
-             'TALB': 'album',  #album
-             'TCON': 'genre',  #genre
-             'TDRC': '2011', #year
-#             'COMM': 'blabla',  #comment
-             }
-
-encoder.set_metadata(metadata)
-encoder.write_metadata(dest)
-
diff --git a/timeside/tests/api/test_pipe.py b/timeside/tests/api/test_pipe.py
deleted file mode 100644 (file)
index 629b3e9..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.tests.api.examples import Gain
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-from sys import stdout
-import os.path
-import numpy
-
-source = os.path.join(os.path.dirname(__file__),  "../samples/guitar.wav")
-
-print "Normalizing %s" % source
-decoder  = FileDecoder(source)
-maxlevel = MaxLevel()
-duration = Duration()
-
-(decoder | maxlevel | duration).run()
-
-gain = 1
-if maxlevel.result() < 0:
-    gain = 0.9 / numpy.exp(maxlevel.result()/20)
-
-print "input maxlevel: %f" % maxlevel.result()
-print "gain: %f" % gain
-print "duration: %f %s" % (duration.result(), duration.unit())
-
-gain     = Gain(gain)
-encoder  = WavEncoder("../results/guitar_normalized.wav")
-
-subpipe  = gain | maxlevel
-
-(decoder | subpipe | encoder).run()
-
-print "output maxlevel: %f" % maxlevel.result()
-
-
diff --git a/timeside/tests/api/test_pipe_spectrogram.py b/timeside/tests/api/test_pipe_spectrogram.py
deleted file mode 100644 (file)
index 49eaa8c..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-from timeside.core import *
-from timeside.api import *
-from timeside.decoder import *
-from timeside.grapher import *
-
-sample_dir = '../samples'
-img_dir = '../results/img'
-if not os.path.exists(img_dir):
-    os.mkdir(img_dir)
-
-test_dict = {'sweep.wav': 'spec_wav.png',
-            'sweep.flac': 'spec_flac.png',
-            'sweep.ogg': 'spec_ogg.png',
-            'sweep.mp3': 'spec_mp3.png',
-            }
-
-for source, image in test_dict.iteritems():
-    audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
-    image = img_dir + os.sep + image
-    print 'Test : decoder(%s) | waveform (%s)' % (source, image)
-    decoder  = FileDecoder(audio)
-    spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    (decoder | spectrogram).run()
-    print 'frames per pixel = ', spectrogram.graph.samples_per_pixel
-    print "render spectrogram to: %s" %  image
-    spectrogram.render(image)
-
-
-
diff --git a/timeside/tests/api/test_pipe_waveform.py b/timeside/tests/api/test_pipe_waveform.py
deleted file mode 100644 (file)
index bbfb54a..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-from timeside.core import *
-from timeside.api import *
-from timeside.decoder import *
-from timeside.grapher import *
-
-sample_dir = '../samples'
-img_dir = '../results/img'
-if not os.path.exists(img_dir):
-    os.mkdir(img_dir)
-
-test_dict = {'sweep.wav': 'waveform_wav.png',
-            'sweep.flac': 'waveform_flac.png',
-            'sweep.ogg': 'waveform_ogg.png',
-            'sweep.mp3': 'waveform_mp3.png',
-            }
-
-for source, image in test_dict.iteritems():
-    audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
-    image = img_dir + os.sep + image
-    print 'Test : decoder(%s) | waveform (%s)' % (source, image)
-    decoder  = FileDecoder(audio)
-    waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    (decoder | waveform).run()
-    print 'frames per pixel = ', waveform.graph.samples_per_pixel
-    print "render waveform to: %s" %  image
-    waveform.render(image)
-
-
diff --git a/timeside/tests/api/test_vorbis.py b/timeside/tests/api/test_vorbis.py
deleted file mode 100644 (file)
index 53818cd..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg")
-
-decoder  = FileDecoder(source)
-encoder  = VorbisEncoder(dest)
-
-(decoder | encoder).run()
diff --git a/timeside/tests/api/test_wav.py b/timeside/tests/api/test_wav.py
deleted file mode 100644 (file)
index 077a01e..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-
-import os.path
-source = os.path.join(os.path.dirname(__file__),  "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav")
-
-decoder  = FileDecoder(source)
-encoder  = WavEncoder(dest)
-
-(decoder | encoder).run()
-
diff --git a/timeside/tests/listprocessors.py b/timeside/tests/listprocessors.py
deleted file mode 100644 (file)
index 840c087..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-import timeside
-
-def list_processors(interface, prefix=""):
-    print prefix + interface.__name__
-    subinterfaces = interface.__subclasses__()
-    for i in subinterfaces:
-        list_processors(i, prefix + "  ")
-    processors = timeside.processors(interface, False)
-    for p in processors:
-        print prefix + "  %s [%s]" % (p.__name__, p.id())
-
-list_processors(timeside.api.IProcessor)        
diff --git a/timeside/tests/samples/guitar.wav b/timeside/tests/samples/guitar.wav
deleted file mode 100644 (file)
index b5a9e80..0000000
Binary files a/timeside/tests/samples/guitar.wav and /dev/null differ
diff --git a/timeside/tests/samples/sweep.flac b/timeside/tests/samples/sweep.flac
deleted file mode 100644 (file)
index decee06..0000000
Binary files a/timeside/tests/samples/sweep.flac and /dev/null differ
diff --git a/timeside/tests/samples/sweep.mp3 b/timeside/tests/samples/sweep.mp3
deleted file mode 100644 (file)
index dc75fe0..0000000
Binary files a/timeside/tests/samples/sweep.mp3 and /dev/null differ
diff --git a/timeside/tests/samples/sweep.ogg b/timeside/tests/samples/sweep.ogg
deleted file mode 100644 (file)
index e30bf99..0000000
Binary files a/timeside/tests/samples/sweep.ogg and /dev/null differ
diff --git a/timeside/tests/samples/sweep.wav b/timeside/tests/samples/sweep.wav
deleted file mode 100644 (file)
index fc28a32..0000000
Binary files a/timeside/tests/samples/sweep.wav and /dev/null differ
diff --git a/timeside/tests/samples/sweep_source.wav b/timeside/tests/samples/sweep_source.wav
deleted file mode 100644 (file)
index 5dcf9ba..0000000
Binary files a/timeside/tests/samples/sweep_source.wav and /dev/null differ
diff --git a/timeside/tests/testcomponent.py b/timeside/tests/testcomponent.py
deleted file mode 100644 (file)
index 6638c30..0000000
+++ /dev/null
@@ -1,166 +0,0 @@
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-__all__ = ['TestComponentArchitecture']
-
-class TestComponentArchitecture(TestCase):
-    "Test the component and interface system"
-   
-    def testOneInterface(self):
-        "Test a component implementing one interface"
-        self.assertSameList(implementations(I1), [C1])
-
-    def testTwoInterfaces(self):        
-        "Test a component implementing two interfaces"
-        self.assertSameList(implementations(I2), [C2])
-        self.assertSameList(implementations(I3), [C2])
-
-    def testTwoImplementations(self):
-        "Test an interface implemented by two components"
-        self.assertSameList(implementations(I4), [C3, C4])
-
-    def testInterfaceInheritance(self):
-        "Test whether a component implements an interface's parent"
-        self.assertSameList(implementations(I5), [C5])
-
-    def testImplementationInheritance(self):
-        "Test that a component doesn't implement the interface implemented by its parent" 
-        self.assertSameList(implementations(I7), [C6])
-
-    def testImplementationRedundancy(self):
-        "Test implementation redundancy across inheritance" 
-        self.assertSameList(implementations(I8), [C8, C9])
-
-    def testAbstractImplementation(self):    
-        "Test abstract implementation"
-        self.assertSameList(implementations(I11), [])
-        self.assertSameList(implementations(I11, abstract=True), [C11])
-
-    def testInterfaceDoc(self):        
-        "Test @interfacedoc decorator"
-        self.assertEquals(C10.test.__doc__, "testdoc")
-
-    def testInterfaceDocStatic(self):        
-        "Test @interfacedoc decorator on static method"
-        self.assertEquals(C10.teststatic.__doc__, "teststaticdoc")
-
-    def testIntefaceDocReversed(self):
-        "Test @interfacedoc on static method (decorators reversed)"
-
-        try:
-
-            class BogusDoc1(Component):
-                implements(I10)
-
-                @interfacedoc
-                @staticmethod        
-                def teststatic(self):
-                    pass
-
-            self.fail("No error raised with reversed decorators")
-
-        except ComponentError:
-            pass
-   
-    def testInterfaceDocBadMethod(self):
-        "Test @interfacedoc with unexistant method in interface"
-
-        try:
-            class BogusDoc2(Component):
-                implements(I10)
-
-                @interfacedoc
-                def nosuchmethod(self):
-                    pass
-
-            self.fail("No error raised when decorating an unexistant method")
-
-        except ComponentError:
-            pass
-
-class I1(Interface):
-    pass
-
-class I2(Interface):
-    pass
-
-class I3(Interface):
-    pass
-
-class I4(Interface):
-    pass
-
-class I5(Interface):
-    pass
-
-class I6(I5):
-    pass
-
-class I7(Interface):
-    pass
-
-class I8(Interface):
-    pass
-
-class I9(I8):
-    pass
-
-class I10(Interface):
-    def test(self):
-        """testdoc"""
-
-    @staticmethod        
-    def teststatic(self):
-        """teststaticdoc"""
-
-class I11(Interface):
-    pass
-
-class C1(Component):
-    implements(I1)
-
-class C2(Component):
-    implements(I2, I3)
-
-class C3(Component):
-    implements(I4)
-
-class C4(Component):
-    implements(I4)
-
-class C5(Component):
-    implements(I6)
-
-class C6(Component):
-    implements(I7)
-
-class C7(C6):
-    pass
-
-class C8(Component):
-    implements(I8)
-
-class C9(Component):
-    implements(I8, I9)
-
-class C10(Component):
-    implements(I10)
-
-    @interfacedoc
-    def test(self):
-        pass
-
-    @staticmethod        
-    @interfacedoc
-    def teststatic(self):
-        pass
-
-class C11(Component):
-    abstract()
-    implements(I11)
-
-if __name__ == '__main__':
-    unittest.main(testRunner=TestRunner())
-
diff --git a/timeside/tests/testdecoding.py b/timeside/tests/testdecoding.py
deleted file mode 100644 (file)
index 5e0d255..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-from timeside.decoder import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestDecoding']
-
-class TestDecoding(TestCase):
-    "Test the low level streaming features"
-
-    def setUp(self):
-        pass
-   
-    def testWav(self):
-        "Test wav decoding"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-
-    def testFlac(self):
-        "Test flac decoding"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-
-    def testOgg(self):
-        "Test ogg decoding"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
-
-    def testMp3(self):
-        "Test mp3 decoding"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
-
-    def tearDown(self):
-        decoder = FileDecoder(self.source)
-        decoder.setup()
-
-        totalframes = 0.
-
-        while True:
-            frames, eod = decoder.process()
-            totalframes += frames.shape[0]
-            if eod: break
-
-        # FIXME compute actual number of frames from file
-        if os.path.splitext(self.source)[-1].lower() == '.mp3':
-            self.assertEquals(totalframes, 355969)
-        elif os.path.splitext(self.source)[-1].lower() == '.ogg':
-            self.assertEquals(totalframes, 352833)
-        else:
-            self.assertEquals(totalframes, 352801)
-
-if __name__ == '__main__':
-    unittest.main(testRunner=TestRunner())
-
-
diff --git a/timeside/tests/testgraphers.py b/timeside/tests/testgraphers.py
deleted file mode 100644 (file)
index bff370c..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.grapher import *
-from timeside.api import *
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestGraphers']
-
-class TestGraphers(TestCase):
-    "Test all graphers with various input media formats"
-
-    def setUp(self):
-        pass
-    
-    # WAVEFORMS
-    def testWav2Waveform(self):
-        "Test WAV to Waveform"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-        self.image = "/tmp/test_waveform_sweep_wav.png"
-        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
-    def testFlac2Waveform(self):
-        "Test FLAC to Waveform"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-        self.image = "/tmp/test_waveform_sweep_flac.png"
-        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testMp32Waveform(self):
-        "Test MP3 to Waveform"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
-        self.image = "/tmp/test_waveform_sweep_mp3.png"
-        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testOgg2Waveform(self):
-        "Test OGG to Waveform"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
-        self.image = "/tmp/test_waveform_sweep_ogg.png"
-        self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    # WAVEFORMS JOYDIV
-    def testWav2WaveformJoyDiv(self):
-        "Test WAV to WaveformJoyDiv"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-        self.image = "/tmp/test_waveformjoydiv_sweep_wav.png"
-        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
-    def testFlac2WaveformJoyDiv(self):
-        "Test FLAC to WaveformJoyDiv"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-        self.image = "/tmp/test_waveformjoydiv_sweep_flac.png"
-        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testMp32WaveformJoyDiv(self):
-        "Test MP3 to WaveformJoyDiv"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
-        self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png"
-        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testOgg2WaveformJoyDiv(self):
-        "Test OGG to WaveformJoyDiv"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
-        self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png"
-        self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-        
-    # SPECTROGRAMS
-    def testWav2Spectrogram(self):
-        "Test WAV to Spectrogram"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-        self.image = "/tmp/test_spectrogram_sweep_wav.png"
-        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testMp32Spectrogram(self):
-        "Test MP3 to Spectrogram"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
-        self.image = "/tmp/test_spectrogram_sweep_mp3.png"
-        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testFlac2Spectrogram(self):
-        "Test FLAC to Spectrogram"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-        self.image = "/tmp/test_spectrogram_sweep_flac.png"
-        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-    
-    def testOgg2Spectrogram(self):
-        "Test OGG to Spectrogram"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
-        self.image = "/tmp/test_spectrogram_sweep_ogg.png"
-        self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-     
-    def tearDown(self):
-        decoder = FileDecoder(self.source)
-        (decoder | self.grapher).run()
-        self.grapher.render(self.image)
-        
-if __name__ == '__main__':
-    unittest.main(testRunner=TestRunner())
-
diff --git a/timeside/tests/testinputadapter.py b/timeside/tests/testinputadapter.py
deleted file mode 100644 (file)
index dca0f63..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-from timeside.core import FixedSizeInputAdapter
-from timeside.tests import TestCase, TestRunner
-import numpy
-import unittest
-
-class TestFixedSizeInputAdapter(TestCase):
-    "Test the fixed-sized input adapter"
-
-    def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None):
-        output = output[:]
-        output.reverse()
-        _eod = None
-        for buffer, _eod in adapter.process(input, input_eod):
-            a = output.pop()
-            if not numpy.array_equiv(buffer, a):
-                self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a)))
-
-        if _eod != output_eod:
-            self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod)))
-
-        if output:
-            self.fail("trailing expected data: %s" % output)
-
-    def setUp(self):
-        self.data = numpy.arange(44).reshape(2,22).transpose()
-
-    def testTwoChannels(self):
-        "Test simple stream with two channels"
-        adapter = FixedSizeInputAdapter(4, 2)
-
-        self.assertEquals(len(self.data), adapter.nframes(len(self.data)))
-
-        self.assertIOEquals(adapter, self.data[0:1], False, [])
-        self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False)
-        self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False)
-        self.assertIOEquals(adapter, self.data[12:13], False, [])
-        self.assertIOEquals(adapter, self.data[13:14], False, [])
-        self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False)
-        self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False)
-        self.assertIOEquals(adapter, self.data[20:21], False, [])
-        self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True)
-
-    def testPadding(self):
-        "Test automatic padding support"
-        adapter = FixedSizeInputAdapter(4, 2, pad=True)
-
-        self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data)))
-
-        self.assertIOEquals(adapter, self.data[0:21], False, 
-            [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], 
-            False)
-
-        self.assertIOEquals(adapter, self.data[21:22], True, [[
-            [20, 42],
-            [21, 43],
-            [0, 0],
-            [0, 0]
-        ]], True)
-
-    def testSizeMultiple(self):
-        "Test a stream which contain a multiple number of buffers"
-        adapter = FixedSizeInputAdapter(4, 2)
-
-        self.assertIOEquals(adapter, self.data[0:20], True, 
-            [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], 
-            True)
-
-
-if __name__ == '__main__':
-    unittest.main(testRunner=TestRunner())
-
diff --git a/timeside/tests/testtranscoding.py b/timeside/tests/testtranscoding.py
deleted file mode 100644 (file)
index 7528a76..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestTranscoding']
-
-class TestTranscoding(TestCase):
-    "Test the low level streaming features"
-
-    def setUp(self):
-        pass
-   
-    def testWav2Mp3(self):
-        "Test wav to mp3 conversion"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-
-        dest1 = "/tmp/test_wav_filesink.mp3"
-        dest2 = "/tmp/test_wav_appsink.mp3"
-        self.f = open(dest2,'w')
-
-        self.streaming=True
-
-        encoder = Mp3Encoder(dest1, streaming=True)
-        self.encoder = encoder
-
-    def testFlac2Mp3(self):
-        "Test flac to mp3 conversion"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-
-        dest1 = "/tmp/test_flac_filesink.mp3"
-        dest2 = "/tmp/test_flac_appsink.mp3"
-        self.f = open(dest2,'w')
-
-        self.streaming=True
-
-        encoder = Mp3Encoder(dest1, streaming=True)
-        self.encoder = encoder
-
-    """
-    def testFlac2Ogg(self):
-        "Test flac to ogg conversion"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
-
-        dest1 = "/tmp/test_flac_filesink.ogg"
-        dest2 = "/tmp/test_flac_appsink.ogg"
-        self.f = open(dest2,'w')
-
-        self.streaming=True
-
-        encoder = VorbisEncoder(dest1, streaming=True)
-        self.encoder = encoder
-
-    def testWav2Ogg(self):
-        "Test wav to ogg conversion"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-
-        dest1 = "/tmp/test_wav_filesink.ogg"
-        dest2 = "/tmp/test_wav_appsink.ogg"
-        self.f = open(dest2,'w')
-
-        self.streaming=True
-
-        encoder = VorbisEncoder(dest1, streaming=True)
-        self.encoder = encoder
-    
-    def testWav2Flac(self):
-        "Test wav to flac conversion"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
-
-        dest1 = "/tmp/test_wav_filesink.flac"
-        dest2 = "/tmp/test_wav_appsink.flac"
-        self.f = open(dest2,'w')
-
-        self.streaming=True
-
-        encoder = FlacEncoder(dest1, streaming=True)
-        self.encoder = encoder
-    """    
-
-    def setUpDecoder(self):
-        self.decoder = FileDecoder(self.source)
-        self.decoder.setup()
-        self.channels  = self.decoder.channels()
-        self.samplerate = self.decoder.samplerate()
-
-    def setUpEncoder(self): 
-        self.encoder.setup(channels = self.channels, samplerate = self.samplerate)
-
-    def tearDown(self):
-        self.setUpDecoder()
-        self.setUpEncoder()
-        encoder = self.encoder
-        f = self.f
-
-        #print "decoder pipe:\n", decoder.pipe
-        #print "encoder pipe:\n", encoder.pipe
-        totalframes = 0.
-
-        while True:
-            frames, eod = self.decoder.process()
-            #print frames.shape[0]
-            totalframes += frames.shape[0]
-            encoder.process(frames, eod)
-            if self.streaming:
-                f.write(encoder.chunk)
-            if eod:
-                break
-            if encoder.eod :
-                break
-        f.close()
-
-        # FIXME compute actual number of frames from file
-        self.assertEquals(totalframes, 352801)
-
-if __name__ == '__main__':
-    unittest.main(testRunner=TestRunner())
-