]> git.parisson.com Git - timeside.git/commitdiff
Fix #49, generate test data from numpy + Gstreamer
authorThomas Fillon <thomas@parisson.com>
Fri, 17 Oct 2014 20:00:10 +0000 (22:00 +0200)
committerThomas Fillon <thomas@parisson.com>
Fri, 17 Oct 2014 20:00:10 +0000 (22:00 +0200)
19 files changed:
tests/get_samples.py [deleted file]
tests/test_analyzer_dc.py
tests/test_analyzer_level.py
tests/test_analyzers_default.py
tests/test_aubio_melenergy.py
tests/test_aubio_mfcc.py
tests/test_aubio_pitch.py
tests/test_aubio_specdesc.py
tests/test_aubio_temporal.py
tests/test_decoder_utils.py
tests/test_decoding.py
tests/test_process_pipe.py
tests/test_transcoding.py
tests/test_transcoding_streaming.py
tests/tools.py
tests/unit_timeside.py
timeside/decoder/utils.py
timeside/encoder/audiosink.py
timeside/tools/data_samples.py [new file with mode: 0644]

diff --git a/tests/get_samples.py b/tests/get_samples.py
deleted file mode 100644 (file)
index a4d37a8..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from tools import check_samples
-
-check_samples()
index c76522638b5db7f5e4264240eafffbd5626ac3ca..e916e0249c8120ae139d5acf7c292ded84cfc4d9 100755 (executable)
@@ -3,7 +3,7 @@
 from unit_timeside import unittest, TestRunner
 from timeside.decoder.file import FileDecoder
 from timeside.analyzer.dc import MeanDCShift
