From: Paul Brossier Date: Tue, 22 Nov 2011 03:30:55 +0000 (-0800) Subject: moved tests outside timeside module X-Git-Tag: 0.3.2~8 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=994b745dd577b2a0474492c56349e1edefaa3aa5;p=timeside.git moved tests outside timeside module --- diff --git a/tests/alltests.py b/tests/alltests.py new file mode 100644 index 0000000..4543878 --- /dev/null +++ b/tests/alltests.py @@ -0,0 +1,8 @@ +from testcomponent import * +from testinputadapter import * +from testgraphers import * +from testdecoding import * +from testtranscoding import * +from unit_timeside import * + +unittest.main(testRunner=TestRunner()) diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/tests/api/examples.py b/tests/api/examples.py new file mode 100644 index 0000000..747c197 --- /dev/null +++ b/tests/api/examples.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter +from timeside.api import * +import numpy + + +class Gain(Processor): + implements(IEffect) + + @interfacedoc + def __init__(self, gain=1.0): + self.gain = gain + + @staticmethod + @interfacedoc + def id(): + return "test_gain" + + @staticmethod + @interfacedoc + def name(): + return "Gain test effect" + + def process(self, frames, eod=False): + return numpy.multiply(frames, self.gain), eod + + +class FixedInputProcessor(Processor): + """Processor which does absolutely nothing except illustrating the use + of the FixedInputSizeAdapter. It also tests things a bit.""" + + implements(IProcessor) + + BUFFER_SIZE = 1024 + + @staticmethod + @interfacedoc + def id(): + return "test_fixed" + + @interfacedoc + def setup(self, channels, samplerate, nframes): + super(FixedInputProcessor, self).setup(channels, samplerate, nframes) + self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True) + + @interfacedoc + def process(self, frames, eod=False): + for buffer, end in self.adapter.process(frames, eod): + # Test that the adapter is actually doing the job: + if len(buffer) != self.BUFFER_SIZE: + raise Exception("Bad buffer size from adapter") + + return frames, eod + + + + + diff --git a/tests/api/gstreamer.py b/tests/api/gstreamer.py new file mode 100644 index 0000000..897ac42 --- /dev/null +++ b/tests/api/gstreamer.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2007-2009 Parisson +# Copyright (c) 2007 Olivier Guilyardi +# Copyright (c) 2007-2009 Guillaume Pellerin +# +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Author: Paul Brossier + +from timeside.core import Processor, implements, interfacedoc +from timeside.api import IDecoder, IEncoder +from numpy import array, frombuffer, getbuffer, float32 + +import pygst +pygst.require('0.10') +import gst +import gobject +gobject.threads_init () + +class FileDecoder(Processor): + """ gstreamer-based decoder """ + implements(IDecoder) + + # duration ms, before discovery process times out + MAX_DISCOVERY_TIME = 3000 + + audioformat = None + audiochannels = None + audiorate = None + audionframes = None + mimetype = '' + + # IProcessor methods + + @staticmethod + @interfacedoc + def id(): + return "test_gstreamerdec" + + def setup(self, channels = None, samplerate = None, nframes = None): + # the output data format we want + caps = "audio/x-raw-float, width=32" + pipeline = gst.parse_launch('''uridecodebin uri=%s + ! audioconvert + ! %s + ! appsink name=sink sync=False ''' % (self.uri, caps)) + # store a pointer to appsink in our decoder object + self.sink = pipeline.get_by_name('sink') + # adjust length of emitted buffers + # self.sink.set_property('blocksize', 0x10000) + # start pipeline + pipeline.set_state(gst.STATE_PLAYING) + + @interfacedoc + def channels(self): + return self.audiochannels + + @interfacedoc + def samplerate(self): + return self.audiorate + + @interfacedoc + def nframes(self): + return self.audionframes + + @interfacedoc + def process(self, frames = None, eod = False): + try: + buf = self.sink.emit('pull-buffer') + except SystemError, e: + # should never happen + print 'SystemError', e + return array([0.]), True + if buf == None: + return array([0.]), True + return self.gst_buffer_to_numpy_array(buf), False + + @interfacedoc + def release(self): + # nothing to do for now + pass + + ## IDecoder methods + + @interfacedoc + def __init__(self, uri): + + # is this a file? + import os.path + if os.path.exists(uri): + # get the absolute path + uri = os.path.abspath(uri) + # first run the file/uri through the discover pipeline + self.discover(uri) + # and make a uri of it + from urllib import quote + self.uri = 'file://'+quote(uri) + + @interfacedoc + def format(self): + # TODO check + return self.mimetype + + @interfacedoc + def encoding(self): + # TODO check + return self.mimetype.split('/')[-1] + + @interfacedoc + def resolution(self): + # TODO check: width or depth? + return self.audiowidth + + @interfacedoc + def metadata(self): + # TODO check + return self.tags + + ## gst.extend discoverer + + def discover(self, path): + """ gstreamer based helper function to get file attributes """ + from gst.extend import discoverer + d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME) + d.connect('discovered', self.discovered) + self.mainloop = gobject.MainLoop() + d.discover() + self.mainloop.run() + + def discovered(self, d, is_media): + """ gstreamer based helper executed upon discover() completion """ + if is_media and d.is_audio: + # copy the discoverer attributes to self + self.audiorate = d.audiorate + self.mimetype= d.mimetype + self.audiochannels = d.audiochannels + self.audiowidth = d.audiowidth + # conversion from time in nanoseconds to frames + from math import ceil + duration = d.audiorate * d.audiolength * 1.e-9 + self.audionframes = int (ceil ( duration ) ) + self.tags = d.tags + elif not d.is_audio: + print "error, no audio found!" + else: + print "fail", path + self.mainloop.quit() + + def gst_buffer_to_numpy_array(self, buf): + """ gstreamer buffer to numpy array conversion """ + chan = self.audiochannels + samples = frombuffer(buf.data, dtype=float32) + samples.resize([len(samples)/chan, chan]) + return samples + +class WavEncoder(Processor): + """ gstreamer-based encoder """ + implements(IEncoder) + + def __init__(self, output): + self.file = None + if isinstance(output, basestring): + self.filename = output + else: + raise Exception("Streaming not supported") + + @interfacedoc + def setup(self, channels=None, samplerate=None, nframes=None): + super(WavEncoder, self).setup(channels, samplerate, nframes) + # TODO open file for writing + # the output data format we want + pipeline = gst.parse_launch(''' appsrc name=src + ! audioconvert + ! wavenc + ! filesink location=%s ''' % self.filename) + # store a pointer to appsink in our encoder object + self.src = pipeline.get_by_name('src') + srccaps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%d""" % (int(channels), int(samplerate))) + self.src.set_property("caps", srccaps) + + # start pipeline + pipeline.set_state(gst.STATE_PLAYING) + self.pipeline = pipeline + + @staticmethod + @interfacedoc + def id(): + return "test_gstreamerenc" + + @staticmethod + @interfacedoc + def description(): + return "Gstreamer based encoder" + + @staticmethod + @interfacedoc + def file_extension(): + return "wav" + + @staticmethod + @interfacedoc + def mime_type(): + return "audio/x-wav" + + @interfacedoc + def set_metadata(self, metadata): + #TODO + pass + + @interfacedoc + def process(self, frames, eod=False): + buf = self.numpy_array_to_gst_buffer(frames) + self.src.emit('push-buffer', buf) + if eod: self.src.emit('end-of-stream') + return frames, eod + + def numpy_array_to_gst_buffer(self, frames): + """ gstreamer buffer to numpy array conversion """ + buf = gst.Buffer(getbuffer(frames)) + return buf diff --git a/tests/api/test_analyzer.py b/tests/api/test_analyzer.py new file mode 100644 index 0000000..2b8198d --- /dev/null +++ b/tests/api/test_analyzer.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +import timeside +from sys import stdout +import os.path +import numpy + + +class TestAnalyzer: + + graphers = timeside.core.processors(timeside.api.IGrapher) + decoders = timeside.core.processors(timeside.api.IDecoder) + encoders= timeside.core.processors(timeside.api.IEncoder) + analyzers = timeside.core.processors(timeside.api.IAnalyzer) + + def __init__(self, path): + self.source = os.path.join(os.path.dirname(__file__), path) + print "Processing %s" % self.source + self.decoder = timeside.decoder.FileDecoder(self.source) + print 'format: ', self.decoder.format() + self.pipe = self.decoder + self.analyzers_sub_pipe = [] + + def process(self): + for analyzer in self.analyzers: + sub_pipe = analyzer() + self.analyzers_sub_pipe.append(sub_pipe) + self.pipe = self.pipe | sub_pipe + self.pipe.run() + + def results(self): + analyzers = [] + for analyzer in self.analyzers_sub_pipe: + value = analyzer.result() + analyzers.append({'name':analyzer.name(), + 'id':analyzer.id(), + 'unit':analyzer.unit(), + 'value':str(value)}) + print analyzers + + +test = TestAnalyzer('../samples/guitar.wav') +#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav') +test.process() +test.results() + diff --git a/tests/api/test_analyzer3.py b/tests/api/test_analyzer3.py new file mode 100644 index 0000000..8114723 --- /dev/null +++ b/tests/api/test_analyzer3.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import timeside +import sys +import os.path +import numpy +import time + + +class TestAnalyzer: + + analyzer = timeside.analyzer.MaxLevel() + + def __init__(self, path): + self.source = os.path.join(os.path.dirname(__file__), path) + print "Processing %s" % self.source + self.decoder = timeside.decoder.FileDecoder(self.source) + print 'format: ', self.decoder.format() + self.pipe = self.decoder + self.sub_pipe = self.analyzer + + def process(self): + self.pipe = self.pipe | self.sub_pipe + self.pipe.run() + + def results(self): + print {'name':self.analyzer.name(), + 'id':self.analyzer.id(), + 'unit':self.analyzer.unit(), + 'value':str(self.analyzer.value)} + +test = TestAnalyzer(sys.argv[-1]) +test.process() +test.results() diff --git a/tests/api/test_flac.py b/tests/api/test_flac.py new file mode 100644 index 0000000..ee590c4 --- /dev/null +++ b/tests/api/test_flac.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + +from timeside.decoder import * +from timeside.encoder import * +import os.path + +source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") +dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac") + +decoder = FileDecoder(source) +encoder = FlacEncoder(dest) + +(decoder | encoder).run() diff --git a/tests/api/test_lolevel.py b/tests/api/test_lolevel.py new file mode 100644 index 0000000..421ce95 --- /dev/null +++ b/tests/api/test_lolevel.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- + +from timeside.tests.api.examples import Gain +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +import sys +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + source= os.path.join (os.path.dirname(__file__), "../samples/guitar.wav") + +Decoder = FileDecoder +print "Creating decoder with id=%s for: %s" % (Decoder.id(), source) +decoder = Decoder(source) +analyzer = MaxLevel() +decoder.setup() +nchannels = decoder.channels() +samplerate = decoder.samplerate() +nframes = decoder.nframes() +analyzer.setup(nchannels, samplerate) + +print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % ( + nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution()) + +while True: + frames, eod = decoder.process() + analyzer.process(frames, eod) + if eod: + break + +max_level = analyzer.result() +print "Max level: %f" % max_level + +destination = "../results/guitar_normalized.wav" +Encoder = WavEncoder +print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination) +encoder = Encoder(destination) + +gain = 1 +if max_level > 0: + gain = 0.9 / max_level + +effect = Gain(gain) + +decoder.setup() +effect.setup(decoder.channels(), decoder.samplerate()) +encoder.setup(effect.channels(), effect.samplerate()) + +print "Applying effect id=%s with gain=%f" % (effect.id(), gain) + +while True: + frames, eod = decoder.process() + encoder.process(*effect.process(frames, eod)) + if eod: + break + diff --git a/tests/api/test_lolevel_streaming.py b/tests/api/test_lolevel_streaming.py new file mode 100644 index 0000000..e394f0c --- /dev/null +++ b/tests/api/test_lolevel_streaming.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +import sys +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") + +decoder = FileDecoder(source) +print "Creating decoder with id=%s for: %s" % (decoder.id(), source) +decoder.setup() +channels = decoder.channels() +print 'channels :', channels +samplerate = decoder.samplerate() +nframes = decoder.nframes() + +dest1 = "/tmp/test_filesink.mp3" +dest2 = "/tmp/test_appsink.mp3" +f = open(dest2,'w') + +streaming=True +encoder = Mp3Encoder(dest1, streaming=True) +encoder.setup(channels=channels, samplerate=samplerate) + +while True: + encoder.process(*decoder.process()) + if streaming: + f.write(encoder.chunk) + if encoder.eod : + break + +f.close() +print encoder.pipe diff --git a/tests/api/test_lolevel_streaming_bad.py b/tests/api/test_lolevel_streaming_bad.py new file mode 100644 index 0000000..e414ead --- /dev/null +++ b/tests/api/test_lolevel_streaming_bad.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +import sys +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + source= os.path.join (os.path.dirname(__file__), "../samples/sweep.wav") + +decoder = FileDecoder(source) +print "Creating decoder with id=%s for: %s" % (decoder.id(), source) +decoder.setup() +channels = decoder.channels() +print 'channels :', channels +samplerate = decoder.samplerate() +nframes = decoder.nframes() + +dest1 = "/tmp/test_filesink.mp3" +dest2 = "/tmp/test_appsink.mp3" +f = open(dest2,'w') + +streaming=True +encoder = Mp3Encoder(dest1, streaming=True) +encoder.setup(channels=channels, samplerate=samplerate) + +print encoder.pipe + +while True: + encoder.process(*decoder.process()) + if streaming: + f.write(encoder.chunk) + if encoder.eod : + break + +f.close() +print encoder.pipe diff --git a/tests/api/test_lolevel_streaming_vorbis.py b/tests/api/test_lolevel_streaming_vorbis.py new file mode 100644 index 0000000..ecc3d3f --- /dev/null +++ b/tests/api/test_lolevel_streaming_vorbis.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +import sys +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") + +decoder = FileDecoder(source) +print "Creating decoder with id=%s for: %s" % (decoder.id(), source) +decoder.setup() +channels = decoder.channels() +print 'channels :', channels +samplerate = decoder.samplerate() +nframes = decoder.nframes() + +dest1 = "/tmp/test_filesink.ogg" +dest2 = "/tmp/test_appsink.ogg" +f = open(dest2,'w') + +streaming=True +encoder = VorbisEncoder(dest1, streaming=True) +encoder.setup(channels=channels, samplerate=samplerate) + +print encoder.pipe + +while True: + encoder.process(*decoder.process()) + if streaming: + f.write(encoder.chunk) + if encoder.eod : + break + +f.close() +print encoder.pipe diff --git a/tests/api/test_mp3.py b/tests/api/test_mp3.py new file mode 100644 index 0000000..620696b --- /dev/null +++ b/tests/api/test_mp3.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + +from timeside.decoder import * +from timeside.encoder import * +import os.path + +source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") +dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3") + +decoder = FileDecoder(source) +encoder = Mp3Encoder(dest) + +(decoder | encoder).run() + +metadata = {'TIT2': 'title', #title2 + 'TCOM': 'composer', #composer + 'TPE1': 'lead creator', #lead + 'UFID': 'identifier', #Unique ID... + 'TALB': 'album', #album + 'TCON': 'genre', #genre + 'TDRC': '2011', #year +# 'COMM': 'blabla', #comment + } + +encoder.set_metadata(metadata) +encoder.write_metadata(dest) + diff --git a/tests/api/test_pipe.py b/tests/api/test_pipe.py new file mode 100644 index 0000000..629b3e9 --- /dev/null +++ b/tests/api/test_pipe.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- + +from timeside.tests.api.examples import Gain +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * +from sys import stdout +import os.path +import numpy + +source = os.path.join(os.path.dirname(__file__), "../samples/guitar.wav") + +print "Normalizing %s" % source +decoder = FileDecoder(source) +maxlevel = MaxLevel() +duration = Duration() + +(decoder | maxlevel | duration).run() + +gain = 1 +if maxlevel.result() < 0: + gain = 0.9 / numpy.exp(maxlevel.result()/20) + +print "input maxlevel: %f" % maxlevel.result() +print "gain: %f" % gain +print "duration: %f %s" % (duration.result(), duration.unit()) + +gain = Gain(gain) +encoder = WavEncoder("../results/guitar_normalized.wav") + +subpipe = gain | maxlevel + +(decoder | subpipe | encoder).run() + +print "output maxlevel: %f" % maxlevel.result() + + diff --git a/tests/api/test_pipe_spectrogram.py b/tests/api/test_pipe_spectrogram.py new file mode 100644 index 0000000..49eaa8c --- /dev/null +++ b/tests/api/test_pipe_spectrogram.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- + +import os +from timeside.core import * +from timeside.api import * +from timeside.decoder import * +from timeside.grapher import * + +sample_dir = '../samples' +img_dir = '../results/img' +if not os.path.exists(img_dir): + os.mkdir(img_dir) + +test_dict = {'sweep.wav': 'spec_wav.png', + 'sweep.flac': 'spec_flac.png', + 'sweep.ogg': 'spec_ogg.png', + 'sweep.mp3': 'spec_mp3.png', + } + +for source, image in test_dict.iteritems(): + audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source) + image = img_dir + os.sep + image + print 'Test : decoder(%s) | waveform (%s)' % (source, image) + decoder = FileDecoder(audio) + spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + (decoder | spectrogram).run() + print 'frames per pixel = ', spectrogram.graph.samples_per_pixel + print "render spectrogram to: %s" % image + spectrogram.render(image) + + + diff --git a/tests/api/test_pipe_waveform.py b/tests/api/test_pipe_waveform.py new file mode 100644 index 0000000..bbfb54a --- /dev/null +++ b/tests/api/test_pipe_waveform.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +import os +from timeside.core import * +from timeside.api import * +from timeside.decoder import * +from timeside.grapher import * + +sample_dir = '../samples' +img_dir = '../results/img' +if not os.path.exists(img_dir): + os.mkdir(img_dir) + +test_dict = {'sweep.wav': 'waveform_wav.png', + 'sweep.flac': 'waveform_flac.png', + 'sweep.ogg': 'waveform_ogg.png', + 'sweep.mp3': 'waveform_mp3.png', + } + +for source, image in test_dict.iteritems(): + audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source) + image = img_dir + os.sep + image + print 'Test : decoder(%s) | waveform (%s)' % (source, image) + decoder = FileDecoder(audio) + waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + (decoder | waveform).run() + print 'frames per pixel = ', waveform.graph.samples_per_pixel + print "render waveform to: %s" % image + waveform.render(image) + + diff --git a/tests/api/test_vorbis.py b/tests/api/test_vorbis.py new file mode 100644 index 0000000..53818cd --- /dev/null +++ b/tests/api/test_vorbis.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + +from timeside.decoder import * +from timeside.encoder import * +import os.path + +source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") +dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg") + +decoder = FileDecoder(source) +encoder = VorbisEncoder(dest) + +(decoder | encoder).run() diff --git a/tests/api/test_wav.py b/tests/api/test_wav.py new file mode 100644 index 0000000..077a01e --- /dev/null +++ b/tests/api/test_wav.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- + +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * + +import os.path +source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") +dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav") + +decoder = FileDecoder(source) +encoder = WavEncoder(dest) + +(decoder | encoder).run() + diff --git a/tests/listprocessors.py b/tests/listprocessors.py new file mode 100644 index 0000000..840c087 --- /dev/null +++ b/tests/listprocessors.py @@ -0,0 +1,12 @@ +import timeside + +def list_processors(interface, prefix=""): + print prefix + interface.__name__ + subinterfaces = interface.__subclasses__() + for i in subinterfaces: + list_processors(i, prefix + " ") + processors = timeside.processors(interface, False) + for p in processors: + print prefix + " %s [%s]" % (p.__name__, p.id()) + +list_processors(timeside.api.IProcessor) diff --git a/tests/samples/guitar.wav b/tests/samples/guitar.wav new file mode 100644 index 0000000..b5a9e80 Binary files /dev/null and b/tests/samples/guitar.wav differ diff --git a/tests/samples/sweep.flac b/tests/samples/sweep.flac new file mode 100644 index 0000000..decee06 Binary files /dev/null and b/tests/samples/sweep.flac differ diff --git a/tests/samples/sweep.mp3 b/tests/samples/sweep.mp3 new file mode 100644 index 0000000..dc75fe0 Binary files /dev/null and b/tests/samples/sweep.mp3 differ diff --git a/tests/samples/sweep.ogg b/tests/samples/sweep.ogg new file mode 100644 index 0000000..e30bf99 Binary files /dev/null and b/tests/samples/sweep.ogg differ diff --git a/tests/samples/sweep.wav b/tests/samples/sweep.wav new file mode 100644 index 0000000..fc28a32 Binary files /dev/null and b/tests/samples/sweep.wav differ diff --git a/tests/samples/sweep_source.wav b/tests/samples/sweep_source.wav new file mode 100644 index 0000000..5dcf9ba Binary files /dev/null and b/tests/samples/sweep_source.wav differ diff --git a/tests/testcomponent.py b/tests/testcomponent.py new file mode 100644 index 0000000..a06952c --- /dev/null +++ b/tests/testcomponent.py @@ -0,0 +1,165 @@ + +from timeside.component import * +from unit_timeside import TestCase, TestRunner + +__all__ = ['TestComponentArchitecture'] + +class TestComponentArchitecture(TestCase): + "Test the component and interface system" + + def testOneInterface(self): + "Test a component implementing one interface" + self.assertSameList(implementations(I1), [C1]) + + def testTwoInterfaces(self): + "Test a component implementing two interfaces" + self.assertSameList(implementations(I2), [C2]) + self.assertSameList(implementations(I3), [C2]) + + def testTwoImplementations(self): + "Test an interface implemented by two components" + self.assertSameList(implementations(I4), [C3, C4]) + + def testInterfaceInheritance(self): + "Test whether a component implements an interface's parent" + self.assertSameList(implementations(I5), [C5]) + + def testImplementationInheritance(self): + "Test that a component doesn't implement the interface implemented by its parent" + self.assertSameList(implementations(I7), [C6]) + + def testImplementationRedundancy(self): + "Test implementation redundancy across inheritance" + self.assertSameList(implementations(I8), [C8, C9]) + + def testAbstractImplementation(self): + "Test abstract implementation" + self.assertSameList(implementations(I11), []) + self.assertSameList(implementations(I11, abstract=True), [C11]) + + def testInterfaceDoc(self): + "Test @interfacedoc decorator" + self.assertEquals(C10.test.__doc__, "testdoc") + + def testInterfaceDocStatic(self): + "Test @interfacedoc decorator on static method" + self.assertEquals(C10.teststatic.__doc__, "teststaticdoc") + + def testIntefaceDocReversed(self): + "Test @interfacedoc on static method (decorators reversed)" + + try: + + class BogusDoc1(Component): + implements(I10) + + @interfacedoc + @staticmethod + def teststatic(self): + pass + + self.fail("No error raised with reversed decorators") + + except ComponentError: + pass + + def testInterfaceDocBadMethod(self): + "Test @interfacedoc with unexistant method in interface" + + try: + class BogusDoc2(Component): + implements(I10) + + @interfacedoc + def nosuchmethod(self): + pass + + self.fail("No error raised when decorating an unexistant method") + + except ComponentError: + pass + +class I1(Interface): + pass + +class I2(Interface): + pass + +class I3(Interface): + pass + +class I4(Interface): + pass + +class I5(Interface): + pass + +class I6(I5): + pass + +class I7(Interface): + pass + +class I8(Interface): + pass + +class I9(I8): + pass + +class I10(Interface): + def test(self): + """testdoc""" + + @staticmethod + def teststatic(self): + """teststaticdoc""" + +class I11(Interface): + pass + +class C1(Component): + implements(I1) + +class C2(Component): + implements(I2, I3) + +class C3(Component): + implements(I4) + +class C4(Component): + implements(I4) + +class C5(Component): + implements(I6) + +class C6(Component): + implements(I7) + +class C7(C6): + pass + +class C8(Component): + implements(I8) + +class C9(Component): + implements(I8, I9) + +class C10(Component): + implements(I10) + + @interfacedoc + def test(self): + pass + + @staticmethod + @interfacedoc + def teststatic(self): + pass + +class C11(Component): + abstract() + implements(I11) + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) + diff --git a/tests/testdecoding.py b/tests/testdecoding.py new file mode 100644 index 0000000..c847c4c --- /dev/null +++ b/tests/testdecoding.py @@ -0,0 +1,52 @@ +from timeside.decoder import * +from unit_timeside import TestCase, TestRunner + +import os.path + +__all__ = ['TestDecoding'] + +class TestDecoding(TestCase): + "Test the low level streaming features" + + def setUp(self): + pass + + def testWav(self): + "Test wav decoding" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + + def testFlac(self): + "Test flac decoding" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + + def testOgg(self): + "Test ogg decoding" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") + + def testMp3(self): + "Test mp3 decoding" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") + + def tearDown(self): + decoder = FileDecoder(self.source) + decoder.setup() + + totalframes = 0. + + while True: + frames, eod = decoder.process() + totalframes += frames.shape[0] + if eod: break + + # FIXME compute actual number of frames from file + if os.path.splitext(self.source)[-1].lower() == '.mp3': + self.assertEquals(totalframes, 355969) + elif os.path.splitext(self.source)[-1].lower() == '.ogg': + self.assertEquals(totalframes, 352833) + else: + self.assertEquals(totalframes, 352801) + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) + + diff --git a/tests/testgraphers.py b/tests/testgraphers.py new file mode 100644 index 0000000..622eeda --- /dev/null +++ b/tests/testgraphers.py @@ -0,0 +1,105 @@ + +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.grapher import * +from timeside.api import * + +from timeside.component import * + +from unit_timeside import TestCase, TestRunner + +import os.path + +__all__ = ['TestGraphers'] + +class TestGraphers(TestCase): + "Test all graphers with various input media formats" + + def setUp(self): + pass + + # WAVEFORMS + def testWav2Waveform(self): + "Test WAV to Waveform" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + self.image = "/tmp/test_waveform_sweep_wav.png" + self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testFlac2Waveform(self): + "Test FLAC to Waveform" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + self.image = "/tmp/test_waveform_sweep_flac.png" + self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testMp32Waveform(self): + "Test MP3 to Waveform" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") + self.image = "/tmp/test_waveform_sweep_mp3.png" + self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testOgg2Waveform(self): + "Test OGG to Waveform" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") + self.image = "/tmp/test_waveform_sweep_ogg.png" + self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + # WAVEFORMS JOYDIV + def testWav2WaveformJoyDiv(self): + "Test WAV to WaveformJoyDiv" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + self.image = "/tmp/test_waveformjoydiv_sweep_wav.png" + self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testFlac2WaveformJoyDiv(self): + "Test FLAC to WaveformJoyDiv" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + self.image = "/tmp/test_waveformjoydiv_sweep_flac.png" + self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testMp32WaveformJoyDiv(self): + "Test MP3 to WaveformJoyDiv" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") + self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png" + self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testOgg2WaveformJoyDiv(self): + "Test OGG to WaveformJoyDiv" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") + self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png" + self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + # SPECTROGRAMS + def testWav2Spectrogram(self): + "Test WAV to Spectrogram" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + self.image = "/tmp/test_spectrogram_sweep_wav.png" + self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testMp32Spectrogram(self): + "Test MP3 to Spectrogram" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") + self.image = "/tmp/test_spectrogram_sweep_mp3.png" + self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testFlac2Spectrogram(self): + "Test FLAC to Spectrogram" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + self.image = "/tmp/test_spectrogram_sweep_flac.png" + self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def testOgg2Spectrogram(self): + "Test OGG to Spectrogram" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") + self.image = "/tmp/test_spectrogram_sweep_ogg.png" + self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + + def tearDown(self): + decoder = FileDecoder(self.source) + (decoder | self.grapher).run() + self.grapher.render(self.image) + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) + diff --git a/tests/testinputadapter.py b/tests/testinputadapter.py new file mode 100644 index 0000000..531866a --- /dev/null +++ b/tests/testinputadapter.py @@ -0,0 +1,70 @@ +from timeside.core import FixedSizeInputAdapter +from unit_timeside import TestCase, TestRunner +import numpy + +class TestFixedSizeInputAdapter(TestCase): + "Test the fixed-sized input adapter" + + def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None): + output = output[:] + output.reverse() + _eod = None + for buffer, _eod in adapter.process(input, input_eod): + a = output.pop() + if not numpy.array_equiv(buffer, a): + self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a))) + + if _eod != output_eod: + self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod))) + + if output: + self.fail("trailing expected data: %s" % output) + + def setUp(self): + self.data = numpy.arange(44).reshape(2,22).transpose() + + def testTwoChannels(self): + "Test simple stream with two channels" + adapter = FixedSizeInputAdapter(4, 2) + + self.assertEquals(len(self.data), adapter.nframes(len(self.data))) + + self.assertIOEquals(adapter, self.data[0:1], False, []) + self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False) + self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False) + self.assertIOEquals(adapter, self.data[12:13], False, []) + self.assertIOEquals(adapter, self.data[13:14], False, []) + self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False) + self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False) + self.assertIOEquals(adapter, self.data[20:21], False, []) + self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True) + + def testPadding(self): + "Test automatic padding support" + adapter = FixedSizeInputAdapter(4, 2, pad=True) + + self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data))) + + self.assertIOEquals(adapter, self.data[0:21], False, + [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], + False) + + self.assertIOEquals(adapter, self.data[21:22], True, [[ + [20, 42], + [21, 43], + [0, 0], + [0, 0] + ]], True) + + def testSizeMultiple(self): + "Test a stream which contain a multiple number of buffers" + adapter = FixedSizeInputAdapter(4, 2) + + self.assertIOEquals(adapter, self.data[0:20], True, + [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], + True) + + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) + diff --git a/tests/testtranscoding.py b/tests/testtranscoding.py new file mode 100644 index 0000000..617b014 --- /dev/null +++ b/tests/testtranscoding.py @@ -0,0 +1,125 @@ +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +from timeside.component import * + +from unit_timeside import TestCase, TestRunner + +import os.path + +__all__ = ['TestTranscoding'] + +class TestTranscoding(TestCase): + "Test the low level streaming features" + + def setUp(self): + pass + + def testWav2Mp3(self): + "Test wav to mp3 conversion" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + + dest1 = "/tmp/test_wav_filesink.mp3" + dest2 = "/tmp/test_wav_appsink.mp3" + self.f = open(dest2,'w') + + self.streaming=True + + encoder = Mp3Encoder(dest1, streaming=True) + self.encoder = encoder + + def testFlac2Mp3(self): + "Test flac to mp3 conversion" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + + dest1 = "/tmp/test_flac_filesink.mp3" + dest2 = "/tmp/test_flac_appsink.mp3" + self.f = open(dest2,'w') + + self.streaming=True + + encoder = Mp3Encoder(dest1, streaming=True) + self.encoder = encoder + + """ + def testFlac2Ogg(self): + "Test flac to ogg conversion" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + + dest1 = "/tmp/test_flac_filesink.ogg" + dest2 = "/tmp/test_flac_appsink.ogg" + self.f = open(dest2,'w') + + self.streaming=True + + encoder = VorbisEncoder(dest1, streaming=True) + self.encoder = encoder + + def testWav2Ogg(self): + "Test wav to ogg conversion" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + + dest1 = "/tmp/test_wav_filesink.ogg" + dest2 = "/tmp/test_wav_appsink.ogg" + self.f = open(dest2,'w') + + self.streaming=True + + encoder = VorbisEncoder(dest1, streaming=True) + self.encoder = encoder + + def testWav2Flac(self): + "Test wav to flac conversion" + self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + + dest1 = "/tmp/test_wav_filesink.flac" + dest2 = "/tmp/test_wav_appsink.flac" + self.f = open(dest2,'w') + + self.streaming=True + + encoder = FlacEncoder(dest1, streaming=True) + self.encoder = encoder + """ + + def setUpDecoder(self): + self.decoder = FileDecoder(self.source) + self.decoder.setup() + self.channels = self.decoder.channels() + self.samplerate = self.decoder.samplerate() + + def setUpEncoder(self): + self.encoder.setup(channels = self.channels, samplerate = self.samplerate) + + def tearDown(self): + self.setUpDecoder() + self.setUpEncoder() + encoder = self.encoder + f = self.f + + #print "decoder pipe:\n", decoder.pipe + #print "encoder pipe:\n", encoder.pipe + totalframes = 0. + + while True: + frames, eod = self.decoder.process() + #print frames.shape[0] + totalframes += frames.shape[0] + encoder.process(frames, eod) + if self.streaming: + f.write(encoder.chunk) + if eod: + break + if encoder.eod : + break + f.close() + + # FIXME compute actual number of frames from file + self.assertEquals(totalframes, 352801) + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) + diff --git a/tests/unit_timeside.py b/tests/unit_timeside.py new file mode 100644 index 0000000..09bc2d6 --- /dev/null +++ b/tests/unit_timeside.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- + +import unittest +import sys +import time + +class TestCase(unittest.TestCase): + + def assertSameList(self, list1, list2): + "Test that two lists contain the same elements, in any order" + if len(list1) != len(list2): + self.fail("Lists length differ : %d != %d" % (len(list1), len(list2))) + + for item in list1: + if not item in list2: + self.fail("%s is not in list2" % str(item)) + + for item in list2: + if not item in list1: + self.fail("%s is not in list1" % str(item)) + +class _TextTestResult(unittest.TestResult): + """A test result class that can print formatted text results to a stream. + + Used by TextTestRunner. + """ + separator1 = '=' * 70 + separator2 = '-' * 70 + + def __init__(self, stream, descriptions, verbosity): + unittest.TestResult.__init__(self) + self.stream = stream + self.showAll = verbosity > 1 + self.dots = verbosity == 1 + self.descriptions = descriptions + self.currentTestCase = None + + def getDescription(self, test): + if self.descriptions: + return test.shortDescription() or str(test) + else: + return str(test) + + def startTest(self, test): + unittest.TestResult.startTest(self, test) + if self.showAll: + if self.currentTestCase != test.__class__: + self.currentTestCase = test.__class__ + self.stream.writeln() + self.stream.writeln("[%s]" % self.currentTestCase.__name__) + self.stream.write(" " + self.getDescription(test)) + self.stream.write(" ... ") + + def addSuccess(self, test): + unittest.TestResult.addSuccess(self, test) + if self.showAll: + self.stream.writeln("ok") + elif self.dots: + self.stream.write('.') + + def addError(self, test, err): + unittest.TestResult.addError(self, test, err) + if self.showAll: + self.stream.writeln("ERROR") + elif self.dots: + self.stream.write('E') + + def addFailure(self, test, err): + unittest.TestResult.addFailure(self, test, err) + if self.showAll: + self.stream.writeln("FAIL") + elif self.dots: + self.stream.write('F') + + def printErrors(self): + if self.dots or self.showAll: + self.stream.writeln() + self.printErrorList('ERROR', self.errors) + self.printErrorList('FAIL', self.failures) + + def printErrorList(self, flavour, errors): + for test, err in errors: + self.stream.writeln(self.separator1) + self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) + self.stream.writeln(self.separator2) + self.stream.writeln("%s" % err) + + +class _WritelnDecorator: + """Used to decorate file-like objects with a handy 'writeln' method""" + def __init__(self,stream): + self.stream = stream + + def __getattr__(self, attr): + return getattr(self.stream,attr) + + def writeln(self, arg=None): + if arg: self.write(arg) + self.write('\n') # text-mode streams translate to \r\n if needed + +class TestRunner: + """A test runner class that displays results in textual form. + + It prints out the names of tests as they are run, errors as they + occur, and a summary of the results at the end of the test run. + """ + def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2): + self.stream = _WritelnDecorator(stream) + self.descriptions = descriptions + self.verbosity = verbosity + + def _makeResult(self): + return _TextTestResult(self.stream, self.descriptions, self.verbosity) + + def run(self, test): + "Run the given test case or test suite." + result = self._makeResult() + startTime = time.time() + test(result) + stopTime = time.time() + timeTaken = stopTime - startTime + result.printErrors() + self.stream.writeln(result.separator2) + run = result.testsRun + self.stream.writeln("Ran %d test%s in %.3fs" % + (run, run != 1 and "s" or "", timeTaken)) + self.stream.writeln() + if not result.wasSuccessful(): + self.stream.write("FAILED (") + failed, errored = map(len, (result.failures, result.errors)) + if failed: + self.stream.write("failures=%d" % failed) + if errored: + if failed: self.stream.write(", ") + self.stream.write("errors=%d" % errored) + self.stream.writeln(")") + else: + self.stream.writeln("OK") + return result + + diff --git a/timeside/__init__.py b/timeside/__init__.py index 0d62329..acb5289 100644 --- a/timeside/__init__.py +++ b/timeside/__init__.py @@ -9,7 +9,6 @@ import timeside.decoder import timeside.encoder import timeside.grapher import timeside.analyzer -import timeside.tests from timeside.core import * __version__ = '0.3.2' diff --git a/timeside/tests/__init__.py b/timeside/tests/__init__.py deleted file mode 100644 index 09bc2d6..0000000 --- a/timeside/tests/__init__.py +++ /dev/null @@ -1,141 +0,0 @@ -# -*- coding: utf-8 -*- - -import unittest -import sys -import time - -class TestCase(unittest.TestCase): - - def assertSameList(self, list1, list2): - "Test that two lists contain the same elements, in any order" - if len(list1) != len(list2): - self.fail("Lists length differ : %d != %d" % (len(list1), len(list2))) - - for item in list1: - if not item in list2: - self.fail("%s is not in list2" % str(item)) - - for item in list2: - if not item in list1: - self.fail("%s is not in list1" % str(item)) - -class _TextTestResult(unittest.TestResult): - """A test result class that can print formatted text results to a stream. - - Used by TextTestRunner. - """ - separator1 = '=' * 70 - separator2 = '-' * 70 - - def __init__(self, stream, descriptions, verbosity): - unittest.TestResult.__init__(self) - self.stream = stream - self.showAll = verbosity > 1 - self.dots = verbosity == 1 - self.descriptions = descriptions - self.currentTestCase = None - - def getDescription(self, test): - if self.descriptions: - return test.shortDescription() or str(test) - else: - return str(test) - - def startTest(self, test): - unittest.TestResult.startTest(self, test) - if self.showAll: - if self.currentTestCase != test.__class__: - self.currentTestCase = test.__class__ - self.stream.writeln() - self.stream.writeln("[%s]" % self.currentTestCase.__name__) - self.stream.write(" " + self.getDescription(test)) - self.stream.write(" ... ") - - def addSuccess(self, test): - unittest.TestResult.addSuccess(self, test) - if self.showAll: - self.stream.writeln("ok") - elif self.dots: - self.stream.write('.') - - def addError(self, test, err): - unittest.TestResult.addError(self, test, err) - if self.showAll: - self.stream.writeln("ERROR") - elif self.dots: - self.stream.write('E') - - def addFailure(self, test, err): - unittest.TestResult.addFailure(self, test, err) - if self.showAll: - self.stream.writeln("FAIL") - elif self.dots: - self.stream.write('F') - - def printErrors(self): - if self.dots or self.showAll: - self.stream.writeln() - self.printErrorList('ERROR', self.errors) - self.printErrorList('FAIL', self.failures) - - def printErrorList(self, flavour, errors): - for test, err in errors: - self.stream.writeln(self.separator1) - self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) - self.stream.writeln(self.separator2) - self.stream.writeln("%s" % err) - - -class _WritelnDecorator: - """Used to decorate file-like objects with a handy 'writeln' method""" - def __init__(self,stream): - self.stream = stream - - def __getattr__(self, attr): - return getattr(self.stream,attr) - - def writeln(self, arg=None): - if arg: self.write(arg) - self.write('\n') # text-mode streams translate to \r\n if needed - -class TestRunner: - """A test runner class that displays results in textual form. - - It prints out the names of tests as they are run, errors as they - occur, and a summary of the results at the end of the test run. - """ - def __init__(self, stream=sys.stderr, descriptions=1, verbosity=2): - self.stream = _WritelnDecorator(stream) - self.descriptions = descriptions - self.verbosity = verbosity - - def _makeResult(self): - return _TextTestResult(self.stream, self.descriptions, self.verbosity) - - def run(self, test): - "Run the given test case or test suite." - result = self._makeResult() - startTime = time.time() - test(result) - stopTime = time.time() - timeTaken = stopTime - startTime - result.printErrors() - self.stream.writeln(result.separator2) - run = result.testsRun - self.stream.writeln("Ran %d test%s in %.3fs" % - (run, run != 1 and "s" or "", timeTaken)) - self.stream.writeln() - if not result.wasSuccessful(): - self.stream.write("FAILED (") - failed, errored = map(len, (result.failures, result.errors)) - if failed: - self.stream.write("failures=%d" % failed) - if errored: - if failed: self.stream.write(", ") - self.stream.write("errors=%d" % errored) - self.stream.writeln(")") - else: - self.stream.writeln("OK") - return result - - diff --git a/timeside/tests/alltests.py b/timeside/tests/alltests.py deleted file mode 100644 index 92f6abb..0000000 --- a/timeside/tests/alltests.py +++ /dev/null @@ -1,8 +0,0 @@ -from timeside.tests.testcomponent import * -from timeside.tests.testinputadapter import * -from timeside.tests.testgraphers import * -from timeside.tests.testdecoding import * -from timeside.tests.testtranscoding import * -from timeside.tests import TestRunner - -unittest.main(testRunner=TestRunner()) diff --git a/timeside/tests/api/__init__.py b/timeside/tests/api/__init__.py deleted file mode 100644 index 40a96af..0000000 --- a/timeside/tests/api/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# -*- coding: utf-8 -*- diff --git a/timeside/tests/api/examples.py b/timeside/tests/api/examples.py deleted file mode 100644 index 747c197..0000000 --- a/timeside/tests/api/examples.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter -from timeside.api import * -import numpy - - -class Gain(Processor): - implements(IEffect) - - @interfacedoc - def __init__(self, gain=1.0): - self.gain = gain - - @staticmethod - @interfacedoc - def id(): - return "test_gain" - - @staticmethod - @interfacedoc - def name(): - return "Gain test effect" - - def process(self, frames, eod=False): - return numpy.multiply(frames, self.gain), eod - - -class FixedInputProcessor(Processor): - """Processor which does absolutely nothing except illustrating the use - of the FixedInputSizeAdapter. It also tests things a bit.""" - - implements(IProcessor) - - BUFFER_SIZE = 1024 - - @staticmethod - @interfacedoc - def id(): - return "test_fixed" - - @interfacedoc - def setup(self, channels, samplerate, nframes): - super(FixedInputProcessor, self).setup(channels, samplerate, nframes) - self.adapter = FixedSizeInputAdapter(self.BUFFER_SIZE, channels, pad=True) - - @interfacedoc - def process(self, frames, eod=False): - for buffer, end in self.adapter.process(frames, eod): - # Test that the adapter is actually doing the job: - if len(buffer) != self.BUFFER_SIZE: - raise Exception("Bad buffer size from adapter") - - return frames, eod - - - - - diff --git a/timeside/tests/api/gstreamer.py b/timeside/tests/api/gstreamer.py deleted file mode 100644 index 897ac42..0000000 --- a/timeside/tests/api/gstreamer.py +++ /dev/null @@ -1,238 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2007-2009 Parisson -# Copyright (c) 2007 Olivier Guilyardi -# Copyright (c) 2007-2009 Guillaume Pellerin -# -# This file is part of TimeSide. - -# TimeSide is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 2 of the License, or -# (at your option) any later version. - -# TimeSide is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with TimeSide. If not, see . - -# Author: Paul Brossier - -from timeside.core import Processor, implements, interfacedoc -from timeside.api import IDecoder, IEncoder -from numpy import array, frombuffer, getbuffer, float32 - -import pygst -pygst.require('0.10') -import gst -import gobject -gobject.threads_init () - -class FileDecoder(Processor): - """ gstreamer-based decoder """ - implements(IDecoder) - - # duration ms, before discovery process times out - MAX_DISCOVERY_TIME = 3000 - - audioformat = None - audiochannels = None - audiorate = None - audionframes = None - mimetype = '' - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "test_gstreamerdec" - - def setup(self, channels = None, samplerate = None, nframes = None): - # the output data format we want - caps = "audio/x-raw-float, width=32" - pipeline = gst.parse_launch('''uridecodebin uri=%s - ! audioconvert - ! %s - ! appsink name=sink sync=False ''' % (self.uri, caps)) - # store a pointer to appsink in our decoder object - self.sink = pipeline.get_by_name('sink') - # adjust length of emitted buffers - # self.sink.set_property('blocksize', 0x10000) - # start pipeline - pipeline.set_state(gst.STATE_PLAYING) - - @interfacedoc - def channels(self): - return self.audiochannels - - @interfacedoc - def samplerate(self): - return self.audiorate - - @interfacedoc - def nframes(self): - return self.audionframes - - @interfacedoc - def process(self, frames = None, eod = False): - try: - buf = self.sink.emit('pull-buffer') - except SystemError, e: - # should never happen - print 'SystemError', e - return array([0.]), True - if buf == None: - return array([0.]), True - return self.gst_buffer_to_numpy_array(buf), False - - @interfacedoc - def release(self): - # nothing to do for now - pass - - ## IDecoder methods - - @interfacedoc - def __init__(self, uri): - - # is this a file? - import os.path - if os.path.exists(uri): - # get the absolute path - uri = os.path.abspath(uri) - # first run the file/uri through the discover pipeline - self.discover(uri) - # and make a uri of it - from urllib import quote - self.uri = 'file://'+quote(uri) - - @interfacedoc - def format(self): - # TODO check - return self.mimetype - - @interfacedoc - def encoding(self): - # TODO check - return self.mimetype.split('/')[-1] - - @interfacedoc - def resolution(self): - # TODO check: width or depth? - return self.audiowidth - - @interfacedoc - def metadata(self): - # TODO check - return self.tags - - ## gst.extend discoverer - - def discover(self, path): - """ gstreamer based helper function to get file attributes """ - from gst.extend import discoverer - d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME) - d.connect('discovered', self.discovered) - self.mainloop = gobject.MainLoop() - d.discover() - self.mainloop.run() - - def discovered(self, d, is_media): - """ gstreamer based helper executed upon discover() completion """ - if is_media and d.is_audio: - # copy the discoverer attributes to self - self.audiorate = d.audiorate - self.mimetype= d.mimetype - self.audiochannels = d.audiochannels - self.audiowidth = d.audiowidth - # conversion from time in nanoseconds to frames - from math import ceil - duration = d.audiorate * d.audiolength * 1.e-9 - self.audionframes = int (ceil ( duration ) ) - self.tags = d.tags - elif not d.is_audio: - print "error, no audio found!" - else: - print "fail", path - self.mainloop.quit() - - def gst_buffer_to_numpy_array(self, buf): - """ gstreamer buffer to numpy array conversion """ - chan = self.audiochannels - samples = frombuffer(buf.data, dtype=float32) - samples.resize([len(samples)/chan, chan]) - return samples - -class WavEncoder(Processor): - """ gstreamer-based encoder """ - implements(IEncoder) - - def __init__(self, output): - self.file = None - if isinstance(output, basestring): - self.filename = output - else: - raise Exception("Streaming not supported") - - @interfacedoc - def setup(self, channels=None, samplerate=None, nframes=None): - super(WavEncoder, self).setup(channels, samplerate, nframes) - # TODO open file for writing - # the output data format we want - pipeline = gst.parse_launch(''' appsrc name=src - ! audioconvert - ! wavenc - ! filesink location=%s ''' % self.filename) - # store a pointer to appsink in our encoder object - self.src = pipeline.get_by_name('src') - srccaps = gst.Caps("""audio/x-raw-float, - endianness=(int)1234, - channels=(int)%s, - width=(int)32, - rate=(int)%d""" % (int(channels), int(samplerate))) - self.src.set_property("caps", srccaps) - - # start pipeline - pipeline.set_state(gst.STATE_PLAYING) - self.pipeline = pipeline - - @staticmethod - @interfacedoc - def id(): - return "test_gstreamerenc" - - @staticmethod - @interfacedoc - def description(): - return "Gstreamer based encoder" - - @staticmethod - @interfacedoc - def file_extension(): - return "wav" - - @staticmethod - @interfacedoc - def mime_type(): - return "audio/x-wav" - - @interfacedoc - def set_metadata(self, metadata): - #TODO - pass - - @interfacedoc - def process(self, frames, eod=False): - buf = self.numpy_array_to_gst_buffer(frames) - self.src.emit('push-buffer', buf) - if eod: self.src.emit('end-of-stream') - return frames, eod - - def numpy_array_to_gst_buffer(self, frames): - """ gstreamer buffer to numpy array conversion """ - buf = gst.Buffer(getbuffer(frames)) - return buf diff --git a/timeside/tests/api/test_analyzer.py b/timeside/tests/api/test_analyzer.py deleted file mode 100644 index 2b8198d..0000000 --- a/timeside/tests/api/test_analyzer.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- - -import timeside -from sys import stdout -import os.path -import numpy - - -class TestAnalyzer: - - graphers = timeside.core.processors(timeside.api.IGrapher) - decoders = timeside.core.processors(timeside.api.IDecoder) - encoders= timeside.core.processors(timeside.api.IEncoder) - analyzers = timeside.core.processors(timeside.api.IAnalyzer) - - def __init__(self, path): - self.source = os.path.join(os.path.dirname(__file__), path) - print "Processing %s" % self.source - self.decoder = timeside.decoder.FileDecoder(self.source) - print 'format: ', self.decoder.format() - self.pipe = self.decoder - self.analyzers_sub_pipe = [] - - def process(self): - for analyzer in self.analyzers: - sub_pipe = analyzer() - self.analyzers_sub_pipe.append(sub_pipe) - self.pipe = self.pipe | sub_pipe - self.pipe.run() - - def results(self): - analyzers = [] - for analyzer in self.analyzers_sub_pipe: - value = analyzer.result() - analyzers.append({'name':analyzer.name(), - 'id':analyzer.id(), - 'unit':analyzer.unit(), - 'value':str(value)}) - print analyzers - - -test = TestAnalyzer('../samples/guitar.wav') -#test = TestAnalyzer('/home/momo/music/wav/Cellar/Cellar-FinallyMix_01.wav') -test.process() -test.results() - diff --git a/timeside/tests/api/test_analyzer3.py b/timeside/tests/api/test_analyzer3.py deleted file mode 100644 index 8114723..0000000 --- a/timeside/tests/api/test_analyzer3.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- - -import timeside -import sys -import os.path -import numpy -import time - - -class TestAnalyzer: - - analyzer = timeside.analyzer.MaxLevel() - - def __init__(self, path): - self.source = os.path.join(os.path.dirname(__file__), path) - print "Processing %s" % self.source - self.decoder = timeside.decoder.FileDecoder(self.source) - print 'format: ', self.decoder.format() - self.pipe = self.decoder - self.sub_pipe = self.analyzer - - def process(self): - self.pipe = self.pipe | self.sub_pipe - self.pipe.run() - - def results(self): - print {'name':self.analyzer.name(), - 'id':self.analyzer.id(), - 'unit':self.analyzer.unit(), - 'value':str(self.analyzer.value)} - -test = TestAnalyzer(sys.argv[-1]) -test.process() -test.results() diff --git a/timeside/tests/api/test_flac.py b/timeside/tests/api/test_flac.py deleted file mode 100644 index ee590c4..0000000 --- a/timeside/tests/api/test_flac.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.decoder import * -from timeside.encoder import * -import os.path - -source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") -dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.flac") - -decoder = FileDecoder(source) -encoder = FlacEncoder(dest) - -(decoder | encoder).run() diff --git a/timeside/tests/api/test_lolevel.py b/timeside/tests/api/test_lolevel.py deleted file mode 100644 index 421ce95..0000000 --- a/timeside/tests/api/test_lolevel.py +++ /dev/null @@ -1,61 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.tests.api.examples import Gain -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * - -import sys -if len(sys.argv) > 1: - source = sys.argv[1] -else: - import os.path - source= os.path.join (os.path.dirname(__file__), "../samples/guitar.wav") - -Decoder = FileDecoder -print "Creating decoder with id=%s for: %s" % (Decoder.id(), source) -decoder = Decoder(source) -analyzer = MaxLevel() -decoder.setup() -nchannels = decoder.channels() -samplerate = decoder.samplerate() -nframes = decoder.nframes() -analyzer.setup(nchannels, samplerate) - -print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % ( - nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution()) - -while True: - frames, eod = decoder.process() - analyzer.process(frames, eod) - if eod: - break - -max_level = analyzer.result() -print "Max level: %f" % max_level - -destination = "../results/guitar_normalized.wav" -Encoder = WavEncoder -print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination) -encoder = Encoder(destination) - -gain = 1 -if max_level > 0: - gain = 0.9 / max_level - -effect = Gain(gain) - -decoder.setup() -effect.setup(decoder.channels(), decoder.samplerate()) -encoder.setup(effect.channels(), effect.samplerate()) - -print "Applying effect id=%s with gain=%f" % (effect.id(), gain) - -while True: - frames, eod = decoder.process() - encoder.process(*effect.process(frames, eod)) - if eod: - break - diff --git a/timeside/tests/api/test_lolevel_streaming.py b/timeside/tests/api/test_lolevel_streaming.py deleted file mode 100644 index e394f0c..0000000 --- a/timeside/tests/api/test_lolevel_streaming.py +++ /dev/null @@ -1,40 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * - -import sys -if len(sys.argv) > 1: - source = sys.argv[1] -else: - import os.path - source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") - -decoder = FileDecoder(source) -print "Creating decoder with id=%s for: %s" % (decoder.id(), source) -decoder.setup() -channels = decoder.channels() -print 'channels :', channels -samplerate = decoder.samplerate() -nframes = decoder.nframes() - -dest1 = "/tmp/test_filesink.mp3" -dest2 = "/tmp/test_appsink.mp3" -f = open(dest2,'w') - -streaming=True -encoder = Mp3Encoder(dest1, streaming=True) -encoder.setup(channels=channels, samplerate=samplerate) - -while True: - encoder.process(*decoder.process()) - if streaming: - f.write(encoder.chunk) - if encoder.eod : - break - -f.close() -print encoder.pipe diff --git a/timeside/tests/api/test_lolevel_streaming_bad.py b/timeside/tests/api/test_lolevel_streaming_bad.py deleted file mode 100644 index e414ead..0000000 --- a/timeside/tests/api/test_lolevel_streaming_bad.py +++ /dev/null @@ -1,42 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * - -import sys -if len(sys.argv) > 1: - source = sys.argv[1] -else: - import os.path - source= os.path.join (os.path.dirname(__file__), "../samples/sweep.wav") - -decoder = FileDecoder(source) -print "Creating decoder with id=%s for: %s" % (decoder.id(), source) -decoder.setup() -channels = decoder.channels() -print 'channels :', channels -samplerate = decoder.samplerate() -nframes = decoder.nframes() - -dest1 = "/tmp/test_filesink.mp3" -dest2 = "/tmp/test_appsink.mp3" -f = open(dest2,'w') - -streaming=True -encoder = Mp3Encoder(dest1, streaming=True) -encoder.setup(channels=channels, samplerate=samplerate) - -print encoder.pipe - -while True: - encoder.process(*decoder.process()) - if streaming: - f.write(encoder.chunk) - if encoder.eod : - break - -f.close() -print encoder.pipe diff --git a/timeside/tests/api/test_lolevel_streaming_vorbis.py b/timeside/tests/api/test_lolevel_streaming_vorbis.py deleted file mode 100644 index ecc3d3f..0000000 --- a/timeside/tests/api/test_lolevel_streaming_vorbis.py +++ /dev/null @@ -1,42 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * - -import sys -if len(sys.argv) > 1: - source = sys.argv[1] -else: - import os.path - source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") - -decoder = FileDecoder(source) -print "Creating decoder with id=%s for: %s" % (decoder.id(), source) -decoder.setup() -channels = decoder.channels() -print 'channels :', channels -samplerate = decoder.samplerate() -nframes = decoder.nframes() - -dest1 = "/tmp/test_filesink.ogg" -dest2 = "/tmp/test_appsink.ogg" -f = open(dest2,'w') - -streaming=True -encoder = VorbisEncoder(dest1, streaming=True) -encoder.setup(channels=channels, samplerate=samplerate) - -print encoder.pipe - -while True: - encoder.process(*decoder.process()) - if streaming: - f.write(encoder.chunk) - if encoder.eod : - break - -f.close() -print encoder.pipe diff --git a/timeside/tests/api/test_mp3.py b/timeside/tests/api/test_mp3.py deleted file mode 100644 index 620696b..0000000 --- a/timeside/tests/api/test_mp3.py +++ /dev/null @@ -1,27 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.decoder import * -from timeside.encoder import * -import os.path - -source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") -dest = os.path.join(os.path.dirname(__file__), "/tmp/sweep_wav.mp3") - -decoder = FileDecoder(source) -encoder = Mp3Encoder(dest) - -(decoder | encoder).run() - -metadata = {'TIT2': 'title', #title2 - 'TCOM': 'composer', #composer - 'TPE1': 'lead creator', #lead - 'UFID': 'identifier', #Unique ID... - 'TALB': 'album', #album - 'TCON': 'genre', #genre - 'TDRC': '2011', #year -# 'COMM': 'blabla', #comment - } - -encoder.set_metadata(metadata) -encoder.write_metadata(dest) - diff --git a/timeside/tests/api/test_pipe.py b/timeside/tests/api/test_pipe.py deleted file mode 100644 index 629b3e9..0000000 --- a/timeside/tests/api/test_pipe.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.tests.api.examples import Gain -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * -from sys import stdout -import os.path -import numpy - -source = os.path.join(os.path.dirname(__file__), "../samples/guitar.wav") - -print "Normalizing %s" % source -decoder = FileDecoder(source) -maxlevel = MaxLevel() -duration = Duration() - -(decoder | maxlevel | duration).run() - -gain = 1 -if maxlevel.result() < 0: - gain = 0.9 / numpy.exp(maxlevel.result()/20) - -print "input maxlevel: %f" % maxlevel.result() -print "gain: %f" % gain -print "duration: %f %s" % (duration.result(), duration.unit()) - -gain = Gain(gain) -encoder = WavEncoder("../results/guitar_normalized.wav") - -subpipe = gain | maxlevel - -(decoder | subpipe | encoder).run() - -print "output maxlevel: %f" % maxlevel.result() - - diff --git a/timeside/tests/api/test_pipe_spectrogram.py b/timeside/tests/api/test_pipe_spectrogram.py deleted file mode 100644 index 49eaa8c..0000000 --- a/timeside/tests/api/test_pipe_spectrogram.py +++ /dev/null @@ -1,32 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -from timeside.core import * -from timeside.api import * -from timeside.decoder import * -from timeside.grapher import * - -sample_dir = '../samples' -img_dir = '../results/img' -if not os.path.exists(img_dir): - os.mkdir(img_dir) - -test_dict = {'sweep.wav': 'spec_wav.png', - 'sweep.flac': 'spec_flac.png', - 'sweep.ogg': 'spec_ogg.png', - 'sweep.mp3': 'spec_mp3.png', - } - -for source, image in test_dict.iteritems(): - audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source) - image = img_dir + os.sep + image - print 'Test : decoder(%s) | waveform (%s)' % (source, image) - decoder = FileDecoder(audio) - spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - (decoder | spectrogram).run() - print 'frames per pixel = ', spectrogram.graph.samples_per_pixel - print "render spectrogram to: %s" % image - spectrogram.render(image) - - - diff --git a/timeside/tests/api/test_pipe_waveform.py b/timeside/tests/api/test_pipe_waveform.py deleted file mode 100644 index bbfb54a..0000000 --- a/timeside/tests/api/test_pipe_waveform.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -from timeside.core import * -from timeside.api import * -from timeside.decoder import * -from timeside.grapher import * - -sample_dir = '../samples' -img_dir = '../results/img' -if not os.path.exists(img_dir): - os.mkdir(img_dir) - -test_dict = {'sweep.wav': 'waveform_wav.png', - 'sweep.flac': 'waveform_flac.png', - 'sweep.ogg': 'waveform_ogg.png', - 'sweep.mp3': 'waveform_mp3.png', - } - -for source, image in test_dict.iteritems(): - audio = os.path.join(os.path.dirname(__file__), sample_dir + os.sep + source) - image = img_dir + os.sep + image - print 'Test : decoder(%s) | waveform (%s)' % (source, image) - decoder = FileDecoder(audio) - waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - (decoder | waveform).run() - print 'frames per pixel = ', waveform.graph.samples_per_pixel - print "render waveform to: %s" % image - waveform.render(image) - - diff --git a/timeside/tests/api/test_vorbis.py b/timeside/tests/api/test_vorbis.py deleted file mode 100644 index 53818cd..0000000 --- a/timeside/tests/api/test_vorbis.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.decoder import * -from timeside.encoder import * -import os.path - -source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") -dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.ogg") - -decoder = FileDecoder(source) -encoder = VorbisEncoder(dest) - -(decoder | encoder).run() diff --git a/timeside/tests/api/test_wav.py b/timeside/tests/api/test_wav.py deleted file mode 100644 index 077a01e..0000000 --- a/timeside/tests/api/test_wav.py +++ /dev/null @@ -1,15 +0,0 @@ -# -*- coding: utf-8 -*- - -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * - -import os.path -source = os.path.join(os.path.dirname(__file__), "../samples/sweep.wav") -dest = os.path.join(os.path.dirname(__file__), "../results/sweep_wav.wav") - -decoder = FileDecoder(source) -encoder = WavEncoder(dest) - -(decoder | encoder).run() - diff --git a/timeside/tests/listprocessors.py b/timeside/tests/listprocessors.py deleted file mode 100644 index 840c087..0000000 --- a/timeside/tests/listprocessors.py +++ /dev/null @@ -1,12 +0,0 @@ -import timeside - -def list_processors(interface, prefix=""): - print prefix + interface.__name__ - subinterfaces = interface.__subclasses__() - for i in subinterfaces: - list_processors(i, prefix + " ") - processors = timeside.processors(interface, False) - for p in processors: - print prefix + " %s [%s]" % (p.__name__, p.id()) - -list_processors(timeside.api.IProcessor) diff --git a/timeside/tests/samples/guitar.wav b/timeside/tests/samples/guitar.wav deleted file mode 100644 index b5a9e80..0000000 Binary files a/timeside/tests/samples/guitar.wav and /dev/null differ diff --git a/timeside/tests/samples/sweep.flac b/timeside/tests/samples/sweep.flac deleted file mode 100644 index decee06..0000000 Binary files a/timeside/tests/samples/sweep.flac and /dev/null differ diff --git a/timeside/tests/samples/sweep.mp3 b/timeside/tests/samples/sweep.mp3 deleted file mode 100644 index dc75fe0..0000000 Binary files a/timeside/tests/samples/sweep.mp3 and /dev/null differ diff --git a/timeside/tests/samples/sweep.ogg b/timeside/tests/samples/sweep.ogg deleted file mode 100644 index e30bf99..0000000 Binary files a/timeside/tests/samples/sweep.ogg and /dev/null differ diff --git a/timeside/tests/samples/sweep.wav b/timeside/tests/samples/sweep.wav deleted file mode 100644 index fc28a32..0000000 Binary files a/timeside/tests/samples/sweep.wav and /dev/null differ diff --git a/timeside/tests/samples/sweep_source.wav b/timeside/tests/samples/sweep_source.wav deleted file mode 100644 index 5dcf9ba..0000000 Binary files a/timeside/tests/samples/sweep_source.wav and /dev/null differ diff --git a/timeside/tests/testcomponent.py b/timeside/tests/testcomponent.py deleted file mode 100644 index 6638c30..0000000 --- a/timeside/tests/testcomponent.py +++ /dev/null @@ -1,166 +0,0 @@ - -from timeside.component import * -from timeside.tests import TestCase, TestRunner -import unittest - -__all__ = ['TestComponentArchitecture'] - -class TestComponentArchitecture(TestCase): - "Test the component and interface system" - - def testOneInterface(self): - "Test a component implementing one interface" - self.assertSameList(implementations(I1), [C1]) - - def testTwoInterfaces(self): - "Test a component implementing two interfaces" - self.assertSameList(implementations(I2), [C2]) - self.assertSameList(implementations(I3), [C2]) - - def testTwoImplementations(self): - "Test an interface implemented by two components" - self.assertSameList(implementations(I4), [C3, C4]) - - def testInterfaceInheritance(self): - "Test whether a component implements an interface's parent" - self.assertSameList(implementations(I5), [C5]) - - def testImplementationInheritance(self): - "Test that a component doesn't implement the interface implemented by its parent" - self.assertSameList(implementations(I7), [C6]) - - def testImplementationRedundancy(self): - "Test implementation redundancy across inheritance" - self.assertSameList(implementations(I8), [C8, C9]) - - def testAbstractImplementation(self): - "Test abstract implementation" - self.assertSameList(implementations(I11), []) - self.assertSameList(implementations(I11, abstract=True), [C11]) - - def testInterfaceDoc(self): - "Test @interfacedoc decorator" - self.assertEquals(C10.test.__doc__, "testdoc") - - def testInterfaceDocStatic(self): - "Test @interfacedoc decorator on static method" - self.assertEquals(C10.teststatic.__doc__, "teststaticdoc") - - def testIntefaceDocReversed(self): - "Test @interfacedoc on static method (decorators reversed)" - - try: - - class BogusDoc1(Component): - implements(I10) - - @interfacedoc - @staticmethod - def teststatic(self): - pass - - self.fail("No error raised with reversed decorators") - - except ComponentError: - pass - - def testInterfaceDocBadMethod(self): - "Test @interfacedoc with unexistant method in interface" - - try: - class BogusDoc2(Component): - implements(I10) - - @interfacedoc - def nosuchmethod(self): - pass - - self.fail("No error raised when decorating an unexistant method") - - except ComponentError: - pass - -class I1(Interface): - pass - -class I2(Interface): - pass - -class I3(Interface): - pass - -class I4(Interface): - pass - -class I5(Interface): - pass - -class I6(I5): - pass - -class I7(Interface): - pass - -class I8(Interface): - pass - -class I9(I8): - pass - -class I10(Interface): - def test(self): - """testdoc""" - - @staticmethod - def teststatic(self): - """teststaticdoc""" - -class I11(Interface): - pass - -class C1(Component): - implements(I1) - -class C2(Component): - implements(I2, I3) - -class C3(Component): - implements(I4) - -class C4(Component): - implements(I4) - -class C5(Component): - implements(I6) - -class C6(Component): - implements(I7) - -class C7(C6): - pass - -class C8(Component): - implements(I8) - -class C9(Component): - implements(I8, I9) - -class C10(Component): - implements(I10) - - @interfacedoc - def test(self): - pass - - @staticmethod - @interfacedoc - def teststatic(self): - pass - -class C11(Component): - abstract() - implements(I11) - -if __name__ == '__main__': - unittest.main(testRunner=TestRunner()) - diff --git a/timeside/tests/testdecoding.py b/timeside/tests/testdecoding.py deleted file mode 100644 index 5e0d255..0000000 --- a/timeside/tests/testdecoding.py +++ /dev/null @@ -1,53 +0,0 @@ -from timeside.decoder import * -from timeside.tests import TestCase, TestRunner -import unittest - -import os.path - -__all__ = ['TestDecoding'] - -class TestDecoding(TestCase): - "Test the low level streaming features" - - def setUp(self): - pass - - def testWav(self): - "Test wav decoding" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - - def testFlac(self): - "Test flac decoding" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - - def testOgg(self): - "Test ogg decoding" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") - - def testMp3(self): - "Test mp3 decoding" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") - - def tearDown(self): - decoder = FileDecoder(self.source) - decoder.setup() - - totalframes = 0. - - while True: - frames, eod = decoder.process() - totalframes += frames.shape[0] - if eod: break - - # FIXME compute actual number of frames from file - if os.path.splitext(self.source)[-1].lower() == '.mp3': - self.assertEquals(totalframes, 355969) - elif os.path.splitext(self.source)[-1].lower() == '.ogg': - self.assertEquals(totalframes, 352833) - else: - self.assertEquals(totalframes, 352801) - -if __name__ == '__main__': - unittest.main(testRunner=TestRunner()) - - diff --git a/timeside/tests/testgraphers.py b/timeside/tests/testgraphers.py deleted file mode 100644 index bff370c..0000000 --- a/timeside/tests/testgraphers.py +++ /dev/null @@ -1,105 +0,0 @@ - -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.grapher import * -from timeside.api import * - -from timeside.component import * -from timeside.tests import TestCase, TestRunner -import unittest - -import os.path - -__all__ = ['TestGraphers'] - -class TestGraphers(TestCase): - "Test all graphers with various input media formats" - - def setUp(self): - pass - - # WAVEFORMS - def testWav2Waveform(self): - "Test WAV to Waveform" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - self.image = "/tmp/test_waveform_sweep_wav.png" - self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testFlac2Waveform(self): - "Test FLAC to Waveform" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - self.image = "/tmp/test_waveform_sweep_flac.png" - self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testMp32Waveform(self): - "Test MP3 to Waveform" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") - self.image = "/tmp/test_waveform_sweep_mp3.png" - self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testOgg2Waveform(self): - "Test OGG to Waveform" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") - self.image = "/tmp/test_waveform_sweep_ogg.png" - self.grapher = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - # WAVEFORMS JOYDIV - def testWav2WaveformJoyDiv(self): - "Test WAV to WaveformJoyDiv" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - self.image = "/tmp/test_waveformjoydiv_sweep_wav.png" - self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testFlac2WaveformJoyDiv(self): - "Test FLAC to WaveformJoyDiv" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - self.image = "/tmp/test_waveformjoydiv_sweep_flac.png" - self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testMp32WaveformJoyDiv(self): - "Test MP3 to WaveformJoyDiv" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") - self.image = "/tmp/test_waveformjoydiv_sweep_mp3.png" - self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testOgg2WaveformJoyDiv(self): - "Test OGG to WaveformJoyDiv" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") - self.image = "/tmp/test_waveformjoydiv_sweep_ogg.png" - self.grapher = WaveformJoyDiv(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - # SPECTROGRAMS - def testWav2Spectrogram(self): - "Test WAV to Spectrogram" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - self.image = "/tmp/test_spectrogram_sweep_wav.png" - self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testMp32Spectrogram(self): - "Test MP3 to Spectrogram" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") - self.image = "/tmp/test_spectrogram_sweep_mp3.png" - self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testFlac2Spectrogram(self): - "Test FLAC to Spectrogram" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - self.image = "/tmp/test_spectrogram_sweep_flac.png" - self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def testOgg2Spectrogram(self): - "Test OGG to Spectrogram" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") - self.image = "/tmp/test_spectrogram_sweep_ogg.png" - self.grapher = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') - - def tearDown(self): - decoder = FileDecoder(self.source) - (decoder | self.grapher).run() - self.grapher.render(self.image) - -if __name__ == '__main__': - unittest.main(testRunner=TestRunner()) - diff --git a/timeside/tests/testinputadapter.py b/timeside/tests/testinputadapter.py deleted file mode 100644 index dca0f63..0000000 --- a/timeside/tests/testinputadapter.py +++ /dev/null @@ -1,71 +0,0 @@ -from timeside.core import FixedSizeInputAdapter -from timeside.tests import TestCase, TestRunner -import numpy -import unittest - -class TestFixedSizeInputAdapter(TestCase): - "Test the fixed-sized input adapter" - - def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None): - output = output[:] - output.reverse() - _eod = None - for buffer, _eod in adapter.process(input, input_eod): - a = output.pop() - if not numpy.array_equiv(buffer, a): - self.fail("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a))) - - if _eod != output_eod: - self.fail("eod do not match: %s != %s", (str(_eod), str(output_eod))) - - if output: - self.fail("trailing expected data: %s" % output) - - def setUp(self): - self.data = numpy.arange(44).reshape(2,22).transpose() - - def testTwoChannels(self): - "Test simple stream with two channels" - adapter = FixedSizeInputAdapter(4, 2) - - self.assertEquals(len(self.data), adapter.nframes(len(self.data))) - - self.assertIOEquals(adapter, self.data[0:1], False, []) - self.assertIOEquals(adapter, self.data[1:5], False, [self.data[0:4]], False) - self.assertIOEquals(adapter, self.data[5:12], False, [self.data[4:8], self.data[8:12]], False) - self.assertIOEquals(adapter, self.data[12:13], False, []) - self.assertIOEquals(adapter, self.data[13:14], False, []) - self.assertIOEquals(adapter, self.data[14:18], False, [self.data[12:16]], False) - self.assertIOEquals(adapter, self.data[18:20], False, [self.data[16:20]], False) - self.assertIOEquals(adapter, self.data[20:21], False, []) - self.assertIOEquals(adapter, self.data[21:22], True, [self.data[20:22]], True) - - def testPadding(self): - "Test automatic padding support" - adapter = FixedSizeInputAdapter(4, 2, pad=True) - - self.assertEquals(len(self.data) + 2, adapter.nframes(len(self.data))) - - self.assertIOEquals(adapter, self.data[0:21], False, - [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], - False) - - self.assertIOEquals(adapter, self.data[21:22], True, [[ - [20, 42], - [21, 43], - [0, 0], - [0, 0] - ]], True) - - def testSizeMultiple(self): - "Test a stream which contain a multiple number of buffers" - adapter = FixedSizeInputAdapter(4, 2) - - self.assertIOEquals(adapter, self.data[0:20], True, - [self.data[0:4], self.data[4:8], self.data[8:12], self.data[12:16], self.data[16:20]], - True) - - -if __name__ == '__main__': - unittest.main(testRunner=TestRunner()) - diff --git a/timeside/tests/testtranscoding.py b/timeside/tests/testtranscoding.py deleted file mode 100644 index 7528a76..0000000 --- a/timeside/tests/testtranscoding.py +++ /dev/null @@ -1,125 +0,0 @@ -from timeside.core import * -from timeside.decoder import * -from timeside.analyzer import * -from timeside.encoder import * -from timeside.api import * - -from timeside.component import * -from timeside.tests import TestCase, TestRunner -import unittest - -import os.path - -__all__ = ['TestTranscoding'] - -class TestTranscoding(TestCase): - "Test the low level streaming features" - - def setUp(self): - pass - - def testWav2Mp3(self): - "Test wav to mp3 conversion" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - - dest1 = "/tmp/test_wav_filesink.mp3" - dest2 = "/tmp/test_wav_appsink.mp3" - self.f = open(dest2,'w') - - self.streaming=True - - encoder = Mp3Encoder(dest1, streaming=True) - self.encoder = encoder - - def testFlac2Mp3(self): - "Test flac to mp3 conversion" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - - dest1 = "/tmp/test_flac_filesink.mp3" - dest2 = "/tmp/test_flac_appsink.mp3" - self.f = open(dest2,'w') - - self.streaming=True - - encoder = Mp3Encoder(dest1, streaming=True) - self.encoder = encoder - - """ - def testFlac2Ogg(self): - "Test flac to ogg conversion" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") - - dest1 = "/tmp/test_flac_filesink.ogg" - dest2 = "/tmp/test_flac_appsink.ogg" - self.f = open(dest2,'w') - - self.streaming=True - - encoder = VorbisEncoder(dest1, streaming=True) - self.encoder = encoder - - def testWav2Ogg(self): - "Test wav to ogg conversion" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - - dest1 = "/tmp/test_wav_filesink.ogg" - dest2 = "/tmp/test_wav_appsink.ogg" - self.f = open(dest2,'w') - - self.streaming=True - - encoder = VorbisEncoder(dest1, streaming=True) - self.encoder = encoder - - def testWav2Flac(self): - "Test wav to flac conversion" - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") - - dest1 = "/tmp/test_wav_filesink.flac" - dest2 = "/tmp/test_wav_appsink.flac" - self.f = open(dest2,'w') - - self.streaming=True - - encoder = FlacEncoder(dest1, streaming=True) - self.encoder = encoder - """ - - def setUpDecoder(self): - self.decoder = FileDecoder(self.source) - self.decoder.setup() - self.channels = self.decoder.channels() - self.samplerate = self.decoder.samplerate() - - def setUpEncoder(self): - self.encoder.setup(channels = self.channels, samplerate = self.samplerate) - - def tearDown(self): - self.setUpDecoder() - self.setUpEncoder() - encoder = self.encoder - f = self.f - - #print "decoder pipe:\n", decoder.pipe - #print "encoder pipe:\n", encoder.pipe - totalframes = 0. - - while True: - frames, eod = self.decoder.process() - #print frames.shape[0] - totalframes += frames.shape[0] - encoder.process(frames, eod) - if self.streaming: - f.write(encoder.chunk) - if eod: - break - if encoder.eod : - break - f.close() - - # FIXME compute actual number of frames from file - self.assertEquals(totalframes, 352801) - -if __name__ == '__main__': - unittest.main(testRunner=TestRunner()) -