From 789af6d8432d730747c5d304b067f0a8a18197d6 Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Wed, 27 Nov 2013 16:59:18 +0100 Subject: [PATCH] Tests: Various enhancements and fixes in transcoding, encoding - unit-timeside: Small enhancements in Error messages display - unit-timeside: Remove 'assertSameList' function which seems equivalent to unittest assertItemsEqual. unittest.TestCase class does thus no more need to be override, propagate this change in every test_* modules. - test_encoding and test_transcoding: refactoring + handling of expected failures (duration for loseless encoders, channels for voaacenc, ...) + run everything in pipes to handle properly the setup. --- tests/test_AnalyzerResult.py | 2 +- tests/test_analyzer_dc.py | 3 +- tests/test_analyzer_level.py | 2 +- tests/test_analyzer_preprocessors.py | 4 +- tests/test_array_decoding.py | 2 +- tests/test_aubio_melenergy.py | 2 +- tests/test_aubio_mfcc.py | 2 +- tests/test_aubio_pitch.py | 2 +- tests/test_aubio_specdesc.py | 2 +- tests/test_aubio_temporal.py | 2 +- tests/test_component.py | 68 ++++++++---- tests/test_decoder_utils.py | 16 ++- tests/test_decoding.py | 66 ++++++++---- tests/test_encoding.py | 123 +++++++++++++-------- tests/test_graphing.py | 2 +- tests/test_inputadapter.py | 2 +- tests/test_list_processors.py | 2 +- tests/test_transcoding.py | 153 +++++++++++++++------------ tests/test_yaafe.py | 2 +- tests/tools.py | 9 ++ tests/unit_timeside.py | 35 +++--- 21 files changed, 305 insertions(+), 196 deletions(-) diff --git a/tests/test_AnalyzerResult.py b/tests/test_AnalyzerResult.py index 200a7d2..912cd5b 100755 --- a/tests/test_AnalyzerResult.py +++ b/tests/test_AnalyzerResult.py @@ -10,7 +10,7 @@ from math import pi verbose = 0 -class TestAnalyzerResult(TestCase): +class TestAnalyzerResult(unittest.TestCase): """ test AnalyzerResult """ def setUp(self): diff --git a/tests/test_analyzer_dc.py b/tests/test_analyzer_dc.py index 7fe21b7..68890cb 100755 --- a/tests/test_analyzer_dc.py +++ b/tests/test_analyzer_dc.py @@ -3,8 +3,9 @@ from unit_timeside import * from timeside.decoder import FileDecoder from timeside.analyzer.dc import MeanDCShift +import os -class TestAnalyzerDC(TestCase): +class TestAnalyzerDC(unittest.TestCase): def setUp(self): self.analyzer = MeanDCShift() diff --git a/tests/test_analyzer_level.py b/tests/test_analyzer_level.py index 47fc081..02fb856 100755 --- a/tests/test_analyzer_level.py +++ b/tests/test_analyzer_level.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.level import Level -class TestAnalyzerLevel(TestCase): +class TestAnalyzerLevel(unittest.TestCase): def setUp(self): self.analyzer = Level() diff --git a/tests/test_analyzer_preprocessors.py b/tests/test_analyzer_preprocessors.py index 3a37625..60c2b34 100644 --- a/tests/test_analyzer_preprocessors.py +++ b/tests/test_analyzer_preprocessors.py @@ -21,7 +21,7 @@ class FakeAnalyzer(object): return frames, eod -class TestAnalyzerPreProcessors(TestCase): +class TestAnalyzerPreProcessors(unittest.TestCase): def tearDown(self): @@ -86,7 +86,7 @@ class TestDownmixToMono(TestAnalyzerPreProcessors): self.input_eod[-1] = True self.process_frames = self.input_frames.mean(axis=-1) -class TestFramesAdapter(TestAnalyzerPreProcessors, TestCase): +class TestFramesAdapter(TestAnalyzerPreProcessors, unittest.TestCase): def setUp(self): self.decorator = frames_adapter diff --git a/tests/test_array_decoding.py b/tests/test_array_decoding.py index b29a14b..1c740b9 100644 --- a/tests/test_array_decoding.py +++ b/tests/test_array_decoding.py @@ -8,7 +8,7 @@ from unit_timeside import * import numpy as np -class TestDecoding(TestCase): +class TestDecoding(unittest.TestCase): "Test decoding for ArrayDecoder" diff --git a/tests/test_aubio_melenergy.py b/tests/test_aubio_melenergy.py index 8ee4a86..274e032 100755 --- a/tests/test_aubio_melenergy.py +++ b/tests/test_aubio_melenergy.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.aubio_melenergy import AubioMelEnergy -class TestAubioMelEnergy(TestCase): +class TestAubioMelEnergy(unittest.TestCase): def setUp(self): self.analyzer = AubioMelEnergy() diff --git a/tests/test_aubio_mfcc.py b/tests/test_aubio_mfcc.py index bc1f26c..8bcffef 100755 --- a/tests/test_aubio_mfcc.py +++ b/tests/test_aubio_mfcc.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.aubio_mfcc import AubioMfcc -class TestAubioMfcc(TestCase): +class TestAubioMfcc(unittest.TestCase): def setUp(self): self.analyzer = AubioMfcc() diff --git a/tests/test_aubio_pitch.py b/tests/test_aubio_pitch.py index 660d7d0..557e797 100755 --- a/tests/test_aubio_pitch.py +++ b/tests/test_aubio_pitch.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.aubio_pitch import AubioPitch -class TestAubioPitch(TestCase): +class TestAubioPitch(unittest.TestCase): def setUp(self): self.analyzer = AubioPitch() diff --git a/tests/test_aubio_specdesc.py b/tests/test_aubio_specdesc.py index 5fb0fcf..f4341ec 100755 --- a/tests/test_aubio_specdesc.py +++ b/tests/test_aubio_specdesc.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.aubio_specdesc import AubioSpecdesc -class TestAubioSpecdesc(TestCase): +class TestAubioSpecdesc(unittest.TestCase): def setUp(self): self.analyzer = AubioSpecdesc() diff --git a/tests/test_aubio_temporal.py b/tests/test_aubio_temporal.py index 6189efb..9ffba1b 100755 --- a/tests/test_aubio_temporal.py +++ b/tests/test_aubio_temporal.py @@ -4,7 +4,7 @@ from unit_timeside import * from timeside.decoder import * from timeside.analyzer.aubio_temporal import AubioTemporal -class TestAubioTemporal(TestCase): +class TestAubioTemporal(unittest.TestCase): def setUp(self): self.analyzer = AubioTemporal() diff --git a/tests/test_component.py b/tests/test_component.py index 411a808..40330bc 100755 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -5,44 +5,46 @@ from unit_timeside import * __all__ = ['TestComponentArchitecture'] -class TestComponentArchitecture(TestCase): + +class TestComponentArchitecture(unittest.TestCase): + "Test the component and interface system" - + def testOneInterface(self): "Test a component implementing one interface" - self.assertSameList(implementations(I1), [C1]) + self.assertItemsEqual(implementations(I1), [C1]) - def testTwoInterfaces(self): + def testTwoInterfaces(self): "Test a component implementing two interfaces" - self.assertSameList(implementations(I2), [C2]) - self.assertSameList(implementations(I3), [C2]) + self.assertItemsEqual(implementations(I2), [C2]) + self.assertItemsEqual(implementations(I3), [C2]) def testTwoImplementations(self): "Test an interface implemented by two components" - self.assertSameList(implementations(I4), [C3, C4]) + self.assertItemsEqual(implementations(I4), [C3, C4]) def testInterfaceInheritance(self): "Test whether a component implements an interface's parent" - self.assertSameList(implementations(I5), [C5]) + self.assertItemsEqual(implementations(I5), [C5]) def testImplementationInheritance(self): - "Test that a component doesn't implement the interface implemented by its parent" - self.assertSameList(implementations(I7), [C6]) + "Test that a component doesn't implement the interface implemented by its parent" + self.assertItemsEqual(implementations(I7), [C6]) def testImplementationRedundancy(self): - "Test implementation redundancy across inheritance" - self.assertSameList(implementations(I8), [C8, C9]) + "Test implementation redundancy across inheritance" + self.assertItemsEqual(implementations(I8), [C8, C9]) - def testAbstractImplementation(self): + def testAbstractImplementation(self): "Test abstract implementation" - self.assertSameList(implementations(I11), []) - self.assertSameList(implementations(I11, abstract=True), [C11]) + self.assertItemsEqual(implementations(I11), []) + self.assertItemsEqual(implementations(I11, abstract=True), [C11]) - def testInterfaceDoc(self): + def testInterfaceDoc(self): "Test @interfacedoc decorator" self.assertEquals(C10.test.__doc__, "testdoc") - def testInterfaceDocStatic(self): + def testInterfaceDocStatic(self): "Test @interfacedoc decorator on static method" self.assertEquals(C10.teststatic.__doc__, "teststaticdoc") @@ -55,7 +57,7 @@ class TestComponentArchitecture(TestCase): implements(I10) @interfacedoc - @staticmethod + @staticmethod def teststatic(self): pass @@ -63,7 +65,7 @@ class TestComponentArchitecture(TestCase): except ComponentError: pass - + def testInterfaceDocBadMethod(self): "Test @interfacedoc with unexistant method in interface" @@ -80,71 +82,93 @@ class TestComponentArchitecture(TestCase): except ComponentError: pass + class I1(Interface): pass + class I2(Interface): pass + class I3(Interface): pass + class I4(Interface): pass + class I5(Interface): pass + class I6(I5): pass + class I7(Interface): pass + class I8(Interface): pass + class I9(I8): pass + class I10(Interface): + def test(self): """testdoc""" - @staticmethod + @staticmethod def teststatic(self): """teststaticdoc""" + class I11(Interface): pass + class C1(Component): implements(I1) + class C2(Component): implements(I2, I3) + class C3(Component): implements(I4) + class C4(Component): implements(I4) + class C5(Component): implements(I6) + class C6(Component): implements(I7) + class C7(C6): pass + class C8(Component): implements(I8) + class C9(Component): implements(I8, I9) + class C10(Component): implements(I10) @@ -152,15 +176,15 @@ class C10(Component): def test(self): pass - @staticmethod + @staticmethod @interfacedoc def teststatic(self): pass + class C11(Component): abstract() implements(I11) if __name__ == '__main__': unittest.main(testRunner=TestRunner()) - diff --git a/tests/test_decoder_utils.py b/tests/test_decoder_utils.py index 44a03d7..e0b6f1e 100644 --- a/tests/test_decoder_utils.py +++ b/tests/test_decoder_utils.py @@ -10,7 +10,7 @@ from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri import os.path -class TestGetUri(TestCase): +class TestGetUri(unittest.TestCase): "Test get_uri function" def testFileName(self): "Retrieve the uri from a filename" @@ -28,7 +28,7 @@ class TestGetUri(TestCase): self.assertEqual(self.uri, get_uri(self.source)) -class TestGetUriWrongUri(TestCase): +class TestGetUriWrongUri(unittest.TestCase): def testMissingFile(self): "Missing file raise IOerror" self.source = os.path.join(os.path.dirname(__file__), @@ -45,12 +45,13 @@ class TestGetUriWrongUri(TestCase): self.assertRaises(IOError, get_uri, self.source) -class TestGetMediaInfo(TestCase): +class TestGetMediaInfo(unittest.TestCase): "Test get_media_uri_info function on an uri" def setUp(self): self.test_exact_duration = True self.source_duration = 8 + self.test_exact_duration = True self.expected_channels = 2 self.expected_samplerate = 44100 self.expected_depth = 16 @@ -90,11 +91,18 @@ class TestGetMediaInfo(TestCase): self.source = os.path.join(os.path.dirname(__file__), "samples/sweep.mp3") self.expected_depth = 32 + self.test_exact_duration = False + def tearDown(self): uri = get_uri(self.source) uri_info = get_media_uri_info(uri) - self.assertEqual(self.source_duration, uri_info['duration']) + if self.test_exact_duration: + self.assertEqual(self.source_duration, uri_info['duration']) + else: + self.assertAlmostEqual(self.source_duration, + uri_info['duration'], + places=1) self.assertEqual(self.expected_channels, uri_info['streams'][0]['channels']) self.assertEqual(self.expected_samplerate, uri_info['streams'][0]['samplerate']) self.assertEqual(self.expected_depth, uri_info['streams'][0]['depth']) diff --git a/tests/test_decoding.py b/tests/test_decoding.py index a8b8fc7..72986f7 100755 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -10,7 +10,9 @@ import os.path #from glib import GError as GST_IOError # HINT : to use later with Gnonlin only -class TestDecoding(TestCase): + +class TestDecoding(unittest.TestCase): + "Test decoding features" def setUp(self): @@ -22,12 +24,12 @@ class TestDecoding(TestCase): self.expected_channels = 2 self.expected_totalframes = 352800 self.test_exact_duration = True + self.source_duration = 8 def testWav(self): "Test wav decoding" self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.wav") - + "samples/sweep.wav") def testWavMono(self): "Test mono wav decoding" @@ -42,7 +44,7 @@ class TestDecoding(TestCase): "samples/sweep_32000.wav") expected_samplerate = 32000 - ratio = expected_samplerate/self.expected_samplerate + ratio = expected_samplerate / self.expected_samplerate self.expected_totalframes = int(self.expected_totalframes * ratio) self.expected_samplerate = expected_samplerate @@ -60,7 +62,6 @@ class TestDecoding(TestCase): self.expected_totalframes = 352832 self.test_exact_duration = False - def testMp3(self): "Test mp3 decoding" self.source = os.path.join(os.path.dirname(__file__), @@ -69,7 +70,6 @@ class TestDecoding(TestCase): self.expected_totalframes = 353664 self.test_exact_duration = False - def tearDown(self): decoder = FileDecoder(uri=self.source, start=self.start, @@ -104,23 +104,36 @@ class TestDecoding(TestCase): self.assertEqual(decoder.input_channels, decoder.output_channels) # and if we know the expected channels, check the output match if self.expected_channels: - self.assertEqual(self.expected_channels, decoder.output_channels) + self.assertEqual( + self.expected_channels, decoder.output_channels) # do the same with the sampling rate if self.samplerate: self.assertEqual(self.samplerate, decoder.output_samplerate) else: - self.assertEqual(decoder.input_samplerate, decoder.output_samplerate) + self.assertEqual( + decoder.input_samplerate, decoder.output_samplerate) if self.expected_samplerate: - self.assertEqual(self.expected_samplerate, decoder.output_samplerate) + self.assertEqual( + self.expected_samplerate, decoder.output_samplerate) self.assertEqual(totalframes, self.expected_totalframes) + input_duration = decoder.input_totalframes / decoder.input_samplerate + output_duration = decoder.totalframes() / decoder.output_samplerate if self.test_exact_duration: - self.assertEqual(totalframes/decoder.output_samplerate, - decoder.totalframes()/decoder.output_samplerate) + self.assertEqual(input_duration, output_duration) + self.assertEqual(input_duration, + decoder.uri_duration) + self.assertEqual(self.source_duration, + decoder.uri_duration) else: - self.assertAlmostEqual(totalframes/decoder.output_samplerate, - decoder.totalframes()/decoder.output_samplerate, - places=1) + self.assertAlmostEqual(input_duration, output_duration, + places=1) + self.assertAlmostEqual(input_duration, + decoder.uri_duration, + places=1) + self.assertAlmostEqual(self.source_duration, + decoder.uri_duration, + places=1) class TestDecodingSegment(TestDecoding): @@ -129,14 +142,15 @@ class TestDecodingSegment(TestDecoding): super(TestDecodingSegment, self).setUp() self.start = 1 self.duration = 3 + self.source_duration = self.duration self.expected_totalframes = self.duration * self.expected_samplerate - def testMp3(self): "Test mp3 decoding" super(TestDecodingSegment, self).testMp3() - self.expected_totalframes = self.duration * self.expected_samplerate + 1 + self.expected_totalframes = self.duration * \ + self.expected_samplerate + 1 def testWav(self): "Test wav decoding" @@ -164,12 +178,14 @@ class TestDecodingSegmentDefaultStart(TestDecodingSegment): def setUp(self): super(TestDecodingSegmentDefaultStart, self).setUp() self.duration = 1 + self.source_duration = self.duration self.expected_totalframes = self.duration * self.expected_samplerate def testMp3(self): "Test mp3 decoding" super(TestDecodingSegmentDefaultStart, self).testMp3() - self.expected_totalframes = self.duration * self.expected_samplerate + 1 + self.expected_totalframes = self.duration * \ + self.expected_samplerate + 1 class TestDecodingSegmentDefaultDuration(TestDecodingSegment): @@ -177,7 +193,10 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment): def setUp(self): super(TestDecodingSegment, self).setUp() self.start = 1 - self.expected_totalframes = self.expected_totalframes - self.start * self.expected_samplerate + self.source_duration -= self.start + + self.expected_totalframes = (self.expected_totalframes + - self.start * self.expected_samplerate) def testWav(self): "Test wav decoding" @@ -194,7 +213,7 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment): def testMp3(self): "Test mp3 decoding" super(TestDecodingSegment, self).testMp3() - self.expected_totalframes = 310715#308701 + self.expected_totalframes = 310715 # was 308701 ? class TestDecodingStereo(TestDecoding): @@ -262,6 +281,7 @@ class TestDecodingMonoDownsampling(TestDecoding): super(TestDecodingMonoDownsampling, self).testMp3() self.expected_totalframes = 128314 + class TestDecodingStereoDownsampling(TestDecoding): def setUp(self): @@ -321,16 +341,18 @@ class TestDecodingLongBlock(TestDecoding): def setUp(self): super(TestDecodingLongBlock, self).setUp() - self.samplerate, self.channels, self.blocksize = None, None, 1024*8*2 + self.samplerate, self.channels, self.blocksize = None, None, 1024 * \ + 8 * 2 + +class TestDecodingWrongFiles(unittest.TestCase): -class TestDecodingWrongFiles(TestCase): "Test decoding features" def testMissingFile(self): "Test decoding missing file" self.source = os.path.join(os.path.dirname(__file__), - "a_missing_file_blahblah.wav") + "a_missing_file_blahblah.wav") self.assertRaises(IOError, FileDecoder, self.source) diff --git a/tests/test_encoding.py b/tests/test_encoding.py index 235e6a1..53dd48a 100755 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -1,91 +1,127 @@ #! /usr/bin/env python +from __future__ import division + from math import pi -from numpy import arange, sin +import numpy as np from unit_timeside import * +from timeside.decoder.utils import get_uri, get_media_uri_info +from timeside.decoder import ArrayDecoder +import os +from tools import tmp_file_sink + +class TestEncoding(unittest.TestCase): + "Test encoding features" -from os import unlink + def generate_source(self): + self.expected_total_frames = self.source_duration * self.samplerate + f0 = 440. + f = f0 * np.logspace(0, 4/12*(self.channels-1), self.channels, base=2) + omega = 2. * pi * f / self.samplerate + samples = np.empty((self.expected_total_frames, self.channels)) + for n in xrange(self.channels): + samples[:,n] = .75 * np.sin(omega[n]*np.arange(self.expected_total_frames)) + return samples -class TestEncoding(TestCase): - "Test encoding features" def setUp(self): self.samplerate, self.channels, self.blocksize = 44100, 1, 1024 - import tempfile - self.tmpfile = tempfile.NamedTemporaryFile(delete=True) - self.sink = self.tmpfile.name - self.tmpfile.close() self.overwrite = False + self.encode_to_file = True + self.test_duration = True + self.test_channels = True + + # Source + self.source_duration = 3. + def testWav(self): "Test wav encoding" from timeside.encoder.wav import WavEncoder self.encoder_function = WavEncoder + self.delta = 0 def testVorbis(self): "Test vorbis encoding" from timeside.encoder.ogg import VorbisEncoder self.encoder_function = VorbisEncoder + self.delta = 0.2 def testMp3(self): "Test mp3 encoding" from timeside.encoder.mp3 import Mp3Encoder self.encoder_function = Mp3Encoder + self.delta = 0.2 + def testAac(self): "Test aac encoding" from timeside.encoder.m4a import AacEncoder self.encoder_function = AacEncoder + self.test_channels = False + self.delta = 0.06 + def testFlac(self): "Test flac encoding" from timeside.encoder.flac import FlacEncoder self.encoder_function = FlacEncoder + self.delta = 0 def testWebm(self): "Test webm encoding" from timeside.encoder.webm import WebMEncoder self.encoder_function = WebMEncoder + self.test_duration = False # webmmux encoder with streamable=true + # does not return a valid duration def tearDown(self): + + # Source through ArrayDecoder + + decoder = ArrayDecoder(self.generate_source(), + samplerate=self.samplerate) + # Encoder + file_extension = '.' + self.encoder_function.file_extension() + if not hasattr(self, 'sink'): + self.sink = tmp_file_sink(prefix=self.__class__.__name__, + suffix=file_extension) self.encoder = self.encoder_function(self.sink, overwrite=self.overwrite) - self.encoder.setup(channels=self.channels, - samplerate=self.samplerate) - - written_frames, eod = 0, False - total_frames = 3. * self.samplerate - block_size = self.blocksize - f0 = 800. - omega = 2. * pi * f0 / float(self.samplerate) - - while True: - remaining = total_frames - written_frames - if remaining >= block_size: - write_length = block_size + + # Run Pipe + (decoder | self.encoder).run() + + if self.encode_to_file: + media_info = get_media_uri_info(get_uri(self.sink)) + media_duration = media_info['duration'] + media_channels = media_info['streams'][0]['channels'] + media_samplerate = media_info['streams'][0]['samplerate'] + + #os.unlink(self.sink) + + if self.test_duration: + self.assertAlmostEqual(self.source_duration, + media_duration, + delta=self.delta) + if self.test_channels: + self.assertEqual(self.channels, media_channels) else: - write_length = remaining - eod = True - # build a sinusoid - frames = .75 * sin(omega * (arange(write_length) + written_frames)) - # process encoder, writing to file - self.encoder.process(frames, eod) - written_frames += frames.shape[0] - if eod: - self.assertEquals(self.encoder.eod, True) - break - - self.encoder.release() + self.assertEqual(2, media_channels) # voaacenc bug ? + self.assertEqual(media_samplerate, self.samplerate) if 0: import commands print commands.getoutput('sndfile-info ' + self.sink) - self.assertEquals(written_frames, total_frames) + self.assertEqual(self.expected_total_frames, self.encoder.num_samples) + self.assertEqual(self.channels, self.encoder.channels()) + self.assertEqual(self.samplerate, self.encoder.samplerate()) + self.assertEqual(self.source_duration, + self.encoder.num_samples/self.encoder.samplerate()) + - if hasattr(self, 'tmpfile'): - unlink(self.sink) class TestEncodingLongBlock(TestEncoding): @@ -147,9 +183,10 @@ class TestEncodingToDevNull(TestEncoding): "Test encoding features with /dev/null" def setUp(self): - self.samplerate, self.channels, self.blocksize = 44100, 1, 1024 + super(TestEncodingToDevNull, self).setUp() self.sink = '/dev/null' - self.overwrite = False + self.encode_to_file = False + class TestEncodingToDirectory(TestEncoding): @@ -167,22 +204,18 @@ class TestEncodingToDirectory(TestEncoding): rmdir(self.sink) -class TestEncodingOverwriteFails(TestCase): +class TestEncodingOverwriteFails(unittest.TestCase): "Test encoding features" def setUp(self): self.samplerate, self.channels, self.blocksize = 44100, 1, 1024 - import tempfile - self.tmpfile = tempfile.NamedTemporaryFile(delete=True) - self.sink = self.tmpfile.name self.overwrite = False def tearDown(self): self.assertRaises(IOError, self.encoder_function, self.sink) - self.tmpfile.close() -class TestEncodingOverwriteForced(TestCase): +class TestEncodingOverwriteForced(unittest.TestCase): "Test encoding features" def setUp(self): @@ -195,7 +228,7 @@ class TestEncodingOverwriteForced(TestCase): def tearDown(self): super(TestEncodingOverwriteForced, self).tearDown() - self.tmpfile.close() + if __name__ == '__main__': unittest.main(testRunner=TestRunner()) diff --git a/tests/test_graphing.py b/tests/test_graphing.py index 9999b07..cfb4a17 100755 --- a/tests/test_graphing.py +++ b/tests/test_graphing.py @@ -7,7 +7,7 @@ import os.path __all__ = ['TestGraphing'] -class TestGraphing(TestCase): +class TestGraphing(unittest.TestCase): "Test all graphers with various input media formats" def setUp(self): diff --git a/tests/test_inputadapter.py b/tests/test_inputadapter.py index 1bbcc26..f2453b3 100755 --- a/tests/test_inputadapter.py +++ b/tests/test_inputadapter.py @@ -4,7 +4,7 @@ from timeside.core import FixedSizeInputAdapter from unit_timeside import * import numpy -class TestFixedSizeInputAdapter(TestCase): +class TestFixedSizeInputAdapter(unittest.TestCase): "Test the fixed-sized input adapter" def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None): diff --git a/tests/test_list_processors.py b/tests/test_list_processors.py index b18c5d0..6b33ab4 100755 --- a/tests/test_list_processors.py +++ b/tests/test_list_processors.py @@ -4,7 +4,7 @@ from unit_timeside import * import timeside verbose = 0 -class TestListCoreProcessors(TestCase): +class TestListCoreProcessors(unittest.TestCase): """ test get list of processors """ def testHasSomeDecoders(self): diff --git a/tests/test_transcoding.py b/tests/test_transcoding.py index 0fedff2..cc11432 100755 --- a/tests/test_transcoding.py +++ b/tests/test_transcoding.py @@ -1,137 +1,158 @@ +#! /usr/bin/env python + +from __future__ import division + from timeside.core import * from timeside.decoder import * from timeside.analyzer import * from timeside.encoder import * -from timeside.api import * from timeside.component import * from unit_timeside import * - +from tools import tmp_file_sink import os.path -class TestTranscodingFromWav(TestCase): - "Test transcoding from wav" - def tmpTarget(self): - import tempfile - self.tmpfile = tempfile.NamedTemporaryFile(delete=True) - self.target = self.tmpfile.name - self.tmpfile.close() +class TestTranscodingFromWav(unittest.TestCase): + "Test transcoding from wav" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav") + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep.wav") + self.test_duration = True + self.test_channels = True def testToWav(self): "Test conversion to wav" - self.tmpTarget() - self.encoder = WavEncoder(self.target) + self.encoder_function = WavEncoder def testToMp3(self): "Test conversion to mp3" - self.tmpTarget() - self.encoder = Mp3Encoder(self.target) + self.encoder_function = Mp3Encoder def testToOgg(self): "Test conversion to ogg" - self.tmpTarget() - self.encoder = VorbisEncoder(self.target) + self.encoder_function = VorbisEncoder - # def testToWebM(self): - # "Test conversion to webm" - # self.tmpTarget() - # self.encoder = WebMEncoder(self.target) + def testToWebM(self): + "Test conversion to webm" + self.encoder_function = WebMEncoder + self.test_duration = False # webmmux encoder with streamable=true + # does not return a valid duration - # def testToM4a(self): - # "Test conversion to m4a" - # self.tmpTarget() - # self.encoder = AacEncoder(self.target) + def testToM4a(self): + "Test conversion to m4a" + self.encoder_function = AacEncoder - def setUpDecoder(self): - self.decoder = FileDecoder(self.source) - self.decoder.setup() - self.channels = self.decoder.channels() - self.samplerate = self.decoder.samplerate() + def tearDown(self): + decoder = FileDecoder(self.source) - def setUpEncoder(self): - self.encoder.setup(channels = self.channels, samplerate = self.samplerate) - def tearDown(self): - self.setUpDecoder() - self.setUpEncoder() - totalframes = 0 - while True: - frames, eod = self.decoder.process() - self.encoder.process(frames, eod) - totalframes += frames.shape[0] - if eod or self.encoder.eod: break + file_extension = '.' + self.encoder_function.file_extension() + self.target = tmp_file_sink(prefix=self.__class__.__name__, + suffix=file_extension) + encoder = self.encoder_function(self.target) + (decoder | encoder).run() - #print self.channels, self.samplerate, totalframes + decoder_encoded = FileDecoder(self.target) - self.encoder.release() + from timeside.analyzer import Waveform + a = Waveform() # Arbitrary analyzer for running the next pipe + (decoder_encoded | a).run() - decoder = FileDecoder(self.target) - decoder.setup() - written_frames = 0 - while True: - frames, eod = decoder.process() - written_frames += frames.shape[0] - if eod: break + import os + os.unlink(self.target) #print decoder.channels(), decoder.samplerate(), written_frames + #print media_channels - self.assertEquals(self.channels, decoder.channels()) - self.assertEquals(self.samplerate, decoder.samplerate()) - self.assertTrue(written_frames - totalframes >= 0) + if self.test_channels: + self.assertEqual(decoder.channels(), decoder_encoded.channels()) + else: + self.assertEqual(2, decoder_encoded.channels()) # voaacenc bug ? - import os - os.unlink(self.target) + self.assertEqual(decoder.samplerate(), + decoder_encoded.samplerate()) + + if self.test_duration: + self.assertAlmostEqual(decoder.input_duration, + decoder_encoded.input_duration, + delta=0.2) + +class TestTranscodingFromMonoWav(TestTranscodingFromWav): + "Test transcoding from a mono wav" + + def setUp(self): + super(TestTranscodingFromMonoWav, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep_mono.wav") + + def testToM4a(self): + "Test conversion to m4a" + super(TestTranscodingFromMonoWav, self).testToM4a() + self.test_channels = False # voaacenc bug ? : always encode stereo -class TestTranscodingFromAnotherWav(TestTranscodingFromWav): + +class TestTranscodingFromAnotherWav(TestTranscodingFromMonoWav): "Test transcoding from another wav" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/guitar.wav") + super(TestTranscodingFromAnotherWav, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/guitar.wav") # Mono + class TestTranscodingFromMp3(TestTranscodingFromWav): "Test transcoding from mp3" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3") + super(TestTranscodingFromMp3, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep.mp3") + class TestTranscodingFromFlac(TestTranscodingFromWav): "Test transcoding from flac" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac") + super(TestTranscodingFromFlac, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep.flac") + class TestTranscodingFromOgg(TestTranscodingFromWav): "Test transcoding from ogg" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg") + super(TestTranscodingFromOgg, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep.ogg") + + -class TestTranscodingFromMonoWav(TestTranscodingFromWav): - "Test transcoding from a mono wav" - def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep_mono.wav") class TestTranscodingFrom32kHzWav(TestTranscodingFromWav): "Test transcoding from a 32kHz wav" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/sweep_32000.wav") + super(TestTranscodingFrom32kHzWav, self).setUp() + self.source = os.path.join(os.path.dirname(__file__), + "samples/sweep_32000.wav") + class TestTranscodingFromMissingFile(TestTranscodingFromWav): "Test transcoding from a missing file" def setUp(self): - self.source = os.path.join (os.path.dirname(__file__), "samples/unexisting.wav") + self.source = os.path.join(os.path.dirname(__file__), + "samples/unexisting.wav") def tearDown(self): - self.assertRaises(IOError, self.setUpDecoder) + decoder = FileDecoder + self.assertRaises(IOError, decoder, self.source) if __name__ == '__main__': unittest.main(testRunner=TestRunner()) diff --git a/tests/test_yaafe.py b/tests/test_yaafe.py index 086bdee..d0c7142 100755 --- a/tests/test_yaafe.py +++ b/tests/test_yaafe.py @@ -5,7 +5,7 @@ from timeside.decoder import * from timeside.analyzer import Yaafe from yaafelib import DataFlow,FeaturePlan -class TestYaafe(TestCase): +class TestYaafe(unittest.TestCase): def setUp(self): self.sample_rate = 16000 diff --git a/tests/tools.py b/tests/tools.py index f17477d..60cfb2d 100644 --- a/tests/tools.py +++ b/tests/tools.py @@ -19,3 +19,12 @@ def check_samples(): u = urllib.urlopen(url+sample) f.write(u.read()) f.close() + + +def tmp_file_sink(prefix=None, suffix = None): + import tempfile + tmpfile = tempfile.NamedTemporaryFile(delete=True, + prefix=prefix, + suffix=suffix) + tmpfile.close() + return tmpfile.name \ No newline at end of file diff --git a/tests/unit_timeside.py b/tests/unit_timeside.py index 46bc353..2ec084c 100644 --- a/tests/unit_timeside.py +++ b/tests/unit_timeside.py @@ -2,24 +2,10 @@ import unittest import doctest -import os, sys +import sys import time -from tools import * +from tools import check_samples -class TestCase(unittest.TestCase): - - def assertSameList(self, list1, list2): - "Test that two lists contain the same elements, in any order" - if len(list1) != len(list2): - self.fail("Lists length differ : %d != %d" % (len(list1), len(list2))) - - for item in list1: - if not item in list2: - self.fail("%s is not in list2" % str(item)) - - for item in list2: - if not item in list1: - self.fail("%s is not in list1" % str(item)) class _TextTestResult(unittest.TestResult): """A test result class that can print formatted text results to a stream. @@ -90,22 +76,26 @@ class _TextTestResult(unittest.TestResult): def printErrorList(self, flavour, errors): for test, err in errors: self.stream.writeln(self.separator1) - self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) + self.stream.writeln("%s: [%s] --> %s " + % (flavour, + test.__class__.__name__, + self.getDescription(test))) self.stream.writeln(self.separator2) self.stream.writeln("%s" % err) class _WritelnDecorator: """Used to decorate file-like objects with a handy 'writeln' method""" - def __init__(self,stream): + def __init__(self, stream): self.stream = stream def __getattr__(self, attr): - return getattr(self.stream,attr) + return getattr(self.stream, attr) def writeln(self, arg=None): - if arg: self.write(arg) - self.write('\n') # text-mode streams translate to \r\n if needed + if arg: + self.write(arg) + self.write('\n') # text-mode streams translate to \r\n if needed class TestRunner: @@ -143,7 +133,8 @@ class TestRunner: if failed: self.stream.write("failures=%d" % failed) if errored: - if failed: self.stream.write(", ") + if failed: + self.stream.write(", ") self.stream.write("errors=%d" % errored) self.stream.writeln(")") else: -- 2.39.5