--- /dev/null
+from testcomponent import *
+from testinputadapter import *
+from testgraphers import *
+from testdecoding import *
+from testtranscoding import *
+from unit_timeside import *
+
+unittest.main(testRunner=TestRunner())
--- /dev/null
+# -*- coding: utf-8 -*-
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.api import *
+import numpy
+
+
+class Gain(Processor):
+ implements(IEffect)
+
+ @interfacedoc
+ def __init__(self, gain=1.0):
+ self.gain = gain
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_gain"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Gain test effect"
+
+ def process(self, frames, eod=False):
+ return numpy.multiply(frames, self.gain), eod
+
+
+class FixedInputProcessor(Processor):
+ """Processor which does absolutely nothing except illustrating the use
+ of the FixedInputSizeAdapter. It also tests things a bit."""
+
+ implements(IProcessor)
+
+ BUFFER_SIZE = 1024
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_fixed"
+
+ @interfacedoc
+ def setup(self, channels, samplerate, nframes):
+ super(FixedInputProcessor, self).setup(channels, samplerate, nframes)
+ self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True)
+
+ @interfacedoc
+ def process(self, frames, eod=False):
+ for buffer, end in self.adapter.process(frames, eod):
+ # Test that the adapter is actually doing the job:
+ if len(buffer) != self.BUFFER_SIZE:
+ raise Exception("Bad buffer size from adapter")
+
+ return frames, eod
+
+
+
+
+
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2007-2009 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import Processor, implements, interfacedoc
+from timeside.api import IDecoder, IEncoder
+from numpy import array, frombuffer, getbuffer, float32
+
+import pygst
+pygst.require('0.10')
+import gst
+import gobject
+gobject.threads_init ()
+
+class FileDecoder(Processor):
+ """ gstreamer-based decoder """
+ implements(IDecoder)
+
+ # duration ms, before discovery process times out
+ MAX_DISCOVERY_TIME = 3000
+
+ audioformat = None
+ audiochannels = None
+ audiorate = None
+ audionframes = None
+ mimetype = ''
+
+ # IProcessor methods
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_gstreamerdec"
+
+ def setup(self, channels = None, samplerate = None, nframes = None):
+ # the output data format we want
+ caps = "audio/x-raw-float, width=32"
+ pipeline = gst.parse_launch('''uridecodebin uri=%s
+ ! audioconvert
+ ! %s
+ ! appsink name=sink sync=False ''' % (self.uri, caps))
+ # store a pointer to appsink in our decoder object
+ self.sink = pipeline.get_by_name('sink')
+ # adjust length of emitted buffers
+ # self.sink.set_property('blocksize', 0x10000)
+ # start pipeline
+ pipeline.set_state(gst.STATE_PLAYING)
+
+ @interfacedoc
+ def channels(self):
+ return self.audiochannels
+
+ @interfacedoc
+ def samplerate(self):
+ return self.audiorate
+
+ @interfacedoc
+ def nframes(self):
+ return self.audionframes
+
+ @interfacedoc
+ def process(self, frames = None, eod = False):
+ try:
+ buf = self.sink.emit('pull-buffer')
+ except SystemError, e:
+ # should never happen
+ print 'SystemError', e
+ return array([0.]), True
+ if buf == None:
+ return array([0.]), True
+ return self.gst_buffer_to_numpy_array(buf), False
+
+ @interfacedoc
+ def release(self):
+ # nothing to do for now
+ pass
+
+ ## IDecoder methods
+
+ @interfacedoc
+ def __init__(self, uri):
+
+ # is this a file?
+ import os.path
+ if os.path.exists(uri):
+ # get the absolute path
+ uri = os.path.abspath(uri)
+ # first run the file/uri through the discover pipeline
+ self.discover(uri)
+ # and make a uri of it
+ from urllib import quote
+ self.uri = 'file://'+quote(uri)
+
+ @interfacedoc
+ def format(self):
+ # TODO check
+ return self.mimetype
+
+ @interfacedoc
+ def encoding(self):
+ # TODO check
+ return self.mimetype.split('/')[-1]
+
+ @interfacedoc
+ def resolution(self):
+ # TODO check: width or depth?
+ return self.audiowidth
+
+ @interfacedoc
+ def metadata(self):
+ # TODO check
+ return self.tags
+
+ ## gst.extend discoverer
+
+ def discover(self, path):
+ """ gstreamer based helper function to get file attributes """
+ from gst.extend import discoverer
+ d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
+ d.connect('discovered', self.discovered)
+ self.mainloop = gobject.MainLoop()
+ d.discover()
+ self.mainloop.run()
+
+ def discovered(self, d, is_media):
+ """ gstreamer based helper executed upon discover() completion """
+ if is_media and d.is_audio:
+ # copy the discoverer attributes to self
+ self.audiorate = d.audiorate
+ self.mimetype= d.mimetype
+ self.audiochannels = d.audiochannels
+ self.audiowidth = d.audiowidth
+ # conversion from time in nanoseconds to frames
+ from math import ceil
+ duration = d.audiorate * d.audiolength * 1.e-9
+ self.audionframes = int (ceil ( duration ) )
+ self.tags = d.tags
+ elif not d.is_audio:
+ print "error, no audio found!"
+ else:
+ print "fail", path
+ self.mainloop.quit()
+
+ def gst_buffer_to_numpy_array(self, buf):
+ """ gstreamer buffer to numpy array conversion """
+ chan = self.audiochannels
+ samples = frombuffer(buf.data, dtype=float32)
+ samples.resize([len(samples)/chan, chan])
+ return samples
+
+class WavEncoder(Processor):
+ """ gstreamer-based encoder """
+ implements(IEncoder)
+
+ def __init__(self, output):
+ self.file = None
+ if isinstance(output, basestring):
+ self.filename = output
+ else:
+ raise Exception("Streaming not supported")
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None, nframes=None):
+ super(WavEncoder, self).setup(channels, samplerate, nframes)
+ # TODO open file for writing
+ # the output data format we want
+ pipeline = gst.parse_launch(''' appsrc name=src
+ ! audioconvert
+ ! wavenc
+ ! filesink location=%s ''' % self.filename)
+ # store a pointer to appsink in our encoder object
+ self.src = pipeline.get_by_name('src')
+ srccaps = gst.Caps("""audio/x-raw-float,
+ endianness=(int)1234,
+ channels=(int)%s,
+ width=(int)32,
+ rate=(int)%d""" % (int(channels), int(samplerate)))
+ self.src.set_property("caps", srccaps)
+
+ # start pipeline
+ pipeline.set_state(gst.STATE_PLAYING)
+ self.pipeline = pipeline
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_gstreamerenc"
+
+ @staticmethod
+ @interfacedoc
+ def description():
+ return "Gstreamer based encoder"
+
+ @staticmethod
+ @interfacedoc
+ def file_extension():
+ return "wav"
+
+ @staticmethod
+ @interfacedoc
+ def mime_type():
+ return "audio/x-wav"
+
+ @interfacedoc
+ def set_metadata(self, metadata):
+ #TODO
+ pass
+
+ @interfacedoc
+ def process(self, frames, eod=False):
+ buf = self.numpy_array_to_gst_buffer(frames)
+ self.src.emit('push-buffer', buf)
+ if eod: self.src.emit('end-of-stream')
+ return frames, eod
+
+ def numpy_array_to_gst_buffer(self, frames):
+ """ gstreamer buffer to numpy array conversion """
+ buf = gst.Buffer(getbuffer(frames))
+ return buf
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import timeside
+from sys import stdout
+import os.path
+import numpy
+
+
+class TestAnalyzer:
+
+ graphers = timeside.core.processors(timeside.api.IGrapher)
+ decoders = timeside.core.processors(timeside.api.IDecoder)
+ encoders= timeside.core.processors(timeside.api.IEncoder)
+ analyzers = timeside.core.processors(timeside.api.IAnalyzer)
+
+ def __init__(self, path):
+ self.source = os.path.join(os.path.dirname(__file__), path)
+ print "Processing %s" % self.source
+ self.decoder = timeside.decoder.FileDecoder(self.source)
+ print 'format: ', self.decoder.format()
+ self.pipe = self.decoder
+ self.analyzers_sub_pipe = []
+
+ def process(self):
+ for analyzer in self.analyzers:
+ sub_pipe = analyzer()
+ self.analyzers_sub_pipe.append(sub_pipe)
+ self.pipe = self.pipe | sub_pipe
+ self.pipe.run()
+
+ def results(self):
+ analyzers = []
+ for analyzer in self.analyzers_sub_pipe:
+ value = analyzer.result()
+ analyzers.append({'name':analyzer.name(),
+ 'id':analyzer.id(),
+ 'unit':analyzer.unit(),
+ 'value':str(value)})
+ print analyzers
+
+
+test = TestAnalyzer('../samples/guitar.wav')
+#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav')
+test.process()
+test.results()
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import timeside
+import sys
+import os.path
+import numpy
+import time
+
+
+class TestAnalyzer:
+
+ analyzer = timeside.analyzer.MaxLevel()
+
+ def __init__(self, path):
+ self.source = os.path.join(os.path.dirname(__file__), path)
+ print "Processing %s" % self.source
+ self.decoder = timeside.decoder.FileDecoder(self.source)
+ print 'format: ', self.decoder.format()
+ self.pipe = self.decoder
+ self.sub_pipe = self.analyzer
+
+ def process(self):
+ self.pipe = self.pipe | self.sub_pipe
+ self.pipe.run()
+
+ def results(self):
+ print {'name':self.analyzer.name(),
+ 'id':self.analyzer.id(),
+ 'unit':self.analyzer.unit(),
+ 'value':str(self.analyzer.value)}
+
+test = TestAnalyzer(sys.argv[-1])
+test.process()
+test.results()
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac")
+
+decoder = FileDecoder(source)
+encoder = FlacEncoder(dest)
+
+(decoder | encoder).run()
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.tests.api.examples import Gain
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+ source = sys.argv[1]
+else:
+ import os.path
+ source= os.path.join (os.path.dirname(__file__), "../samples/guitar.wav")
+
+Decoder = FileDecoder
+print "Creating decoder with id=%s for: %s" % (Decoder.id(), source)
+decoder = Decoder(source)
+analyzer = MaxLevel()
+decoder.setup()
+nchannels = decoder.channels()
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+analyzer.setup(nchannels, samplerate)
+
+print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
+ nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution())
+
+while True:
+ frames, eod = decoder.process()
+ analyzer.process(frames, eod)
+ if eod:
+ break
+
+max_level = analyzer.result()
+print "Max level: %f" % max_level
+
+destination = "../results/guitar_normalized.wav"
+Encoder = WavEncoder
+print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
+encoder = Encoder(destination)
+
+gain = 1
+if max_level > 0:
+ gain = 0.9 / max_level
+
+effect = Gain(gain)
+
+decoder.setup()
+effect.setup(decoder.channels(), decoder.samplerate())
+encoder.setup(effect.channels(), effect.samplerate())
+
+print "Applying effect id=%s with gain=%f" % (effect.id(), gain)
+
+while True:
+ frames, eod = decoder.process()
+ encoder.process(*effect.process(frames, eod))
+ if eod:
+ break
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+ source = sys.argv[1]
+else:
+ import os.path
+ source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+while True:
+ encoder.process(*decoder.process())
+ if streaming:
+ f.write(encoder.chunk)
+ if encoder.eod :
+ break
+
+f.close()
+print encoder.pipe
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+ source = sys.argv[1]
+else:
+ import os.path
+ source= os.path.join (os.path.dirname(__file__), "../samples/sweep.wav")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+print encoder.pipe
+
+while True:
+ encoder.process(*decoder.process())
+ if streaming:
+ f.write(encoder.chunk)
+ if encoder.eod :
+ break
+
+f.close()
+print encoder.pipe
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+import sys
+if len(sys.argv) > 1:
+ source = sys.argv[1]
+else:
+ import os.path
+ source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac")
+
+decoder = FileDecoder(source)
+print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
+decoder.setup()
+channels = decoder.channels()
+print 'channels :', channels
+samplerate = decoder.samplerate()
+nframes = decoder.nframes()
+
+dest1 = "/tmp/test_filesink.ogg"
+dest2 = "/tmp/test_appsink.ogg"
+f = open(dest2,'w')
+
+streaming=True
+encoder = VorbisEncoder(dest1, streaming=True)
+encoder.setup(channels=channels, samplerate=samplerate)
+
+print encoder.pipe
+
+while True:
+ encoder.process(*decoder.process())
+ if streaming:
+ f.write(encoder.chunk)
+ if encoder.eod :
+ break
+
+f.close()
+print encoder.pipe
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3")
+
+decoder = FileDecoder(source)
+encoder = Mp3Encoder(dest)
+
+(decoder | encoder).run()
+
+metadata = {'TIT2': 'title', #title2
+ 'TCOM': 'composer', #composer
+ 'TPE1': 'lead creator', #lead
+ 'UFID': 'identifier', #Unique ID...
+ 'TALB': 'album', #album
+ 'TCON': 'genre', #genre
+ 'TDRC': '2011', #year
+# 'COMM': 'blabla', #comment
+ }
+
+encoder.set_metadata(metadata)
+encoder.write_metadata(dest)
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.tests.api.examples import Gain
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+from sys import stdout
+import os.path
+import numpy
+
+source = os.path.join(os.path.dirname(__file__), "../samples/guitar.wav")
+
+print "Normalizing %s" % source
+decoder = FileDecoder(source)
+maxlevel = MaxLevel()
+duration = Duration()
+
+(decoder | maxlevel | duration).run()
+
+gain = 1
+if maxlevel.result() < 0:
+ gain = 0.9 / numpy.exp(maxlevel.result()/20)
+
+print "input maxlevel: %f" % maxlevel.result()
+print "gain: %f" % gain
+print "duration: %f %s" % (duration.result(), duration.unit())
+
+gain = Gain(gain)
+encoder = WavEncoder("../results/guitar_normalized.wav")
+
+subpipe = gain | maxlevel
+
+(decoder | subpipe | encoder).run()
+
+print "output maxlevel: %f" % maxlevel.result()
+
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import os
+from timeside.core import *
+from timeside.api import *
+from timeside.decoder import *
+from timeside.grapher import *
+
+sample_dir = '../samples'
+img_dir = '../results/img'
+if not os.path.exists(img_dir):
+ os.mkdir(img_dir)
+
+test_dict = {'sweep.wav': 'spec_wav.png',
+ 'sweep.flac': 'spec_flac.png',
+ 'sweep.ogg': 'spec_ogg.png',
+ 'sweep.mp3': 'spec_mp3.png',
+ }
+
+for source, image in test_dict.iteritems():
+ audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
+ image = img_dir + os.sep + image
+ print 'Test : decoder(%s) | waveform (%s)' % (source, image)
+ decoder = FileDecoder(audio)
+ spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+ (decoder | spectrogram).run()
+ print 'frames per pixel = ', spectrogram.graph.samples_per_pixel
+ print "render spectrogram to: %s" % image
+ spectrogram.render(image)
+
+
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import os
+from timeside.core import *
+from timeside.api import *
+from timeside.decoder import *
+from timeside.grapher import *
+
+sample_dir = '../samples'
+img_dir = '../results/img'
+if not os.path.exists(img_dir):
+ os.mkdir(img_dir)
+
+test_dict = {'sweep.wav': 'waveform_wav.png',
+ 'sweep.flac': 'waveform_flac.png',
+ 'sweep.ogg': 'waveform_ogg.png',
+ 'sweep.mp3': 'waveform_mp3.png',
+ }
+
+for source, image in test_dict.iteritems():
+ audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
+ image = img_dir + os.sep + image
+ print 'Test : decoder(%s) | waveform (%s)' % (source, image)
+ decoder = FileDecoder(audio)
+ waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+ (decoder | waveform).run()
+ print 'frames per pixel = ', waveform.graph.samples_per_pixel
+ print "render waveform to: %s" % image
+ waveform.render(image)
+
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.encoder import *
+import os.path
+
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg")
+
+decoder = FileDecoder(source)
+encoder = VorbisEncoder(dest)
+
+(decoder | encoder).run()
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+
+import os.path
+source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
+dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav")
+
+decoder = FileDecoder(source)
+encoder = WavEncoder(dest)
+
+(decoder | encoder).run()
+
--- /dev/null
+import timeside
+
+def list_processors(interface, prefix=""):
+ print prefix + interface.__name__
+ subinterfaces = interface.__subclasses__()
+ for i in subinterfaces:
+ list_processors(i, prefix + " ")
+ processors = timeside.processors(interface, False)
+ for p in processors:
+ print prefix + " %s [%s]" % (p.__name__, p.id())
+
+list_processors(timeside.api.IProcessor)
--- /dev/null
+
+from timeside.component import *
+from unit_timeside import TestCase, TestRunner
+
+__all__ = ['TestComponentArchitecture']
+
+class TestComponentArchitecture(TestCase):
+ "Test the component and interface system"
+
+ def testOneInterface(self):
+ "Test a component implementing one interface"
+ self.assertSameList(implementations(I1), [C1])
+
+ def testTwoInterfaces(self):
+ "Test a component implementing two interfaces"
+ self.assertSameList(implementations(I2), [C2])
+ self.assertSameList(implementations(I3), [C2])
+
+ def testTwoImplementations(self):
+ "Test an interface implemented by two components"
+ self.assertSameList(implementations(I4), [C3, C4])
+
+ def testInterfaceInheritance(self):
+ "Test whether a component implements an interface's parent"
+ self.assertSameList(implementations(I5), [C5])
+
+ def testImplementationInheritance(self):
+ "Test that a component doesn't implement the interface implemented by its parent"
+ self.assertSameList(implementations(I7), [C6])
+
+ def testImplementationRedundancy(self):
+ "Test implementation redundancy across inheritance"
+ self.assertSameList(implementations(I8), [C8, C9])
+
+ def testAbstractImplementation(self):
+ "Test abstract implementation"
+ self.assertSameList(implementations(I11), [])
+ self.assertSameList(implementations(I11, abstract=True), [C11])
+
+ def testInterfaceDoc(self):
+ "Test @interfacedoc decorator"
+ self.assertEquals(C10.test.__doc__, "testdoc")
+
+ def testInterfaceDocStatic(self):
+ "Test @interfacedoc decorator on static method"
+ self.assertEquals(C10.teststatic.__doc__, "teststaticdoc")
+
+ def testIntefaceDocReversed(self):
+ "Test @interfacedoc on static method (decorators reversed)"
+
+ try:
+
+ class BogusDoc1(Component):
+ implements(I10)
+
+ @interfacedoc
+ @staticmethod
+ def teststatic(self):
+ pass
+
+ self.fail("No error raised with reversed decorators")
+
+ except ComponentError:
+ pass
+
+ def testInterfaceDocBadMethod(self):
+ "Test @interfacedoc with unexistant method in interface"
+
+ try:
+ class BogusDoc2(Component):
+ implements(I10)
+
+ @interfacedoc
+ def nosuchmethod(self):
+ pass
+
+ self.fail("No error raised when decorating an unexistant method")
+
+ except ComponentError:
+ pass
+
+class I1(Interface):
+ pass
+
+class I2(Interface):
+ pass
+
+class I3(Interface):
+ pass
+
+class I4(Interface):
+ pass
+
+class I5(Interface):
+ pass
+
+class I6(I5):
+ pass
+
+class I7(Interface):
+ pass
+
+class I8(Interface):
+ pass
+
+class I9(I8):
+ pass
+
+class I10(Interface):
+ def test(self):
+ """testdoc"""
+
+ @staticmethod
+ def teststatic(self):
+ """teststaticdoc"""
+
+class I11(Interface):
+ pass
+
+class C1(Component):
+ implements(I1)
+
+class C2(Component):
+ implements(I2, I3)
+
+class C3(Component):
+ implements(I4)
+
+class C4(Component):
+ implements(I4)
+
+class C5(Component):
+ implements(I6)
+
+class C6(Component):
+ implements(I7)
+
+class C7(C6):
+ pass
+
+class C8(Component):
+ implements(I8)
+
+class C9(Component):
+ implements(I8, I9)
+
+class C10(Component):
+ implements(I10)
+
+ @interfacedoc
+ def test(self):
+ pass
+
+ @staticmethod
+ @interfacedoc
+ def teststatic(self):
+ pass
+
+class C11(Component):
+ abstract()
+ implements(I11)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
+
--- /dev/null
+from timeside.decoder import *
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestDecoding']
+
+class TestDecoding(TestCase):
+ "Test the low level streaming features"
+
+ def setUp(self):
+ pass
+
+ def testWav(self):
+ "Test wav decoding"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+
+ def testFlac(self):
+ "Test flac decoding"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+
+ def testOgg(self):
+ "Test ogg decoding"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
+
+ def testMp3(self):
+ "Test mp3 decoding"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
+
+ def tearDown(self):
+ decoder = FileDecoder(self.source)
+ decoder.setup()
+
+ totalframes = 0.
+
+ while True:
+ frames, eod = decoder.process()
+ totalframes += frames.shape[0]
+ if eod: break
+
+ # FIXME compute actual number of frames from file
+ if os.path.splitext(self.source)[-1].lower() == '.mp3':
+ self.assertEquals(totalframes, 355969)
+ elif os.path.splitext(self.source)[-1].lower() == '.ogg':
+ self.assertEquals(totalframes, 352833)
+ else:
+ self.assertEquals(totalframes, 352801)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
+
+
--- /dev/null
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.grapher import *
+from timeside.api import *
+
+from timeside.component import *
+
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestGraphers']
+
+class TestGraphers(TestCase):
+ "Test all graphers with various input media formats"
+
+ def setUp(self):
+ pass
+
+ # WAVEFORMS
+ def testWav2Waveform(self):
+ "Test WAV to Waveform"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+ self.image = "/tmp/test_waveform_sweep_wav.png"
+ self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testFlac2Waveform(self):
+ "Test FLAC to Waveform"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+ self.image = "/tmp/test_waveform_sweep_flac.png"
+ self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testMp32Waveform(self):
+ "Test MP3 to Waveform"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
+ self.image = "/tmp/test_waveform_sweep_mp3.png"
+ self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testOgg2Waveform(self):
+ "Test OGG to Waveform"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
+ self.image = "/tmp/test_waveform_sweep_ogg.png"
+ self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ # WAVEFORMS JOYDIV
+ def testWav2WaveformJoyDiv(self):
+ "Test WAV to WaveformJoyDiv"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+ self.image = "/tmp/test_waveformjoydiv_sweep_wav.png"
+ self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testFlac2WaveformJoyDiv(self):
+ "Test FLAC to WaveformJoyDiv"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+ self.image = "/tmp/test_waveformjoydiv_sweep_flac.png"
+ self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testMp32WaveformJoyDiv(self):
+ "Test MP3 to WaveformJoyDiv"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
+ self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png"
+ self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testOgg2WaveformJoyDiv(self):
+ "Test OGG to WaveformJoyDiv"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
+ self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png"
+ self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ # SPECTROGRAMS
+ def testWav2Spectrogram(self):
+ "Test WAV to Spectrogram"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+ self.image = "/tmp/test_spectrogram_sweep_wav.png"
+ self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testMp32Spectrogram(self):
+ "Test MP3 to Spectrogram"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
+ self.image = "/tmp/test_spectrogram_sweep_mp3.png"
+ self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testFlac2Spectrogram(self):
+ "Test FLAC to Spectrogram"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+ self.image = "/tmp/test_spectrogram_sweep_flac.png"
+ self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def testOgg2Spectrogram(self):
+ "Test OGG to Spectrogram"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
+ self.image = "/tmp/test_spectrogram_sweep_ogg.png"
+ self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+
+ def tearDown(self):
+ decoder = FileDecoder(self.source)
+ (decoder | self.grapher).run()
+ self.grapher.render(self.image)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
+
--- /dev/null
+from timeside.core import FixedSizeInputAdapter
+from unit_timeside import TestCase, TestRunner
+import numpy
+
+class TestFixedSizeInputAdapter(TestCase):
+ "Test the fixed-sized input adapter"
+
+ def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None):
+ output = output[:]
+ output.reverse()
+ _eod = None
+ for buffer, _eod in adapter.process(input, input_eod):
+ a = output.pop()
+ if not numpy.array_equiv(buffer, a):
+ self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a)))
+
+ if _eod != output_eod:
+ self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod)))
+
+ if output:
+ self.fail("trailing expected data: %s" % output)
+
+ def setUp(self):
+ self.data = numpy.arange(44).reshape(2,22).transpose()
+
+ def testTwoChannels(self):
+ "Test simple stream with two channels"
+ adapter = FixedSizeInputAdapter(4, 2)
+
+ self.assertEquals(len(self.data), adapter.nframes(len(self.data)))
+
+ self.assertIOEquals(adapter, self.data[0:1], False, [])
+ self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False)
+ self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False)
+ self.assertIOEquals(adapter, self.data[12:13], False, [])
+ self.assertIOEquals(adapter, self.data[13:14], False, [])
+ self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False)
+ self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False)
+ self.assertIOEquals(adapter, self.data[20:21], False, [])
+ self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True)
+
+ def testPadding(self):
+ "Test automatic padding support"
+ adapter = FixedSizeInputAdapter(4, 2, pad=True)
+
+ self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data)))
+
+ self.assertIOEquals(adapter, self.data[0:21], False,
+ [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]],
+ False)
+
+ self.assertIOEquals(adapter, self.data[21:22], True, [[
+ [20, 42],
+ [21, 43],
+ [0, 0],
+ [0, 0]
+ ]], True)
+
+ def testSizeMultiple(self):
+ "Test a stream which contain a multiple number of buffers"
+ adapter = FixedSizeInputAdapter(4, 2)
+
+ self.assertIOEquals(adapter, self.data[0:20], True,
+ [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]],
+ True)
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
+
--- /dev/null
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.api import *
+
+from timeside.component import *
+
+from unit_timeside import TestCase, TestRunner
+
+import os.path
+
+__all__ = ['TestTranscoding']
+
+class TestTranscoding(TestCase):
+ "Test the low level streaming features"
+
+ def setUp(self):
+ pass
+
+ def testWav2Mp3(self):
+ "Test wav to mp3 conversion"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+
+ dest1 = "/tmp/test_wav_filesink.mp3"
+ dest2 = "/tmp/test_wav_appsink.mp3"
+ self.f = open(dest2,'w')
+
+ self.streaming=True
+
+ encoder = Mp3Encoder(dest1, streaming=True)
+ self.encoder = encoder
+
+ def testFlac2Mp3(self):
+ "Test flac to mp3 conversion"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+
+ dest1 = "/tmp/test_flac_filesink.mp3"
+ dest2 = "/tmp/test_flac_appsink.mp3"
+ self.f = open(dest2,'w')
+
+ self.streaming=True
+
+ encoder = Mp3Encoder(dest1, streaming=True)
+ self.encoder = encoder
+
+ """
+ def testFlac2Ogg(self):
+ "Test flac to ogg conversion"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
+
+ dest1 = "/tmp/test_flac_filesink.ogg"
+ dest2 = "/tmp/test_flac_appsink.ogg"
+ self.f = open(dest2,'w')
+
+ self.streaming=True
+
+ encoder = VorbisEncoder(dest1, streaming=True)
+ self.encoder = encoder
+
+ def testWav2Ogg(self):
+ "Test wav to ogg conversion"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+
+ dest1 = "/tmp/test_wav_filesink.ogg"
+ dest2 = "/tmp/test_wav_appsink.ogg"
+ self.f = open(dest2,'w')
+
+ self.streaming=True
+
+ encoder = VorbisEncoder(dest1, streaming=True)
+ self.encoder = encoder
+
+ def testWav2Flac(self):
+ "Test wav to flac conversion"
+ self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+
+ dest1 = "/tmp/test_wav_filesink.flac"
+ dest2 = "/tmp/test_wav_appsink.flac"
+ self.f = open(dest2,'w')
+
+ self.streaming=True
+
+ encoder = FlacEncoder(dest1, streaming=True)
+ self.encoder = encoder
+ """
+
+ def setUpDecoder(self):
+ self.decoder = FileDecoder(self.source)
+ self.decoder.setup()
+ self.channels = self.decoder.channels()
+ self.samplerate = self.decoder.samplerate()
+
+ def setUpEncoder(self):
+ self.encoder.setup(channels = self.channels, samplerate = self.samplerate)
+
+ def tearDown(self):
+ self.setUpDecoder()
+ self.setUpEncoder()
+ encoder = self.encoder
+ f = self.f
+
+ #print "decoder pipe:\n", decoder.pipe
+ #print "encoder pipe:\n", encoder.pipe
+ totalframes = 0.
+
+ while True:
+ frames, eod = self.decoder.process()
+ #print frames.shape[0]
+ totalframes += frames.shape[0]
+ encoder.process(frames, eod)
+ if self.streaming:
+ f.write(encoder.chunk)
+ if eod:
+ break
+ if encoder.eod :
+ break
+ f.close()
+
+ # FIXME compute actual number of frames from file
+ self.assertEquals(totalframes, 352801)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
+
--- /dev/null
+# -*- coding: utf-8 -*-
+
+import unittest
+import sys
+import time
+
+class TestCase(unittest.TestCase):
+
+ def assertSameList(self, list1, list2):
+ "Test that two lists contain the same elements, in any order"
+ if len(list1) != len(list2):
+ self.fail("Lists length differ : %d != %d" % (len(list1), len(list2)))
+
+ for item in list1:
+ if not item in list2:
+ self.fail("%s is not in list2" % str(item))
+
+ for item in list2:
+ if not item in list1:
+ self.fail("%s is not in list1" % str(item))
+
+class _TextTestResult(unittest.TestResult):
+ """A test result class that can print formatted text results to a stream.
+
+ Used by TextTestRunner.
+ """
+ separator1 = '=' * 70
+ separator2 = '-' * 70
+
+ def __init__(self, stream, descriptions, verbosity):
+ unittest.TestResult.__init__(self)
+ self.stream = stream
+ self.showAll = verbosity > 1
+ self.dots = verbosity == 1
+ self.descriptions = descriptions
+ self.currentTestCase = None
+
+ def getDescription(self, test):
+ if self.descriptions:
+ return test.shortDescription() or str(test)
+ else:
+ return str(test)
+
+ def startTest(self, test):
+ unittest.TestResult.startTest(self, test)
+ if self.showAll:
+ if self.currentTestCase != test.__class__:
+ self.currentTestCase = test.__class__
+ self.stream.writeln()
+ self.stream.writeln("[%s]" % self.currentTestCase.__name__)
+ self.stream.write(" " + self.getDescription(test))
+ self.stream.write(" ... ")
+
+ def addSuccess(self, test):
+ unittest.TestResult.addSuccess(self, test)
+ if self.showAll:
+ self.stream.writeln("ok")
+ elif self.dots:
+ self.stream.write('.')
+
+ def addError(self, test, err):
+ unittest.TestResult.addError(self, test, err)
+ if self.showAll:
+ self.stream.writeln("ERROR")
+ elif self.dots:
+ self.stream.write('E')
+
+ def addFailure(self, test, err):
+ unittest.TestResult.addFailure(self, test, err)
+ if self.showAll:
+ self.stream.writeln("FAIL")
+ elif self.dots:
+ self.stream.write('F')
+
+ def printErrors(self):
+ if self.dots or self.showAll:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ def printErrorList(self, flavour, errors):
+ for test, err in errors:
+ self.stream.writeln(self.separator1)
+ self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
+ self.stream.writeln(self.separator2)
+ self.stream.writeln("%s" % err)
+
+
+class _WritelnDecorator:
+ """Used to decorate file-like objects with a handy 'writeln' method"""
+ def __init__(self,stream):
+ self.stream = stream
+
+ def __getattr__(self, attr):
+ return getattr(self.stream,attr)
+
+ def writeln(self, arg=None):
+ if arg: self.write(arg)
+ self.write('\n') # text-mode streams translate to \r\n if needed
+
+class TestRunner:
+ """A test runner class that displays results in textual form.
+
+ It prints out the names of tests as they are run, errors as they
+ occur, and a summary of the results at the end of the test run.
+ """
+ def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2):
+ self.stream = _WritelnDecorator(stream)
+ self.descriptions = descriptions
+ self.verbosity = verbosity
+
+ def _makeResult(self):
+ return _TextTestResult(self.stream, self.descriptions, self.verbosity)
+
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = self._makeResult()
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ timeTaken = stopTime - startTime
+ result.printErrors()
+ self.stream.writeln(result.separator2)
+ run = result.testsRun
+ self.stream.writeln("Ran %d test%s in %.3fs" %
+ (run, run != 1 and "s" or "", timeTaken))
+ self.stream.writeln()
+ if not result.wasSuccessful():
+ self.stream.write("FAILED (")
+ failed, errored = map(len, (result.failures, result.errors))
+ if failed:
+ self.stream.write("failures=%d" % failed)
+ if errored:
+ if failed: self.stream.write(", ")
+ self.stream.write("errors=%d" % errored)
+ self.stream.writeln(")")
+ else:
+ self.stream.writeln("OK")
+ return result
+
+
import timeside.encoder
import timeside.grapher
import timeside.analyzer
-import timeside.tests
from timeside.core import *
__version__ = '0.3.2'
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-import unittest
-import sys
-import time
-
-class TestCase(unittest.TestCase):
-
- def assertSameList(self, list1, list2):
- "Test that two lists contain the same elements, in any order"
- if len(list1) != len(list2):
- self.fail("Lists length differ : %d != %d" % (len(list1), len(list2)))
-
- for item in list1:
- if not item in list2:
- self.fail("%s is not in list2" % str(item))
-
- for item in list2:
- if not item in list1:
- self.fail("%s is not in list1" % str(item))
-
-class _TextTestResult(unittest.TestResult):
- """A test result class that can print formatted text results to a stream.
-
- Used by TextTestRunner.
- """
- separator1 = '=' * 70
- separator2 = '-' * 70
-
- def __init__(self, stream, descriptions, verbosity):
- unittest.TestResult.__init__(self)
- self.stream = stream
- self.showAll = verbosity > 1
- self.dots = verbosity == 1
- self.descriptions = descriptions
- self.currentTestCase = None
-
- def getDescription(self, test):
- if self.descriptions:
- return test.shortDescription() or str(test)
- else:
- return str(test)
-
- def startTest(self, test):
- unittest.TestResult.startTest(self, test)
- if self.showAll:
- if self.currentTestCase != test.__class__:
- self.currentTestCase = test.__class__
- self.stream.writeln()
- self.stream.writeln("[%s]" % self.currentTestCase.__name__)
- self.stream.write(" " + self.getDescription(test))
- self.stream.write(" ... ")
-
- def addSuccess(self, test):
- unittest.TestResult.addSuccess(self, test)
- if self.showAll:
- self.stream.writeln("ok")
- elif self.dots:
- self.stream.write('.')
-
- def addError(self, test, err):
- unittest.TestResult.addError(self, test, err)
- if self.showAll:
- self.stream.writeln("ERROR")
- elif self.dots:
- self.stream.write('E')
-
- def addFailure(self, test, err):
- unittest.TestResult.addFailure(self, test, err)
- if self.showAll:
- self.stream.writeln("FAIL")
- elif self.dots:
- self.stream.write('F')
-
- def printErrors(self):
- if self.dots or self.showAll:
- self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
-
- def printErrorList(self, flavour, errors):
- for test, err in errors:
- self.stream.writeln(self.separator1)
- self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
- self.stream.writeln(self.separator2)
- self.stream.writeln("%s" % err)
-
-
-class _WritelnDecorator:
- """Used to decorate file-like objects with a handy 'writeln' method"""
- def __init__(self,stream):
- self.stream = stream
-
- def __getattr__(self, attr):
- return getattr(self.stream,attr)
-
- def writeln(self, arg=None):
- if arg: self.write(arg)
- self.write('\n') # text-mode streams translate to \r\n if needed
-
-class TestRunner:
- """A test runner class that displays results in textual form.
-
- It prints out the names of tests as they are run, errors as they
- occur, and a summary of the results at the end of the test run.
- """
- def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2):
- self.stream = _WritelnDecorator(stream)
- self.descriptions = descriptions
- self.verbosity = verbosity
-
- def _makeResult(self):
- return _TextTestResult(self.stream, self.descriptions, self.verbosity)
-
- def run(self, test):
- "Run the given test case or test suite."
- result = self._makeResult()
- startTime = time.time()
- test(result)
- stopTime = time.time()
- timeTaken = stopTime - startTime
- result.printErrors()
- self.stream.writeln(result.separator2)
- run = result.testsRun
- self.stream.writeln("Ran %d test%s in %.3fs" %
- (run, run != 1 and "s" or "", timeTaken))
- self.stream.writeln()
- if not result.wasSuccessful():
- self.stream.write("FAILED (")
- failed, errored = map(len, (result.failures, result.errors))
- if failed:
- self.stream.write("failures=%d" % failed)
- if errored:
- if failed: self.stream.write(", ")
- self.stream.write("errors=%d" % errored)
- self.stream.writeln(")")
- else:
- self.stream.writeln("OK")
- return result
-
-
+++ /dev/null
-from timeside.tests.testcomponent import *
-from timeside.tests.testinputadapter import *
-from timeside.tests.testgraphers import *
-from timeside.tests.testdecoding import *
-from timeside.tests.testtranscoding import *
-from timeside.tests import TestRunner
-
-unittest.main(testRunner=TestRunner())
+++ /dev/null
-# -*- coding: utf-8 -*-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.api import *
-import numpy
-
-
-class Gain(Processor):
- implements(IEffect)
-
- @interfacedoc
- def __init__(self, gain=1.0):
- self.gain = gain
-
- @staticmethod
- @interfacedoc
- def id():
- return "test_gain"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Gain test effect"
-
- def process(self, frames, eod=False):
- return numpy.multiply(frames, self.gain), eod
-
-
-class FixedInputProcessor(Processor):
- """Processor which does absolutely nothing except illustrating the use
- of the FixedInputSizeAdapter. It also tests things a bit."""
-
- implements(IProcessor)
-
- BUFFER_SIZE = 1024
-
- @staticmethod
- @interfacedoc
- def id():
- return "test_fixed"
-
- @interfacedoc
- def setup(self, channels, samplerate, nframes):
- super(FixedInputProcessor, self).setup(channels, samplerate, nframes)
- self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True)
-
- @interfacedoc
- def process(self, frames, eod=False):
- for buffer, end in self.adapter.process(frames, eod):
- # Test that the adapter is actually doing the job:
- if len(buffer) != self.BUFFER_SIZE:
- raise Exception("Bad buffer size from adapter")
-
- return frames, eod
-
-
-
-
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2007-2009 Parisson
-# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
-# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
-#
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import Processor, implements, interfacedoc
-from timeside.api import IDecoder, IEncoder
-from numpy import array, frombuffer, getbuffer, float32
-
-import pygst
-pygst.require('0.10')
-import gst
-import gobject
-gobject.threads_init ()
-
-class FileDecoder(Processor):
- """ gstreamer-based decoder """
- implements(IDecoder)
-
- # duration ms, before discovery process times out
- MAX_DISCOVERY_TIME = 3000
-
- audioformat = None
- audiochannels = None
- audiorate = None
- audionframes = None
- mimetype = ''
-
- # IProcessor methods
-
- @staticmethod
- @interfacedoc
- def id():
- return "test_gstreamerdec"
-
- def setup(self, channels = None, samplerate = None, nframes = None):
- # the output data format we want
- caps = "audio/x-raw-float, width=32"
- pipeline = gst.parse_launch('''uridecodebin uri=%s
- ! audioconvert
- ! %s
- ! appsink name=sink sync=False ''' % (self.uri, caps))
- # store a pointer to appsink in our decoder object
- self.sink = pipeline.get_by_name('sink')
- # adjust length of emitted buffers
- # self.sink.set_property('blocksize', 0x10000)
- # start pipeline
- pipeline.set_state(gst.STATE_PLAYING)
-
- @interfacedoc
- def channels(self):
- return self.audiochannels
-
- @interfacedoc
- def samplerate(self):
- return self.audiorate
-
- @interfacedoc
- def nframes(self):
- return self.audionframes
-
- @interfacedoc
- def process(self, frames = None, eod = False):
- try:
- buf = self.sink.emit('pull-buffer')
- except SystemError, e:
- # should never happen
- print 'SystemError', e
- return array([0.]), True
- if buf == None:
- return array([0.]), True
- return self.gst_buffer_to_numpy_array(buf), False
-
- @interfacedoc
- def release(self):
- # nothing to do for now
- pass
-
- ## IDecoder methods
-
- @interfacedoc
- def __init__(self, uri):
-
- # is this a file?
- import os.path
- if os.path.exists(uri):
- # get the absolute path
- uri = os.path.abspath(uri)
- # first run the file/uri through the discover pipeline
- self.discover(uri)
- # and make a uri of it
- from urllib import quote
- self.uri = 'file://'+quote(uri)
-
- @interfacedoc
- def format(self):
- # TODO check
- return self.mimetype
-
- @interfacedoc
- def encoding(self):
- # TODO check
- return self.mimetype.split('/')[-1]
-
- @interfacedoc
- def resolution(self):
- # TODO check: width or depth?
- return self.audiowidth
-
- @interfacedoc
- def metadata(self):
- # TODO check
- return self.tags
-
- ## gst.extend discoverer
-
- def discover(self, path):
- """ gstreamer based helper function to get file attributes """
- from gst.extend import discoverer
- d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
- d.connect('discovered', self.discovered)
- self.mainloop = gobject.MainLoop()
- d.discover()
- self.mainloop.run()
-
- def discovered(self, d, is_media):
- """ gstreamer based helper executed upon discover() completion """
- if is_media and d.is_audio:
- # copy the discoverer attributes to self
- self.audiorate = d.audiorate
- self.mimetype= d.mimetype
- self.audiochannels = d.audiochannels
- self.audiowidth = d.audiowidth
- # conversion from time in nanoseconds to frames
- from math import ceil
- duration = d.audiorate * d.audiolength * 1.e-9
- self.audionframes = int (ceil ( duration ) )
- self.tags = d.tags
- elif not d.is_audio:
- print "error, no audio found!"
- else:
- print "fail", path
- self.mainloop.quit()
-
- def gst_buffer_to_numpy_array(self, buf):
- """ gstreamer buffer to numpy array conversion """
- chan = self.audiochannels
- samples = frombuffer(buf.data, dtype=float32)
- samples.resize([len(samples)/chan, chan])
- return samples
-
-class WavEncoder(Processor):
- """ gstreamer-based encoder """
- implements(IEncoder)
-
- def __init__(self, output):
- self.file = None
- if isinstance(output, basestring):
- self.filename = output
- else:
- raise Exception("Streaming not supported")
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None, nframes=None):
- super(WavEncoder, self).setup(channels, samplerate, nframes)
- # TODO open file for writing
- # the output data format we want
- pipeline = gst.parse_launch(''' appsrc name=src
- ! audioconvert
- ! wavenc
- ! filesink location=%s ''' % self.filename)
- # store a pointer to appsink in our encoder object
- self.src = pipeline.get_by_name('src')
- srccaps = gst.Caps("""audio/x-raw-float,
- endianness=(int)1234,
- channels=(int)%s,
- width=(int)32,
- rate=(int)%d""" % (int(channels), int(samplerate)))
- self.src.set_property("caps", srccaps)
-
- # start pipeline
- pipeline.set_state(gst.STATE_PLAYING)
- self.pipeline = pipeline
-
- @staticmethod
- @interfacedoc
- def id():
- return "test_gstreamerenc"
-
- @staticmethod
- @interfacedoc
- def description():
- return "Gstreamer based encoder"
-
- @staticmethod
- @interfacedoc
- def file_extension():
- return "wav"
-
- @staticmethod
- @interfacedoc
- def mime_type():
- return "audio/x-wav"
-
- @interfacedoc
- def set_metadata(self, metadata):
- #TODO
- pass
-
- @interfacedoc
- def process(self, frames, eod=False):
- buf = self.numpy_array_to_gst_buffer(frames)
- self.src.emit('push-buffer', buf)
- if eod: self.src.emit('end-of-stream')
- return frames, eod
-
- def numpy_array_to_gst_buffer(self, frames):
- """ gstreamer buffer to numpy array conversion """
- buf = gst.Buffer(getbuffer(frames))
- return buf
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-import timeside
-from sys import stdout
-import os.path
-import numpy
-
-
-class TestAnalyzer:
-
- graphers = timeside.core.processors(timeside.api.IGrapher)
- decoders = timeside.core.processors(timeside.api.IDecoder)
- encoders= timeside.core.processors(timeside.api.IEncoder)
- analyzers = timeside.core.processors(timeside.api.IAnalyzer)
-
- def __init__(self, path):
- self.source = os.path.join(os.path.dirname(__file__), path)
- print "Processing %s" % self.source
- self.decoder = timeside.decoder.FileDecoder(self.source)
- print 'format: ', self.decoder.format()
- self.pipe = self.decoder
- self.analyzers_sub_pipe = []
-
- def process(self):
- for analyzer in self.analyzers:
- sub_pipe = analyzer()
- self.analyzers_sub_pipe.append(sub_pipe)
- self.pipe = self.pipe | sub_pipe
- self.pipe.run()
-
- def results(self):
- analyzers = []
- for analyzer in self.analyzers_sub_pipe:
- value = analyzer.result()
- analyzers.append({'name':analyzer.name(),
- 'id':analyzer.id(),
- 'unit':analyzer.unit(),
- 'value':str(value)})
- print analyzers
-
-
-test = TestAnalyzer('../samples/guitar.wav')
-#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav')
-test.process()
-test.results()
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-import timeside
-import sys
-import os.path
-import numpy
-import time
-
-
-class TestAnalyzer:
-
- analyzer = timeside.analyzer.MaxLevel()
-
- def __init__(self, path):
- self.source = os.path.join(os.path.dirname(__file__), path)
- print "Processing %s" % self.source
- self.decoder = timeside.decoder.FileDecoder(self.source)
- print 'format: ', self.decoder.format()
- self.pipe = self.decoder
- self.sub_pipe = self.analyzer
-
- def process(self):
- self.pipe = self.pipe | self.sub_pipe
- self.pipe.run()
-
- def results(self):
- print {'name':self.analyzer.name(),
- 'id':self.analyzer.id(),
- 'unit':self.analyzer.unit(),
- 'value':str(self.analyzer.value)}
-
-test = TestAnalyzer(sys.argv[-1])
-test.process()
-test.results()
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac")
-
-decoder = FileDecoder(source)
-encoder = FlacEncoder(dest)
-
-(decoder | encoder).run()
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.tests.api.examples import Gain
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
- source = sys.argv[1]
-else:
- import os.path
- source= os.path.join (os.path.dirname(__file__), "../samples/guitar.wav")
-
-Decoder = FileDecoder
-print "Creating decoder with id=%s for: %s" % (Decoder.id(), source)
-decoder = Decoder(source)
-analyzer = MaxLevel()
-decoder.setup()
-nchannels = decoder.channels()
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-analyzer.setup(nchannels, samplerate)
-
-print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
- nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution())
-
-while True:
- frames, eod = decoder.process()
- analyzer.process(frames, eod)
- if eod:
- break
-
-max_level = analyzer.result()
-print "Max level: %f" % max_level
-
-destination = "../results/guitar_normalized.wav"
-Encoder = WavEncoder
-print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
-encoder = Encoder(destination)
-
-gain = 1
-if max_level > 0:
- gain = 0.9 / max_level
-
-effect = Gain(gain)
-
-decoder.setup()
-effect.setup(decoder.channels(), decoder.samplerate())
-encoder.setup(effect.channels(), effect.samplerate())
-
-print "Applying effect id=%s with gain=%f" % (effect.id(), gain)
-
-while True:
- frames, eod = decoder.process()
- encoder.process(*effect.process(frames, eod))
- if eod:
- break
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
- source = sys.argv[1]
-else:
- import os.path
- source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.mp3"
-dest2 = "/tmp/test_appsink.mp3"
-f = open(dest2,'w')
-
-streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-while True:
- encoder.process(*decoder.process())
- if streaming:
- f.write(encoder.chunk)
- if encoder.eod :
- break
-
-f.close()
-print encoder.pipe
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
- source = sys.argv[1]
-else:
- import os.path
- source= os.path.join (os.path.dirname(__file__), "../samples/sweep.wav")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.mp3"
-dest2 = "/tmp/test_appsink.mp3"
-f = open(dest2,'w')
-
-streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-print encoder.pipe
-
-while True:
- encoder.process(*decoder.process())
- if streaming:
- f.write(encoder.chunk)
- if encoder.eod :
- break
-
-f.close()
-print encoder.pipe
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-import sys
-if len(sys.argv) > 1:
- source = sys.argv[1]
-else:
- import os.path
- source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac")
-
-decoder = FileDecoder(source)
-print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
-decoder.setup()
-channels = decoder.channels()
-print 'channels :', channels
-samplerate = decoder.samplerate()
-nframes = decoder.nframes()
-
-dest1 = "/tmp/test_filesink.ogg"
-dest2 = "/tmp/test_appsink.ogg"
-f = open(dest2,'w')
-
-streaming=True
-encoder = VorbisEncoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
-print encoder.pipe
-
-while True:
- encoder.process(*decoder.process())
- if streaming:
- f.write(encoder.chunk)
- if encoder.eod :
- break
-
-f.close()
-print encoder.pipe
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3")
-
-decoder = FileDecoder(source)
-encoder = Mp3Encoder(dest)
-
-(decoder | encoder).run()
-
-metadata = {'TIT2': 'title', #title2
- 'TCOM': 'composer', #composer
- 'TPE1': 'lead creator', #lead
- 'UFID': 'identifier', #Unique ID...
- 'TALB': 'album', #album
- 'TCON': 'genre', #genre
- 'TDRC': '2011', #year
-# 'COMM': 'blabla', #comment
- }
-
-encoder.set_metadata(metadata)
-encoder.write_metadata(dest)
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.tests.api.examples import Gain
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-from sys import stdout
-import os.path
-import numpy
-
-source = os.path.join(os.path.dirname(__file__), "../samples/guitar.wav")
-
-print "Normalizing %s" % source
-decoder = FileDecoder(source)
-maxlevel = MaxLevel()
-duration = Duration()
-
-(decoder | maxlevel | duration).run()
-
-gain = 1
-if maxlevel.result() < 0:
- gain = 0.9 / numpy.exp(maxlevel.result()/20)
-
-print "input maxlevel: %f" % maxlevel.result()
-print "gain: %f" % gain
-print "duration: %f %s" % (duration.result(), duration.unit())
-
-gain = Gain(gain)
-encoder = WavEncoder("../results/guitar_normalized.wav")
-
-subpipe = gain | maxlevel
-
-(decoder | subpipe | encoder).run()
-
-print "output maxlevel: %f" % maxlevel.result()
-
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-import os
-from timeside.core import *
-from timeside.api import *
-from timeside.decoder import *
-from timeside.grapher import *
-
-sample_dir = '../samples'
-img_dir = '../results/img'
-if not os.path.exists(img_dir):
- os.mkdir(img_dir)
-
-test_dict = {'sweep.wav': 'spec_wav.png',
- 'sweep.flac': 'spec_flac.png',
- 'sweep.ogg': 'spec_ogg.png',
- 'sweep.mp3': 'spec_mp3.png',
- }
-
-for source, image in test_dict.iteritems():
- audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
- image = img_dir + os.sep + image
- print 'Test : decoder(%s) | waveform (%s)' % (source, image)
- decoder = FileDecoder(audio)
- spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
- (decoder | spectrogram).run()
- print 'frames per pixel = ', spectrogram.graph.samples_per_pixel
- print "render spectrogram to: %s" % image
- spectrogram.render(image)
-
-
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-import os
-from timeside.core import *
-from timeside.api import *
-from timeside.decoder import *
-from timeside.grapher import *
-
-sample_dir = '../samples'
-img_dir = '../results/img'
-if not os.path.exists(img_dir):
- os.mkdir(img_dir)
-
-test_dict = {'sweep.wav': 'waveform_wav.png',
- 'sweep.flac': 'waveform_flac.png',
- 'sweep.ogg': 'waveform_ogg.png',
- 'sweep.mp3': 'waveform_mp3.png',
- }
-
-for source, image in test_dict.iteritems():
- audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source)
- image = img_dir + os.sep + image
- print 'Test : decoder(%s) | waveform (%s)' % (source, image)
- decoder = FileDecoder(audio)
- waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
- (decoder | waveform).run()
- print 'frames per pixel = ', waveform.graph.samples_per_pixel
- print "render waveform to: %s" % image
- waveform.render(image)
-
-
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.encoder import *
-import os.path
-
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg")
-
-decoder = FileDecoder(source)
-encoder = VorbisEncoder(dest)
-
-(decoder | encoder).run()
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-
-import os.path
-source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav")
-dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav")
-
-decoder = FileDecoder(source)
-encoder = WavEncoder(dest)
-
-(decoder | encoder).run()
-
+++ /dev/null
-import timeside
-
-def list_processors(interface, prefix=""):
- print prefix + interface.__name__
- subinterfaces = interface.__subclasses__()
- for i in subinterfaces:
- list_processors(i, prefix + " ")
- processors = timeside.processors(interface, False)
- for p in processors:
- print prefix + " %s [%s]" % (p.__name__, p.id())
-
-list_processors(timeside.api.IProcessor)
+++ /dev/null
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-__all__ = ['TestComponentArchitecture']
-
-class TestComponentArchitecture(TestCase):
- "Test the component and interface system"
-
- def testOneInterface(self):
- "Test a component implementing one interface"
- self.assertSameList(implementations(I1), [C1])
-
- def testTwoInterfaces(self):
- "Test a component implementing two interfaces"
- self.assertSameList(implementations(I2), [C2])
- self.assertSameList(implementations(I3), [C2])
-
- def testTwoImplementations(self):
- "Test an interface implemented by two components"
- self.assertSameList(implementations(I4), [C3, C4])
-
- def testInterfaceInheritance(self):
- "Test whether a component implements an interface's parent"
- self.assertSameList(implementations(I5), [C5])
-
- def testImplementationInheritance(self):
- "Test that a component doesn't implement the interface implemented by its parent"
- self.assertSameList(implementations(I7), [C6])
-
- def testImplementationRedundancy(self):
- "Test implementation redundancy across inheritance"
- self.assertSameList(implementations(I8), [C8, C9])
-
- def testAbstractImplementation(self):
- "Test abstract implementation"
- self.assertSameList(implementations(I11), [])
- self.assertSameList(implementations(I11, abstract=True), [C11])
-
- def testInterfaceDoc(self):
- "Test @interfacedoc decorator"
- self.assertEquals(C10.test.__doc__, "testdoc")
-
- def testInterfaceDocStatic(self):
- "Test @interfacedoc decorator on static method"
- self.assertEquals(C10.teststatic.__doc__, "teststaticdoc")
-
- def testIntefaceDocReversed(self):
- "Test @interfacedoc on static method (decorators reversed)"
-
- try:
-
- class BogusDoc1(Component):
- implements(I10)
-
- @interfacedoc
- @staticmethod
- def teststatic(self):
- pass
-
- self.fail("No error raised with reversed decorators")
-
- except ComponentError:
- pass
-
- def testInterfaceDocBadMethod(self):
- "Test @interfacedoc with unexistant method in interface"
-
- try:
- class BogusDoc2(Component):
- implements(I10)
-
- @interfacedoc
- def nosuchmethod(self):
- pass
-
- self.fail("No error raised when decorating an unexistant method")
-
- except ComponentError:
- pass
-
-class I1(Interface):
- pass
-
-class I2(Interface):
- pass
-
-class I3(Interface):
- pass
-
-class I4(Interface):
- pass
-
-class I5(Interface):
- pass
-
-class I6(I5):
- pass
-
-class I7(Interface):
- pass
-
-class I8(Interface):
- pass
-
-class I9(I8):
- pass
-
-class I10(Interface):
- def test(self):
- """testdoc"""
-
- @staticmethod
- def teststatic(self):
- """teststaticdoc"""
-
-class I11(Interface):
- pass
-
-class C1(Component):
- implements(I1)
-
-class C2(Component):
- implements(I2, I3)
-
-class C3(Component):
- implements(I4)
-
-class C4(Component):
- implements(I4)
-
-class C5(Component):
- implements(I6)
-
-class C6(Component):
- implements(I7)
-
-class C7(C6):
- pass
-
-class C8(Component):
- implements(I8)
-
-class C9(Component):
- implements(I8, I9)
-
-class C10(Component):
- implements(I10)
-
- @interfacedoc
- def test(self):
- pass
-
- @staticmethod
- @interfacedoc
- def teststatic(self):
- pass
-
-class C11(Component):
- abstract()
- implements(I11)
-
-if __name__ == '__main__':
- unittest.main(testRunner=TestRunner())
-
+++ /dev/null
-from timeside.decoder import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestDecoding']
-
-class TestDecoding(TestCase):
- "Test the low level streaming features"
-
- def setUp(self):
- pass
-
- def testWav(self):
- "Test wav decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
-
- def testFlac(self):
- "Test flac decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
-
- def testOgg(self):
- "Test ogg decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
-
- def testMp3(self):
- "Test mp3 decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
-
- def tearDown(self):
- decoder = FileDecoder(self.source)
- decoder.setup()
-
- totalframes = 0.
-
- while True:
- frames, eod = decoder.process()
- totalframes += frames.shape[0]
- if eod: break
-
- # FIXME compute actual number of frames from file
- if os.path.splitext(self.source)[-1].lower() == '.mp3':
- self.assertEquals(totalframes, 355969)
- elif os.path.splitext(self.source)[-1].lower() == '.ogg':
- self.assertEquals(totalframes, 352833)
- else:
- self.assertEquals(totalframes, 352801)
-
-if __name__ == '__main__':
- unittest.main(testRunner=TestRunner())
-
-
+++ /dev/null
-
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.grapher import *
-from timeside.api import *
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestGraphers']
-
-class TestGraphers(TestCase):
- "Test all graphers with various input media formats"
-
- def setUp(self):
- pass
-
- # WAVEFORMS
- def testWav2Waveform(self):
- "Test WAV to Waveform"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
- self.image = "/tmp/test_waveform_sweep_wav.png"
- self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testFlac2Waveform(self):
- "Test FLAC to Waveform"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
- self.image = "/tmp/test_waveform_sweep_flac.png"
- self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testMp32Waveform(self):
- "Test MP3 to Waveform"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
- self.image = "/tmp/test_waveform_sweep_mp3.png"
- self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testOgg2Waveform(self):
- "Test OGG to Waveform"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
- self.image = "/tmp/test_waveform_sweep_ogg.png"
- self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- # WAVEFORMS JOYDIV
- def testWav2WaveformJoyDiv(self):
- "Test WAV to WaveformJoyDiv"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
- self.image = "/tmp/test_waveformjoydiv_sweep_wav.png"
- self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testFlac2WaveformJoyDiv(self):
- "Test FLAC to WaveformJoyDiv"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
- self.image = "/tmp/test_waveformjoydiv_sweep_flac.png"
- self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testMp32WaveformJoyDiv(self):
- "Test MP3 to WaveformJoyDiv"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
- self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png"
- self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testOgg2WaveformJoyDiv(self):
- "Test OGG to WaveformJoyDiv"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
- self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png"
- self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- # SPECTROGRAMS
- def testWav2Spectrogram(self):
- "Test WAV to Spectrogram"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
- self.image = "/tmp/test_spectrogram_sweep_wav.png"
- self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testMp32Spectrogram(self):
- "Test MP3 to Spectrogram"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
- self.image = "/tmp/test_spectrogram_sweep_mp3.png"
- self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testFlac2Spectrogram(self):
- "Test FLAC to Spectrogram"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
- self.image = "/tmp/test_spectrogram_sweep_flac.png"
- self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def testOgg2Spectrogram(self):
- "Test OGG to Spectrogram"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
- self.image = "/tmp/test_spectrogram_sweep_ogg.png"
- self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
-
- def tearDown(self):
- decoder = FileDecoder(self.source)
- (decoder | self.grapher).run()
- self.grapher.render(self.image)
-
-if __name__ == '__main__':
- unittest.main(testRunner=TestRunner())
-
+++ /dev/null
-from timeside.core import FixedSizeInputAdapter
-from timeside.tests import TestCase, TestRunner
-import numpy
-import unittest
-
-class TestFixedSizeInputAdapter(TestCase):
- "Test the fixed-sized input adapter"
-
- def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None):
- output = output[:]
- output.reverse()
- _eod = None
- for buffer, _eod in adapter.process(input, input_eod):
- a = output.pop()
- if not numpy.array_equiv(buffer, a):
- self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a)))
-
- if _eod != output_eod:
- self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod)))
-
- if output:
- self.fail("trailing expected data: %s" % output)
-
- def setUp(self):
- self.data = numpy.arange(44).reshape(2,22).transpose()
-
- def testTwoChannels(self):
- "Test simple stream with two channels"
- adapter = FixedSizeInputAdapter(4, 2)
-
- self.assertEquals(len(self.data), adapter.nframes(len(self.data)))
-
- self.assertIOEquals(adapter, self.data[0:1], False, [])
- self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False)
- self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False)
- self.assertIOEquals(adapter, self.data[12:13], False, [])
- self.assertIOEquals(adapter, self.data[13:14], False, [])
- self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False)
- self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False)
- self.assertIOEquals(adapter, self.data[20:21], False, [])
- self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True)
-
- def testPadding(self):
- "Test automatic padding support"
- adapter = FixedSizeInputAdapter(4, 2, pad=True)
-
- self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data)))
-
- self.assertIOEquals(adapter, self.data[0:21], False,
- [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]],
- False)
-
- self.assertIOEquals(adapter, self.data[21:22], True, [[
- [20, 42],
- [21, 43],
- [0, 0],
- [0, 0]
- ]], True)
-
- def testSizeMultiple(self):
- "Test a stream which contain a multiple number of buffers"
- adapter = FixedSizeInputAdapter(4, 2)
-
- self.assertIOEquals(adapter, self.data[0:20], True,
- [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]],
- True)
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=TestRunner())
-
+++ /dev/null
-from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
-
-from timeside.component import *
-from timeside.tests import TestCase, TestRunner
-import unittest
-
-import os.path
-
-__all__ = ['TestTranscoding']
-
-class TestTranscoding(TestCase):
- "Test the low level streaming features"
-
- def setUp(self):
- pass
-
- def testWav2Mp3(self):
- "Test wav to mp3 conversion"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
-
- dest1 = "/tmp/test_wav_filesink.mp3"
- dest2 = "/tmp/test_wav_appsink.mp3"
- self.f = open(dest2,'w')
-
- self.streaming=True
-
- encoder = Mp3Encoder(dest1, streaming=True)
- self.encoder = encoder
-
- def testFlac2Mp3(self):
- "Test flac to mp3 conversion"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
-
- dest1 = "/tmp/test_flac_filesink.mp3"
- dest2 = "/tmp/test_flac_appsink.mp3"
- self.f = open(dest2,'w')
-
- self.streaming=True
-
- encoder = Mp3Encoder(dest1, streaming=True)
- self.encoder = encoder
-
- """
- def testFlac2Ogg(self):
- "Test flac to ogg conversion"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
-
- dest1 = "/tmp/test_flac_filesink.ogg"
- dest2 = "/tmp/test_flac_appsink.ogg"
- self.f = open(dest2,'w')
-
- self.streaming=True
-
- encoder = VorbisEncoder(dest1, streaming=True)
- self.encoder = encoder
-
- def testWav2Ogg(self):
- "Test wav to ogg conversion"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
-
- dest1 = "/tmp/test_wav_filesink.ogg"
- dest2 = "/tmp/test_wav_appsink.ogg"
- self.f = open(dest2,'w')
-
- self.streaming=True
-
- encoder = VorbisEncoder(dest1, streaming=True)
- self.encoder = encoder
-
- def testWav2Flac(self):
- "Test wav to flac conversion"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
-
- dest1 = "/tmp/test_wav_filesink.flac"
- dest2 = "/tmp/test_wav_appsink.flac"
- self.f = open(dest2,'w')
-
- self.streaming=True
-
- encoder = FlacEncoder(dest1, streaming=True)
- self.encoder = encoder
- """
-
- def setUpDecoder(self):
- self.decoder = FileDecoder(self.source)
- self.decoder.setup()
- self.channels = self.decoder.channels()
- self.samplerate = self.decoder.samplerate()
-
- def setUpEncoder(self):
- self.encoder.setup(channels = self.channels, samplerate = self.samplerate)
-
- def tearDown(self):
- self.setUpDecoder()
- self.setUpEncoder()
- encoder = self.encoder
- f = self.f
-
- #print "decoder pipe:\n", decoder.pipe
- #print "encoder pipe:\n", encoder.pipe
- totalframes = 0.
-
- while True:
- frames, eod = self.decoder.process()
- #print frames.shape[0]
- totalframes += frames.shape[0]
- encoder.process(frames, eod)
- if self.streaming:
- f.write(encoder.chunk)
- if eod:
- break
- if encoder.eod :
- break
- f.close()
-
- # FIXME compute actual number of frames from file
- self.assertEquals(totalframes, 352801)
-
-if __name__ == '__main__':
- unittest.main(testRunner=TestRunner())
-