From c48ac9f7aa953a38e7b0f4dc1bf40b564a3aee62 Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Fri, 17 Oct 2014 22:00:10 +0200 Subject: [PATCH] Fix #49, generate test data from numpy + Gstreamer --- tests/get_samples.py | 6 - tests/test_analyzer_dc.py | 16 +- tests/test_analyzer_level.py | 20 +- tests/test_analyzers_default.py | 5 +- tests/test_aubio_melenergy.py | 8 +- tests/test_aubio_mfcc.py | 10 +- tests/test_aubio_pitch.py | 12 +- tests/test_aubio_specdesc.py | 12 +- tests/test_aubio_temporal.py | 12 +- tests/test_decoder_utils.py | 36 ++- tests/test_decoding.py | 36 +-- tests/test_process_pipe.py | 6 +- tests/test_transcoding.py | 36 +-- tests/test_transcoding_streaming.py | 7 +- tests/tools.py | 27 +- tests/unit_timeside.py | 4 - timeside/decoder/utils.py | 13 +- timeside/encoder/audiosink.py | 9 +- timeside/tools/data_samples.py | 441 ++++++++++++++++++++++++++++ 19 files changed, 545 insertions(+), 171 deletions(-) delete mode 100644 tests/get_samples.py create mode 100644 timeside/tools/data_samples.py diff --git a/tests/get_samples.py b/tests/get_samples.py deleted file mode 100644 index a4d37a8..0000000 --- a/tests/get_samples.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from tools import check_samples - -check_samples() diff --git a/tests/test_analyzer_dc.py b/tests/test_analyzer_dc.py index c765226..e916e02 100755 --- a/tests/test_analyzer_dc.py +++ b/tests/test_analyzer_dc.py @@ -3,7 +3,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.analyzer.dc import MeanDCShift -import os +from timeside.tools.data_samples import samples as ts_samples class TestAnalyzerDC(unittest.TestCase): @@ -13,16 +13,14 @@ class TestAnalyzerDC(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - self.expected = {'mean_dc_shift': -0.000} + self.expected = {'mean_dc_shift': 0.004} - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") - self.expected = {'mean_dc_shift': 0.054} + def testOnScale(self): + "runs on C4 Scale" + self.source = ts_samples["C4_scale.wav"] + self.expected = {'mean_dc_shift': 0.034} def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_analyzer_level.py b/tests/test_analyzer_level.py index 510ef6c..29ffef6 100755 --- a/tests/test_analyzer_level.py +++ b/tests/test_analyzer_level.py @@ -3,7 +3,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.analyzer.level import Level -import os +from timeside.tools.data_samples import samples as ts_samples class TestAnalyzerLevel(unittest.TestCase): @@ -13,22 +13,20 @@ class TestAnalyzerLevel(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - max_level_value = -6.021 - rms_level_value = -9.856 + max_level_value = 0 + rms_level_value = -2.995 self.expected = {'level.max': max_level_value, 'level.rms': rms_level_value} - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + def testOnC4_Scale(self): + "runs on C4 scale" + self.source = ts_samples["C4_scale.wav"] - max_level_value = -4.054 - rms_level_value = -21.945 + max_level_value = 0 + rms_level_value = -3.705 self.expected = {'level.max': max_level_value, 'level.rms': rms_level_value} diff --git a/tests/test_analyzers_default.py b/tests/test_analyzers_default.py index 12434c6..a2c50ea 100755 --- a/tests/test_analyzers_default.py +++ b/tests/test_analyzers_default.py @@ -7,15 +7,14 @@ from unit_timeside import unittest, TestRunner import timeside from timeside.decoder.file import FileDecoder import numpy as np -import os +from timeside.tools.data_samples import samples as ts_samples class TestAnalyzers_with_default(unittest.TestCase): """Test analyzer with default parameters""" def setUp(self): - source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + source = ts_samples["C4_scale.wav"] self.decoder = FileDecoder(source) diff --git a/tests/test_aubio_melenergy.py b/tests/test_aubio_melenergy.py index abdc730..612da64 100755 --- a/tests/test_aubio_melenergy.py +++ b/tests/test_aubio_melenergy.py @@ -1,10 +1,10 @@ #! /usr/bin/env python from unit_timeside import unittest, TestRunner -import os from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO +from timeside.tools.data_samples import samples as ts_samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,13 +15,11 @@ class TestAubioMelEnergy(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] def testOnGuitar(self): "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + self.source = ts_samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_mfcc.py b/tests/test_aubio_mfcc.py index 1071b5e..f225f10 100755 --- a/tests/test_aubio_mfcc.py +++ b/tests/test_aubio_mfcc.py @@ -1,10 +1,10 @@ #! /usr/bin/env python from unit_timeside import unittest, TestRunner -import os from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO +from timeside.tools.data_samples import samples as ts_samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioMfcc(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav") + def testOnScale(self): + "runs on C4 scale" + self.source = ts_samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_pitch.py b/tests/test_aubio_pitch.py index bf2a43b..2eee6a4 100755 --- a/tests/test_aubio_pitch.py +++ b/tests/test_aubio_pitch.py @@ -1,10 +1,10 @@ #! /usr/bin/env python from unit_timeside import unittest, TestRunner -import os from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO +from timeside.tools.data_samples import samples as ts_samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,13 +15,11 @@ class TestAubioPitch(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + def testOnC4Scale(self): + "runs on C4 scale" + self.source = ts_samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_specdesc.py b/tests/test_aubio_specdesc.py index 2f34a1e..c92e685 100755 --- a/tests/test_aubio_specdesc.py +++ b/tests/test_aubio_specdesc.py @@ -1,10 +1,10 @@ #! /usr/bin/env python from unit_timeside import unittest, TestRunner -import os from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO +from timeside.tools.data_samples import samples as ts_samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,13 +15,11 @@ class TestAubioSpecdesc(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + def testOnC4Scale(self): + "runs on C4 scale" + self.source = ts_samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_temporal.py b/tests/test_aubio_temporal.py index b0850fa..209808e 100755 --- a/tests/test_aubio_temporal.py +++ b/tests/test_aubio_temporal.py @@ -1,10 +1,10 @@ #! /usr/bin/env python from unit_timeside import unittest, TestRunner -import os from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO +from timeside.tools.data_samples import samples as ts_samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,13 +15,11 @@ class TestAubioTemporal(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "sweep.wav") + self.source = ts_samples["sweep.wav"] - def testOnGuitar(self): - "runs on guitar" - self.source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + def testOnC4Scale(self): + "runs on C4 scale" + self.source = ts_samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_decoder_utils.py b/tests/test_decoder_utils.py index e0b6f1e..3d6de02 100755 --- a/tests/test_decoder_utils.py +++ b/tests/test_decoder_utils.py @@ -4,9 +4,9 @@ from __future__ import division -from numpy import arange, sin -from unit_timeside import * +from unit_timeside import unittest, TestRunner from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri +from timeside.tools.data_samples import samples as ts_samples import os.path @@ -33,6 +33,7 @@ class TestGetUriWrongUri(unittest.TestCase): "Missing file raise IOerror" self.source = os.path.join(os.path.dirname(__file__), "a_missing_file_blahblah.wav") + def testNotValidUri(self): "Not valid uri raise IOerror" self.source = os.path.join("://not/a/valid/uri/parisson.com") @@ -54,46 +55,40 @@ class TestGetMediaInfo(unittest.TestCase): self.test_exact_duration = True self.expected_channels = 2 self.expected_samplerate = 44100 - self.expected_depth = 16 + self.expected_depth = 0 # ? def testWav(self): "Test wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.wav") - + self.source = ts_samples["sweep.wav"] def testWavMono(self): "Test mono wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_mono.wav") + self.source = ts_samples["sweep_mono.wav"] self.expected_channels = 1 def testWav32k(self): "Test 32kHz wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_32000.wav") + self.source = ts_samples["sweep_32000.wav"] self.expected_samplerate = 32000 def testFlac(self): "Test flac decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.flac") + self.source = ts_samples["sweep.flac"] + self.expected_depth = 24 def testOgg(self): "Test ogg decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.ogg") + self.source = ts_samples["sweep.ogg"] + self.test_exact_duration = False self.expected_depth = 0 # ? def testMp3(self): "Test mp3 decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.mp3") + self.source = ts_samples["sweep.mp3"] self.expected_depth = 32 self.test_exact_duration = False - def tearDown(self): uri = get_uri(self.source) uri_info = get_media_uri_info(uri) @@ -103,9 +98,12 @@ class TestGetMediaInfo(unittest.TestCase): self.assertAlmostEqual(self.source_duration, uri_info['duration'], places=1) - self.assertEqual(self.expected_channels, uri_info['streams'][0]['channels']) - self.assertEqual(self.expected_samplerate, uri_info['streams'][0]['samplerate']) + self.assertEqual(self.expected_channels, + uri_info['streams'][0]['channels']) + self.assertEqual(self.expected_samplerate, + uri_info['streams'][0]['samplerate']) self.assertEqual(self.expected_depth, uri_info['streams'][0]['depth']) + if __name__ == '__main__': unittest.main(testRunner=TestRunner()) diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 5ce3f82..5bd0091 100755 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -6,6 +6,7 @@ from timeside.decoder.file import FileDecoder from timeside.core import ProcessPipe from unit_timeside import * +from timeside.tools.data_samples import samples as ts_samples import os.path @@ -31,20 +32,17 @@ class TestDecoding(unittest.TestCase): def testWav(self): "Test wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.wav") + self.source = ts_samples["sweep.wav"] def testWavMono(self): "Test mono wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_mono.wav") + self.source = ts_samples["sweep_mono.wav"] self.expected_channels = 1 def testWav32k(self): "Test 32kHz wav decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_32000.wav") + self.source = ts_samples["sweep_32000.wav"] expected_samplerate = 32000 ratio = expected_samplerate / self.expected_samplerate @@ -54,23 +52,20 @@ class TestDecoding(unittest.TestCase): def testFlac(self): "Test flac decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.flac") + self.source = ts_samples["sweep.flac"] self.expected_mime_type = 'audio/x-flac' def testOgg(self): "Test ogg decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.ogg") + self.source = ts_samples["sweep.ogg"] - self.expected_totalframes = 352832 + self.expected_totalframes = 352960 self.expected_mime_type = 'application/ogg' self.test_exact_duration = False def testMp3(self): "Test mp3 decoding" - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.mp3") + self.source = ts_samples["sweep.mp3"] self.expected_totalframes = 353664 self.expected_mime_type = 'audio/mpeg' @@ -125,8 +120,9 @@ class TestDecoding(unittest.TestCase): self.assertEqual(decoder.mime_type(), self.expected_mime_type) - expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32] # +32 to handle some issue with ogg - self.assertIn(totalframes, expected_totalframes) + #expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32] # +32 to handle some issue with ogg + + self.assertEqual(totalframes, self.expected_totalframes) input_duration = decoder.input_totalframes / decoder.input_samplerate output_duration = decoder.totalframes() / decoder.output_samplerate @@ -224,7 +220,7 @@ class TestDecodingSegmentDefaultDuration(TestDecodingSegment): def testMp3(self): "Test mp3 decoding" super(TestDecodingSegment, self).testMp3() - self.expected_totalframes = 310715 # was 308701 ? + self.expected_totalframes = 309565 # was 308701 ? class TestDecodingSegmentBadParameters(unittest.TestCase): @@ -285,7 +281,7 @@ class TestDecodingMonoUpsampling(TestDecoding): def testOgg(self): "Test ogg decoding" super(TestDecodingMonoUpsampling, self).testOgg() - self.expected_totalframes = 384000 + self.expected_totalframes = 384140 class TestDecodingMonoDownsampling(TestDecoding): @@ -304,7 +300,7 @@ class TestDecodingMonoDownsampling(TestDecoding): def testOgg(self): "Test ogg decoding" super(TestDecodingMonoDownsampling, self).testOgg() - self.expected_totalframes = 127980 + self.expected_totalframes = 128027 def testMp3(self): "Test mp3 decoding" @@ -328,7 +324,7 @@ class TestDecodingStereoDownsampling(TestDecoding): def testOgg(self): "Test ogg decoding" super(TestDecodingStereoDownsampling, self).testOgg() - self.expected_totalframes = 255992 + self.expected_totalframes = 256085 def testMp3(self): "Test mp3 decoding" @@ -352,7 +348,7 @@ class TestDecodingStereoUpsampling(TestDecoding): def testOgg(self): "Test ogg decoding" super(TestDecodingStereoUpsampling, self).testOgg() - self.expected_totalframes = 768000 + self.expected_totalframes = 768279 def testMp3(self): "Test mp3 decoding" diff --git a/tests/test_process_pipe.py b/tests/test_process_pipe.py index 468e689..cdaa182 100644 --- a/tests/test_process_pipe.py +++ b/tests/test_process_pipe.py @@ -6,7 +6,7 @@ from unit_timeside import unittest, TestRunner import timeside from timeside.decoder.file import FileDecoder -import os +from timeside.tools.data_samples import samples as ts_samples class TestProcessPipe(unittest.TestCase): @@ -16,8 +16,8 @@ class TestProcessPipe(unittest.TestCase): """Test process pipe (Quick and dirty)""" # TODO clean up and complete - source = os.path.join(os.path.dirname(__file__), - "samples", "guitar.wav") + source = ts_samples["sweep.wav"] + pipe = timeside.core.ProcessPipe() dec = FileDecoder(source) diff --git a/tests/test_transcoding.py b/tests/test_transcoding.py index 7602ba2..c3b1aa4 100755 --- a/tests/test_transcoding.py +++ b/tests/test_transcoding.py @@ -4,19 +4,18 @@ from __future__ import division from timeside.core import get_processor, ProcessPipe from timeside.decoder.file import FileDecoder -from timeside.component import * - -from unit_timeside import * +#from timeside.component import * +import os +from unit_timeside import unittest, TestRunner from tools import tmp_file_sink -import os.path +from timeside.tools.data_samples import samples as ts_samples class TestTranscodingFromWav(unittest.TestCase): "Test transcoding from wav" def setUp(self): - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.wav") + self.source = ts_samples["sweep.wav"] self.test_duration = True self.test_channels = True @@ -50,7 +49,7 @@ class TestTranscodingFromWav(unittest.TestCase): file_extension = '.' + encoder_cls.file_extension() self.target = tmp_file_sink(prefix=self.__class__.__name__, - suffix=file_extension) + suffix=file_extension) encoder = encoder_cls(self.target) (decoder | encoder).run() @@ -59,7 +58,6 @@ class TestTranscodingFromWav(unittest.TestCase): pipe = ProcessPipe(decoder_encoded) pipe.run() - import os os.unlink(self.target) #print decoder.channels(), decoder.samplerate(), written_frames @@ -78,13 +76,13 @@ class TestTranscodingFromWav(unittest.TestCase): decoder_encoded.input_duration, delta=0.2) + class TestTranscodingFromMonoWav(TestTranscodingFromWav): "Test transcoding from a mono wav" def setUp(self): super(TestTranscodingFromMonoWav, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_mono.wav") + self.source = ts_samples["sweep_mono.wav"] def testM4a(self): "Test conversion to m4a" @@ -97,8 +95,7 @@ class TestTranscodingFromAnotherWav(TestTranscodingFromMonoWav): def setUp(self): super(TestTranscodingFromAnotherWav, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/guitar.wav") # Mono + self.source = ts_samples["C4_scale.wav"] # Mono class TestTranscodingFromMp3(TestTranscodingFromWav): @@ -106,8 +103,7 @@ class TestTranscodingFromMp3(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromMp3, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.mp3") + self.source = ts_samples["sweep.mp3"] class TestTranscodingFromFlac(TestTranscodingFromWav): @@ -115,8 +111,7 @@ class TestTranscodingFromFlac(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromFlac, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.flac") + self.source = ts_samples["sweep.flac"] class TestTranscodingFromOgg(TestTranscodingFromWav): @@ -124,11 +119,7 @@ class TestTranscodingFromOgg(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromOgg, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.ogg") - - - + self.source = ts_samples["sweep.ogg"] class TestTranscodingFrom32kHzWav(TestTranscodingFromWav): @@ -136,8 +127,7 @@ class TestTranscodingFrom32kHzWav(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFrom32kHzWav, self).setUp() - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep_32000.wav") + self.source = ts_samples["sweep_32000.wav"] class TestTranscodingFromMissingFile(TestTranscodingFromWav): diff --git a/tests/test_transcoding_streaming.py b/tests/test_transcoding_streaming.py index 3aa38fe..b540444 100755 --- a/tests/test_transcoding_streaming.py +++ b/tests/test_transcoding_streaming.py @@ -8,17 +8,16 @@ from timeside.decoder.file import FileDecoder #from timeside.encoder import * #from timeside.component import * -from unit_timeside import * +from unit_timeside import unittest, TestRunner from tools import tmp_file_sink -import os.path +from timeside.tools.data_samples import samples as ts_samples class TestTranscodingStreaming(unittest.TestCase): "Test transcoding and streaming" def setUp(self): - self.source = os.path.join(os.path.dirname(__file__), - "samples/sweep.wav") + self.source = ts_samples["sweep.wav"] self.test_duration = True self.test_channels = True self.filesize_delta = None diff --git a/tests/tools.py b/tests/tools.py index 60cfb2d..b9522f6 100644 --- a/tests/tools.py +++ b/tests/tools.py @@ -1,30 +1,9 @@ -import os -import urllib +import tempfile -def check_samples(): - url = 'http://github.com/yomguy/timeside-samples/raw/master/samples/' - samples = ['guitar.wav', 'sweep.wav', 'sweep_mono.wav', 'sweep_32000.wav', 'sweep.flac', 'sweep.ogg', 'sweep.mp3', 'sweep_source.wav'] - path = os.path.normpath(os.path.dirname(__file__)) - dir = path + os.sep + 'samples' - - if not os.path.exists(dir): - os.makedirs(dir) - - for sample in samples: - path = dir + os.sep + sample - if not os.path.exists(path): - print 'downloading: ' + sample - f = open(path, 'w') - u = urllib.urlopen(url+sample) - f.write(u.read()) - f.close() - - -def tmp_file_sink(prefix=None, suffix = None): - import tempfile +def tmp_file_sink(prefix=None, suffix=None): tmpfile = tempfile.NamedTemporaryFile(delete=True, prefix=prefix, suffix=suffix) tmpfile.close() - return tmpfile.name \ No newline at end of file + return tmpfile.name diff --git a/tests/unit_timeside.py b/tests/unit_timeside.py index 0e5789d..2a985c5 100644 --- a/tests/unit_timeside.py +++ b/tests/unit_timeside.py @@ -4,9 +4,6 @@ import unittest import doctest import sys import time -from tools import check_samples - -check_samples() class _TextTestResult(unittest.TestResult): @@ -111,7 +108,6 @@ class TestRunner: self.stream = _WritelnDecorator(stream) self.descriptions = descriptions self.verbosity = verbosity - check_samples() def _makeResult(self): return _TextTestResult(self.stream, self.descriptions, self.verbosity) diff --git a/timeside/decoder/utils.py b/timeside/decoder/utils.py index 484b371..1cc4b10 100644 --- a/timeside/decoder/utils.py +++ b/timeside/decoder/utils.py @@ -181,9 +181,10 @@ def sha1sum_file(filename): ''' Return the secure hash digest with sha1 algorithm for a given file - >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP - >>> print sha1sum_file(wav_file) # doctest: +SKIP - #08301c3f9a8d60926f31e253825cc74263e52ad1 + >>> from timeside.tools.data_samples import samples as ts_samples + >>> wav_file = ts_samples["C4_scale.wav"] + >>> print sha1sum_file(wav_file) + a598e78d0b5c90da54a77e34c083abdcd38d42ba ''' import hashlib import io @@ -200,13 +201,9 @@ def sha1sum_file(filename): def sha1sum_url(url): '''Return the secure hash digest with sha1 algorithm for a given url - >>> url = 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav' + >>> url = "https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav" >>> print sha1sum_url(url) 08301c3f9a8d60926f31e253825cc74263e52ad1 - >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP - >>> uri = get_uri(wav_file) - >>> print sha1sum_url(uri) - 08301c3f9a8d60926f31e253825cc74263e52ad1 ''' import hashlib diff --git a/timeside/encoder/audiosink.py b/timeside/encoder/audiosink.py index 9f01d3e..8dd6d78 100644 --- a/timeside/encoder/audiosink.py +++ b/timeside/encoder/audiosink.py @@ -33,7 +33,8 @@ class AudioSink(GstEncoder): >>> import timeside - >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP + >>> from timeside.tools.data_samples import samples as ts_samples + >>> wav_file = ts_samples['C4_scale.wav'] >>> d = timeside.decoder.file.FileDecoder(wav_file) >>> e = timeside.encoder.audiosink.AudioSink() >>> (d|e).run() # doctest: +SKIP @@ -96,11 +97,7 @@ class AudioSink(GstEncoder): self.metadata = metadata -# Define global variables for use with doctest -DOCTEST_ALIAS = {'wav_file': - 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav'} - if __name__ == "__main__": import doctest - doctest.testmod(extraglobs=DOCTEST_ALIAS) + doctest.testmod() diff --git a/timeside/tools/data_samples.py b/timeside/tools/data_samples.py new file mode 100644 index 0000000..4c4c5cd --- /dev/null +++ b/timeside/tools/data_samples.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Created on Tue Oct 7 09:19:37 2014 + +@author: thomas +""" + +from __future__ import division + +import pygst +pygst.require("0.10") +import gobject +import gst +import numpy +import scipy.signal.waveforms +import os.path + +#import timeside + + +class NumpySrc: + def __init__(self, array, samplerate): + self.appsrc = gst.element_factory_make("appsrc") + self.pos = 0 + self.samplerate = samplerate + if array.ndim == 1: + array.resize((array.shape[0], 1)) + self.length, self.channels = array.shape + self.array = array.astype("float32") + self.per_sample = gst.SECOND // samplerate + self.fac = self.channels * array.dtype.itemsize + #self.appsrc.set_property("size", (self.length * self.channels * + # array.dtype.itemsize)) + self.appsrc.set_property("format", gst.FORMAT_TIME) + capstr = """audio/x-raw-float, + width=%d, + depth=%d, + rate=%d, + channels=%d, + endianness=(int)1234, + signed=true""" % (32,#self.array.dtype.itemsize*8, + 32,#self.array.dtype.itemsize*8, + self.samplerate, + self.channels) + print array.dtype.itemsize*8 + self.appsrc.set_property("caps", gst.caps_from_string(capstr)) + self.appsrc.set_property("stream-type", 0) # Seekable + self.appsrc.set_property('block', True) + + self.appsrc.connect("need-data", self.need_data) + self.appsrc.connect("seek-data", self.seek_data) + self.appsrc.connect("enough-data", self.enough_data) + + def need_data(self, element, length): + #length = length // 64 + if self.pos >= self.array.shape[0]: + element.emit("end-of-stream") + else: + avalaible_sample = self.length - self.pos + if avalaible_sample < length: + length = avalaible_sample + buf = gst.Buffer(self.array[self.pos:self.pos+length, :].data) + + buf.timestamp = self.pos * self.per_sample + buf.duration = int(length*self.per_sample) + element.emit("push-buffer", buf) + self.pos += length + + def seek_data(self, element, npos): + print 'seek %d' % npos + self.pos = npos // self.per_sample + return True + + def enough_data(self, element): + print "----------- enough data ---------------" + + +class SampleArray(object): + """Base Class for generating a data sample array""" + + def __init__(self, duration=10, samplerate=44100): + self.samplerate = int(samplerate) + self.num_samples = numpy.ceil(duration * self.samplerate) + self.array = NotImplemented + + @property + def time_samples(self): + return numpy.arange(0, self.num_samples) + + @property + def duration(self): + return self.num_samples / self.samplerate + + def __add__(self, other): + if not self.samplerate == other.samplerate: + raise ValueError("Samplerates mismatch") + + sum_ = SampleArray(samplerate=self.samplerate) + sum_.num_samples = self.num_samples + other.num_samples + sum_.array = numpy.vstack([self.array, other.array]) + return sum_ + + def __and__(self, other): + if not self.samplerate == other.samplerate: + raise ValueError("Samplerates mismatch") + if not self.num_samples == other.num_samples: + raise ValueError("Number of samples mismatch") + + and_ = SampleArray(samplerate=self.samplerate) + and_.num_samples = self.num_samples + and_.array = numpy.hstack([self.array, other.array]) + return and_ + + +class SineArray(SampleArray): + """Class for generating a Sine array""" + def __init__(self, frequency=440, duration=10, samplerate=44100, + channels=1): + super(SineArray, self).__init__(duration=duration, + samplerate=samplerate) + self.frequency = frequency + self.array = numpy.sin((2 * numpy.pi * self.frequency * + self.time_samples / self.samplerate)) + self.array.resize(self.num_samples, 1) + + +class SweepArray(SampleArray): + """Class for generating a Sweep array""" + def __init__(self, f0=20, f1=None, duration=10, samplerate=44100, + method='logarithmic'): + super(SweepArray, self).__init__(duration=duration, + samplerate=samplerate) + + self.f0 = f0 / samplerate + if f1 is None: + self.f1 = 0.5 + else: + self.f1 = f1 / samplerate + self.method = method + self.array = scipy.signal.waveforms.chirp(t=self.time_samples, + f0=self.f0, + t1=self.time_samples[-1], + f1=self.f1, + method=self.method) + self.array.resize(self.num_samples, 1) + + +class WhiteNoiseArray(SampleArray): + """Class for generating a white noise array""" + def __init__(self, duration=10, samplerate=44100): + super(WhiteNoiseArray, self).__init__(duration=duration, + samplerate=samplerate) + array = numpy.random.randn(self.num_samples, 1) + self.array = array / abs(array).max() + + +class SilenceArray(SampleArray): + """Class for generating a silence""" + def __init__(self, duration=10, samplerate=44100): + super(SilenceArray, self).__init__(duration=duration, + samplerate=samplerate) + + self.array = numpy.zeros((self.num_samples, 1)) + + +class gst_BuildSample(object): + def __init__(self, sample_array, output_file, gst_audio_encoder): + if not isinstance(sample_array, SampleArray): + raise ValueError("array must be a SampleArray subclass") + self.sample_array = sample_array + self.samplerate = self.sample_array.samplerate + self.output_file = output_file + if not isinstance(gst_audio_encoder, list): + gst_audio_encoder = [gst_audio_encoder] + self.gst_audio_encoder = gst_audio_encoder + + def run(self): + pipeline = gst.Pipeline("pipeline") + gobject.threads_init() + mainloop = gobject.MainLoop() + + numpy_src = NumpySrc(array=self.sample_array.array, + samplerate=self.samplerate) + + converter = gst.element_factory_make('audioconvert', 'converter') + + encoder_muxer = [] + for enc in self.gst_audio_encoder: + encoder_muxer.append(gst.element_factory_make(enc)) + + filesink = gst.element_factory_make('filesink', 'sink') + filesink.set_property('location', self.output_file) + + pipe_elements = [numpy_src.appsrc, converter] + pipe_elements.extend(encoder_muxer) + pipe_elements.append(filesink) + + pipeline.add(*pipe_elements) + gst.element_link_many(*pipe_elements) + + def _on_new_pad(self, source, pad, target_pad): + print 'on_new_pad' + if not pad.is_linked(): + if target_pad.is_linked(): + target_pad.get_peer().unlink(target_pad) + pad.link(target_pad) + + def on_eos(bus, msg): + pipeline.set_state(gst.STATE_NULL) + mainloop.quit() + + def on_error(bus, msg): + err, debug_info = msg.parse_error() + print ("Error received from element %s: %s" % (msg.src.get_name(), + err)) + print ("Debugging information: %s" % debug_info) + mainloop.quit() + + pipeline.set_state(gst.STATE_PLAYING) + bus = pipeline.get_bus() + bus.add_signal_watch() + bus.connect('message::eos', on_eos) + bus.connect("message::error", on_error) + + mainloop.run() + + +def generate_sample_file(filename, samples_dir, gst_audio_encoder, + sample_array, overwrite=False): + sample_file = os.path.join(samples_dir, filename) + + if overwrite or not os.path.exists(sample_file): + gst_builder = gst_BuildSample(sample_array=sample_array, + output_file=sample_file, + gst_audio_encoder=gst_audio_encoder) + gst_builder.run() + return sample_file + + +def generateSamples(overwrite=False): + from timeside import __file__ as ts_file + ts_path = os.path.split(os.path.abspath(ts_file))[0] + tests_dir = os.path.abspath(os.path.join(ts_path, '../tests')) + if os.path.isdir(tests_dir): + samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples')) + if not os.path.isdir(samples_dir): + os.makedirs(samples_dir) + else: + import tempfile + samples_dir = tempfile.mkdtemp(suffix="ts_samples") + + samples = dict() + + # --------- Sweeps --------- + # sweep 44100 mono wav + filename = 'sweep_mono.wav' + samplerate = 44100 + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_mono, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo wav + sweep_stereo = sweep_mono & sweep_mono + filename = 'sweep.wav' + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo mp3 + filename = 'sweep.mp3' + gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo flac + filename = 'sweep.flac' + gst_audio_encoder = 'flacenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo ogg + filename = 'sweep.ogg' + gst_audio_encoder = ['vorbisenc', 'oggmux'] + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 32000 stereo wav + samplerate = 32000 + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sweep_stereo = sweep_mono & sweep_mono + + filename = 'sweep_32000.wav' + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- Sines --------- + # sine at 440Hz, 44100 mono wav + filename = 'sine440Hz_mono.wav' + samplerate = 44100 + gst_audio_encoder = 'wavenc' + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_mono, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo wav + filename = 'sine440Hz.wav' + sweep_stereo = sweep_mono & sweep_mono + gst_audio_encoder = 'wavenc' + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo mp3 + filename = 'sine440Hz.mp3' + gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo ogg + filename = 'sine440Hz.ogg' + gst_audio_encoder = ['vorbisenc', 'oggmux'] + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- Equal tempered scale --------- + filename = 'C4_scale.wav' + samplerate = 44100 + f_C4 = 261.63 + f_D4 = 293.66 + f_E4 = 329.63 + f_F4 = 349.23 + f_G4 = 392.00 + f_A4 = 440.00 + f_B4 = 493.88 + f_C5 = 523.25 + sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4) + sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4) + sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4) + sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4) + sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4) + sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4) + sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4) + sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5) + + silence = SilenceArray(duration=0.2, samplerate=samplerate) + + scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence + + sineF4 + silence + sineG4 + silence + sineA4 + silence + + sineB4 + silence + sineC5) + + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=scale, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- White noise --------- + # white noise - 44100Hz mono + filename = 'white_noise_mono.wav' + samplerate = 44100 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # white noise - 44100Hz stereo + filename = 'white_noise.wav' + samplerate = 44100 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + noise_stereo = noise & noise + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # white noise - 32000Hz stereo + filename = 'white_noise_32000.wav' + samplerate = 32000 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + noise_stereo = noise & noise + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + return samples + + +samples = generateSamples() + + +if __name__ == '__main__': + generateSamples() -- 2.39.5