]> git.parisson.com Git - timeside.git/commitdiff
Tests: Various enhancements and fixes in transcoding, encoding
authorThomas Fillon <thomas@parisson.com>
Wed, 27 Nov 2013 15:59:18 +0000 (16:59 +0100)
committerThomas Fillon <thomas@parisson.com>
Wed, 27 Nov 2013 16:14:53 +0000 (17:14 +0100)
- unit-timeside: Small enhancements in Error messages display
- unit-timeside: Remove 'assertSameList' function which seems equivalent to unittest assertItemsEqual. unittest.TestCase class does thus no more need to be override, propagate this change in every test_* modules.
- test_encoding and test_transcoding: refactoring + handling of expected failures (duration for loseless encoders, channels for voaacenc, ...) + run everything in pipes to handle properly the setup.

21 files changed:
tests/test_AnalyzerResult.py
tests/test_analyzer_dc.py
tests/test_analyzer_level.py
tests/test_analyzer_preprocessors.py
tests/test_array_decoding.py
tests/test_aubio_melenergy.py
tests/test_aubio_mfcc.py
tests/test_aubio_pitch.py
tests/test_aubio_specdesc.py
tests/test_aubio_temporal.py
tests/test_component.py
tests/test_decoder_utils.py
tests/test_decoding.py
tests/test_encoding.py
tests/test_graphing.py
tests/test_inputadapter.py
tests/test_list_processors.py
tests/test_transcoding.py
tests/test_yaafe.py
tests/tools.py
tests/unit_timeside.py

index 200a7d2db63e4e27106662c7bf7a4528a2d34bcb..912cd5b536b177fb344c3d4a6c13a042bd5a2d06 100755 (executable)
@@ -10,7 +10,7 @@ from math import pi
 verbose = 0
 
 
-class TestAnalyzerResult(TestCase):
+class TestAnalyzerResult(unittest.TestCase):
     """ test AnalyzerResult """
 
     def setUp(self):
index 7fe21b77001b10002fbd5576194d0aac3655474d..68890cb2c5483579b3eb405270728200942ceb2a 100755 (executable)
@@ -3,8 +3,9 @@
 from unit_timeside import *
 from timeside.decoder import FileDecoder
 from timeside.analyzer.dc import MeanDCShift
+import os
 
