From: Thomas Fillon Date: Wed, 22 Oct 2014 13:55:28 +0000 (+0200) Subject: Update id for all encoders and decoders. Update documentation X-Git-Tag: 0.6~1^2~6^2~21 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=df0a604a4cea5cbcbd7479782e9141ce4bdf4816;p=timeside.git Update id for all encoders and decoders. Update documentation --- diff --git a/README.rst b/README.rst index ddcc64c..5570066 100644 --- a/README.rst +++ b/README.rst @@ -54,9 +54,9 @@ Processors IDecoder --------- - * FileDecoder [gst_dec] - * ArrayDecoder [array_dec] - * LiveDecoder [gst_live_dec] + * FileDecoder [file_decoder] + * ArrayDecoder [array_decoder] + * LiveDecoder [live_decoder] IAnalyzer --------- @@ -72,7 +72,7 @@ IAnalyzer * VampSimpleHost [vamp_simple_host] * IRITSpeechEntropy [irit_speech_entropy] * IRITSpeech4Hz [irit_speech_4hz] - * OnsetDetectionFunction [odf] + * OnsetDetectionFunction [onset_detection_function] * LimsiSad [limsi_sad] IValueAnalyzer @@ -92,21 +92,21 @@ IGrapher * SpectrogramLog [spectrogram_log] * SpectrogramLinear [spectrogram_lin] * Display.aubio_pitch.pitch [grapher_aubio_pitch] - * Display.odf [grapher_odf] + * Display.onset_detection_function [grapher_odf] * Display.waveform_analyzer [grapher_waveform] * Display.irit_speech_4hz.segments [grapher_irit_speech_4hz_segments] IEncoder --------- - * VorbisEncoder [gst_vorbis_enc] - * WavEncoder [gst_wav_enc] - * Mp3Encoder [gst_mp3_enc] - * FlacEncoder [gst_flac_enc] - * AacEncoder [gst_aac_enc] - * WebMEncoder [gst_webm_enc] - * OpusEncoder [gst_opus_enc] - * AudioSink [gst_audio_sink_enc] + * VorbisEncoder [vorbis_encoder] + * WavEncoder [wav_encoder] + * Mp3Encoder [mp3_encoder] + * FlacEncoder [flac_encoder] + * AacEncoder [aac_encoder] + * WebMEncoder [webm_encoder] + * OpusEncoder [opus_encoder] + * AudioSink [live_encoder] News ===== @@ -136,10 +136,10 @@ To list all available plugins:: Define some processors:: from timeside.core import get_processor - decoder = get_processor('gst_dec')('sweep.wav') + decoder = get_processor('file_decoder')('sweep.wav') grapher = get_processor('waveform_simple') analyzer = get_processor('level') - encoder = get_processor('gst_vorbis_enc')('sweep.ogg') + encoder = get_processor('vorbis_encoder')('sweep.ogg') Then run the *magic* pipeline:: diff --git a/doc/source/api/decoder/index.rst b/doc/source/api/decoder/index.rst index 5197bbe..8283527 100644 --- a/doc/source/api/decoder/index.rst +++ b/doc/source/api/decoder/index.rst @@ -10,7 +10,7 @@ Decoder package File Decoder -=========== +============ .. autoclass:: timeside.decoder.file.FileDecoder :members: @@ -26,7 +26,7 @@ Array Decoder :show-inheritance: Live Decoder -============= +============ .. autoclass:: timeside.decoder.live.LiveDecoder :members: diff --git a/doc/source/api/encoder/index.rst b/doc/source/api/encoder/index.rst index fbe4af3..d3909ae 100644 --- a/doc/source/api/encoder/index.rst +++ b/doc/source/api/encoder/index.rst @@ -56,7 +56,7 @@ WebM encoder AudioSink encoder ------------- +----------------- .. automodule:: timeside.encoder.audiosink :members: diff --git a/doc/source/dive_in.rst b/doc/source/dive_in.rst index 2fa010c..e913dd9 100644 --- a/doc/source/dive_in.rst +++ b/doc/source/dive_in.rst @@ -9,10 +9,10 @@ To list all available plugins:: Define some processors:: from timeside.core import get_processor - decoder = get_processor('gst_dec')('sweep.wav') + decoder = get_processor('file_decoder')('sweep.wav') grapher = get_processor('waveform_simple') analyzer = get_processor('level') - encoder = get_processor('gst_vorbis_enc')('sweep.ogg') + encoder = get_processor('vorbis_encoder')('sweep.ogg') Then run the *magic* pipeline:: diff --git a/doc/source/intro.rst b/doc/source/intro.rst index f5dabd4..98ff4b1 100644 --- a/doc/source/intro.rst +++ b/doc/source/intro.rst @@ -54,9 +54,9 @@ Processors IDecoder --------- - * FileDecoder [gst_dec] - * ArrayDecoder [array_dec] - * LiveDecoder [gst_live_dec] + * FileDecoder [file_decoder] + * ArrayDecoder [array_decoder] + * LiveDecoder [live_decoder] IAnalyzer --------- @@ -72,7 +72,7 @@ IAnalyzer * VampSimpleHost [vamp_simple_host] * IRITSpeechEntropy [irit_speech_entropy] * IRITSpeech4Hz [irit_speech_4hz] - * OnsetDetectionFunction [odf] + * OnsetDetectionFunction [onset_detection_function] * LimsiSad [limsi_sad] IValueAnalyzer @@ -92,19 +92,19 @@ IGrapher * SpectrogramLog [spectrogram_log] * SpectrogramLinear [spectrogram_lin] * Display.aubio_pitch.pitch [grapher_aubio_pitch] - * Display.odf [grapher_odf] + * Display.onset_detection_function [grapher_odf] * Display.waveform_analyzer [grapher_waveform] * Display.irit_speech_4hz.segments [grapher_irit_speech_4hz_segments] IEncoder --------- - * VorbisEncoder [gst_vorbis_enc] - * WavEncoder [gst_wav_enc] - * Mp3Encoder [gst_mp3_enc] - * FlacEncoder [gst_flac_enc] - * AacEncoder [gst_aac_enc] - * WebMEncoder [gst_webm_enc] - * OpusEncoder [gst_opus_enc] - * AudioSink [gst_audio_sink_enc] + * VorbisEncoder [vorbis_encoder] + * WavEncoder [wav_encoder] + * Mp3Encoder [mp3_encoder] + * FlacEncoder [flac_encoder] + * AacEncoder [aac_encoder] + * WebMEncoder [webm_encoder] + * OpusEncoder [opus_encoder] + * AudioSink [live_encoder] diff --git a/doc/source/tutorial/frames_stack.rst b/doc/source/tutorial/frames_stack.rst index 4d50a0d..c8272eb 100644 --- a/doc/source/tutorial/frames_stack.rst +++ b/doc/source/tutorial/frames_stack.rst @@ -11,8 +11,9 @@ First, let's import everything and define the audio file source : >>> import timeside >>> from timeside.core import get_processor +>>> from timeside.tools.test_samples import samples >>> import numpy as np ->>> audio_file = 'http://github.com/yomguy/timeside-samples/raw/master/samples/sweep.mp3' +>>> audio_file = samples['sweep.mp3'] Then let's setup a :class:`FileDecoder ` with argument `stack=True` (default argument is `stack=False`) : @@ -23,7 +24,7 @@ Setup an arbitrary analyzer to check that decoding process from file and from st >>> pitch = get_processor('aubio_pitch')() >>> pipe = (decoder | pitch) >>> print pipe.processors #doctest: +ELLIPSIS -[gst_dec-{}, aubio_pitch-{"blocksize_s": 0.0, "stepsize_s": 0.0}] +[file_decoder-{}, aubio_pitch-{"blocksize_s": 0.0, "stepsize_s": 0.0}] Run the pipe: diff --git a/doc/source/tutorial/quick_start.rst b/doc/source/tutorial/quick_start.rst index bbbb3ac..2ed3bea 100644 --- a/doc/source/tutorial/quick_start.rst +++ b/doc/source/tutorial/quick_start.rst @@ -29,9 +29,9 @@ A most basic operation, transcoding, is easily performed with two processors: .. doctest:: test_1 >>> import timeside - >>> from timeside.core import get_processor - >>> decoder = get_processor('gst_dec')('sweep.wav')# doctest: +SKIP - >>> encoder = get_processor('gst_vorbis_enc')('sweep.ogg') + >>> from timeside.core import get_processor + >>> decoder = get_processor('file_decoder')('sweep.wav')# doctest: +SKIP + >>> encoder = get_processor('vorbis_encoder')('sweep.ogg') >>> pipe = decoder | encoder >>> pipe.run() @@ -59,7 +59,7 @@ analysis and encoding: >>> from timeside.core import get_processor >>> decoder = get_processor('gst-dec')('sweep.wav') # doctest: +SKIP >>> levels = get_processor('level')() - >>> encoders = get_processor('gst_mp3_enc')('sweep.mp3') | get_processor('gst_flac_enc')('sweep.flac') + >>> encoders = get_processor('mp3_encoder')('sweep.mp3') | get_processor('flac_encoder')('sweep.flac') >>> (decoder | levels | encoders).run() >>> print levels.results - {'...': AnalyzerResult(id_metadata=IdMetadata(id='level.max', name='Level Max', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([ 0.]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={}), '...': AnalyzerResult(id_metadata=IdMetadata(id='level.rms', name='Level RMS', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([-2.995]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})} + {'...': AnalyzerResult(id_metadata=IdMetadata(id='level.max', name='Level Analyzer Max', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([ 0.]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={}), '...': AnalyzerResult(id_metadata=IdMetadata(id='level.rms', name='Level Analyzer RMS', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([-2.995]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})} diff --git a/tests/test_analyzer_dc.py b/tests/test_analyzer_dc.py index e916e02..7b84cf2 100755 --- a/tests/test_analyzer_dc.py +++ b/tests/test_analyzer_dc.py @@ -3,7 +3,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.analyzer.dc import MeanDCShift -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestAnalyzerDC(unittest.TestCase): @@ -13,13 +13,13 @@ class TestAnalyzerDC(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] self.expected = {'mean_dc_shift': 0.004} def testOnScale(self): "runs on C4 Scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] self.expected = {'mean_dc_shift': 0.034} def tearDown(self): diff --git a/tests/test_analyzer_level.py b/tests/test_analyzer_level.py index 29ffef6..bd6efa1 100755 --- a/tests/test_analyzer_level.py +++ b/tests/test_analyzer_level.py @@ -3,7 +3,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.analyzer.level import Level -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestAnalyzerLevel(unittest.TestCase): @@ -13,7 +13,7 @@ class TestAnalyzerLevel(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] max_level_value = 0 rms_level_value = -2.995 @@ -23,7 +23,7 @@ class TestAnalyzerLevel(unittest.TestCase): def testOnC4_Scale(self): "runs on C4 scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] max_level_value = 0 rms_level_value = -3.705 diff --git a/tests/test_analyzers_default.py b/tests/test_analyzers_default.py index a2c50ea..ae30461 100755 --- a/tests/test_analyzers_default.py +++ b/tests/test_analyzers_default.py @@ -7,14 +7,14 @@ from unit_timeside import unittest, TestRunner import timeside from timeside.decoder.file import FileDecoder import numpy as np -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestAnalyzers_with_default(unittest.TestCase): """Test analyzer with default parameters""" def setUp(self): - source = ts_samples["C4_scale.wav"] + source = samples["C4_scale.wav"] self.decoder = FileDecoder(source) diff --git a/tests/test_analyzers_stress.py b/tests/test_analyzers_stress.py index 9669643..1482832 100755 --- a/tests/test_analyzers_stress.py +++ b/tests/test_analyzers_stress.py @@ -16,7 +16,7 @@ class TestAnalyzers_with_zeros(unittest.TestCase): samplerate = 16000 # LimsiSad require Fs = 16000 Hz duration = 10 samples = np.zeros((duration * samplerate, 1)) - decoder_cls = timeside.core.get_processor('array_dec') + decoder_cls = timeside.core.get_processor('array_decoder') self.decoder = decoder_cls(samples, samplerate=samplerate) def _perform_test(self, analyzer_cls): @@ -44,7 +44,7 @@ class TestAnalyzers_withDC(TestAnalyzers_with_zeros): samplerate = 16000 # LimsiSad require Fs = 16000 Hz duration = 10 samples = -1000*np.ones((duration * samplerate, 1)) - decoder_cls = timeside.core.get_processor('array_dec') + decoder_cls = timeside.core.get_processor('array_decoder') self.decoder = decoder_cls(samples, samplerate=samplerate) diff --git a/tests/test_aubio_melenergy.py b/tests/test_aubio_melenergy.py index 730e26f..33f60c0 100755 --- a/tests/test_aubio_melenergy.py +++ b/tests/test_aubio_melenergy.py @@ -4,7 +4,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioMelEnergy(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testOnGuitar(self): "runs on C4_scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_mfcc.py b/tests/test_aubio_mfcc.py index f225f10..88e3733 100755 --- a/tests/test_aubio_mfcc.py +++ b/tests/test_aubio_mfcc.py @@ -4,7 +4,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioMfcc(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testOnScale(self): "runs on C4 scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_pitch.py b/tests/test_aubio_pitch.py index 2eee6a4..9caeda7 100755 --- a/tests/test_aubio_pitch.py +++ b/tests/test_aubio_pitch.py @@ -4,7 +4,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioPitch(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testOnC4Scale(self): "runs on C4 scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_specdesc.py b/tests/test_aubio_specdesc.py index c92e685..986eea7 100755 --- a/tests/test_aubio_specdesc.py +++ b/tests/test_aubio_specdesc.py @@ -4,7 +4,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioSpecdesc(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testOnC4Scale(self): "runs on C4 scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_aubio_temporal.py b/tests/test_aubio_temporal.py index 209808e..101c269 100755 --- a/tests/test_aubio_temporal.py +++ b/tests/test_aubio_temporal.py @@ -4,7 +4,7 @@ from unit_timeside import unittest, TestRunner from timeside.decoder.file import FileDecoder from timeside.core import get_processor from timeside import _WITH_AUBIO -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples @unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available') @@ -15,11 +15,11 @@ class TestAubioTemporal(unittest.TestCase): def testOnSweep(self): "runs on sweep" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testOnC4Scale(self): "runs on C4 scale" - self.source = ts_samples["C4_scale.wav"] + self.source = samples["C4_scale.wav"] def tearDown(self): decoder = FileDecoder(self.source) diff --git a/tests/test_decoder_utils.py b/tests/test_decoder_utils.py index 3d6de02..7a05fcd 100755 --- a/tests/test_decoder_utils.py +++ b/tests/test_decoder_utils.py @@ -6,7 +6,7 @@ from __future__ import division from unit_timeside import unittest, TestRunner from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples import os.path @@ -59,33 +59,33 @@ class TestGetMediaInfo(unittest.TestCase): def testWav(self): "Test wav decoding" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testWavMono(self): "Test mono wav decoding" - self.source = ts_samples["sweep_mono.wav"] + self.source = samples["sweep_mono.wav"] self.expected_channels = 1 def testWav32k(self): "Test 32kHz wav decoding" - self.source = ts_samples["sweep_32000.wav"] + self.source = samples["sweep_32000.wav"] self.expected_samplerate = 32000 def testFlac(self): "Test flac decoding" - self.source = ts_samples["sweep.flac"] + self.source = samples["sweep.flac"] self.expected_depth = 24 def testOgg(self): "Test ogg decoding" - self.source = ts_samples["sweep.ogg"] + self.source = samples["sweep.ogg"] self.test_exact_duration = False self.expected_depth = 0 # ? def testMp3(self): "Test mp3 decoding" - self.source = ts_samples["sweep.mp3"] + self.source = samples["sweep.mp3"] self.expected_depth = 32 self.test_exact_duration = False diff --git a/tests/test_decoding.py b/tests/test_decoding.py index fce5429..a0f97ff 100755 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -6,7 +6,7 @@ from timeside.decoder.file import FileDecoder from timeside.core import ProcessPipe from unit_timeside import unittest, TestRunner -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples import os.path @@ -31,35 +31,35 @@ class TestDecoding(unittest.TestCase): def testWav(self): "Test wav decoding" - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] def testWavMono(self): "Test mono wav decoding" - self.source = ts_samples["sweep_mono.wav"] + self.source = samples["sweep_mono.wav"] self.expected_channels = 1 def testWav32k(self): "Test 32kHz wav decoding" - self.source = ts_samples["sweep_32000.wav"] + self.source = samples["sweep_32000.wav"] self.expected_samplerate = 32000 def testFlac(self): "Test flac decoding" - self.source = ts_samples["sweep.flac"] + self.source = samples["sweep.flac"] self.expected_mime_type = 'audio/x-flac' def testOgg(self): "Test ogg decoding" - self.source = ts_samples["sweep.ogg"] + self.source = samples["sweep.ogg"] self.expected_mime_type = 'application/ogg' self.test_exact_duration = False def testMp3(self): "Test mp3 decoding" - self.source = ts_samples["sweep.mp3"] + self.source = samples["sweep.mp3"] self.expected_mime_type = 'audio/mpeg' self.test_exact_duration = False diff --git a/tests/test_graphers_render_analyzers.py b/tests/test_graphers_render_analyzers.py index 1819f64..d047a84 100755 --- a/tests/test_graphers_render_analyzers.py +++ b/tests/test_graphers_render_analyzers.py @@ -20,7 +20,7 @@ class Test_graphers_analyzers(unittest.TestCase): t = np.arange(0, duration, 1/samplerate) samples = chirp(t=t, f0=20, t1=t[-1], f1=samplerate/2, method='logarithmic') - decoder_cls = timeside.core.get_processor('array_dec') + decoder_cls = timeside.core.get_processor('array_decoder') self.decoder = decoder_cls(samples, samplerate=samplerate) def _perform_test(self, grapher_cls): diff --git a/tests/test_process_pipe.py b/tests/test_process_pipe.py old mode 100644 new mode 100755 index cdaa182..b503efb --- a/tests/test_process_pipe.py +++ b/tests/test_process_pipe.py @@ -6,7 +6,7 @@ from unit_timeside import unittest, TestRunner import timeside from timeside.decoder.file import FileDecoder -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestProcessPipe(unittest.TestCase): @@ -16,8 +16,7 @@ class TestProcessPipe(unittest.TestCase): """Test process pipe (Quick and dirty)""" # TODO clean up and complete - source = ts_samples["sweep.wav"] - + source = samples["sweep.wav"] pipe = timeside.core.ProcessPipe() dec = FileDecoder(source) @@ -31,7 +30,11 @@ class TestProcessPipe(unittest.TestCase): a2 = timeside.analyzer.spectrogram.Spectrogram() pipe2 = (dec | a | a2 | abis) + self.assertEqual(len(pipe2.processors), 4) + # Release temporary buffers in Spectrogram + for proc in pipe2.processors: + proc.release() #pipe2.draw_graph() if __name__ == '__main__': diff --git a/tests/test_transcoding.py b/tests/test_transcoding.py index c3b1aa4..75d9b37 100755 --- a/tests/test_transcoding.py +++ b/tests/test_transcoding.py @@ -8,38 +8,38 @@ from timeside.decoder.file import FileDecoder import os from unit_timeside import unittest, TestRunner from tools import tmp_file_sink -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestTranscodingFromWav(unittest.TestCase): "Test transcoding from wav" def setUp(self): - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] self.test_duration = True self.test_channels = True def testWav(self): "Test conversion to wav" - self.encoder_id = 'gst_wav_enc' + self.encoder_id = 'wav_encoder' def testMp3(self): "Test conversion to mp3" - self.encoder_id = 'gst_mp3_enc' + self.encoder_id = 'mp3_encoder' def testOgg(self): "Test conversion to ogg" - self.encoder_id = 'gst_vorbis_enc' + self.encoder_id = 'vorbis_encoder' def testWebM(self): "Test conversion to webm" - self.encoder_id = 'gst_webm_enc' + self.encoder_id = 'webm_encoder' self.test_duration = False # webmmux encoder with streamable=true # does not return a valid duration def testM4a(self): "Test conversion to m4a" - self.encoder_id = 'gst_aac_enc' + self.encoder_id = 'aac_encoder' def tearDown(self): decoder = FileDecoder(self.source) @@ -82,7 +82,7 @@ class TestTranscodingFromMonoWav(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromMonoWav, self).setUp() - self.source = ts_samples["sweep_mono.wav"] + self.source = samples["sweep_mono.wav"] def testM4a(self): "Test conversion to m4a" @@ -95,7 +95,7 @@ class TestTranscodingFromAnotherWav(TestTranscodingFromMonoWav): def setUp(self): super(TestTranscodingFromAnotherWav, self).setUp() - self.source = ts_samples["C4_scale.wav"] # Mono + self.source = samples["C4_scale.wav"] # Mono class TestTranscodingFromMp3(TestTranscodingFromWav): @@ -103,7 +103,7 @@ class TestTranscodingFromMp3(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromMp3, self).setUp() - self.source = ts_samples["sweep.mp3"] + self.source = samples["sweep.mp3"] class TestTranscodingFromFlac(TestTranscodingFromWav): @@ -111,7 +111,7 @@ class TestTranscodingFromFlac(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromFlac, self).setUp() - self.source = ts_samples["sweep.flac"] + self.source = samples["sweep.flac"] class TestTranscodingFromOgg(TestTranscodingFromWav): @@ -119,7 +119,7 @@ class TestTranscodingFromOgg(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFromOgg, self).setUp() - self.source = ts_samples["sweep.ogg"] + self.source = samples["sweep.ogg"] class TestTranscodingFrom32kHzWav(TestTranscodingFromWav): @@ -127,7 +127,7 @@ class TestTranscodingFrom32kHzWav(TestTranscodingFromWav): def setUp(self): super(TestTranscodingFrom32kHzWav, self).setUp() - self.source = ts_samples["sweep_32000.wav"] + self.source = samples["sweep_32000.wav"] class TestTranscodingFromMissingFile(TestTranscodingFromWav): diff --git a/tests/test_transcoding_streaming.py b/tests/test_transcoding_streaming.py index b540444..10a2eed 100755 --- a/tests/test_transcoding_streaming.py +++ b/tests/test_transcoding_streaming.py @@ -10,14 +10,14 @@ from timeside.decoder.file import FileDecoder from unit_timeside import unittest, TestRunner from tools import tmp_file_sink -from timeside.tools.data_samples import samples as ts_samples +from timeside.tools.test_samples import samples class TestTranscodingStreaming(unittest.TestCase): "Test transcoding and streaming" def setUp(self): - self.source = ts_samples["sweep.wav"] + self.source = samples["sweep.wav"] self.test_duration = True self.test_channels = True self.filesize_delta = None @@ -25,21 +25,21 @@ class TestTranscodingStreaming(unittest.TestCase): def testMp3(self): "Test conversion to mp3" - self.encoder_id = 'gst_mp3_enc' + self.encoder_id = 'mp3_encoder' self.filesize_delta = 156 def testOgg(self): "Test conversion to ogg" - self.encoder_id = 'gst_vorbis_enc' + self.encoder_id = 'vorbis_encoder' def testOpus(self): "Test conversion to opus" - self.encoder_id = 'gst_opus_enc' + self.encoder_id = 'opus_encoder' self.expected_sample_rate = 48000 def testWebM(self): "Test conversion to webm" - self.encoder_id = 'gst_webm_enc' + self.encoder_id = 'webm_encoder' self.test_duration = False # webmmux encoder with streamable=true # does not return a valid duration diff --git a/timeside/analyzer/dc.py b/timeside/analyzer/dc.py index 1c52391..a1dfb45 100644 --- a/timeside/analyzer/dc.py +++ b/timeside/analyzer/dc.py @@ -47,7 +47,7 @@ class MeanDCShift(Analyzer): @staticmethod @interfacedoc def name(): - return "Mean DC shift" + return "Mean DC shift Analyzer" @staticmethod @interfacedoc diff --git a/timeside/analyzer/externals/yaafe.py b/timeside/analyzer/externals/yaafe.py index 8708470..26d399f 100644 --- a/timeside/analyzer/externals/yaafe.py +++ b/timeside/analyzer/externals/yaafe.py @@ -46,10 +46,10 @@ class Yaafe(Analyzer): Examples -------- >>> import timeside - >>> from timeside.tools.data_samples import samples as ts_samples + >>> from timeside.tools.test_samples import samples >>> from timeside.core import get_processor - >>> source = ts_samples['C4_scale.wav'] - >>> FileDecoder = get_processor('gst_dec') + >>> source = samples['C4_scale.wav'] + >>> FileDecoder = get_processor('file_decoder') >>> YaafeAnalyzer = get_processor('yaafe') >>> # feature extraction defition >>> feature_plan = ['mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256', diff --git a/timeside/analyzer/level.py b/timeside/analyzer/level.py index edf4775..4efbb0a 100644 --- a/timeside/analyzer/level.py +++ b/timeside/analyzer/level.py @@ -29,7 +29,26 @@ from timeside.analyzer.utils import MACHINE_EPSILON class Level(Analyzer): - """RMS level analyzer + """Audio level analyzer + + Examples + -------- + + >>> import timeside + >>> from timeside.core import get_processor + >>> from timeside.tools.test_samples import samples + >>> source = samples['sweep.mp3'] + >>> decoder = get_processor('file_decoder')(uri=source) + >>> level = get_processor('level')() + >>> (decoder | level).run() + >>> level.results.keys() + ['level.max', 'level.rms'] + >>> max = level.results['level.max'] + >>> print max.data + [ 0.] + >>> rms = level.results['level.rms'] + >>> print rms.data + [-3.263] """ implements(IValueAnalyzer) @@ -50,7 +69,7 @@ class Level(Analyzer): @staticmethod @interfacedoc def name(): - return "Level" + return "Level Analyzer" @staticmethod @interfacedoc @@ -97,4 +116,5 @@ class Level(Analyzer): if __name__ == "__main__": import doctest - print doctest.testmod(verbose=True) + import timeside + doctest.testmod(timeside.analyzer.level) diff --git a/timeside/analyzer/odf.py b/timeside/analyzer/odf.py index bbfc931..e825423 100644 --- a/timeside/analyzer/odf.py +++ b/timeside/analyzer/odf.py @@ -61,7 +61,7 @@ class OnsetDetectionFunction(Analyzer): @staticmethod @interfacedoc def id(): - return "odf" + return "onset_detection_function" @staticmethod @interfacedoc diff --git a/timeside/analyzer/spectrogram.py b/timeside/analyzer/spectrogram.py index ce7e8fe..34c9c75 100644 --- a/timeside/analyzer/spectrogram.py +++ b/timeside/analyzer/spectrogram.py @@ -48,9 +48,9 @@ class Spectrogram(Analyzer): -------- >>> import timeside >>> from timeside.core import get_processor - >>> from timeside.tools.data_samples import samples as ts_samples - >>> audio_source = ts_samples['sweep.wav'] - >>> decoder = get_processor('gst_dec')(uri=audio_source) + >>> from timeside.tools.test_samples import samples + >>> audio_source = samples['sweep.wav'] + >>> decoder = get_processor('file_decoder')(uri=audio_source) >>> spectrogram = get_processor('spectrogram_analyzer')(input_blocksize=2048, input_stepsize=1024) >>> pipe = (decoder | spectrogram) >>> pipe.run() @@ -64,9 +64,9 @@ class Spectrogram(Analyzer): import timeside from timeside.core import get_processor - from timeside.tools.data_samples import samples as ts_samples - audio_source = ts_samples['sweep.wav'] - decoder = get_processor('gst_dec')(uri=audio_source) + from timeside.tools.test_samples import samples + audio_source = samples['sweep.wav'] + decoder = get_processor('file_decoder')(uri=audio_source) spectrogram = get_processor('spectrogram_analyzer')(input_blocksize=2048, input_stepsize=1024) pipe = (decoder | spectrogram) diff --git a/timeside/core.py b/timeside/core.py index b68146d..b981f67 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -445,7 +445,7 @@ class ProcessPipe(object): eod = False - if source.id() == 'gst_live_dec': + if source.id() == 'live_decoder': # Set handler for Interruption signal import signal @@ -459,7 +459,7 @@ class ProcessPipe(object): for item in items: frames, eod = item.process(frames, eod) - if source.id() == 'gst_live_dec': + if source.id() == 'live_decoder': # Restore default handler for Interruption signal signal.signal(signal.SIGINT, signal.SIG_DFL) diff --git a/timeside/decoder/array.py b/timeside/decoder/array.py index cbe0d8f..9e54420 100644 --- a/timeside/decoder/array.py +++ b/timeside/decoder/array.py @@ -43,7 +43,7 @@ class ArrayDecoder(Decoder): @staticmethod @interfacedoc def id(): - return "array_dec" + return "array_decoder" def __init__(self, samples, samplerate=44100, start=0, duration=None): ''' diff --git a/timeside/decoder/file.py b/timeside/decoder/file.py index 9802a50..4063bea 100644 --- a/timeside/decoder/file.py +++ b/timeside/decoder/file.py @@ -1,10 +1,12 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# Copyright (c) 2007-2013 Parisson +# Copyright (c) 2007-2014 Parisson # Copyright (c) 2007 Olivier Guilyardi # Copyright (c) 2007-2013 Guillaume Pellerin # Copyright (c) 2010-2013 Paul Brossier +# Copyright (c) 2013-2014 Thomas fillon + # # This file is part of TimeSide. @@ -45,8 +47,35 @@ import numpy as np class FileDecoder(Decoder): - - """ gstreamer-based decoder """ + """ File Decoder based on Gstreamer + + Parameters + ---------- + uri : str + uri of the media + start : float, optional + start time of the segment in seconds + duration : float, optional + duration of the segment in seconds + stack : boolean, optional + keep decoded data in the stack + sha1 : boolean, optional + compute the sha1 hash of the data + + + Examples + -------- + >>> import timeside + >>> from timeside.core import get_processor + >>> from timeside.tools.test_samples import samples + >>> audio_source = samples['sweep.wav'] + >>> FileDecoder = get_processor('file_decoder') # Get the decoder class + >>> # Use decoder with default parameters + >>> decoder = FileDecoder(uri=audio_source) + >>> analyzer = get_processor('level')() # Pick a arbitrary analyzer + >>> pipe =(decoder | analyzer) + >>> pipe.run() # Run the pipe for the given audio source +""" implements(IDecoder) output_blocksize = 8 * 1024 @@ -59,26 +88,9 @@ class FileDecoder(Decoder): @staticmethod @interfacedoc def id(): - return "gst_dec" + return "file_decoder" def __init__(self, uri, start=0, duration=None, stack=False, sha1=None): - """ - Construct a new FileDecoder - - Parameters - ---------- - uri : str - uri of the media - start : float - start time of the segment in seconds - duration : float - duration of the segment in seconds - stack : boolean - keep decoded data in the stack - sha1 : boolean - compute the sha1 hash of the data - - """ super(FileDecoder, self).__init__(start=start, duration=duration) @@ -361,4 +373,5 @@ class FileDecoder(Decoder): if __name__ == "__main__": import doctest - doctest.testmod() + import timeside + doctest.testmod(timeside.decoder.file) diff --git a/timeside/decoder/live.py b/timeside/decoder/live.py index 6f422dd..0f3558b 100644 --- a/timeside/decoder/live.py +++ b/timeside/decoder/live.py @@ -30,7 +30,7 @@ from __future__ import division from timeside.decoder.core import Decoder, IDecoder, interfacedoc, implements from timeside.tools.gstutils import MainloopThread, gobject -from . file import FileDecoder +from timeside.decoder.file import FileDecoder import Queue import threading @@ -41,7 +41,42 @@ QUEUE_SIZE = 10 class LiveDecoder(FileDecoder): - """ gstreamer-based decoder from live source""" + """Live source Decoder based on Gstreamer + capturing audio from alsasrc + + Construct a new LiveDecoder capturing audio from alsasrc + + Parameters + ---------- + num_buffers : int, optional + Number of buffers to output before sending End Of Stream signal + (-1 = unlimited). + (Allowed values: >= -1, Default value: -1) + + input_src : str, optional + Gstreamer source element + default to 'alsasrc' + possible values : 'autoaudiosrc', 'alsasrc', 'osssrc' + + Examples + -------- + + >>> import timeside + + >>> from timeside.core import get_processor + >>> live_decoder = get_processor('live_decoder')(num_buffers=5) + >>> waveform = get_processor('waveform_analyzer')() + >>> mp3_encoder = timeside.encoder.mp3.Mp3Encoder('/tmp/test_live.mp3', + ... overwrite=True) + >>> pipe = (live_decoder | waveform | mp3_encoder) + >>> pipe.run() + >>> # Show the audio as captured by the decoder + >>> import matplotlib.pyplot as plt # doctest: +SKIP + >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP + a.results['waveform_analyzer'].data) # doctest: +SKIP + >>> plt.show() # doctest: +SKIP + + """ implements(IDecoder) # IProcessor methods @@ -49,39 +84,9 @@ class LiveDecoder(FileDecoder): @staticmethod @interfacedoc def id(): - return "gst_live_dec" - - def __init__(self, num_buffers=-1, input_src='autoaudiosrc'): - """ - Construct a new LiveDecoder capturing audio from alsasrc - - Parameters - ---------- - num_buffers : - Number of buffers to output before sending EOS (-1 = unlimited). - (Allowed values: >= -1, Default value: -1) - + return "live_decoder" - Examples - -------- - - >>> import timeside - - >>> from timeside.core import get_processor - >>> live = timeside.decoder.live.LiveDecoder(num_buffers=5) - >>> a = get_processor('waveform_analyzer')() - >>> e = timeside.encoder.mp3.Mp3Encoder('/tmp/test_live.mp3', - ... overwrite=True) - >>> pipe = (live | a | e) - >>> pipe.run() # doctest: +SKIP - >>> pipe.run() # doctest: +SKIP - - >>> import matplotlib.pyplot as plt # doctest: +SKIP - >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP - a.results['waveform_analyzer'].data) # doctest: +SKIP - >>> plt.show() # doctest: +SKIP - - """ + def __init__(self, num_buffers=-1, input_src='alsasrc'): super(Decoder, self).__init__() self.num_buffers = num_buffers @@ -187,3 +192,8 @@ class LiveDecoder(FileDecoder): pass # IDecoder methods + +if __name__ == "__main__": + import doctest + import timeside + doctest.testmod(timeside.decoder.live, verbose=True) diff --git a/timeside/decoder/utils.py b/timeside/decoder/utils.py index 1cc4b10..b021f13 100644 --- a/timeside/decoder/utils.py +++ b/timeside/decoder/utils.py @@ -181,8 +181,8 @@ def sha1sum_file(filename): ''' Return the secure hash digest with sha1 algorithm for a given file - >>> from timeside.tools.data_samples import samples as ts_samples - >>> wav_file = ts_samples["C4_scale.wav"] + >>> from timeside.tools.test_samples import samples + >>> wav_file = samples["C4_scale.wav"] >>> print sha1sum_file(wav_file) a598e78d0b5c90da54a77e34c083abdcd38d42ba ''' diff --git a/timeside/encoder/audiosink.py b/timeside/encoder/audiosink.py index b8fa80f..f8ddcae 100644 --- a/timeside/encoder/audiosink.py +++ b/timeside/encoder/audiosink.py @@ -31,13 +31,20 @@ class AudioSink(GstEncoder): This encoder plays the decoded audio stream to the sound card + Parameters + ---------- + output_sink : str, optional + Gstreamer sink element + Default to 'autoaudiosink' + Possible values : 'autoaudiosink', 'alsasink', 'osssink' >>> import timeside - >>> from timeside.tools.data_samples import samples as ts_samples - >>> wav_file = ts_samples['C4_scale.wav'] - >>> d = timeside.decoder.file.FileDecoder(wav_file) - >>> e = timeside.encoder.audiosink.AudioSink() - >>> (d|e).run() # doctest: +SKIP + >>> from timeside.core import get_processor + >>> from timeside.tools.test_samples import samples + >>> wav_file = samples['sine440Hz_mono_1s.wav'] + >>> d = get_processor('file_decoder')(wav_file) + >>> e = get_processor('live_encoder')() + >>> (d|e).run() """ implements(IEncoder) @@ -70,7 +77,7 @@ class AudioSink(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_audio_sink_enc" + return "live_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/flac.py b/timeside/encoder/flac.py index ac51143..f414a8b 100644 --- a/timeside/encoder/flac.py +++ b/timeside/encoder/flac.py @@ -25,12 +25,12 @@ from timeside.api import IEncoder class FlacEncoder(GstEncoder): - - """ gstreamer-based FLAC encoder """ + """FLAC encoder based on Gstreamer""" implements(IEncoder) @interfacedoc - def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + def setup(self, channels=None, samplerate=None, blocksize=None, + totalframes=None): super(FlacEncoder, self).setup( channels, samplerate, blocksize, totalframes) @@ -53,7 +53,7 @@ class FlacEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_flac_enc" + return "flac_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/m4a.py b/timeside/encoder/m4a.py index 66c1ac9..6e6af9a 100644 --- a/timeside/encoder/m4a.py +++ b/timeside/encoder/m4a.py @@ -62,7 +62,7 @@ class AacEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_aac_enc" + return "aac_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/mp3.py b/timeside/encoder/mp3.py index 5ff28f6..4897148 100644 --- a/timeside/encoder/mp3.py +++ b/timeside/encoder/mp3.py @@ -64,7 +64,7 @@ class Mp3Encoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_mp3_enc" + return "mp3_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/ogg.py b/timeside/encoder/ogg.py index da0aef0..230eb9d 100644 --- a/timeside/encoder/ogg.py +++ b/timeside/encoder/ogg.py @@ -25,12 +25,12 @@ from timeside.api import IEncoder class VorbisEncoder(GstEncoder): - - """ gstreamer-based OGG Vorbis encoder """ + """OGG Vorbis encoder based on Gstreamer""" implements(IEncoder) @interfacedoc - def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + def setup(self, channels=None, samplerate=None, blocksize=None, + totalframes=None): super(VorbisEncoder, self).setup( channels, samplerate, blocksize, totalframes) self.pipe = ''' appsrc name=src @@ -55,7 +55,7 @@ class VorbisEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_vorbis_enc" + return "vorbis_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/opus.py b/timeside/encoder/opus.py index 301e0b2..1650f78 100644 --- a/timeside/encoder/opus.py +++ b/timeside/encoder/opus.py @@ -63,7 +63,7 @@ class OpusEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_opus_enc" + return "opus_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/wav.py b/timeside/encoder/wav.py index 03b77c5..833876f 100644 --- a/timeside/encoder/wav.py +++ b/timeside/encoder/wav.py @@ -55,7 +55,7 @@ class WavEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_wav_enc" + return "wav_encoder" @staticmethod @interfacedoc diff --git a/timeside/encoder/webm.py b/timeside/encoder/webm.py index 1952324..5b316cd 100644 --- a/timeside/encoder/webm.py +++ b/timeside/encoder/webm.py @@ -25,8 +25,7 @@ from timeside.api import IEncoder class WebMEncoder(GstEncoder): - - """ gstreamer-based WebM encoder """ + """WebM encoder based on Gstreamer `webmmux` muxer""" implements(IEncoder) def __init__(self, output, streaming=False, overwrite=False, video=False): @@ -66,7 +65,7 @@ class WebMEncoder(GstEncoder): @staticmethod @interfacedoc def id(): - return "gst_webm_enc" + return "webm_encoder" @staticmethod @interfacedoc diff --git a/timeside/grapher/render_analyzers.py b/timeside/grapher/render_analyzers.py index 9adb1e4..19fa7ab 100644 --- a/timeside/grapher/render_analyzers.py +++ b/timeside/grapher/render_analyzers.py @@ -144,11 +144,11 @@ except PIDError: pass # Onset Detection Function -odf = get_processor('odf') +odf = get_processor('onset_detection_function') DisplayOnsetDetectionFunction = DisplayAnalyzer.create( analyzer=odf, - result_id='odf', - grapher_id='grapher_odf', + result_id='onset_detection_function', + grapher_id='grapher_onset_detection_function', grapher_name='Onset detection function') # Waveform diff --git a/timeside/tools/data_samples.py b/timeside/tools/data_samples.py deleted file mode 100644 index e709373..0000000 --- a/timeside/tools/data_samples.py +++ /dev/null @@ -1,441 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Created on Tue Oct 7 09:19:37 2014 - -@author: thomas -""" - -from __future__ import division - -import pygst -pygst.require("0.10") -import gobject -import gst -import numpy -import scipy.signal.waveforms -import os.path - -#import timeside - - -class NumpySrc: - def __init__(self, array, samplerate): - self.appsrc = gst.element_factory_make("appsrc") - self.pos = 0 - self.samplerate = samplerate - if array.ndim == 1: - array.resize((array.shape[0], 1)) - self.length, self.channels = array.shape - self.array = array.astype("float32") - self.per_sample = gst.SECOND // samplerate - self.fac = self.channels * array.dtype.itemsize - #self.appsrc.set_property("size", (self.length * self.channels * - # array.dtype.itemsize)) - self.appsrc.set_property("format", gst.FORMAT_TIME) - capstr = """audio/x-raw-float, - width=%d, - depth=%d, - rate=%d, - channels=%d, - endianness=(int)1234, - signed=true""" % (self.array.dtype.itemsize*8, - self.array.dtype.itemsize*8, - self.samplerate, - self.channels) - self.appsrc.set_property("caps", gst.caps_from_string(capstr)) - self.appsrc.set_property("stream-type", 0) # Seekable - self.appsrc.set_property('block', True) - - self.appsrc.connect("need-data", self.need_data) - self.appsrc.connect("seek-data", self.seek_data) - self.appsrc.connect("enough-data", self.enough_data) - - def need_data(self, element, length): - #length = length // 64 - if self.pos >= self.array.shape[0]: - element.emit("end-of-stream") - else: - avalaible_sample = self.length - self.pos - if avalaible_sample < length: - length = avalaible_sample - array = self.array[self.pos:self.pos+length] - buf = gst.Buffer(numpy.getbuffer(array.flatten())) - - buf.timestamp = self.pos * self.per_sample - buf.duration = int(length*self.per_sample) - element.emit("push-buffer", buf) - self.pos += length - - def seek_data(self, element, npos): - print 'seek %d' % npos - self.pos = npos // self.per_sample - return True - - def enough_data(self, element): - print "----------- enough data ---------------" - - -class SampleArray(object): - """Base Class for generating a data sample array""" - - def __init__(self, duration=10, samplerate=44100): - self.samplerate = int(samplerate) - self.num_samples = int(numpy.ceil(duration * self.samplerate)) - self.array = NotImplemented - - @property - def time_samples(self): - return numpy.arange(0, self.num_samples) - - @property - def duration(self): - return self.num_samples / self.samplerate - - def __add__(self, other): - if not self.samplerate == other.samplerate: - raise ValueError("Samplerates mismatch") - - sum_ = SampleArray(samplerate=self.samplerate) - sum_.num_samples = self.num_samples + other.num_samples - sum_.array = numpy.vstack([self.array, other.array]) - return sum_ - - def __and__(self, other): - if not self.samplerate == other.samplerate: - raise ValueError("Samplerates mismatch") - if not self.num_samples == other.num_samples: - raise ValueError("Number of samples mismatch") - - and_ = SampleArray(samplerate=self.samplerate) - and_.num_samples = self.num_samples - and_.array = numpy.hstack([self.array, other.array]) - return and_ - - -class SineArray(SampleArray): - """Class for generating a Sine array""" - def __init__(self, frequency=440, duration=10, samplerate=44100, - channels=1): - super(SineArray, self).__init__(duration=duration, - samplerate=samplerate) - self.frequency = frequency - self.array = numpy.sin((2 * numpy.pi * self.frequency * - self.time_samples / self.samplerate)) - self.array.resize(self.num_samples, 1) - - -class SweepArray(SampleArray): - """Class for generating a Sweep array""" - def __init__(self, f0=20, f1=None, duration=10, samplerate=44100, - method='logarithmic'): - super(SweepArray, self).__init__(duration=duration, - samplerate=samplerate) - - self.f0 = f0 / samplerate - if f1 is None: - self.f1 = 0.5 - else: - self.f1 = f1 / samplerate - self.method = method - self.array = scipy.signal.waveforms.chirp(t=self.time_samples, - f0=self.f0, - t1=self.time_samples[-1], - f1=self.f1, - method=self.method) - self.array.resize(self.num_samples, 1) - - -class WhiteNoiseArray(SampleArray): - """Class for generating a white noise array""" - def __init__(self, duration=10, samplerate=44100): - super(WhiteNoiseArray, self).__init__(duration=duration, - samplerate=samplerate) - array = numpy.random.randn(self.num_samples, 1) - self.array = array / abs(array).max() - - -class SilenceArray(SampleArray): - """Class for generating a silence""" - def __init__(self, duration=10, samplerate=44100): - super(SilenceArray, self).__init__(duration=duration, - samplerate=samplerate) - - self.array = numpy.zeros((self.num_samples, 1)) - - -class gst_BuildSample(object): - def __init__(self, sample_array, output_file, gst_audio_encoder): - if not isinstance(sample_array, SampleArray): - raise ValueError("array must be a SampleArray subclass") - self.sample_array = sample_array - self.samplerate = self.sample_array.samplerate - self.output_file = output_file - if not isinstance(gst_audio_encoder, list): - gst_audio_encoder = [gst_audio_encoder] - self.gst_audio_encoder = gst_audio_encoder - - def run(self): - pipeline = gst.Pipeline("pipeline") - gobject.threads_init() - mainloop = gobject.MainLoop() - - numpy_src = NumpySrc(array=self.sample_array.array, - samplerate=self.samplerate) - - converter = gst.element_factory_make('audioconvert', 'converter') - - encoder_muxer = [] - for enc in self.gst_audio_encoder: - encoder_muxer.append(gst.element_factory_make(enc)) - - filesink = gst.element_factory_make('filesink', 'sink') - filesink.set_property('location', self.output_file) - - pipe_elements = [numpy_src.appsrc, converter] - pipe_elements.extend(encoder_muxer) - pipe_elements.append(filesink) - - pipeline.add(*pipe_elements) - gst.element_link_many(*pipe_elements) - - def _on_new_pad(self, source, pad, target_pad): - print 'on_new_pad' - if not pad.is_linked(): - if target_pad.is_linked(): - target_pad.get_peer().unlink(target_pad) - pad.link(target_pad) - - def on_eos(bus, msg): - pipeline.set_state(gst.STATE_NULL) - mainloop.quit() - - def on_error(bus, msg): - err, debug_info = msg.parse_error() - print ("Error received from element %s: %s" % (msg.src.get_name(), - err)) - print ("Debugging information: %s" % debug_info) - mainloop.quit() - - pipeline.set_state(gst.STATE_PLAYING) - bus = pipeline.get_bus() - bus.add_signal_watch() - bus.connect('message::eos', on_eos) - bus.connect("message::error", on_error) - - mainloop.run() - - -def generate_sample_file(filename, samples_dir, gst_audio_encoder, - sample_array, overwrite=False): - sample_file = os.path.join(samples_dir, filename) - - if overwrite or not os.path.exists(sample_file): - gst_builder = gst_BuildSample(sample_array=sample_array, - output_file=sample_file, - gst_audio_encoder=gst_audio_encoder) - gst_builder.run() - return sample_file - - -def generateSamples(overwrite=False): - from timeside import __file__ as ts_file - ts_path = os.path.split(os.path.abspath(ts_file))[0] - tests_dir = os.path.abspath(os.path.join(ts_path, '../tests')) - if os.path.isdir(tests_dir): - samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples')) - if not os.path.isdir(samples_dir): - os.makedirs(samples_dir) - else: - import tempfile - samples_dir = tempfile.mkdtemp(suffix="ts_samples") - - samples = dict() - - # --------- Sweeps --------- - # sweep 44100 mono wav - filename = 'sweep_mono.wav' - samplerate = 44100 - gst_audio_encoder = 'wavenc' - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_mono, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sweep 44100 stereo wav - sweep_stereo = sweep_mono & sweep_mono - filename = 'sweep.wav' - gst_audio_encoder = 'wavenc' - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sweep 44100 stereo mp3 - filename = 'sweep.mp3' - gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sweep 44100 stereo flac - filename = 'sweep.flac' - gst_audio_encoder = 'flacenc' - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sweep 44100 stereo ogg - filename = 'sweep.ogg' - gst_audio_encoder = ['vorbisenc', 'oggmux'] - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sweep 32000 stereo wav - samplerate = 32000 - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sweep_stereo = sweep_mono & sweep_mono - - filename = 'sweep_32000.wav' - gst_audio_encoder = 'wavenc' - sweep_mono = SweepArray(duration=8, samplerate=samplerate) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # --------- Sines --------- - # sine at 440Hz, 44100 mono wav - filename = 'sine440Hz_mono.wav' - samplerate = 44100 - gst_audio_encoder = 'wavenc' - sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_mono, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sine at 440Hz, 44100 stereo wav - filename = 'sine440Hz.wav' - sweep_stereo = sweep_mono & sweep_mono - gst_audio_encoder = 'wavenc' - sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sine at 440Hz, 44100 stereo mp3 - filename = 'sine440Hz.mp3' - gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] - sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # sine at 440Hz, 44100 stereo ogg - filename = 'sine440Hz.ogg' - gst_audio_encoder = ['vorbisenc', 'oggmux'] - sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=sweep_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # --------- Equal tempered scale --------- - filename = 'C4_scale.wav' - samplerate = 44100 - f_C4 = 261.63 - f_D4 = 293.66 - f_E4 = 329.63 - f_F4 = 349.23 - f_G4 = 392.00 - f_A4 = 440.00 - f_B4 = 493.88 - f_C5 = 523.25 - sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4) - sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4) - sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4) - sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4) - sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4) - sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4) - sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4) - sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5) - - silence = SilenceArray(duration=0.2, samplerate=samplerate) - - scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence + - sineF4 + silence + sineG4 + silence + sineA4 + silence + - sineB4 + silence + sineC5) - - gst_audio_encoder = 'wavenc' - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=scale, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # --------- White noise --------- - # white noise - 44100Hz mono - filename = 'white_noise_mono.wav' - samplerate = 44100 - noise = WhiteNoiseArray(duration=8, samplerate=samplerate) - gst_audio_encoder = 'wavenc' - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=noise, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # white noise - 44100Hz stereo - filename = 'white_noise.wav' - samplerate = 44100 - noise = WhiteNoiseArray(duration=8, samplerate=samplerate) - noise_stereo = noise & noise - gst_audio_encoder = 'wavenc' - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=noise_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - # white noise - 32000Hz stereo - filename = 'white_noise_32000.wav' - samplerate = 32000 - noise = WhiteNoiseArray(duration=8, samplerate=samplerate) - noise_stereo = noise & noise - gst_audio_encoder = 'wavenc' - sample_file = generate_sample_file(filename, samples_dir, - gst_audio_encoder, - sample_array=noise_stereo, - overwrite=overwrite) - samples.update({filename: sample_file}) - - return samples - - -samples = generateSamples() - - -if __name__ == '__main__': - generateSamples() diff --git a/timeside/tools/test_samples.py b/timeside/tools/test_samples.py new file mode 100644 index 0000000..ca202b1 --- /dev/null +++ b/timeside/tools/test_samples.py @@ -0,0 +1,452 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Created on Tue Oct 7 09:19:37 2014 + +@author: thomas +""" + +from __future__ import division + +import pygst +pygst.require("0.10") +import gobject +import gst +import numpy +import scipy.signal.waveforms +import os.path + +#import timeside + + +class NumpySrc: + def __init__(self, array, samplerate): + self.appsrc = gst.element_factory_make("appsrc") + self.pos = 0 + self.samplerate = samplerate + if array.ndim == 1: + array.resize((array.shape[0], 1)) + self.length, self.channels = array.shape + self.array = array.astype("float32") + self.per_sample = gst.SECOND // samplerate + self.fac = self.channels * array.dtype.itemsize + #self.appsrc.set_property("size", (self.length * self.channels * + # array.dtype.itemsize)) + self.appsrc.set_property("format", gst.FORMAT_TIME) + capstr = """audio/x-raw-float, + width=%d, + depth=%d, + rate=%d, + channels=%d, + endianness=(int)1234, + signed=true""" % (self.array.dtype.itemsize*8, + self.array.dtype.itemsize*8, + self.samplerate, + self.channels) + self.appsrc.set_property("caps", gst.caps_from_string(capstr)) + self.appsrc.set_property("stream-type", 0) # Seekable + self.appsrc.set_property('block', True) + + self.appsrc.connect("need-data", self.need_data) + self.appsrc.connect("seek-data", self.seek_data) + self.appsrc.connect("enough-data", self.enough_data) + + def need_data(self, element, length): + #length = length // 64 + if self.pos >= self.array.shape[0]: + element.emit("end-of-stream") + else: + avalaible_sample = self.length - self.pos + if avalaible_sample < length: + length = avalaible_sample + array = self.array[self.pos:self.pos+length] + buf = gst.Buffer(numpy.getbuffer(array.flatten())) + + buf.timestamp = self.pos * self.per_sample + buf.duration = int(length*self.per_sample) + element.emit("push-buffer", buf) + self.pos += length + + def seek_data(self, element, npos): + print 'seek %d' % npos + self.pos = npos // self.per_sample + return True + + def enough_data(self, element): + print "----------- enough data ---------------" + + +class SampleArray(object): + """Base Class for generating a data sample array""" + + def __init__(self, duration=10, samplerate=44100): + self.samplerate = int(samplerate) + self.num_samples = int(numpy.ceil(duration * self.samplerate)) + self.array = NotImplemented + + @property + def time_samples(self): + return numpy.arange(0, self.num_samples) + + @property + def duration(self): + return self.num_samples / self.samplerate + + def __add__(self, other): + if not self.samplerate == other.samplerate: + raise ValueError("Samplerates mismatch") + + sum_ = SampleArray(samplerate=self.samplerate) + sum_.num_samples = self.num_samples + other.num_samples + sum_.array = numpy.vstack([self.array, other.array]) + return sum_ + + def __and__(self, other): + if not self.samplerate == other.samplerate: + raise ValueError("Samplerates mismatch") + if not self.num_samples == other.num_samples: + raise ValueError("Number of samples mismatch") + + and_ = SampleArray(samplerate=self.samplerate) + and_.num_samples = self.num_samples + and_.array = numpy.hstack([self.array, other.array]) + return and_ + + +class SineArray(SampleArray): + """Class for generating a Sine array""" + def __init__(self, frequency=440, duration=10, samplerate=44100, + channels=1): + super(SineArray, self).__init__(duration=duration, + samplerate=samplerate) + self.frequency = frequency + self.array = numpy.sin((2 * numpy.pi * self.frequency * + self.time_samples / self.samplerate)) + self.array.resize(self.num_samples, 1) + + +class SweepArray(SampleArray): + """Class for generating a Sweep array""" + def __init__(self, f0=20, f1=None, duration=10, samplerate=44100, + method='logarithmic'): + super(SweepArray, self).__init__(duration=duration, + samplerate=samplerate) + + self.f0 = f0 / samplerate + if f1 is None: + self.f1 = 0.5 + else: + self.f1 = f1 / samplerate + self.method = method + self.array = scipy.signal.waveforms.chirp(t=self.time_samples, + f0=self.f0, + t1=self.time_samples[-1], + f1=self.f1, + method=self.method) + self.array.resize(self.num_samples, 1) + + +class WhiteNoiseArray(SampleArray): + """Class for generating a white noise array""" + def __init__(self, duration=10, samplerate=44100): + super(WhiteNoiseArray, self).__init__(duration=duration, + samplerate=samplerate) + array = numpy.random.randn(self.num_samples, 1) + self.array = array / abs(array).max() + + +class SilenceArray(SampleArray): + """Class for generating a silence""" + def __init__(self, duration=10, samplerate=44100): + super(SilenceArray, self).__init__(duration=duration, + samplerate=samplerate) + + self.array = numpy.zeros((self.num_samples, 1)) + + +class gst_BuildSample(object): + def __init__(self, sample_array, output_file, gst_audio_encoder): + if not isinstance(sample_array, SampleArray): + raise ValueError("array must be a SampleArray subclass") + self.sample_array = sample_array + self.samplerate = self.sample_array.samplerate + self.output_file = output_file + if not isinstance(gst_audio_encoder, list): + gst_audio_encoder = [gst_audio_encoder] + self.gst_audio_encoder = gst_audio_encoder + + def run(self): + pipeline = gst.Pipeline("pipeline") + gobject.threads_init() + mainloop = gobject.MainLoop() + + numpy_src = NumpySrc(array=self.sample_array.array, + samplerate=self.samplerate) + + converter = gst.element_factory_make('audioconvert', 'converter') + + encoder_muxer = [] + for enc in self.gst_audio_encoder: + encoder_muxer.append(gst.element_factory_make(enc)) + + filesink = gst.element_factory_make('filesink', 'sink') + filesink.set_property('location', self.output_file) + + pipe_elements = [numpy_src.appsrc, converter] + pipe_elements.extend(encoder_muxer) + pipe_elements.append(filesink) + + pipeline.add(*pipe_elements) + gst.element_link_many(*pipe_elements) + + def _on_new_pad(self, source, pad, target_pad): + print 'on_new_pad' + if not pad.is_linked(): + if target_pad.is_linked(): + target_pad.get_peer().unlink(target_pad) + pad.link(target_pad) + + def on_eos(bus, msg): + pipeline.set_state(gst.STATE_NULL) + mainloop.quit() + + def on_error(bus, msg): + err, debug_info = msg.parse_error() + print ("Error received from element %s: %s" % (msg.src.get_name(), + err)) + print ("Debugging information: %s" % debug_info) + mainloop.quit() + + pipeline.set_state(gst.STATE_PLAYING) + bus = pipeline.get_bus() + bus.add_signal_watch() + bus.connect('message::eos', on_eos) + bus.connect("message::error", on_error) + + mainloop.run() + + +def generate_sample_file(filename, samples_dir, gst_audio_encoder, + sample_array, overwrite=False): + sample_file = os.path.join(samples_dir, filename) + + if overwrite or not os.path.exists(sample_file): + gst_builder = gst_BuildSample(sample_array=sample_array, + output_file=sample_file, + gst_audio_encoder=gst_audio_encoder) + gst_builder.run() + return sample_file + + +def generateSamples(overwrite=False): + from timeside import __file__ as ts_file + ts_path = os.path.split(os.path.abspath(ts_file))[0] + tests_dir = os.path.abspath(os.path.join(ts_path, '../tests')) + if os.path.isdir(tests_dir): + samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples')) + if not os.path.isdir(samples_dir): + os.makedirs(samples_dir) + else: + import tempfile + samples_dir = tempfile.mkdtemp(suffix="ts_samples") + + samples = dict() + + # --------- Sweeps --------- + # sweep 44100 mono wav + filename = 'sweep_mono.wav' + samplerate = 44100 + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_mono, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo wav + sweep_stereo = sweep_mono & sweep_mono + filename = 'sweep.wav' + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo mp3 + filename = 'sweep.mp3' + gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo flac + filename = 'sweep.flac' + gst_audio_encoder = 'flacenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 44100 stereo ogg + filename = 'sweep.ogg' + gst_audio_encoder = ['vorbisenc', 'oggmux'] + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sweep 32000 stereo wav + samplerate = 32000 + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sweep_stereo = sweep_mono & sweep_mono + + filename = 'sweep_32000.wav' + gst_audio_encoder = 'wavenc' + sweep_mono = SweepArray(duration=8, samplerate=samplerate) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- Sines --------- + # sine at 440Hz, 44100 mono wav + filename = 'sine440Hz_mono.wav' + samplerate = 44100 + gst_audio_encoder = 'wavenc' + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_mono, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # Short 1s sine at 440Hz, 44100 mono wav + filename = 'sine440Hz_mono_1s.wav' + samplerate = 44100 + gst_audio_encoder = 'wavenc' + sweep_mono = SineArray(duration=1, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_mono, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo wav + filename = 'sine440Hz.wav' + sweep_stereo = sweep_mono & sweep_mono + gst_audio_encoder = 'wavenc' + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo mp3 + filename = 'sine440Hz.mp3' + gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux'] + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # sine at 440Hz, 44100 stereo ogg + filename = 'sine440Hz.ogg' + gst_audio_encoder = ['vorbisenc', 'oggmux'] + sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440) + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=sweep_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- Equal tempered scale --------- + filename = 'C4_scale.wav' + samplerate = 44100 + f_C4 = 261.63 + f_D4 = 293.66 + f_E4 = 329.63 + f_F4 = 349.23 + f_G4 = 392.00 + f_A4 = 440.00 + f_B4 = 493.88 + f_C5 = 523.25 + sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4) + sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4) + sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4) + sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4) + sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4) + sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4) + sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4) + sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5) + + silence = SilenceArray(duration=0.2, samplerate=samplerate) + + scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence + + sineF4 + silence + sineG4 + silence + sineA4 + silence + + sineB4 + silence + sineC5) + + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=scale, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # --------- White noise --------- + # white noise - 44100Hz mono + filename = 'white_noise_mono.wav' + samplerate = 44100 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # white noise - 44100Hz stereo + filename = 'white_noise.wav' + samplerate = 44100 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + noise_stereo = noise & noise + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + # white noise - 32000Hz stereo + filename = 'white_noise_32000.wav' + samplerate = 32000 + noise = WhiteNoiseArray(duration=8, samplerate=samplerate) + noise_stereo = noise & noise + gst_audio_encoder = 'wavenc' + sample_file = generate_sample_file(filename, samples_dir, + gst_audio_encoder, + sample_array=noise_stereo, + overwrite=overwrite) + samples.update({filename: sample_file}) + + return samples + + +samples = generateSamples() + + +if __name__ == '__main__': + generateSamples()