-import os
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestAnalyzerDC(unittest.TestCase):
@@ -13,16 +13,14 @@ class TestAnalyzerDC(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-        self.expected = {'mean_dc_shift': -0.000}
+        self.expected = {'mean_dc_shift': 0.004}
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
-        self.expected = {'mean_dc_shift': 0.054}
+    def testOnScale(self):
+        "runs on C4 Scale"
+        self.source = ts_samples["C4_scale.wav"]
+        self.expected = {'mean_dc_shift': 0.034}
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index 510ef6c6143f852df8cf954cb7ada8bf8f3184c0..29ffef66b217b8c1be42e738c5d3fbade8f4d7ac 100755 (executable)
@@ -3,7 +3,7 @@
 from unit_timeside import unittest, TestRunner
 from timeside.decoder.file import FileDecoder
 from timeside.analyzer.level import Level
-import os
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestAnalyzerLevel(unittest.TestCase):
@@ -13,22 +13,20 @@ class TestAnalyzerLevel(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-        max_level_value = -6.021
-        rms_level_value = -9.856
+        max_level_value = 0
+        rms_level_value = -2.995
 
         self.expected = {'level.max': max_level_value,
                          'level.rms': rms_level_value}
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
+    def testOnC4_Scale(self):
+        "runs on C4 scale"
+        self.source = ts_samples["C4_scale.wav"]
 
-        max_level_value = -4.054
-        rms_level_value = -21.945
+        max_level_value = 0
+        rms_level_value = -3.705
 
         self.expected = {'level.max': max_level_value,
                          'level.rms': rms_level_value}
index 12434c6214851175d4e56c58828cf2515da63b0f..a2c50ea722b6898740e9d6b163a8d0c51d4e4e33 100755 (executable)
@@ -7,15 +7,14 @@ from unit_timeside import unittest, TestRunner
 import timeside
 from timeside.decoder.file import FileDecoder
 import numpy as np
-import os
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestAnalyzers_with_default(unittest.TestCase):
     """Test analyzer with default parameters"""
 
     def setUp(self):
-        source = os.path.join(os.path.dirname(__file__),
-                              "samples", "guitar.wav")
+        source = ts_samples["C4_scale.wav"]
 
         self.decoder = FileDecoder(source)
 
index abdc7300b4dab1393c2da0163779c2b33c55626e..612da646cf6606f353e607b816803288841a49b0 100755 (executable)
@@ -1,10 +1,10 @@
 #! /usr/bin/env python
 
 from unit_timeside import unittest, TestRunner
-import os
 from timeside.decoder.file import FileDecoder
 from timeside.core import get_processor
 from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
 
 
 @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
@@ -15,13 +15,11 @@ class TestAubioMelEnergy(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
     def testOnGuitar(self):
         "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
+        self.source = ts_samples["C4_scale.wav"]
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index 1071b5e7266e54c6dcd7e3085797e29320a33897..f225f107179ee2e775ca9e752bfe1ce15e58199b 100755 (executable)
@@ -1,10 +1,10 @@
 #! /usr/bin/env python
 
 from unit_timeside import unittest, TestRunner
-import os
 from timeside.decoder.file import FileDecoder
 from timeside.core import get_processor
 from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
 
 
 @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
@@ -15,11 +15,11 @@ class TestAubioMfcc(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join (os.path.dirname(__file__),  "samples", "guitar.wav")
+    def testOnScale(self):
+        "runs on C4 scale"
+        self.source = ts_samples["C4_scale.wav"]
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index bf2a43bb5842e0c52284b2bc688e2d59c5e8d417..2eee6a47f7016273bcfc2bc4a77fe5c18f77a593 100755 (executable)
@@ -1,10 +1,10 @@
 #! /usr/bin/env python
 
 from unit_timeside import unittest, TestRunner
-import os
 from timeside.decoder.file import FileDecoder
 from timeside.core import get_processor
 from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
 
 
 @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
@@ -15,13 +15,11 @@ class TestAubioPitch(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
+    def testOnC4Scale(self):
+        "runs on C4 scale"
+        self.source = ts_samples["C4_scale.wav"]
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index 2f34a1e9db1f32f8a27bc87649baca15c7e1cc55..c92e6850d3a47a63a555cc6c9d294f62f7b892a9 100755 (executable)
@@ -1,10 +1,10 @@
 #! /usr/bin/env python
 
 from unit_timeside import unittest, TestRunner
-import os
 from timeside.decoder.file import FileDecoder
 from timeside.core import get_processor
 from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
 
 
 @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
@@ -15,13 +15,11 @@ class TestAubioSpecdesc(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
+    def testOnC4Scale(self):
+        "runs on C4 scale"
+        self.source = ts_samples["C4_scale.wav"]
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index b0850fa4488ae445b595b4b6d094c47d8aa42af9..209808e677bed7ad50e2783d9e16cdff9d7d9d85 100755 (executable)
@@ -1,10 +1,10 @@
 #! /usr/bin/env python
 
 from unit_timeside import unittest, TestRunner
-import os
 from timeside.decoder.file import FileDecoder
 from timeside.core import get_processor
 from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
 
 
 @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
@@ -15,13 +15,11 @@ class TestAubioTemporal(unittest.TestCase):
 
     def testOnSweep(self):
         "runs on sweep"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
-    def testOnGuitar(self):
-        "runs on guitar"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples", "guitar.wav")
+    def testOnC4Scale(self):
+        "runs on C4 scale"
+        self.source = ts_samples["C4_scale.wav"]
 
     def tearDown(self):
         decoder = FileDecoder(self.source)
index e0b6f1efeb7eeec71a27a1ee6068f45413a40a5e..3d6de022e158ed66d34057a38c9c89c03f9779b7 100755 (executable)
@@ -4,9 +4,9 @@
 
 from __future__ import division
 
-from numpy import arange, sin
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
 from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri
+from timeside.tools.data_samples import samples as ts_samples
 import os.path
 
 
@@ -33,6 +33,7 @@ class TestGetUriWrongUri(unittest.TestCase):
         "Missing file raise IOerror"
         self.source = os.path.join(os.path.dirname(__file__),
                                    "a_missing_file_blahblah.wav")
+
     def testNotValidUri(self):
         "Not valid uri raise IOerror"
         self.source = os.path.join("://not/a/valid/uri/parisson.com")
@@ -54,46 +55,40 @@ class TestGetMediaInfo(unittest.TestCase):
         self.test_exact_duration = True
         self.expected_channels = 2
         self.expected_samplerate = 44100
-        self.expected_depth = 16
+        self.expected_depth = 0  # ?
 
     def testWav(self):
         "Test wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.wav")
-
+        self.source = ts_samples["sweep.wav"]
 
     def testWavMono(self):
         "Test mono wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_mono.wav")
+        self.source = ts_samples["sweep_mono.wav"]
 
         self.expected_channels = 1
 
     def testWav32k(self):
         "Test 32kHz wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_32000.wav")
+        self.source = ts_samples["sweep_32000.wav"]
         self.expected_samplerate = 32000
 
     def testFlac(self):
         "Test flac decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.flac")
+        self.source = ts_samples["sweep.flac"]
+        self.expected_depth = 24
 
     def testOgg(self):
         "Test ogg decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.ogg")
+        self.source = ts_samples["sweep.ogg"]
+        self.test_exact_duration = False
         self.expected_depth = 0  # ?
 
     def testMp3(self):
         "Test mp3 decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.mp3")
+        self.source = ts_samples["sweep.mp3"]
         self.expected_depth = 32
         self.test_exact_duration = False
 
-
     def tearDown(self):
         uri = get_uri(self.source)
         uri_info = get_media_uri_info(uri)
@@ -103,9 +98,12 @@ class TestGetMediaInfo(unittest.TestCase):
             self.assertAlmostEqual(self.source_duration,
                                    uri_info['duration'],
                                    places=1)
-        self.assertEqual(self.expected_channels, uri_info['streams'][0]['channels'])
-        self.assertEqual(self.expected_samplerate, uri_info['streams'][0]['samplerate'])
+        self.assertEqual(self.expected_channels,
+                         uri_info['streams'][0]['channels'])
+        self.assertEqual(self.expected_samplerate,
+                         uri_info['streams'][0]['samplerate'])
         self.assertEqual(self.expected_depth, uri_info['streams'][0]['depth'])
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=TestRunner())
index 5ce3f82b6676e73648702db77d8d5a32ad8c7239..5bd0091a75d76887a542ead5694bbfb6b2a04766 100755 (executable)
@@ -6,6 +6,7 @@ from timeside.decoder.file import FileDecoder
 from timeside.core import ProcessPipe
 
 from unit_timeside import *
+from timeside.tools.data_samples import samples as ts_samples
 
 import os.path
 
@@ -31,20 +32,17 @@ class TestDecoding(unittest.TestCase):
 
     def testWav(self):
         "Test wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.wav")
+        self.source = ts_samples["sweep.wav"]
 
     def testWavMono(self):
         "Test mono wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_mono.wav")
+        self.source = ts_samples["sweep_mono.wav"]
 
         self.expected_channels = 1
 
     def testWav32k(self):
         "Test 32kHz wav decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_32000.wav")
+        self.source = ts_samples["sweep_32000.wav"]
 
         expected_samplerate = 32000
         ratio = expected_samplerate / self.expected_samplerate
@@ -54,23 +52,20 @@ class TestDecoding(unittest.TestCase):
 
     def testFlac(self):
         "Test flac decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.flac")
+        self.source = ts_samples["sweep.flac"]
         self.expected_mime_type = 'audio/x-flac'
 
     def testOgg(self):
         "Test ogg decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.ogg")
+        self.source = ts_samples["sweep.ogg"]
 
-        self.expected_totalframes = 352832
+        self.expected_totalframes = 352960
         self.expected_mime_type = 'application/ogg'
         self.test_exact_duration = False
 
     def testMp3(self):
         "Test mp3 decoding"
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.mp3")
+        self.source = ts_samples["sweep.mp3"]
 
         self.expected_totalframes = 353664
         self.expected_mime_type = 'audio/mpeg'
@@ -125,8 +120,9 @@ class TestDecoding(unittest.TestCase):
 
         self.assertEqual(decoder.mime_type(), self.expected_mime_type)
 
-        expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32]  # +32 to handle some issue with ogg
-        self.assertIn(totalframes, expected_totalframes)
+        #expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32]  # +32 to handle some issue with ogg
+
+        self.assertEqual(totalframes, self.expected_totalframes)
 
         input_duration = decoder.input_totalframes / decoder.input_samplerate
         output_duration = decoder.totalframes() / decoder.output_samplerate
@@ -224,7 +220,7 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment):
     def testMp3(self):
         "Test mp3 decoding"
         super(TestDecodingSegment, self).testMp3()
-        self.expected_totalframes = 310715  # was  308701 ?
+        self.expected_totalframes = 309565  # was  308701 ?
 
 
 class TestDecodingSegmentBadParameters(unittest.TestCase):
@@ -285,7 +281,7 @@ class TestDecodingMonoUpsampling(TestDecoding):
     def testOgg(self):
         "Test ogg decoding"
         super(TestDecodingMonoUpsampling, self).testOgg()
-        self.expected_totalframes = 384000
+        self.expected_totalframes = 384140
 
 
 class TestDecodingMonoDownsampling(TestDecoding):
@@ -304,7 +300,7 @@ class TestDecodingMonoDownsampling(TestDecoding):
     def testOgg(self):
         "Test ogg decoding"
         super(TestDecodingMonoDownsampling, self).testOgg()
-        self.expected_totalframes = 127980
+        self.expected_totalframes = 128027
 
     def testMp3(self):
         "Test mp3 decoding"
@@ -328,7 +324,7 @@ class TestDecodingStereoDownsampling(TestDecoding):
     def testOgg(self):
         "Test ogg decoding"
         super(TestDecodingStereoDownsampling, self).testOgg()
-        self.expected_totalframes = 255992
+        self.expected_totalframes = 256085
 
     def testMp3(self):
         "Test mp3 decoding"
@@ -352,7 +348,7 @@ class TestDecodingStereoUpsampling(TestDecoding):
     def testOgg(self):
         "Test ogg decoding"
         super(TestDecodingStereoUpsampling, self).testOgg()
-        self.expected_totalframes = 768000
+        self.expected_totalframes = 768279
 
     def testMp3(self):
         "Test mp3 decoding"
index 468e689fa4e91f3bd2a9b3975385cb84f5a813be..cdaa182f8ec3a375c679bd9d8e83a6910d210888 100644 (file)
@@ -6,7 +6,7 @@
 from unit_timeside import unittest, TestRunner
 import timeside
 from timeside.decoder.file import FileDecoder
-import os
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestProcessPipe(unittest.TestCase):
@@ -16,8 +16,8 @@ class TestProcessPipe(unittest.TestCase):
         """Test process pipe (Quick and dirty)"""
         # TODO clean up and complete
 
-        source = os.path.join(os.path.dirname(__file__),
-                              "samples", "guitar.wav")
+        source = ts_samples["sweep.wav"]
+
 
         pipe = timeside.core.ProcessPipe()
         dec = FileDecoder(source)
index 7602ba226eb272b6f84a820ea4d3311e32d5f2d3..c3b1aa4cda92000c61f3fe262af8f2980808d0b8 100755 (executable)
@@ -4,19 +4,18 @@ from __future__ import division
 
 from timeside.core import get_processor, ProcessPipe
 from timeside.decoder.file import FileDecoder
-from timeside.component import *
-
-from unit_timeside import *
+#from timeside.component import *
+import os
+from unit_timeside import unittest, TestRunner
 from tools import tmp_file_sink
-import os.path
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestTranscodingFromWav(unittest.TestCase):
     "Test transcoding from wav"
 
     def setUp(self):
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.wav")
+        self.source = ts_samples["sweep.wav"]
         self.test_duration = True
         self.test_channels = True
 
@@ -50,7 +49,7 @@ class TestTranscodingFromWav(unittest.TestCase):
         file_extension = '.' + encoder_cls.file_extension()
 
         self.target = tmp_file_sink(prefix=self.__class__.__name__,
-                                  suffix=file_extension)
+                                    suffix=file_extension)
         encoder = encoder_cls(self.target)
         (decoder | encoder).run()
 
@@ -59,7 +58,6 @@ class TestTranscodingFromWav(unittest.TestCase):
         pipe = ProcessPipe(decoder_encoded)
         pipe.run()
 
-        import os
         os.unlink(self.target)
 
         #print decoder.channels(), decoder.samplerate(), written_frames
@@ -78,13 +76,13 @@ class TestTranscodingFromWav(unittest.TestCase):
                                    decoder_encoded.input_duration,
                                    delta=0.2)
 
+
 class TestTranscodingFromMonoWav(TestTranscodingFromWav):
     "Test transcoding from a mono wav"
 
     def setUp(self):
         super(TestTranscodingFromMonoWav, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_mono.wav")
+        self.source = ts_samples["sweep_mono.wav"]
 
     def testM4a(self):
         "Test conversion to m4a"
@@ -97,8 +95,7 @@ class TestTranscodingFromAnotherWav(TestTranscodingFromMonoWav):
 
     def setUp(self):
         super(TestTranscodingFromAnotherWav, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/guitar.wav")  # Mono
+        self.source = ts_samples["C4_scale.wav"]  # Mono
 
 
 class TestTranscodingFromMp3(TestTranscodingFromWav):
@@ -106,8 +103,7 @@ class TestTranscodingFromMp3(TestTranscodingFromWav):
 
     def setUp(self):
         super(TestTranscodingFromMp3, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.mp3")
+        self.source = ts_samples["sweep.mp3"]
 
 
 class TestTranscodingFromFlac(TestTranscodingFromWav):
@@ -115,8 +111,7 @@ class TestTranscodingFromFlac(TestTranscodingFromWav):
 
     def setUp(self):
         super(TestTranscodingFromFlac, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.flac")
+        self.source = ts_samples["sweep.flac"]
 
 
 class TestTranscodingFromOgg(TestTranscodingFromWav):
@@ -124,11 +119,7 @@ class TestTranscodingFromOgg(TestTranscodingFromWav):
 
     def setUp(self):
         super(TestTranscodingFromOgg, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.ogg")
-
-
-
+        self.source = ts_samples["sweep.ogg"]
 
 
 class TestTranscodingFrom32kHzWav(TestTranscodingFromWav):
@@ -136,8 +127,7 @@ class TestTranscodingFrom32kHzWav(TestTranscodingFromWav):
 
     def setUp(self):
         super(TestTranscodingFrom32kHzWav, self).setUp()
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep_32000.wav")
+        self.source = ts_samples["sweep_32000.wav"]
 
 
 class TestTranscodingFromMissingFile(TestTranscodingFromWav):
index 3aa38fe3c4d53fc07a5c2a413cb64deabf7ddaf3..b5404448f85ef30695709bb29c634b5eeef6cfc4 100755 (executable)
@@ -8,17 +8,16 @@ from timeside.decoder.file import FileDecoder
 #from timeside.encoder import *
 #from timeside.component import *
 
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
 from tools import tmp_file_sink
-import os.path
+from timeside.tools.data_samples import samples as ts_samples
 
 
 class TestTranscodingStreaming(unittest.TestCase):
     "Test transcoding and streaming"
 
     def setUp(self):
-        self.source = os.path.join(os.path.dirname(__file__),
-                                   "samples/sweep.wav")
+        self.source = ts_samples["sweep.wav"]
         self.test_duration = True
         self.test_channels = True
         self.filesize_delta = None
index 60cfb2d4705b8e178cf12cb3e6a58cb1f07104b1..b9522f663de33005c7377ab3c8cdb5a4d0f291c2 100644 (file)
@@ -1,30 +1,9 @@
-import os
-import urllib
+import tempfile
 
 
-def check_samples():
-    url = 'http://github.com/yomguy/timeside-samples/raw/master/samples/'
-    samples = ['guitar.wav', 'sweep.wav', 'sweep_mono.wav', 'sweep_32000.wav', 'sweep.flac', 'sweep.ogg', 'sweep.mp3', 'sweep_source.wav']
-    path = os.path.normpath(os.path.dirname(__file__))
-    dir = path + os.sep + 'samples'
-
-    if not os.path.exists(dir):
-        os.makedirs(dir)
-
-    for sample in samples:
-        path = dir + os.sep + sample
-        if not os.path.exists(path):
-            print 'downloading: ' + sample
-            f = open(path, 'w')
-            u = urllib.urlopen(url+sample)
-            f.write(u.read())
-            f.close()
-
-
-def tmp_file_sink(prefix=None, suffix = None):
-    import tempfile
+def tmp_file_sink(prefix=None, suffix=None):
     tmpfile = tempfile.NamedTemporaryFile(delete=True,
                                           prefix=prefix,
                                           suffix=suffix)
     tmpfile.close()
-    return tmpfile.name
\ No newline at end of file
+    return tmpfile.name
index 0e5789d3ee7b4a536387e9ff799b438b51b3da6d..2a985c570b274f8f2045a460cdfa7da0e0870f02 100644 (file)
@@ -4,9 +4,6 @@ import unittest
 import doctest
 import sys
 import time
-from tools import check_samples
-
-check_samples()
 
 
 class _TextTestResult(unittest.TestResult):
@@ -111,7 +108,6 @@ class TestRunner:
         self.stream = _WritelnDecorator(stream)
         self.descriptions = descriptions
         self.verbosity = verbosity
-        check_samples()
 
     def _makeResult(self):
         return _TextTestResult(self.stream, self.descriptions, self.verbosity)
index 484b371d51819712aa2641c9d35282e26337f81a..1cc4b105c90e6cf6358b8c02db85ea9dc577b066 100644 (file)
@@ -181,9 +181,10 @@ def sha1sum_file(filename):
     '''
     Return the secure hash digest with sha1 algorithm for a given file
 
-    >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
-    >>> print sha1sum_file(wav_file) # doctest: +SKIP
-    #08301c3f9a8d60926f31e253825cc74263e52ad1
+    >>> from timeside.tools.data_samples import samples as ts_samples
+    >>> wav_file = ts_samples["C4_scale.wav"]
+    >>> print sha1sum_file(wav_file)
+    a598e78d0b5c90da54a77e34c083abdcd38d42ba
     '''
     import hashlib
     import io
@@ -200,13 +201,9 @@ def sha1sum_file(filename):
 def sha1sum_url(url):
     '''Return the secure hash digest with sha1 algorithm for a given url
 
-    >>> url = 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav'
+    >>> url = "https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav"
     >>> print sha1sum_url(url)
     08301c3f9a8d60926f31e253825cc74263e52ad1
-    >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
-    >>> uri = get_uri(wav_file)
-    >>> print sha1sum_url(uri)
-    08301c3f9a8d60926f31e253825cc74263e52ad1
 
     '''
     import hashlib
index 9f01d3e71af5e44e612aa353c7b866349b6a8f3d..8dd6d780420e6a11e348729056a280590b591880 100644 (file)
@@ -33,7 +33,8 @@ class AudioSink(GstEncoder):
 
 
     >>> import timeside
-    >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
+    >>> from timeside.tools.data_samples import samples as ts_samples
+    >>> wav_file = ts_samples['C4_scale.wav']
     >>> d = timeside.decoder.file.FileDecoder(wav_file)
     >>> e = timeside.encoder.audiosink.AudioSink()
     >>> (d|e).run() # doctest: +SKIP
@@ -96,11 +97,7 @@ class AudioSink(GstEncoder):
         self.metadata = metadata
 
 
-# Define global variables for use with doctest
-DOCTEST_ALIAS = {'wav_file':
-                 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav'}
-
 if __name__ == "__main__":
     import doctest
 
-    doctest.testmod(extraglobs=DOCTEST_ALIAS)
+    doctest.testmod()
diff --git a/timeside/tools/data_samples.py b/timeside/tools/data_samples.py
new file mode 100644 (file)
index 0000000..4c4c5cd
--- /dev/null
@@ -0,0 +1,441 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Oct  7 09:19:37 2014
+
+@author: thomas
+"""
+
+from __future__ import division
+
+import pygst
+pygst.require("0.10")
+import gobject
+import gst
+import numpy
+import scipy.signal.waveforms
+import os.path
+
+#import timeside
+
+
+class NumpySrc:
+    def __init__(self, array, samplerate):
+        self.appsrc = gst.element_factory_make("appsrc")
+        self.pos = 0
+        self.samplerate = samplerate
+        if array.ndim == 1:
+            array.resize((array.shape[0], 1))
+        self.length, self.channels = array.shape
+        self.array = array.astype("float32")
+        self.per_sample = gst.SECOND // samplerate
+        self.fac = self.channels * array.dtype.itemsize
+        #self.appsrc.set_property("size", (self.length * self.channels *
+        #                                  array.dtype.itemsize))
+        self.appsrc.set_property("format", gst.FORMAT_TIME)
+        capstr = """audio/x-raw-float,
+                    width=%d,
+                    depth=%d,
+                    rate=%d,
+                    channels=%d,
+                    endianness=(int)1234,
+                    signed=true""" % (32,#self.array.dtype.itemsize*8,
+                                      32,#self.array.dtype.itemsize*8,
+                                      self.samplerate,
+                                      self.channels)
+        print array.dtype.itemsize*8
+        self.appsrc.set_property("caps", gst.caps_from_string(capstr))
+        self.appsrc.set_property("stream-type", 0)  # Seekable
+        self.appsrc.set_property('block', True)
+
+        self.appsrc.connect("need-data", self.need_data)
+        self.appsrc.connect("seek-data", self.seek_data)
+        self.appsrc.connect("enough-data", self.enough_data)
+
+    def need_data(self, element, length):
+        #length = length // 64
+        if self.pos >= self.array.shape[0]:
+            element.emit("end-of-stream")
+        else:
+            avalaible_sample = self.length - self.pos
+            if avalaible_sample < length:
+                length = avalaible_sample
+            buf = gst.Buffer(self.array[self.pos:self.pos+length, :].data)
+
+            buf.timestamp = self.pos * self.per_sample
+            buf.duration = int(length*self.per_sample)
+            element.emit("push-buffer", buf)
+            self.pos += length
+
+    def seek_data(self, element, npos):
+        print 'seek %d' % npos
+        self.pos = npos // self.per_sample
+        return True
+
+    def enough_data(self, element):
+        print "----------- enough data ---------------"
+
+
+class SampleArray(object):
+    """Base Class for generating a data sample array"""
+
+    def __init__(self, duration=10, samplerate=44100):
+        self.samplerate = int(samplerate)
+        self.num_samples = numpy.ceil(duration * self.samplerate)
+        self.array = NotImplemented
+
+    @property
+    def time_samples(self):
+        return numpy.arange(0, self.num_samples)
+
+    @property
+    def duration(self):
+        return self.num_samples / self.samplerate
+
+    def __add__(self, other):
+        if not self.samplerate == other.samplerate:
+            raise ValueError("Samplerates mismatch")
+
+        sum_ = SampleArray(samplerate=self.samplerate)
+        sum_.num_samples = self.num_samples + other.num_samples
+        sum_.array = numpy.vstack([self.array, other.array])
+        return sum_
+
+    def __and__(self, other):
+        if not self.samplerate == other.samplerate:
+            raise ValueError("Samplerates mismatch")
+        if not self.num_samples == other.num_samples:
+            raise ValueError("Number of samples mismatch")
+
+        and_ = SampleArray(samplerate=self.samplerate)
+        and_.num_samples = self.num_samples
+        and_.array = numpy.hstack([self.array, other.array])
+        return and_
+
+
+class SineArray(SampleArray):
+    """Class for generating a Sine array"""
+    def __init__(self, frequency=440, duration=10, samplerate=44100,
+                 channels=1):
+        super(SineArray, self).__init__(duration=duration,
+                                        samplerate=samplerate)
+        self.frequency = frequency
+        self.array = numpy.sin((2 * numpy.pi * self.frequency *
+                               self.time_samples / self.samplerate))
+        self.array.resize(self.num_samples, 1)
+
+
+class SweepArray(SampleArray):
+    """Class for generating a Sweep array"""
+    def __init__(self, f0=20, f1=None, duration=10, samplerate=44100,
+                 method='logarithmic'):
+        super(SweepArray, self).__init__(duration=duration,
+                                         samplerate=samplerate)
+
+        self.f0 = f0 / samplerate
+        if f1 is None:
+            self.f1 = 0.5
+        else:
+            self.f1 = f1 / samplerate
+        self.method = method
+        self.array = scipy.signal.waveforms.chirp(t=self.time_samples,
+                                                  f0=self.f0,
+                                                  t1=self.time_samples[-1],
+                                                  f1=self.f1,
+                                                  method=self.method)
+        self.array.resize(self.num_samples, 1)
+
+
+class WhiteNoiseArray(SampleArray):
+    """Class for generating a white noise array"""
+    def __init__(self, duration=10, samplerate=44100):
+        super(WhiteNoiseArray, self).__init__(duration=duration,
+                                              samplerate=samplerate)
+        array = numpy.random.randn(self.num_samples, 1)
+        self.array = array / abs(array).max()
+
+
+class SilenceArray(SampleArray):
+    """Class for generating a silence"""
+    def __init__(self, duration=10, samplerate=44100):
+        super(SilenceArray, self).__init__(duration=duration,
+                                           samplerate=samplerate)
+
+        self.array = numpy.zeros((self.num_samples, 1))
+
+
+class gst_BuildSample(object):
+    def __init__(self, sample_array, output_file, gst_audio_encoder):
+        if not isinstance(sample_array, SampleArray):
+            raise ValueError("array must be a SampleArray subclass")
+        self.sample_array = sample_array
+        self.samplerate = self.sample_array.samplerate
+        self.output_file = output_file
+        if not isinstance(gst_audio_encoder, list):
+            gst_audio_encoder = [gst_audio_encoder]
+        self.gst_audio_encoder = gst_audio_encoder
+
+    def run(self):
+        pipeline = gst.Pipeline("pipeline")
+        gobject.threads_init()
+        mainloop = gobject.MainLoop()
+
+        numpy_src = NumpySrc(array=self.sample_array.array,
+                             samplerate=self.samplerate)
+
+        converter = gst.element_factory_make('audioconvert', 'converter')
+
+        encoder_muxer = []
+        for enc in self.gst_audio_encoder:
+            encoder_muxer.append(gst.element_factory_make(enc))
+
+        filesink = gst.element_factory_make('filesink', 'sink')
+        filesink.set_property('location', self.output_file)
+
+        pipe_elements = [numpy_src.appsrc, converter]
+        pipe_elements.extend(encoder_muxer)
+        pipe_elements.append(filesink)
+
+        pipeline.add(*pipe_elements)
+        gst.element_link_many(*pipe_elements)
+
+        def _on_new_pad(self, source, pad, target_pad):
+            print 'on_new_pad'
+            if not pad.is_linked():
+                if target_pad.is_linked():
+                    target_pad.get_peer().unlink(target_pad)
+                pad.link(target_pad)
+
+        def on_eos(bus, msg):
+            pipeline.set_state(gst.STATE_NULL)
+            mainloop.quit()
+
+        def on_error(bus, msg):
+            err, debug_info = msg.parse_error()
+            print ("Error received from element %s: %s" % (msg.src.get_name(),
+                                                           err))
+            print ("Debugging information: %s" % debug_info)
+            mainloop.quit()
+
+        pipeline.set_state(gst.STATE_PLAYING)
+        bus = pipeline.get_bus()
+        bus.add_signal_watch()
+        bus.connect('message::eos', on_eos)
+        bus.connect("message::error", on_error)
+
+        mainloop.run()
+
+
+def generate_sample_file(filename, samples_dir, gst_audio_encoder,
+                         sample_array, overwrite=False):
+    sample_file = os.path.join(samples_dir, filename)
+
+    if overwrite or not os.path.exists(sample_file):
+            gst_builder = gst_BuildSample(sample_array=sample_array,
+                                          output_file=sample_file,
+                                          gst_audio_encoder=gst_audio_encoder)
+            gst_builder.run()
+    return sample_file
+
+
+def generateSamples(overwrite=False):
+    from timeside import __file__ as ts_file
+    ts_path = os.path.split(os.path.abspath(ts_file))[0]
+    tests_dir = os.path.abspath(os.path.join(ts_path, '../tests'))
+    if os.path.isdir(tests_dir):
+        samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples'))
+        if not os.path.isdir(samples_dir):
+            os.makedirs(samples_dir)
+    else:
+        import tempfile
+        samples_dir = tempfile.mkdtemp(suffix="ts_samples")
+
+    samples = dict()
+
+    # --------- Sweeps ---------
+    # sweep 44100 mono wav
+    filename = 'sweep_mono.wav'
+    samplerate = 44100
+    gst_audio_encoder = 'wavenc'
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_mono,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # sweep 44100 stereo wav
+    sweep_stereo = sweep_mono & sweep_mono
+    filename = 'sweep.wav'
+    gst_audio_encoder = 'wavenc'
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+   # sweep 44100 stereo mp3
+    filename = 'sweep.mp3'
+    gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+   # sweep 44100 stereo flac
+    filename = 'sweep.flac'
+    gst_audio_encoder = 'flacenc'
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+   # sweep 44100 stereo ogg
+    filename = 'sweep.ogg'
+    gst_audio_encoder = ['vorbisenc', 'oggmux']
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # sweep 32000 stereo wav
+    samplerate = 32000
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sweep_stereo = sweep_mono & sweep_mono
+
+    filename = 'sweep_32000.wav'
+    gst_audio_encoder = 'wavenc'
+    sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # --------- Sines ---------
+    # sine at 440Hz,  44100 mono wav
+    filename = 'sine440Hz_mono.wav'
+    samplerate = 44100
+    gst_audio_encoder = 'wavenc'
+    sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_mono,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # sine at 440Hz,  44100 stereo wav
+    filename = 'sine440Hz.wav'
+    sweep_stereo = sweep_mono & sweep_mono
+    gst_audio_encoder = 'wavenc'
+    sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # sine at 440Hz,  44100 stereo mp3
+    filename = 'sine440Hz.mp3'
+    gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+    sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # sine at 440Hz,  44100 stereo ogg
+    filename = 'sine440Hz.ogg'
+    gst_audio_encoder = ['vorbisenc', 'oggmux']
+    sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=sweep_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # --------- Equal tempered scale ---------
+    filename = 'C4_scale.wav'
+    samplerate = 44100
+    f_C4 = 261.63
+    f_D4 = 293.66
+    f_E4 = 329.63
+    f_F4 = 349.23
+    f_G4 = 392.00
+    f_A4 = 440.00
+    f_B4 = 493.88
+    f_C5 = 523.25
+    sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4)
+    sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4)
+    sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4)
+    sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4)
+    sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4)
+    sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4)
+    sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4)
+    sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5)
+
+    silence = SilenceArray(duration=0.2, samplerate=samplerate)
+
+    scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence +
+             sineF4 + silence + sineG4 + silence + sineA4 + silence +
+             sineB4 + silence + sineC5)
+
+    gst_audio_encoder = 'wavenc'
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=scale,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # --------- White noise ---------
+    # white noise - 44100Hz mono
+    filename = 'white_noise_mono.wav'
+    samplerate = 44100
+    noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+    gst_audio_encoder = 'wavenc'
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=noise,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # white noise - 44100Hz stereo
+    filename = 'white_noise.wav'
+    samplerate = 44100
+    noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+    noise_stereo = noise & noise
+    gst_audio_encoder = 'wavenc'
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=noise_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    # white noise - 32000Hz stereo
+    filename = 'white_noise_32000.wav'
+    samplerate = 32000
+    noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+    noise_stereo = noise & noise
+    gst_audio_encoder = 'wavenc'
+    sample_file = generate_sample_file(filename, samples_dir,
+                                       gst_audio_encoder,
+                                       sample_array=noise_stereo,
+                                       overwrite=overwrite)
+    samples.update({filename: sample_file})
+
+    return samples
+
+
+samples = generateSamples()
+
+
+if __name__ == '__main__':
+    generateSamples()