IDecoder
---------
- * FileDecoder [gst_dec]
- * ArrayDecoder [array_dec]
- * LiveDecoder [gst_live_dec]
+ * FileDecoder [file_decoder]
+ * ArrayDecoder [array_decoder]
+ * LiveDecoder [live_decoder]
IAnalyzer
---------
* VampSimpleHost [vamp_simple_host]
* IRITSpeechEntropy [irit_speech_entropy]
* IRITSpeech4Hz [irit_speech_4hz]
- * OnsetDetectionFunction [odf]
+ * OnsetDetectionFunction [onset_detection_function]
* LimsiSad [limsi_sad]
IValueAnalyzer
* SpectrogramLog [spectrogram_log]
* SpectrogramLinear [spectrogram_lin]
* Display.aubio_pitch.pitch [grapher_aubio_pitch]
- * Display.odf [grapher_odf]
+ * Display.onset_detection_function [grapher_odf]
* Display.waveform_analyzer [grapher_waveform]
* Display.irit_speech_4hz.segments [grapher_irit_speech_4hz_segments]
IEncoder
---------
- * VorbisEncoder [gst_vorbis_enc]
- * WavEncoder [gst_wav_enc]
- * Mp3Encoder [gst_mp3_enc]
- * FlacEncoder [gst_flac_enc]
- * AacEncoder [gst_aac_enc]
- * WebMEncoder [gst_webm_enc]
- * OpusEncoder [gst_opus_enc]
- * AudioSink [gst_audio_sink_enc]
+ * VorbisEncoder [vorbis_encoder]
+ * WavEncoder [wav_encoder]
+ * Mp3Encoder [mp3_encoder]
+ * FlacEncoder [flac_encoder]
+ * AacEncoder [aac_encoder]
+ * WebMEncoder [webm_encoder]
+ * OpusEncoder [opus_encoder]
+ * AudioSink [live_encoder]
News
=====
Define some processors::
from timeside.core import get_processor
- decoder = get_processor('gst_dec')('sweep.wav')
+ decoder = get_processor('file_decoder')('sweep.wav')
grapher = get_processor('waveform_simple')
analyzer = get_processor('level')
- encoder = get_processor('gst_vorbis_enc')('sweep.ogg')
+ encoder = get_processor('vorbis_encoder')('sweep.ogg')
Then run the *magic* pipeline::
File Decoder
-===========
+============
.. autoclass:: timeside.decoder.file.FileDecoder
:members:
:show-inheritance:
Live Decoder
-=============
+============
.. autoclass:: timeside.decoder.live.LiveDecoder
:members:
AudioSink encoder
-------------
+-----------------
.. automodule:: timeside.encoder.audiosink
:members:
Define some processors::
from timeside.core import get_processor
- decoder = get_processor('gst_dec')('sweep.wav')
+ decoder = get_processor('file_decoder')('sweep.wav')
grapher = get_processor('waveform_simple')
analyzer = get_processor('level')
- encoder = get_processor('gst_vorbis_enc')('sweep.ogg')
+ encoder = get_processor('vorbis_encoder')('sweep.ogg')
Then run the *magic* pipeline::
IDecoder
---------
- * FileDecoder [gst_dec]
- * ArrayDecoder [array_dec]
- * LiveDecoder [gst_live_dec]
+ * FileDecoder [file_decoder]
+ * ArrayDecoder [array_decoder]
+ * LiveDecoder [live_decoder]
IAnalyzer
---------
* VampSimpleHost [vamp_simple_host]
* IRITSpeechEntropy [irit_speech_entropy]
* IRITSpeech4Hz [irit_speech_4hz]
- * OnsetDetectionFunction [odf]
+ * OnsetDetectionFunction [onset_detection_function]
* LimsiSad [limsi_sad]
IValueAnalyzer
* SpectrogramLog [spectrogram_log]
* SpectrogramLinear [spectrogram_lin]
* Display.aubio_pitch.pitch [grapher_aubio_pitch]
- * Display.odf [grapher_odf]
+ * Display.onset_detection_function [grapher_odf]
* Display.waveform_analyzer [grapher_waveform]
* Display.irit_speech_4hz.segments [grapher_irit_speech_4hz_segments]
IEncoder
---------
- * VorbisEncoder [gst_vorbis_enc]
- * WavEncoder [gst_wav_enc]
- * Mp3Encoder [gst_mp3_enc]
- * FlacEncoder [gst_flac_enc]
- * AacEncoder [gst_aac_enc]
- * WebMEncoder [gst_webm_enc]
- * OpusEncoder [gst_opus_enc]
- * AudioSink [gst_audio_sink_enc]
+ * VorbisEncoder [vorbis_encoder]
+ * WavEncoder [wav_encoder]
+ * Mp3Encoder [mp3_encoder]
+ * FlacEncoder [flac_encoder]
+ * AacEncoder [aac_encoder]
+ * WebMEncoder [webm_encoder]
+ * OpusEncoder [opus_encoder]
+ * AudioSink [live_encoder]
>>> import timeside
>>> from timeside.core import get_processor
+>>> from timeside.tools.test_samples import samples
>>> import numpy as np
->>> audio_file = 'http://github.com/yomguy/timeside-samples/raw/master/samples/sweep.mp3'
+>>> audio_file = samples['sweep.mp3']
Then let's setup a :class:`FileDecoder <timeside.decoder.file.FileDecoder>` with argument `stack=True` (default argument is `stack=False`) :
>>> pitch = get_processor('aubio_pitch')()
>>> pipe = (decoder | pitch)
>>> print pipe.processors #doctest: +ELLIPSIS
-[gst_dec-{}, aubio_pitch-{"blocksize_s": 0.0, "stepsize_s": 0.0}]
+[file_decoder-{}, aubio_pitch-{"blocksize_s": 0.0, "stepsize_s": 0.0}]
Run the pipe:
.. doctest:: test_1
>>> import timeside
- >>> from timeside.core import get_processor
- >>> decoder = get_processor('gst_dec')('sweep.wav')# doctest: +SKIP
- >>> encoder = get_processor('gst_vorbis_enc')('sweep.ogg')
+ >>> from timeside.core import get_processor
+ >>> decoder = get_processor('file_decoder')('sweep.wav')# doctest: +SKIP
+ >>> encoder = get_processor('vorbis_encoder')('sweep.ogg')
>>> pipe = decoder | encoder
>>> pipe.run()
>>> from timeside.core import get_processor
>>> decoder = get_processor('gst-dec')('sweep.wav') # doctest: +SKIP
>>> levels = get_processor('level')()
- >>> encoders = get_processor('gst_mp3_enc')('sweep.mp3') | get_processor('gst_flac_enc')('sweep.flac')
+ >>> encoders = get_processor('mp3_encoder')('sweep.mp3') | get_processor('flac_encoder')('sweep.flac')
>>> (decoder | levels | encoders).run()
>>> print levels.results
- {'...': AnalyzerResult(id_metadata=IdMetadata(id='level.max', name='Level Max', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([ 0.]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={}), '...': AnalyzerResult(id_metadata=IdMetadata(id='level.rms', name='Level RMS', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([-2.995]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})}
+ {'...': AnalyzerResult(id_metadata=IdMetadata(id='level.max', name='Level Analyzer Max', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([ 0.]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={}), '...': AnalyzerResult(id_metadata=IdMetadata(id='level.rms', name='Level Analyzer RMS', unit='dBFS', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=GlobalValueObject(value=array([-2.995]), y_value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='.../sweep.wav', start=0.0, duration=8.0, is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})}
from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.dc import MeanDCShift
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestAnalyzerDC(unittest.TestCase):
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
self.expected = {'mean_dc_shift': 0.004}
def testOnScale(self):
"runs on C4 Scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
self.expected = {'mean_dc_shift': 0.034}
def tearDown(self):
from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.level import Level
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestAnalyzerLevel(unittest.TestCase):
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
max_level_value = 0
rms_level_value = -2.995
def testOnC4_Scale(self):
"runs on C4 scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
max_level_value = 0
rms_level_value = -3.705
import timeside
from timeside.decoder.file import FileDecoder
import numpy as np
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestAnalyzers_with_default(unittest.TestCase):
"""Test analyzer with default parameters"""
def setUp(self):
- source = ts_samples["C4_scale.wav"]
+ source = samples["C4_scale.wav"]
self.decoder = FileDecoder(source)
samplerate = 16000 # LimsiSad require Fs = 16000 Hz
duration = 10
samples = np.zeros((duration * samplerate, 1))
- decoder_cls = timeside.core.get_processor('array_dec')
+ decoder_cls = timeside.core.get_processor('array_decoder')
self.decoder = decoder_cls(samples, samplerate=samplerate)
def _perform_test(self, analyzer_cls):
samplerate = 16000 # LimsiSad require Fs = 16000 Hz
duration = 10
samples = -1000*np.ones((duration * samplerate, 1))
- decoder_cls = timeside.core.get_processor('array_dec')
+ decoder_cls = timeside.core.get_processor('array_decoder')
self.decoder = decoder_cls(samples, samplerate=samplerate)
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testOnGuitar(self):
"runs on C4_scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testOnScale(self):
"runs on C4 scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testOnC4Scale(self):
"runs on C4 scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testOnC4Scale(self):
"runs on C4 scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testOnC4Scale(self):
"runs on C4 scale"
- self.source = ts_samples["C4_scale.wav"]
+ self.source = samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from unit_timeside import unittest, TestRunner
from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
import os.path
def testWav(self):
"Test wav decoding"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testWavMono(self):
"Test mono wav decoding"
- self.source = ts_samples["sweep_mono.wav"]
+ self.source = samples["sweep_mono.wav"]
self.expected_channels = 1
def testWav32k(self):
"Test 32kHz wav decoding"
- self.source = ts_samples["sweep_32000.wav"]
+ self.source = samples["sweep_32000.wav"]
self.expected_samplerate = 32000
def testFlac(self):
"Test flac decoding"
- self.source = ts_samples["sweep.flac"]
+ self.source = samples["sweep.flac"]
self.expected_depth = 24
def testOgg(self):
"Test ogg decoding"
- self.source = ts_samples["sweep.ogg"]
+ self.source = samples["sweep.ogg"]
self.test_exact_duration = False
self.expected_depth = 0 # ?
def testMp3(self):
"Test mp3 decoding"
- self.source = ts_samples["sweep.mp3"]
+ self.source = samples["sweep.mp3"]
self.expected_depth = 32
self.test_exact_duration = False
from timeside.core import ProcessPipe
from unit_timeside import unittest, TestRunner
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
import os.path
def testWav(self):
"Test wav decoding"
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
def testWavMono(self):
"Test mono wav decoding"
- self.source = ts_samples["sweep_mono.wav"]
+ self.source = samples["sweep_mono.wav"]
self.expected_channels = 1
def testWav32k(self):
"Test 32kHz wav decoding"
- self.source = ts_samples["sweep_32000.wav"]
+ self.source = samples["sweep_32000.wav"]
self.expected_samplerate = 32000
def testFlac(self):
"Test flac decoding"
- self.source = ts_samples["sweep.flac"]
+ self.source = samples["sweep.flac"]
self.expected_mime_type = 'audio/x-flac'
def testOgg(self):
"Test ogg decoding"
- self.source = ts_samples["sweep.ogg"]
+ self.source = samples["sweep.ogg"]
self.expected_mime_type = 'application/ogg'
self.test_exact_duration = False
def testMp3(self):
"Test mp3 decoding"
- self.source = ts_samples["sweep.mp3"]
+ self.source = samples["sweep.mp3"]
self.expected_mime_type = 'audio/mpeg'
self.test_exact_duration = False
t = np.arange(0, duration, 1/samplerate)
samples = chirp(t=t, f0=20, t1=t[-1], f1=samplerate/2,
method='logarithmic')
- decoder_cls = timeside.core.get_processor('array_dec')
+ decoder_cls = timeside.core.get_processor('array_decoder')
self.decoder = decoder_cls(samples, samplerate=samplerate)
def _perform_test(self, grapher_cls):
from unit_timeside import unittest, TestRunner
import timeside
from timeside.decoder.file import FileDecoder
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestProcessPipe(unittest.TestCase):
"""Test process pipe (Quick and dirty)"""
# TODO clean up and complete
- source = ts_samples["sweep.wav"]
-
+ source = samples["sweep.wav"]
pipe = timeside.core.ProcessPipe()
dec = FileDecoder(source)
a2 = timeside.analyzer.spectrogram.Spectrogram()
pipe2 = (dec | a | a2 | abis)
+
self.assertEqual(len(pipe2.processors), 4)
+ # Release temporary buffers in Spectrogram
+ for proc in pipe2.processors:
+ proc.release()
#pipe2.draw_graph()
if __name__ == '__main__':
import os
from unit_timeside import unittest, TestRunner
from tools import tmp_file_sink
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestTranscodingFromWav(unittest.TestCase):
"Test transcoding from wav"
def setUp(self):
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
self.test_duration = True
self.test_channels = True
def testWav(self):
"Test conversion to wav"
- self.encoder_id = 'gst_wav_enc'
+ self.encoder_id = 'wav_encoder'
def testMp3(self):
"Test conversion to mp3"
- self.encoder_id = 'gst_mp3_enc'
+ self.encoder_id = 'mp3_encoder'
def testOgg(self):
"Test conversion to ogg"
- self.encoder_id = 'gst_vorbis_enc'
+ self.encoder_id = 'vorbis_encoder'
def testWebM(self):
"Test conversion to webm"
- self.encoder_id = 'gst_webm_enc'
+ self.encoder_id = 'webm_encoder'
self.test_duration = False # webmmux encoder with streamable=true
# does not return a valid duration
def testM4a(self):
"Test conversion to m4a"
- self.encoder_id = 'gst_aac_enc'
+ self.encoder_id = 'aac_encoder'
def tearDown(self):
decoder = FileDecoder(self.source)
def setUp(self):
super(TestTranscodingFromMonoWav, self).setUp()
- self.source = ts_samples["sweep_mono.wav"]
+ self.source = samples["sweep_mono.wav"]
def testM4a(self):
"Test conversion to m4a"
def setUp(self):
super(TestTranscodingFromAnotherWav, self).setUp()
- self.source = ts_samples["C4_scale.wav"] # Mono
+ self.source = samples["C4_scale.wav"] # Mono
class TestTranscodingFromMp3(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromMp3, self).setUp()
- self.source = ts_samples["sweep.mp3"]
+ self.source = samples["sweep.mp3"]
class TestTranscodingFromFlac(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromFlac, self).setUp()
- self.source = ts_samples["sweep.flac"]
+ self.source = samples["sweep.flac"]
class TestTranscodingFromOgg(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromOgg, self).setUp()
- self.source = ts_samples["sweep.ogg"]
+ self.source = samples["sweep.ogg"]
class TestTranscodingFrom32kHzWav(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFrom32kHzWav, self).setUp()
- self.source = ts_samples["sweep_32000.wav"]
+ self.source = samples["sweep_32000.wav"]
class TestTranscodingFromMissingFile(TestTranscodingFromWav):
from unit_timeside import unittest, TestRunner
from tools import tmp_file_sink
-from timeside.tools.data_samples import samples as ts_samples
+from timeside.tools.test_samples import samples
class TestTranscodingStreaming(unittest.TestCase):
"Test transcoding and streaming"
def setUp(self):
- self.source = ts_samples["sweep.wav"]
+ self.source = samples["sweep.wav"]
self.test_duration = True
self.test_channels = True
self.filesize_delta = None
def testMp3(self):
"Test conversion to mp3"
- self.encoder_id = 'gst_mp3_enc'
+ self.encoder_id = 'mp3_encoder'
self.filesize_delta = 156
def testOgg(self):
"Test conversion to ogg"
- self.encoder_id = 'gst_vorbis_enc'
+ self.encoder_id = 'vorbis_encoder'
def testOpus(self):
"Test conversion to opus"
- self.encoder_id = 'gst_opus_enc'
+ self.encoder_id = 'opus_encoder'
self.expected_sample_rate = 48000
def testWebM(self):
"Test conversion to webm"
- self.encoder_id = 'gst_webm_enc'
+ self.encoder_id = 'webm_encoder'
self.test_duration = False # webmmux encoder with streamable=true
# does not return a valid duration
@staticmethod
@interfacedoc
def name():
- return "Mean DC shift"
+ return "Mean DC shift Analyzer"
@staticmethod
@interfacedoc
Examples
--------
>>> import timeside
- >>> from timeside.tools.data_samples import samples as ts_samples
+ >>> from timeside.tools.test_samples import samples
>>> from timeside.core import get_processor
- >>> source = ts_samples['C4_scale.wav']
- >>> FileDecoder = get_processor('gst_dec')
+ >>> source = samples['C4_scale.wav']
+ >>> FileDecoder = get_processor('file_decoder')
>>> YaafeAnalyzer = get_processor('yaafe')
>>> # feature extraction defition
>>> feature_plan = ['mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256',
class Level(Analyzer):
- """RMS level analyzer
+ """Audio level analyzer
+
+ Examples
+ --------
+
+ >>> import timeside
+ >>> from timeside.core import get_processor
+ >>> from timeside.tools.test_samples import samples
+ >>> source = samples['sweep.mp3']
+ >>> decoder = get_processor('file_decoder')(uri=source)
+ >>> level = get_processor('level')()
+ >>> (decoder | level).run()
+ >>> level.results.keys()
+ ['level.max', 'level.rms']
+ >>> max = level.results['level.max']
+ >>> print max.data
+ [ 0.]
+ >>> rms = level.results['level.rms']
+ >>> print rms.data
+ [-3.263]
"""
implements(IValueAnalyzer)
@staticmethod
@interfacedoc
def name():
- return "Level"
+ return "Level Analyzer"
@staticmethod
@interfacedoc
if __name__ == "__main__":
import doctest
- print doctest.testmod(verbose=True)
+ import timeside
+ doctest.testmod(timeside.analyzer.level)
@staticmethod
@interfacedoc
def id():
- return "odf"
+ return "onset_detection_function"
@staticmethod
@interfacedoc
--------
>>> import timeside
>>> from timeside.core import get_processor
- >>> from timeside.tools.data_samples import samples as ts_samples
- >>> audio_source = ts_samples['sweep.wav']
- >>> decoder = get_processor('gst_dec')(uri=audio_source)
+ >>> from timeside.tools.test_samples import samples
+ >>> audio_source = samples['sweep.wav']
+ >>> decoder = get_processor('file_decoder')(uri=audio_source)
>>> spectrogram = get_processor('spectrogram_analyzer')(input_blocksize=2048, input_stepsize=1024)
>>> pipe = (decoder | spectrogram)
>>> pipe.run()
import timeside
from timeside.core import get_processor
- from timeside.tools.data_samples import samples as ts_samples
- audio_source = ts_samples['sweep.wav']
- decoder = get_processor('gst_dec')(uri=audio_source)
+ from timeside.tools.test_samples import samples
+ audio_source = samples['sweep.wav']
+ decoder = get_processor('file_decoder')(uri=audio_source)
spectrogram = get_processor('spectrogram_analyzer')(input_blocksize=2048,
input_stepsize=1024)
pipe = (decoder | spectrogram)
eod = False
- if source.id() == 'gst_live_dec':
+ if source.id() == 'live_decoder':
# Set handler for Interruption signal
import signal
for item in items:
frames, eod = item.process(frames, eod)
- if source.id() == 'gst_live_dec':
+ if source.id() == 'live_decoder':
# Restore default handler for Interruption signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
@staticmethod
@interfacedoc
def id():
- return "array_dec"
+ return "array_decoder"
def __init__(self, samples, samplerate=44100, start=0, duration=None):
'''
#!/usr/bin/python
# -*- coding: utf-8 -*-
-# Copyright (c) 2007-2013 Parisson
+# Copyright (c) 2007-2014 Parisson
# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
# Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com>
# Copyright (c) 2010-2013 Paul Brossier <piem@piem.org>
+# Copyright (c) 2013-2014 Thomas fillon <thomas@parisson.com>
+
#
# This file is part of TimeSide.
class FileDecoder(Decoder):
-
- """ gstreamer-based decoder """
+ """ File Decoder based on Gstreamer
+
+ Parameters
+ ----------
+ uri : str
+ uri of the media
+ start : float, optional
+ start time of the segment in seconds
+ duration : float, optional
+ duration of the segment in seconds
+ stack : boolean, optional
+ keep decoded data in the stack
+ sha1 : boolean, optional
+ compute the sha1 hash of the data
+
+
+ Examples
+ --------
+ >>> import timeside
+ >>> from timeside.core import get_processor
+ >>> from timeside.tools.test_samples import samples
+ >>> audio_source = samples['sweep.wav']
+ >>> FileDecoder = get_processor('file_decoder') # Get the decoder class
+ >>> # Use decoder with default parameters
+ >>> decoder = FileDecoder(uri=audio_source)
+ >>> analyzer = get_processor('level')() # Pick a arbitrary analyzer
+ >>> pipe =(decoder | analyzer)
+ >>> pipe.run() # Run the pipe for the given audio source
+"""
implements(IDecoder)
output_blocksize = 8 * 1024
@staticmethod
@interfacedoc
def id():
- return "gst_dec"
+ return "file_decoder"
def __init__(self, uri, start=0, duration=None, stack=False, sha1=None):
- """
- Construct a new FileDecoder
-
- Parameters
- ----------
- uri : str
- uri of the media
- start : float
- start time of the segment in seconds
- duration : float
- duration of the segment in seconds
- stack : boolean
- keep decoded data in the stack
- sha1 : boolean
- compute the sha1 hash of the data
-
- """
super(FileDecoder, self).__init__(start=start, duration=duration)
if __name__ == "__main__":
import doctest
- doctest.testmod()
+ import timeside
+ doctest.testmod(timeside.decoder.file)
from timeside.decoder.core import Decoder, IDecoder, interfacedoc, implements
from timeside.tools.gstutils import MainloopThread, gobject
-from . file import FileDecoder
+from timeside.decoder.file import FileDecoder
import Queue
import threading
class LiveDecoder(FileDecoder):
- """ gstreamer-based decoder from live source"""
+ """Live source Decoder based on Gstreamer
+ capturing audio from alsasrc
+
+ Construct a new LiveDecoder capturing audio from alsasrc
+
+ Parameters
+ ----------
+ num_buffers : int, optional
+ Number of buffers to output before sending End Of Stream signal
+ (-1 = unlimited).
+ (Allowed values: >= -1, Default value: -1)
+
+ input_src : str, optional
+ Gstreamer source element
+ default to 'alsasrc'
+ possible values : 'autoaudiosrc', 'alsasrc', 'osssrc'
+
+ Examples
+ --------
+
+ >>> import timeside
+
+ >>> from timeside.core import get_processor
+ >>> live_decoder = get_processor('live_decoder')(num_buffers=5)
+ >>> waveform = get_processor('waveform_analyzer')()
+ >>> mp3_encoder = timeside.encoder.mp3.Mp3Encoder('/tmp/test_live.mp3',
+ ... overwrite=True)
+ >>> pipe = (live_decoder | waveform | mp3_encoder)
+ >>> pipe.run()
+ >>> # Show the audio as captured by the decoder
+ >>> import matplotlib.pyplot as plt # doctest: +SKIP
+ >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP
+ a.results['waveform_analyzer'].data) # doctest: +SKIP
+ >>> plt.show() # doctest: +SKIP
+
+ """
implements(IDecoder)
# IProcessor methods
@staticmethod
@interfacedoc
def id():
- return "gst_live_dec"
-
- def __init__(self, num_buffers=-1, input_src='autoaudiosrc'):
- """
- Construct a new LiveDecoder capturing audio from alsasrc
-
- Parameters
- ----------
- num_buffers :
- Number of buffers to output before sending EOS (-1 = unlimited).
- (Allowed values: >= -1, Default value: -1)
-
+ return "live_decoder"
- Examples
- --------
-
- >>> import timeside
-
- >>> from timeside.core import get_processor
- >>> live = timeside.decoder.live.LiveDecoder(num_buffers=5)
- >>> a = get_processor('waveform_analyzer')()
- >>> e = timeside.encoder.mp3.Mp3Encoder('/tmp/test_live.mp3',
- ... overwrite=True)
- >>> pipe = (live | a | e)
- >>> pipe.run() # doctest: +SKIP
- >>> pipe.run() # doctest: +SKIP
-
- >>> import matplotlib.pyplot as plt # doctest: +SKIP
- >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP
- a.results['waveform_analyzer'].data) # doctest: +SKIP
- >>> plt.show() # doctest: +SKIP
-
- """
+ def __init__(self, num_buffers=-1, input_src='alsasrc'):
super(Decoder, self).__init__()
self.num_buffers = num_buffers
pass
# IDecoder methods
+
+if __name__ == "__main__":
+ import doctest
+ import timeside
+ doctest.testmod(timeside.decoder.live, verbose=True)
'''
Return the secure hash digest with sha1 algorithm for a given file
- >>> from timeside.tools.data_samples import samples as ts_samples
- >>> wav_file = ts_samples["C4_scale.wav"]
+ >>> from timeside.tools.test_samples import samples
+ >>> wav_file = samples["C4_scale.wav"]
>>> print sha1sum_file(wav_file)
a598e78d0b5c90da54a77e34c083abdcd38d42ba
'''
This encoder plays the decoded audio stream to the sound card
+ Parameters
+ ----------
+ output_sink : str, optional
+ Gstreamer sink element
+ Default to 'autoaudiosink'
+ Possible values : 'autoaudiosink', 'alsasink', 'osssink'
>>> import timeside
- >>> from timeside.tools.data_samples import samples as ts_samples
- >>> wav_file = ts_samples['C4_scale.wav']
- >>> d = timeside.decoder.file.FileDecoder(wav_file)
- >>> e = timeside.encoder.audiosink.AudioSink()
- >>> (d|e).run() # doctest: +SKIP
+ >>> from timeside.core import get_processor
+ >>> from timeside.tools.test_samples import samples
+ >>> wav_file = samples['sine440Hz_mono_1s.wav']
+ >>> d = get_processor('file_decoder')(wav_file)
+ >>> e = get_processor('live_encoder')()
+ >>> (d|e).run()
"""
implements(IEncoder)
@staticmethod
@interfacedoc
def id():
- return "gst_audio_sink_enc"
+ return "live_encoder"
@staticmethod
@interfacedoc
class FlacEncoder(GstEncoder):
-
- """ gstreamer-based FLAC encoder """
+ """FLAC encoder based on Gstreamer"""
implements(IEncoder)
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None,
+ totalframes=None):
super(FlacEncoder, self).setup(
channels, samplerate, blocksize, totalframes)
@staticmethod
@interfacedoc
def id():
- return "gst_flac_enc"
+ return "flac_encoder"
@staticmethod
@interfacedoc
@staticmethod
@interfacedoc
def id():
- return "gst_aac_enc"
+ return "aac_encoder"
@staticmethod
@interfacedoc
@staticmethod
@interfacedoc
def id():
- return "gst_mp3_enc"
+ return "mp3_encoder"
@staticmethod
@interfacedoc
class VorbisEncoder(GstEncoder):
-
- """ gstreamer-based OGG Vorbis encoder """
+ """OGG Vorbis encoder based on Gstreamer"""
implements(IEncoder)
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None,
+ totalframes=None):
super(VorbisEncoder, self).setup(
channels, samplerate, blocksize, totalframes)
self.pipe = ''' appsrc name=src
@staticmethod
@interfacedoc
def id():
- return "gst_vorbis_enc"
+ return "vorbis_encoder"
@staticmethod
@interfacedoc
@staticmethod
@interfacedoc
def id():
- return "gst_opus_enc"
+ return "opus_encoder"
@staticmethod
@interfacedoc
@staticmethod
@interfacedoc
def id():
- return "gst_wav_enc"
+ return "wav_encoder"
@staticmethod
@interfacedoc
class WebMEncoder(GstEncoder):
-
- """ gstreamer-based WebM encoder """
+ """WebM encoder based on Gstreamer `webmmux` muxer"""
implements(IEncoder)
def __init__(self, output, streaming=False, overwrite=False, video=False):
@staticmethod
@interfacedoc
def id():
- return "gst_webm_enc"
+ return "webm_encoder"
@staticmethod
@interfacedoc
pass
# Onset Detection Function
-odf = get_processor('odf')
+odf = get_processor('onset_detection_function')
DisplayOnsetDetectionFunction = DisplayAnalyzer.create(
analyzer=odf,
- result_id='odf',
- grapher_id='grapher_odf',
+ result_id='onset_detection_function',
+ grapher_id='grapher_onset_detection_function',
grapher_name='Onset detection function')
# Waveform
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-Created on Tue Oct 7 09:19:37 2014
-
-@author: thomas
-"""
-
-from __future__ import division
-
-import pygst
-pygst.require("0.10")
-import gobject
-import gst
-import numpy
-import scipy.signal.waveforms
-import os.path
-
-#import timeside
-
-
-class NumpySrc:
- def __init__(self, array, samplerate):
- self.appsrc = gst.element_factory_make("appsrc")
- self.pos = 0
- self.samplerate = samplerate
- if array.ndim == 1:
- array.resize((array.shape[0], 1))
- self.length, self.channels = array.shape
- self.array = array.astype("float32")
- self.per_sample = gst.SECOND // samplerate
- self.fac = self.channels * array.dtype.itemsize
- #self.appsrc.set_property("size", (self.length * self.channels *
- # array.dtype.itemsize))
- self.appsrc.set_property("format", gst.FORMAT_TIME)
- capstr = """audio/x-raw-float,
- width=%d,
- depth=%d,
- rate=%d,
- channels=%d,
- endianness=(int)1234,
- signed=true""" % (self.array.dtype.itemsize*8,
- self.array.dtype.itemsize*8,
- self.samplerate,
- self.channels)
- self.appsrc.set_property("caps", gst.caps_from_string(capstr))
- self.appsrc.set_property("stream-type", 0) # Seekable
- self.appsrc.set_property('block', True)
-
- self.appsrc.connect("need-data", self.need_data)
- self.appsrc.connect("seek-data", self.seek_data)
- self.appsrc.connect("enough-data", self.enough_data)
-
- def need_data(self, element, length):
- #length = length // 64
- if self.pos >= self.array.shape[0]:
- element.emit("end-of-stream")
- else:
- avalaible_sample = self.length - self.pos
- if avalaible_sample < length:
- length = avalaible_sample
- array = self.array[self.pos:self.pos+length]
- buf = gst.Buffer(numpy.getbuffer(array.flatten()))
-
- buf.timestamp = self.pos * self.per_sample
- buf.duration = int(length*self.per_sample)
- element.emit("push-buffer", buf)
- self.pos += length
-
- def seek_data(self, element, npos):
- print 'seek %d' % npos
- self.pos = npos // self.per_sample
- return True
-
- def enough_data(self, element):
- print "----------- enough data ---------------"
-
-
-class SampleArray(object):
- """Base Class for generating a data sample array"""
-
- def __init__(self, duration=10, samplerate=44100):
- self.samplerate = int(samplerate)
- self.num_samples = int(numpy.ceil(duration * self.samplerate))
- self.array = NotImplemented
-
- @property
- def time_samples(self):
- return numpy.arange(0, self.num_samples)
-
- @property
- def duration(self):
- return self.num_samples / self.samplerate
-
- def __add__(self, other):
- if not self.samplerate == other.samplerate:
- raise ValueError("Samplerates mismatch")
-
- sum_ = SampleArray(samplerate=self.samplerate)
- sum_.num_samples = self.num_samples + other.num_samples
- sum_.array = numpy.vstack([self.array, other.array])
- return sum_
-
- def __and__(self, other):
- if not self.samplerate == other.samplerate:
- raise ValueError("Samplerates mismatch")
- if not self.num_samples == other.num_samples:
- raise ValueError("Number of samples mismatch")
-
- and_ = SampleArray(samplerate=self.samplerate)
- and_.num_samples = self.num_samples
- and_.array = numpy.hstack([self.array, other.array])
- return and_
-
-
-class SineArray(SampleArray):
- """Class for generating a Sine array"""
- def __init__(self, frequency=440, duration=10, samplerate=44100,
- channels=1):
- super(SineArray, self).__init__(duration=duration,
- samplerate=samplerate)
- self.frequency = frequency
- self.array = numpy.sin((2 * numpy.pi * self.frequency *
- self.time_samples / self.samplerate))
- self.array.resize(self.num_samples, 1)
-
-
-class SweepArray(SampleArray):
- """Class for generating a Sweep array"""
- def __init__(self, f0=20, f1=None, duration=10, samplerate=44100,
- method='logarithmic'):
- super(SweepArray, self).__init__(duration=duration,
- samplerate=samplerate)
-
- self.f0 = f0 / samplerate
- if f1 is None:
- self.f1 = 0.5
- else:
- self.f1 = f1 / samplerate
- self.method = method
- self.array = scipy.signal.waveforms.chirp(t=self.time_samples,
- f0=self.f0,
- t1=self.time_samples[-1],
- f1=self.f1,
- method=self.method)
- self.array.resize(self.num_samples, 1)
-
-
-class WhiteNoiseArray(SampleArray):
- """Class for generating a white noise array"""
- def __init__(self, duration=10, samplerate=44100):
- super(WhiteNoiseArray, self).__init__(duration=duration,
- samplerate=samplerate)
- array = numpy.random.randn(self.num_samples, 1)
- self.array = array / abs(array).max()
-
-
-class SilenceArray(SampleArray):
- """Class for generating a silence"""
- def __init__(self, duration=10, samplerate=44100):
- super(SilenceArray, self).__init__(duration=duration,
- samplerate=samplerate)
-
- self.array = numpy.zeros((self.num_samples, 1))
-
-
-class gst_BuildSample(object):
- def __init__(self, sample_array, output_file, gst_audio_encoder):
- if not isinstance(sample_array, SampleArray):
- raise ValueError("array must be a SampleArray subclass")
- self.sample_array = sample_array
- self.samplerate = self.sample_array.samplerate
- self.output_file = output_file
- if not isinstance(gst_audio_encoder, list):
- gst_audio_encoder = [gst_audio_encoder]
- self.gst_audio_encoder = gst_audio_encoder
-
- def run(self):
- pipeline = gst.Pipeline("pipeline")
- gobject.threads_init()
- mainloop = gobject.MainLoop()
-
- numpy_src = NumpySrc(array=self.sample_array.array,
- samplerate=self.samplerate)
-
- converter = gst.element_factory_make('audioconvert', 'converter')
-
- encoder_muxer = []
- for enc in self.gst_audio_encoder:
- encoder_muxer.append(gst.element_factory_make(enc))
-
- filesink = gst.element_factory_make('filesink', 'sink')
- filesink.set_property('location', self.output_file)
-
- pipe_elements = [numpy_src.appsrc, converter]
- pipe_elements.extend(encoder_muxer)
- pipe_elements.append(filesink)
-
- pipeline.add(*pipe_elements)
- gst.element_link_many(*pipe_elements)
-
- def _on_new_pad(self, source, pad, target_pad):
- print 'on_new_pad'
- if not pad.is_linked():
- if target_pad.is_linked():
- target_pad.get_peer().unlink(target_pad)
- pad.link(target_pad)
-
- def on_eos(bus, msg):
- pipeline.set_state(gst.STATE_NULL)
- mainloop.quit()
-
- def on_error(bus, msg):
- err, debug_info = msg.parse_error()
- print ("Error received from element %s: %s" % (msg.src.get_name(),
- err))
- print ("Debugging information: %s" % debug_info)
- mainloop.quit()
-
- pipeline.set_state(gst.STATE_PLAYING)
- bus = pipeline.get_bus()
- bus.add_signal_watch()
- bus.connect('message::eos', on_eos)
- bus.connect("message::error", on_error)
-
- mainloop.run()
-
-
-def generate_sample_file(filename, samples_dir, gst_audio_encoder,
- sample_array, overwrite=False):
- sample_file = os.path.join(samples_dir, filename)
-
- if overwrite or not os.path.exists(sample_file):
- gst_builder = gst_BuildSample(sample_array=sample_array,
- output_file=sample_file,
- gst_audio_encoder=gst_audio_encoder)
- gst_builder.run()
- return sample_file
-
-
-def generateSamples(overwrite=False):
- from timeside import __file__ as ts_file
- ts_path = os.path.split(os.path.abspath(ts_file))[0]
- tests_dir = os.path.abspath(os.path.join(ts_path, '../tests'))
- if os.path.isdir(tests_dir):
- samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples'))
- if not os.path.isdir(samples_dir):
- os.makedirs(samples_dir)
- else:
- import tempfile
- samples_dir = tempfile.mkdtemp(suffix="ts_samples")
-
- samples = dict()
-
- # --------- Sweeps ---------
- # sweep 44100 mono wav
- filename = 'sweep_mono.wav'
- samplerate = 44100
- gst_audio_encoder = 'wavenc'
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_mono,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sweep 44100 stereo wav
- sweep_stereo = sweep_mono & sweep_mono
- filename = 'sweep.wav'
- gst_audio_encoder = 'wavenc'
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sweep 44100 stereo mp3
- filename = 'sweep.mp3'
- gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sweep 44100 stereo flac
- filename = 'sweep.flac'
- gst_audio_encoder = 'flacenc'
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sweep 44100 stereo ogg
- filename = 'sweep.ogg'
- gst_audio_encoder = ['vorbisenc', 'oggmux']
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sweep 32000 stereo wav
- samplerate = 32000
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sweep_stereo = sweep_mono & sweep_mono
-
- filename = 'sweep_32000.wav'
- gst_audio_encoder = 'wavenc'
- sweep_mono = SweepArray(duration=8, samplerate=samplerate)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # --------- Sines ---------
- # sine at 440Hz, 44100 mono wav
- filename = 'sine440Hz_mono.wav'
- samplerate = 44100
- gst_audio_encoder = 'wavenc'
- sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_mono,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sine at 440Hz, 44100 stereo wav
- filename = 'sine440Hz.wav'
- sweep_stereo = sweep_mono & sweep_mono
- gst_audio_encoder = 'wavenc'
- sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sine at 440Hz, 44100 stereo mp3
- filename = 'sine440Hz.mp3'
- gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
- sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # sine at 440Hz, 44100 stereo ogg
- filename = 'sine440Hz.ogg'
- gst_audio_encoder = ['vorbisenc', 'oggmux']
- sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=sweep_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # --------- Equal tempered scale ---------
- filename = 'C4_scale.wav'
- samplerate = 44100
- f_C4 = 261.63
- f_D4 = 293.66
- f_E4 = 329.63
- f_F4 = 349.23
- f_G4 = 392.00
- f_A4 = 440.00
- f_B4 = 493.88
- f_C5 = 523.25
- sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4)
- sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4)
- sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4)
- sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4)
- sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4)
- sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4)
- sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4)
- sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5)
-
- silence = SilenceArray(duration=0.2, samplerate=samplerate)
-
- scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence +
- sineF4 + silence + sineG4 + silence + sineA4 + silence +
- sineB4 + silence + sineC5)
-
- gst_audio_encoder = 'wavenc'
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=scale,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # --------- White noise ---------
- # white noise - 44100Hz mono
- filename = 'white_noise_mono.wav'
- samplerate = 44100
- noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
- gst_audio_encoder = 'wavenc'
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=noise,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # white noise - 44100Hz stereo
- filename = 'white_noise.wav'
- samplerate = 44100
- noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
- noise_stereo = noise & noise
- gst_audio_encoder = 'wavenc'
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=noise_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- # white noise - 32000Hz stereo
- filename = 'white_noise_32000.wav'
- samplerate = 32000
- noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
- noise_stereo = noise & noise
- gst_audio_encoder = 'wavenc'
- sample_file = generate_sample_file(filename, samples_dir,
- gst_audio_encoder,
- sample_array=noise_stereo,
- overwrite=overwrite)
- samples.update({filename: sample_file})
-
- return samples
-
-
-samples = generateSamples()
-
-
-if __name__ == '__main__':
- generateSamples()
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Oct 7 09:19:37 2014
+
+@author: thomas
+"""
+
+from __future__ import division
+
+import pygst
+pygst.require("0.10")
+import gobject
+import gst
+import numpy
+import scipy.signal.waveforms
+import os.path
+
+#import timeside
+
+
+class NumpySrc:
+ def __init__(self, array, samplerate):
+ self.appsrc = gst.element_factory_make("appsrc")
+ self.pos = 0
+ self.samplerate = samplerate
+ if array.ndim == 1:
+ array.resize((array.shape[0], 1))
+ self.length, self.channels = array.shape
+ self.array = array.astype("float32")
+ self.per_sample = gst.SECOND // samplerate
+ self.fac = self.channels * array.dtype.itemsize
+ #self.appsrc.set_property("size", (self.length * self.channels *
+ # array.dtype.itemsize))
+ self.appsrc.set_property("format", gst.FORMAT_TIME)
+ capstr = """audio/x-raw-float,
+ width=%d,
+ depth=%d,
+ rate=%d,
+ channels=%d,
+ endianness=(int)1234,
+ signed=true""" % (self.array.dtype.itemsize*8,
+ self.array.dtype.itemsize*8,
+ self.samplerate,
+ self.channels)
+ self.appsrc.set_property("caps", gst.caps_from_string(capstr))
+ self.appsrc.set_property("stream-type", 0) # Seekable
+ self.appsrc.set_property('block', True)
+
+ self.appsrc.connect("need-data", self.need_data)
+ self.appsrc.connect("seek-data", self.seek_data)
+ self.appsrc.connect("enough-data", self.enough_data)
+
+ def need_data(self, element, length):
+ #length = length // 64
+ if self.pos >= self.array.shape[0]:
+ element.emit("end-of-stream")
+ else:
+ avalaible_sample = self.length - self.pos
+ if avalaible_sample < length:
+ length = avalaible_sample
+ array = self.array[self.pos:self.pos+length]
+ buf = gst.Buffer(numpy.getbuffer(array.flatten()))
+
+ buf.timestamp = self.pos * self.per_sample
+ buf.duration = int(length*self.per_sample)
+ element.emit("push-buffer", buf)
+ self.pos += length
+
+ def seek_data(self, element, npos):
+ print 'seek %d' % npos
+ self.pos = npos // self.per_sample
+ return True
+
+ def enough_data(self, element):
+ print "----------- enough data ---------------"
+
+
+class SampleArray(object):
+ """Base Class for generating a data sample array"""
+
+ def __init__(self, duration=10, samplerate=44100):
+ self.samplerate = int(samplerate)
+ self.num_samples = int(numpy.ceil(duration * self.samplerate))
+ self.array = NotImplemented
+
+ @property
+ def time_samples(self):
+ return numpy.arange(0, self.num_samples)
+
+ @property
+ def duration(self):
+ return self.num_samples / self.samplerate
+
+ def __add__(self, other):
+ if not self.samplerate == other.samplerate:
+ raise ValueError("Samplerates mismatch")
+
+ sum_ = SampleArray(samplerate=self.samplerate)
+ sum_.num_samples = self.num_samples + other.num_samples
+ sum_.array = numpy.vstack([self.array, other.array])
+ return sum_
+
+ def __and__(self, other):
+ if not self.samplerate == other.samplerate:
+ raise ValueError("Samplerates mismatch")
+ if not self.num_samples == other.num_samples:
+ raise ValueError("Number of samples mismatch")
+
+ and_ = SampleArray(samplerate=self.samplerate)
+ and_.num_samples = self.num_samples
+ and_.array = numpy.hstack([self.array, other.array])
+ return and_
+
+
+class SineArray(SampleArray):
+ """Class for generating a Sine array"""
+ def __init__(self, frequency=440, duration=10, samplerate=44100,
+ channels=1):
+ super(SineArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+ self.frequency = frequency
+ self.array = numpy.sin((2 * numpy.pi * self.frequency *
+ self.time_samples / self.samplerate))
+ self.array.resize(self.num_samples, 1)
+
+
+class SweepArray(SampleArray):
+ """Class for generating a Sweep array"""
+ def __init__(self, f0=20, f1=None, duration=10, samplerate=44100,
+ method='logarithmic'):
+ super(SweepArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+
+ self.f0 = f0 / samplerate
+ if f1 is None:
+ self.f1 = 0.5
+ else:
+ self.f1 = f1 / samplerate
+ self.method = method
+ self.array = scipy.signal.waveforms.chirp(t=self.time_samples,
+ f0=self.f0,
+ t1=self.time_samples[-1],
+ f1=self.f1,
+ method=self.method)
+ self.array.resize(self.num_samples, 1)
+
+
+class WhiteNoiseArray(SampleArray):
+ """Class for generating a white noise array"""
+ def __init__(self, duration=10, samplerate=44100):
+ super(WhiteNoiseArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+ array = numpy.random.randn(self.num_samples, 1)
+ self.array = array / abs(array).max()
+
+
+class SilenceArray(SampleArray):
+ """Class for generating a silence"""
+ def __init__(self, duration=10, samplerate=44100):
+ super(SilenceArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+
+ self.array = numpy.zeros((self.num_samples, 1))
+
+
+class gst_BuildSample(object):
+ def __init__(self, sample_array, output_file, gst_audio_encoder):
+ if not isinstance(sample_array, SampleArray):
+ raise ValueError("array must be a SampleArray subclass")
+ self.sample_array = sample_array
+ self.samplerate = self.sample_array.samplerate
+ self.output_file = output_file
+ if not isinstance(gst_audio_encoder, list):
+ gst_audio_encoder = [gst_audio_encoder]
+ self.gst_audio_encoder = gst_audio_encoder
+
+ def run(self):
+ pipeline = gst.Pipeline("pipeline")
+ gobject.threads_init()
+ mainloop = gobject.MainLoop()
+
+ numpy_src = NumpySrc(array=self.sample_array.array,
+ samplerate=self.samplerate)
+
+ converter = gst.element_factory_make('audioconvert', 'converter')
+
+ encoder_muxer = []
+ for enc in self.gst_audio_encoder:
+ encoder_muxer.append(gst.element_factory_make(enc))
+
+ filesink = gst.element_factory_make('filesink', 'sink')
+ filesink.set_property('location', self.output_file)
+
+ pipe_elements = [numpy_src.appsrc, converter]
+ pipe_elements.extend(encoder_muxer)
+ pipe_elements.append(filesink)
+
+ pipeline.add(*pipe_elements)
+ gst.element_link_many(*pipe_elements)
+
+ def _on_new_pad(self, source, pad, target_pad):
+ print 'on_new_pad'
+ if not pad.is_linked():
+ if target_pad.is_linked():
+ target_pad.get_peer().unlink(target_pad)
+ pad.link(target_pad)
+
+ def on_eos(bus, msg):
+ pipeline.set_state(gst.STATE_NULL)
+ mainloop.quit()
+
+ def on_error(bus, msg):
+ err, debug_info = msg.parse_error()
+ print ("Error received from element %s: %s" % (msg.src.get_name(),
+ err))
+ print ("Debugging information: %s" % debug_info)
+ mainloop.quit()
+
+ pipeline.set_state(gst.STATE_PLAYING)
+ bus = pipeline.get_bus()
+ bus.add_signal_watch()
+ bus.connect('message::eos', on_eos)
+ bus.connect("message::error", on_error)
+
+ mainloop.run()
+
+
+def generate_sample_file(filename, samples_dir, gst_audio_encoder,
+ sample_array, overwrite=False):
+ sample_file = os.path.join(samples_dir, filename)
+
+ if overwrite or not os.path.exists(sample_file):
+ gst_builder = gst_BuildSample(sample_array=sample_array,
+ output_file=sample_file,
+ gst_audio_encoder=gst_audio_encoder)
+ gst_builder.run()
+ return sample_file
+
+
+def generateSamples(overwrite=False):
+ from timeside import __file__ as ts_file
+ ts_path = os.path.split(os.path.abspath(ts_file))[0]
+ tests_dir = os.path.abspath(os.path.join(ts_path, '../tests'))
+ if os.path.isdir(tests_dir):
+ samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples'))
+ if not os.path.isdir(samples_dir):
+ os.makedirs(samples_dir)
+ else:
+ import tempfile
+ samples_dir = tempfile.mkdtemp(suffix="ts_samples")
+
+ samples = dict()
+
+ # --------- Sweeps ---------
+ # sweep 44100 mono wav
+ filename = 'sweep_mono.wav'
+ samplerate = 44100
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_mono,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo wav
+ sweep_stereo = sweep_mono & sweep_mono
+ filename = 'sweep.wav'
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo mp3
+ filename = 'sweep.mp3'
+ gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo flac
+ filename = 'sweep.flac'
+ gst_audio_encoder = 'flacenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo ogg
+ filename = 'sweep.ogg'
+ gst_audio_encoder = ['vorbisenc', 'oggmux']
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 32000 stereo wav
+ samplerate = 32000
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sweep_stereo = sweep_mono & sweep_mono
+
+ filename = 'sweep_32000.wav'
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- Sines ---------
+ # sine at 440Hz, 44100 mono wav
+ filename = 'sine440Hz_mono.wav'
+ samplerate = 44100
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_mono,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # Short 1s sine at 440Hz, 44100 mono wav
+ filename = 'sine440Hz_mono_1s.wav'
+ samplerate = 44100
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SineArray(duration=1, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_mono,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo wav
+ filename = 'sine440Hz.wav'
+ sweep_stereo = sweep_mono & sweep_mono
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo mp3
+ filename = 'sine440Hz.mp3'
+ gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo ogg
+ filename = 'sine440Hz.ogg'
+ gst_audio_encoder = ['vorbisenc', 'oggmux']
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- Equal tempered scale ---------
+ filename = 'C4_scale.wav'
+ samplerate = 44100
+ f_C4 = 261.63
+ f_D4 = 293.66
+ f_E4 = 329.63
+ f_F4 = 349.23
+ f_G4 = 392.00
+ f_A4 = 440.00
+ f_B4 = 493.88
+ f_C5 = 523.25
+ sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4)
+ sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4)
+ sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4)
+ sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4)
+ sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4)
+ sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4)
+ sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4)
+ sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5)
+
+ silence = SilenceArray(duration=0.2, samplerate=samplerate)
+
+ scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence +
+ sineF4 + silence + sineG4 + silence + sineA4 + silence +
+ sineB4 + silence + sineC5)
+
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=scale,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- White noise ---------
+ # white noise - 44100Hz mono
+ filename = 'white_noise_mono.wav'
+ samplerate = 44100
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # white noise - 44100Hz stereo
+ filename = 'white_noise.wav'
+ samplerate = 44100
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ noise_stereo = noise & noise
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # white noise - 32000Hz stereo
+ filename = 'white_noise_32000.wav'
+ samplerate = 32000
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ noise_stereo = noise & noise
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ return samples
+
+
+samples = generateSamples()
+
+
+if __name__ == '__main__':
+ generateSamples()