-class TestAnalyzerDC(TestCase):
+class TestAnalyzerDC(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = MeanDCShift()
index 47fc0814b41e59ed46ef0ea16f7230b495e10959..02fb856abf45a84061c9a5b94c99eeda801521d3 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.level import Level
 
-class TestAnalyzerLevel(TestCase):
+class TestAnalyzerLevel(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = Level()
index 3a3762571e3f3034fbaf3cab6e756344b077cb65..60c2b3449cc86415ba10368ee696afe553136d98 100644 (file)
@@ -21,7 +21,7 @@ class FakeAnalyzer(object):
         return frames, eod
 
 
-class TestAnalyzerPreProcessors(TestCase):
+class TestAnalyzerPreProcessors(unittest.TestCase):
 
     def tearDown(self):
 
@@ -86,7 +86,7 @@ class TestDownmixToMono(TestAnalyzerPreProcessors):
         self.input_eod[-1] = True
         self.process_frames = self.input_frames.mean(axis=-1)
 
-class TestFramesAdapter(TestAnalyzerPreProcessors, TestCase):
+class TestFramesAdapter(TestAnalyzerPreProcessors, unittest.TestCase):
 
     def setUp(self):
         self.decorator = frames_adapter
index b29a14b77c8eacfc28f030bc53bdb31688073dce..1c740b9363d7607ba72ee0cb88e7fb6573ea0a5b 100644 (file)
@@ -8,7 +8,7 @@ from unit_timeside import *
 import numpy as np
 
 
-class TestDecoding(TestCase):
+class TestDecoding(unittest.TestCase):
 
     "Test decoding for ArrayDecoder"
 
index 8ee4a86d4ad5c21085b4d3200bb6c6931223e99a..274e032e238d7a191b25802b86007ce3e0a3aa8c 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.aubio_melenergy import AubioMelEnergy
 
-class TestAubioMelEnergy(TestCase):
+class TestAubioMelEnergy(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = AubioMelEnergy()
index bc1f26cf4e01d0553ad5eeb24ede3865837b0dc7..8bcffef92207ea18513c81d380a518c31271c572 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.aubio_mfcc import AubioMfcc
 
-class TestAubioMfcc(TestCase):
+class TestAubioMfcc(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = AubioMfcc()
index 660d7d0b8728042381aa350010bf779eb2f60a46..557e7978c10241d25aa49806909abefd5ece298c 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.aubio_pitch import AubioPitch
 
-class TestAubioPitch(TestCase):
+class TestAubioPitch(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = AubioPitch()
index 5fb0fcf5555d6aa9232b5351e840b0dbc0b9fbf8..f4341ec6bc393f51b620cec92e351af4538407e6 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.aubio_specdesc import AubioSpecdesc
 
-class TestAubioSpecdesc(TestCase):
+class TestAubioSpecdesc(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = AubioSpecdesc()
index 6189efbde1b863e6e66c26340421a0e450ff2ecd..9ffba1bc844cbf2b38be6eaf2adce350eee8dbc5 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 from timeside.decoder import *
 from timeside.analyzer.aubio_temporal import AubioTemporal
 
-class TestAubioTemporal(TestCase):
+class TestAubioTemporal(unittest.TestCase):
 
     def setUp(self):
         self.analyzer = AubioTemporal()
index 411a80819e64de5bb98d632c7bca3d8c64ea8c1b..40330bcd641bbfb8699d54e14044c0646fd2e283 100755 (executable)
@@ -5,44 +5,46 @@ from unit_timeside import *
 
 __all__ = ['TestComponentArchitecture']
 
-class TestComponentArchitecture(TestCase):
+
+class TestComponentArchitecture(unittest.TestCase):
+
     "Test the component and interface system"
-   
+
     def testOneInterface(self):
         "Test a component implementing one interface"
-        self.assertSameList(implementations(I1), [C1])
+        self.assertItemsEqual(implementations(I1), [C1])
 
-    def testTwoInterfaces(self):        
+    def testTwoInterfaces(self):
         "Test a component implementing two interfaces"
-        self.assertSameList(implementations(I2), [C2])
-        self.assertSameList(implementations(I3), [C2])
+        self.assertItemsEqual(implementations(I2), [C2])
+        self.assertItemsEqual(implementations(I3), [C2])
 
     def testTwoImplementations(self):
         "Test an interface implemented by two components"
-        self.assertSameList(implementations(I4), [C3, C4])
+        self.assertItemsEqual(implementations(I4), [C3, C4])
 
     def testInterfaceInheritance(self):
         "Test whether a component implements an interface's parent"
-        self.assertSameList(implementations(I5), [C5])
+        self.assertItemsEqual(implementations(I5), [C5])
 
     def testImplementationInheritance(self):
-        "Test that a component doesn't implement the interface implemented by its parent" 
-        self.assertSameList(implementations(I7), [C6])
+        "Test that a component doesn't implement the interface implemented by its parent"
+        self.assertItemsEqual(implementations(I7), [C6])
 
     def testImplementationRedundancy(self):
-        "Test implementation redundancy across inheritance" 
-        self.assertSameList(implementations(I8), [C8, C9])
+        "Test implementation redundancy across inheritance"
+        self.assertItemsEqual(implementations(I8), [C8, C9])
 
-    def testAbstractImplementation(self):    
+    def testAbstractImplementation(self):
         "Test abstract implementation"
-        self.assertSameList(implementations(I11), [])
-        self.assertSameList(implementations(I11, abstract=True), [C11])
+        self.assertItemsEqual(implementations(I11), [])
+        self.assertItemsEqual(implementations(I11, abstract=True), [C11])
 
-    def testInterfaceDoc(self):        
+    def testInterfaceDoc(self):
         "Test @interfacedoc decorator"
         self.assertEquals(C10.test.__doc__, "testdoc")
 
-    def testInterfaceDocStatic(self):        
+    def testInterfaceDocStatic(self):
         "Test @interfacedoc decorator on static method"
         self.assertEquals(C10.teststatic.__doc__, "teststaticdoc")
 
@@ -55,7 +57,7 @@ class TestComponentArchitecture(TestCase):
                 implements(I10)
 
                 @interfacedoc
-                @staticmethod        
+                @staticmethod
                 def teststatic(self):
                     pass
 
@@ -63,7 +65,7 @@ class TestComponentArchitecture(TestCase):
 
         except ComponentError:
             pass
-   
+
     def testInterfaceDocBadMethod(self):
         "Test @interfacedoc with unexistant method in interface"
 
@@ -80,71 +82,93 @@ class TestComponentArchitecture(TestCase):
         except ComponentError:
             pass
 
+
 class I1(Interface):
     pass
 
+
 class I2(Interface):
     pass
 
+
 class I3(Interface):
     pass
 
+
 class I4(Interface):
     pass
 
+
 class I5(Interface):
     pass
 
+
 class I6(I5):
     pass
 
+
 class I7(Interface):
     pass
 
+
 class I8(Interface):
     pass
 
+
 class I9(I8):
     pass
 
+
 class I10(Interface):
+
     def test(self):
         """testdoc"""
 
-    @staticmethod        
+    @staticmethod
     def teststatic(self):
         """teststaticdoc"""
 
+
 class I11(Interface):
     pass
 
+
 class C1(Component):
     implements(I1)
 
+
 class C2(Component):
     implements(I2, I3)
 
+
 class C3(Component):
     implements(I4)
 
+
 class C4(Component):
     implements(I4)
 
+
 class C5(Component):
     implements(I6)
 
+
 class C6(Component):
     implements(I7)
 
+
 class C7(C6):
     pass
 
+
 class C8(Component):
     implements(I8)
 
+
 class C9(Component):
     implements(I8, I9)
 
+
 class C10(Component):
     implements(I10)
 
@@ -152,15 +176,15 @@ class C10(Component):
     def test(self):
         pass
 
-    @staticmethod        
+    @staticmethod
     @interfacedoc
     def teststatic(self):
         pass
 
+
 class C11(Component):
     abstract()
     implements(I11)
 
 if __name__ == '__main__':
     unittest.main(testRunner=TestRunner())
-
index 44a03d752ac174c217a16425cef42515b25a857a..e0b6f1efeb7eeec71a27a1ee6068f45413a40a5e 100644 (file)
@@ -10,7 +10,7 @@ from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri
 import os.path
 
 
-class TestGetUri(TestCase):
+class TestGetUri(unittest.TestCase):
     "Test get_uri function"
     def testFileName(self):
         "Retrieve the uri from a filename"
@@ -28,7 +28,7 @@ class TestGetUri(TestCase):
         self.assertEqual(self.uri, get_uri(self.source))
 
 
-class TestGetUriWrongUri(TestCase):
+class TestGetUriWrongUri(unittest.TestCase):
     def testMissingFile(self):
         "Missing file raise IOerror"
         self.source = os.path.join(os.path.dirname(__file__),
@@ -45,12 +45,13 @@ class TestGetUriWrongUri(TestCase):
         self.assertRaises(IOError, get_uri, self.source)
 
 
-class TestGetMediaInfo(TestCase):
+class TestGetMediaInfo(unittest.TestCase):
     "Test get_media_uri_info function on an uri"
 
     def setUp(self):
         self.test_exact_duration = True
         self.source_duration = 8
+        self.test_exact_duration = True
         self.expected_channels = 2
         self.expected_samplerate = 44100
         self.expected_depth = 16
@@ -90,11 +91,18 @@ class TestGetMediaInfo(TestCase):
         self.source = os.path.join(os.path.dirname(__file__),
                                    "samples/sweep.mp3")
         self.expected_depth = 32
+        self.test_exact_duration = False
+
 
     def tearDown(self):
         uri = get_uri(self.source)
         uri_info = get_media_uri_info(uri)
-        self.assertEqual(self.source_duration, uri_info['duration'])
+        if self.test_exact_duration:
+            self.assertEqual(self.source_duration, uri_info['duration'])
+        else:
+            self.assertAlmostEqual(self.source_duration,
+                                   uri_info['duration'],
+                                   places=1)
         self.assertEqual(self.expected_channels, uri_info['streams'][0]['channels'])
         self.assertEqual(self.expected_samplerate, uri_info['streams'][0]['samplerate'])
         self.assertEqual(self.expected_depth, uri_info['streams'][0]['depth'])
index a8b8fc76d2c19f39e36e6cf81e9fa0778cc540b1..72986f79c6304c3861579fb6a61b90532c2898f4 100755 (executable)
@@ -10,7 +10,9 @@ import os.path
 #from glib import GError as GST_IOError
 # HINT : to use later with Gnonlin only
 
-class TestDecoding(TestCase):
+
+class TestDecoding(unittest.TestCase):
+
     "Test decoding features"
 
     def setUp(self):
@@ -22,12 +24,12 @@ class TestDecoding(TestCase):
         self.expected_channels = 2
         self.expected_totalframes = 352800
         self.test_exact_duration = True
+        self.source_duration = 8
 
     def testWav(self):
         "Test wav decoding"
         self.source = os.path.join(os.path.dirname(__file__),
-                                    "samples/sweep.wav")
-
+                                   "samples/sweep.wav")
 
     def testWavMono(self):
         "Test mono wav decoding"
@@ -42,7 +44,7 @@ class TestDecoding(TestCase):
                                    "samples/sweep_32000.wav")
 
         expected_samplerate = 32000
-        ratio = expected_samplerate/self.expected_samplerate
+        ratio = expected_samplerate / self.expected_samplerate
 
         self.expected_totalframes = int(self.expected_totalframes * ratio)
         self.expected_samplerate = expected_samplerate
@@ -60,7 +62,6 @@ class TestDecoding(TestCase):
         self.expected_totalframes = 352832
         self.test_exact_duration = False
 
-
     def testMp3(self):
         "Test mp3 decoding"
         self.source = os.path.join(os.path.dirname(__file__),
@@ -69,7 +70,6 @@ class TestDecoding(TestCase):
         self.expected_totalframes = 353664
         self.test_exact_duration = False
 
-
     def tearDown(self):
         decoder = FileDecoder(uri=self.source,
                               start=self.start,
@@ -104,23 +104,36 @@ class TestDecoding(TestCase):
             self.assertEqual(decoder.input_channels, decoder.output_channels)
             # and if we know the expected channels, check the output match
             if self.expected_channels:
-                self.assertEqual(self.expected_channels, decoder.output_channels)
+                self.assertEqual(
+                    self.expected_channels, decoder.output_channels)
         # do the same with the sampling rate
         if self.samplerate:
             self.assertEqual(self.samplerate, decoder.output_samplerate)
         else:
-            self.assertEqual(decoder.input_samplerate, decoder.output_samplerate)
+            self.assertEqual(
+                decoder.input_samplerate, decoder.output_samplerate)
             if self.expected_samplerate:
-                self.assertEqual(self.expected_samplerate, decoder.output_samplerate)
+                self.assertEqual(
+                    self.expected_samplerate, decoder.output_samplerate)
 
         self.assertEqual(totalframes, self.expected_totalframes)
+        input_duration = decoder.input_totalframes / decoder.input_samplerate
+        output_duration = decoder.totalframes() / decoder.output_samplerate
         if self.test_exact_duration:
-            self.assertEqual(totalframes/decoder.output_samplerate,
-                             decoder.totalframes()/decoder.output_samplerate)
+            self.assertEqual(input_duration, output_duration)
+            self.assertEqual(input_duration,
+                             decoder.uri_duration)
+            self.assertEqual(self.source_duration,
+                             decoder.uri_duration)
         else:
-            self.assertAlmostEqual(totalframes/decoder.output_samplerate,
-                               decoder.totalframes()/decoder.output_samplerate,
-                               places=1)
+            self.assertAlmostEqual(input_duration, output_duration,
+                                   places=1)
+            self.assertAlmostEqual(input_duration,
+                                   decoder.uri_duration,
+                                   places=1)
+            self.assertAlmostEqual(self.source_duration,
+                                   decoder.uri_duration,
+                                   places=1)
 
 
 class TestDecodingSegment(TestDecoding):
@@ -129,14 +142,15 @@ class TestDecodingSegment(TestDecoding):
         super(TestDecodingSegment, self).setUp()
         self.start = 1
         self.duration = 3
+        self.source_duration = self.duration
 
         self.expected_totalframes = self.duration * self.expected_samplerate
 
-
     def testMp3(self):
         "Test mp3 decoding"
         super(TestDecodingSegment, self).testMp3()
-        self.expected_totalframes = self.duration * self.expected_samplerate + 1
+        self.expected_totalframes = self.duration * \
+            self.expected_samplerate + 1
 
     def testWav(self):
         "Test wav decoding"
@@ -164,12 +178,14 @@ class TestDecodingSegmentDefaultStart(TestDecodingSegment):
     def setUp(self):
         super(TestDecodingSegmentDefaultStart, self).setUp()
         self.duration = 1
+        self.source_duration = self.duration
         self.expected_totalframes = self.duration * self.expected_samplerate
 
     def testMp3(self):
         "Test mp3 decoding"
         super(TestDecodingSegmentDefaultStart, self).testMp3()
-        self.expected_totalframes = self.duration * self.expected_samplerate + 1
+        self.expected_totalframes = self.duration * \
+            self.expected_samplerate + 1
 
 
 class TestDecodingSegmentDefaultDuration(TestDecodingSegment):
@@ -177,7 +193,10 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment):
     def setUp(self):
         super(TestDecodingSegment, self).setUp()
         self.start = 1
-        self.expected_totalframes = self.expected_totalframes - self.start * self.expected_samplerate
+        self.source_duration -= self.start
+
+        self.expected_totalframes = (self.expected_totalframes
+                                     - self.start * self.expected_samplerate)
 
     def testWav(self):
         "Test wav decoding"
@@ -194,7 +213,7 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment):
     def testMp3(self):
         "Test mp3 decoding"
         super(TestDecodingSegment, self).testMp3()
-        self.expected_totalframes = 310715#308701
+        self.expected_totalframes = 310715  # was  308701 ?
 
 
 class TestDecodingStereo(TestDecoding):
@@ -262,6 +281,7 @@ class TestDecodingMonoDownsampling(TestDecoding):
         super(TestDecodingMonoDownsampling, self).testMp3()
         self.expected_totalframes = 128314
 
+
 class TestDecodingStereoDownsampling(TestDecoding):
 
     def setUp(self):
@@ -321,16 +341,18 @@ class TestDecodingLongBlock(TestDecoding):
 
     def setUp(self):
         super(TestDecodingLongBlock, self).setUp()
-        self.samplerate, self.channels, self.blocksize = None, None, 1024*8*2
+        self.samplerate, self.channels, self.blocksize = None, None, 1024 * \
+            8 * 2
+
 
+class TestDecodingWrongFiles(unittest.TestCase):
 
-class TestDecodingWrongFiles(TestCase):
     "Test decoding features"
 
     def testMissingFile(self):
         "Test decoding missing file"
         self.source = os.path.join(os.path.dirname(__file__),
-                                    "a_missing_file_blahblah.wav")
+                                   "a_missing_file_blahblah.wav")
 
         self.assertRaises(IOError, FileDecoder, self.source)
 
index 235e6a152c6791833c38572c1a01751ad067d099..53dd48a18798676e7a86cd1ee6c4b2e0ec19887c 100755 (executable)
 #! /usr/bin/env python
 
+from __future__ import division
+
 from math import pi
-from numpy import arange, sin
+import numpy as np
 from unit_timeside import *
+from timeside.decoder.utils import get_uri, get_media_uri_info
+from timeside.decoder import ArrayDecoder
+import os
+from tools import tmp_file_sink
+
+class TestEncoding(unittest.TestCase):
+    "Test encoding features"
 
-from os import unlink
+    def generate_source(self):
+        self.expected_total_frames = self.source_duration * self.samplerate
 
+        f0 = 440.
+        f = f0 * np.logspace(0, 4/12*(self.channels-1), self.channels, base=2)
+        omega = 2. * pi * f / self.samplerate
+        samples = np.empty((self.expected_total_frames, self.channels))
+        for n in xrange(self.channels):
+            samples[:,n] = .75 * np.sin(omega[n]*np.arange(self.expected_total_frames))
+        return samples
 
-class TestEncoding(TestCase):
-    "Test encoding features"
 
     def setUp(self):
         self.samplerate, self.channels, self.blocksize = 44100, 1, 1024
-        import tempfile
-        self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
-        self.sink = self.tmpfile.name
-        self.tmpfile.close()
         self.overwrite = False
+        self.encode_to_file = True
+        self.test_duration = True
+        self.test_channels = True
+
+        # Source
+        self.source_duration = 3.
+
 
     def testWav(self):
         "Test wav encoding"
         from timeside.encoder.wav import WavEncoder
         self.encoder_function = WavEncoder
+        self.delta = 0
 
     def testVorbis(self):
         "Test vorbis encoding"
         from timeside.encoder.ogg import VorbisEncoder
         self.encoder_function = VorbisEncoder
+        self.delta = 0.2
 
     def testMp3(self):
         "Test mp3 encoding"
         from timeside.encoder.mp3 import Mp3Encoder
         self.encoder_function = Mp3Encoder
+        self.delta = 0.2
+
 
     def testAac(self):
         "Test aac encoding"
         from timeside.encoder.m4a import AacEncoder
         self.encoder_function = AacEncoder
+        self.test_channels = False
+        self.delta = 0.06
+
 
     def testFlac(self):
         "Test flac encoding"
         from timeside.encoder.flac import FlacEncoder
         self.encoder_function = FlacEncoder
+        self.delta = 0
 
     def testWebm(self):
         "Test webm encoding"
         from timeside.encoder.webm import WebMEncoder
         self.encoder_function = WebMEncoder
+        self.test_duration = False  # webmmux encoder with streamable=true
+                                    # does not return a valid duration
 
     def tearDown(self):
+
+        # Source through ArrayDecoder
+
+        decoder = ArrayDecoder(self.generate_source(),
+                               samplerate=self.samplerate)
+        # Encoder
+        file_extension = '.' + self.encoder_function.file_extension()
+        if not hasattr(self, 'sink'):
+            self.sink = tmp_file_sink(prefix=self.__class__.__name__,
+                                  suffix=file_extension)
         self.encoder = self.encoder_function(self.sink,
                                              overwrite=self.overwrite)
-        self.encoder.setup(channels=self.channels,
-                           samplerate=self.samplerate)
-
-        written_frames, eod = 0, False
-        total_frames = 3. * self.samplerate
-        block_size = self.blocksize
-        f0 = 800.
-        omega = 2. * pi * f0 / float(self.samplerate)
-
-        while True:
-            remaining = total_frames - written_frames
-            if remaining >= block_size:
-                write_length = block_size
+
+        # Run Pipe
+        (decoder | self.encoder).run()
+
+        if self.encode_to_file:
+            media_info = get_media_uri_info(get_uri(self.sink))
+            media_duration = media_info['duration']
+            media_channels = media_info['streams'][0]['channels']
+            media_samplerate = media_info['streams'][0]['samplerate']
+
+            #os.unlink(self.sink)
+
+            if self.test_duration:
+                self.assertAlmostEqual(self.source_duration,
+                                       media_duration,
+                                       delta=self.delta)
+            if self.test_channels:
+                self.assertEqual(self.channels, media_channels)
             else:
-                write_length = remaining
-                eod = True
-            # build a sinusoid
-            frames = .75 * sin(omega * (arange(write_length) + written_frames))
-            # process encoder, writing to file
-            self.encoder.process(frames, eod)
-            written_frames += frames.shape[0]
-            if eod:
-                self.assertEquals(self.encoder.eod, True)
-                break
-
-        self.encoder.release()
+                self.assertEqual(2, media_channels)   # voaacenc bug ?
+            self.assertEqual(media_samplerate, self.samplerate)
 
         if 0:
             import commands
             print commands.getoutput('sndfile-info ' + self.sink)
 
-        self.assertEquals(written_frames, total_frames)
+        self.assertEqual(self.expected_total_frames, self.encoder.num_samples)
+        self.assertEqual(self.channels, self.encoder.channels())
+        self.assertEqual(self.samplerate, self.encoder.samplerate())
+        self.assertEqual(self.source_duration,
+                         self.encoder.num_samples/self.encoder.samplerate())
+
 
-        if hasattr(self, 'tmpfile'):
-            unlink(self.sink)
 
 
 class TestEncodingLongBlock(TestEncoding):
@@ -147,9 +183,10 @@ class TestEncodingToDevNull(TestEncoding):
     "Test encoding features with /dev/null"
 
     def setUp(self):
-        self.samplerate, self.channels, self.blocksize = 44100, 1, 1024
+        super(TestEncodingToDevNull, self).setUp()
         self.sink = '/dev/null'
-        self.overwrite = False
+        self.encode_to_file = False
+
 
 
 class TestEncodingToDirectory(TestEncoding):
@@ -167,22 +204,18 @@ class TestEncodingToDirectory(TestEncoding):
         rmdir(self.sink)
 
 
-class TestEncodingOverwriteFails(TestCase):
+class TestEncodingOverwriteFails(unittest.TestCase):
     "Test encoding features"
 
     def setUp(self):
         self.samplerate, self.channels, self.blocksize = 44100, 1, 1024
-        import tempfile
-        self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
-        self.sink = self.tmpfile.name
         self.overwrite = False
 
     def tearDown(self):
         self.assertRaises(IOError, self.encoder_function, self.sink)
-        self.tmpfile.close()
 
 
-class TestEncodingOverwriteForced(TestCase):
+class TestEncodingOverwriteForced(unittest.TestCase):
     "Test encoding features"
 
     def setUp(self):
@@ -195,7 +228,7 @@ class TestEncodingOverwriteForced(TestCase):
 
     def tearDown(self):
         super(TestEncodingOverwriteForced, self).tearDown()
-        self.tmpfile.close()
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=TestRunner())
index 9999b07e5e6f1c4b214ce158b3aae437761406c9..cfb4a1779db201d4435e21a96354ae9fad836048 100755 (executable)
@@ -7,7 +7,7 @@ import os.path
 
 __all__ = ['TestGraphing']
 
-class TestGraphing(TestCase):
+class TestGraphing(unittest.TestCase):
     "Test all graphers with various input media formats"
 
     def setUp(self):
index 1bbcc26d7b8d6adf15ed480c1ff096cef04e5e3c..f2453b39589bc441540b69ed0ed52427d9f0d9e9 100755 (executable)
@@ -4,7 +4,7 @@ from timeside.core import FixedSizeInputAdapter
 from unit_timeside import *
 import numpy
 
-class TestFixedSizeInputAdapter(TestCase):
+class TestFixedSizeInputAdapter(unittest.TestCase):
     "Test the fixed-sized input adapter"
 
     def assertIOEquals(self, adapter, input, input_eod, output, output_eod=None):
index b18c5d0af6e3668c4c76ee2f9d834ddd8132cab5..6b33ab4595593162c78eb4bfe48b0879cb2add50 100755 (executable)
@@ -4,7 +4,7 @@ from unit_timeside import *
 import timeside
 verbose = 0
 
-class TestListCoreProcessors(TestCase):
+class TestListCoreProcessors(unittest.TestCase):
     """ test get list of processors """
 
     def testHasSomeDecoders(self):
index 0fedff2f8dcdd2b10f78168f375e0866c3d094bd..cc11432572c8256df9d7553057114a837011f0ff 100755 (executable)
+#! /usr/bin/env python
+
+from __future__ import division
+
 from timeside.core import *
 from timeside.decoder import *
 from timeside.analyzer import *
 from timeside.encoder import *
-from timeside.api import *
 from timeside.component import *
 
 from unit_timeside import *
-
+from tools import tmp_file_sink
 import os.path
 
-class TestTranscodingFromWav(TestCase):
-    "Test transcoding from wav"
 
-    def tmpTarget(self):
-        import tempfile
-        self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
-        self.target = self.tmpfile.name
-        self.tmpfile.close()
+class TestTranscodingFromWav(unittest.TestCase):
+    "Test transcoding from wav"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.wav")
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep.wav")
+        self.test_duration = True
+        self.test_channels = True
 
     def testToWav(self):
         "Test conversion to wav"
-        self.tmpTarget()
-        self.encoder = WavEncoder(self.target)
+        self.encoder_function = WavEncoder
 
     def testToMp3(self):
         "Test conversion to mp3"
-        self.tmpTarget()
-        self.encoder = Mp3Encoder(self.target)
+        self.encoder_function = Mp3Encoder
 
     def testToOgg(self):
         "Test conversion to ogg"
-        self.tmpTarget()
-        self.encoder = VorbisEncoder(self.target)
+        self.encoder_function = VorbisEncoder
 
-    # def testToWebM(self):
-    #     "Test conversion to webm"
-    #     self.tmpTarget()
-    #     self.encoder = WebMEncoder(self.target)
+    def testToWebM(self):
+        "Test conversion to webm"
+        self.encoder_function = WebMEncoder
+        self.test_duration = False  # webmmux encoder with streamable=true
+                                    # does not return a valid duration
 
-    # def testToM4a(self):
-    #     "Test conversion to m4a"
-    #     self.tmpTarget()
-    #     self.encoder = AacEncoder(self.target)
+    def testToM4a(self):
+        "Test conversion to m4a"
+        self.encoder_function = AacEncoder
 
-    def setUpDecoder(self):
-        self.decoder = FileDecoder(self.source)
-        self.decoder.setup()
-        self.channels  = self.decoder.channels()
-        self.samplerate = self.decoder.samplerate()
+    def tearDown(self):
+        decoder = FileDecoder(self.source)
 
-    def setUpEncoder(self):
-        self.encoder.setup(channels = self.channels, samplerate = self.samplerate)
 
-    def tearDown(self):
-        self.setUpDecoder()
-        self.setUpEncoder()
 
-        totalframes = 0
-        while True:
-            frames, eod = self.decoder.process()
-            self.encoder.process(frames, eod)
-            totalframes += frames.shape[0]
-            if eod or self.encoder.eod: break
+        file_extension = '.' + self.encoder_function.file_extension()
 
+        self.target = tmp_file_sink(prefix=self.__class__.__name__,
+                                  suffix=file_extension)
+        encoder = self.encoder_function(self.target)
+        (decoder | encoder).run()
 
-        #print self.channels, self.samplerate, totalframes
+        decoder_encoded = FileDecoder(self.target)
 
-        self.encoder.release()
+        from timeside.analyzer import Waveform
+        a = Waveform()  # Arbitrary analyzer for running the next pipe
+        (decoder_encoded | a).run()
 
-        decoder = FileDecoder(self.target)
-        decoder.setup()
-        written_frames = 0
-        while True:
-            frames, eod = decoder.process()
-            written_frames += frames.shape[0]
-            if eod: break
+        import os
+        os.unlink(self.target)
 
         #print decoder.channels(), decoder.samplerate(), written_frames
+        #print media_channels
 
-        self.assertEquals(self.channels, decoder.channels())
-        self.assertEquals(self.samplerate, decoder.samplerate())
-        self.assertTrue(written_frames - totalframes >= 0)
+        if self.test_channels:
+            self.assertEqual(decoder.channels(), decoder_encoded.channels())
+        else:
+            self.assertEqual(2, decoder_encoded.channels())  # voaacenc bug ?
 
-        import os
-        os.unlink(self.target)
+        self.assertEqual(decoder.samplerate(),
+                         decoder_encoded.samplerate())
+
+        if self.test_duration:
+            self.assertAlmostEqual(decoder.input_duration,
+                                   decoder_encoded.input_duration,
+                                   delta=0.2)
+
+class TestTranscodingFromMonoWav(TestTranscodingFromWav):
+    "Test transcoding from a mono wav"
+
+    def setUp(self):
+        super(TestTranscodingFromMonoWav, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep_mono.wav")
+
+    def testToM4a(self):
+        "Test conversion to m4a"
+        super(TestTranscodingFromMonoWav, self).testToM4a()
+        self.test_channels = False  # voaacenc bug ? : always encode stereo
 
-class TestTranscodingFromAnotherWav(TestTranscodingFromWav):
+
+class TestTranscodingFromAnotherWav(TestTranscodingFromMonoWav):
     "Test transcoding from another wav"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/guitar.wav")
+        super(TestTranscodingFromAnotherWav, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/guitar.wav")  # Mono
+
 
 class TestTranscodingFromMp3(TestTranscodingFromWav):
     "Test transcoding from mp3"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.mp3")
+        super(TestTranscodingFromMp3, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep.mp3")
+
 
 class TestTranscodingFromFlac(TestTranscodingFromWav):
     "Test transcoding from flac"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.flac")
+        super(TestTranscodingFromFlac, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep.flac")
+
 
 class TestTranscodingFromOgg(TestTranscodingFromWav):
     "Test transcoding from ogg"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep.ogg")
+        super(TestTranscodingFromOgg, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep.ogg")
+
+
 
-class TestTranscodingFromMonoWav(TestTranscodingFromWav):
-    "Test transcoding from a mono wav"
 
-    def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep_mono.wav")
 
 class TestTranscodingFrom32kHzWav(TestTranscodingFromWav):
     "Test transcoding from a 32kHz wav"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/sweep_32000.wav")
+        super(TestTranscodingFrom32kHzWav, self).setUp()
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep_32000.wav")
+
 
 class TestTranscodingFromMissingFile(TestTranscodingFromWav):
     "Test transcoding from a missing file"
 
     def setUp(self):
-        self.source = os.path.join (os.path.dirname(__file__),  "samples/unexisting.wav")
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/unexisting.wav")
 
     def tearDown(self):
-        self.assertRaises(IOError, self.setUpDecoder)
+        decoder = FileDecoder
+        self.assertRaises(IOError, decoder, self.source)
 
 if __name__ == '__main__':
     unittest.main(testRunner=TestRunner())
index 086bdeebe6748fbd167bf932c208b03bc49d4f2c..d0c7142810f62f64fbcf69712691cc9cc5d41df5 100755 (executable)
@@ -5,7 +5,7 @@ from timeside.decoder import *
 from timeside.analyzer import Yaafe
 from yaafelib import DataFlow,FeaturePlan
 
-class TestYaafe(TestCase):
+class TestYaafe(unittest.TestCase):
 
     def setUp(self):
         self.sample_rate = 16000
index f17477d57a2061a2174d3b26ce64f77de344456c..60cfb2d4705b8e178cf12cb3e6a58cb1f07104b1 100644 (file)
@@ -19,3 +19,12 @@ def check_samples():
             u = urllib.urlopen(url+sample)
             f.write(u.read())
             f.close()
+
+
+def tmp_file_sink(prefix=None, suffix = None):
+    import tempfile
+    tmpfile = tempfile.NamedTemporaryFile(delete=True,
+                                          prefix=prefix,
+                                          suffix=suffix)
+    tmpfile.close()
+    return tmpfile.name
\ No newline at end of file
index 46bc35354ad45fce9b736533340f6ef7947d141d..2ec084c26ba14cd4701384de40839fc5c99b6c3e 100644 (file)
@@ -2,24 +2,10 @@
 
 import unittest
 import doctest
-import os, sys
+import sys
 import time
-from tools import *
+from tools import check_samples
 
-class TestCase(unittest.TestCase):
-
-    def assertSameList(self, list1, list2):
-        "Test that two lists contain the same elements, in any order"
-        if len(list1) != len(list2):
-            self.fail("Lists length differ : %d != %d" % (len(list1), len(list2)))
-
-        for item in list1:
-            if not item in list2:
-                self.fail("%s is not in list2" % str(item))
-
-        for item in list2:
-            if not item in list1:
-                self.fail("%s is not in list1" % str(item))
 
 class _TextTestResult(unittest.TestResult):
     """A test result class that can print formatted text results to a stream.
@@ -90,22 +76,26 @@ class _TextTestResult(unittest.TestResult):
     def printErrorList(self, flavour, errors):
         for test, err in errors:
             self.stream.writeln(self.separator1)
-            self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
+            self.stream.writeln("%s: [%s] --> %s "
+                                % (flavour,
+                                   test.__class__.__name__,
+                                   self.getDescription(test)))
             self.stream.writeln(self.separator2)
             self.stream.writeln("%s" % err)
 
 
 class _WritelnDecorator:
     """Used to decorate file-like objects with a handy 'writeln' method"""
-    def __init__(self,stream):
+    def __init__(self, stream):
         self.stream = stream
 
     def __getattr__(self, attr):
-        return getattr(self.stream,attr)
+        return getattr(self.stream, attr)
 
     def writeln(self, arg=None):
-        if arg: self.write(arg)
-        self.write('\n') # text-mode streams translate to \r\n if needed
+        if arg:
+            self.write(arg)
+        self.write('\n')  # text-mode streams translate to \r\n if needed
 
 
 class TestRunner:
@@ -143,7 +133,8 @@ class TestRunner:
             if failed:
                 self.stream.write("failures=%d" % failed)
             if errored:
-                if failed: self.stream.write(", ")
+                if failed:
+                    self.stream.write(", ")
                 self.stream.write("errors=%d" % errored)
             self.stream.writeln(")")
         else: