from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
-from ..tools.parameters import Int, HasTraits
+from timeside.tools.parameters import Int, HasTraits
+from timeside.tools.buffering import BufferTable
import numpy as np
class Spectrogram(Analyzer):
- """Spectrogram analyzer"""
+ """Spectrogram analyzer
+
+ >>> import timeside
+ >>> from timeside.tools.data_samples import samples as ts_samples
+ >>> audio_source = ts_samples['sweep.wav']
+ >>> decoder = timeside.decoder.file.FileDecoder(uri=audio_source)
+ >>> spectrogram = Spectrogram(input_blocksize=2048, input_stepsize=1024)
+ >>> pipe = (decoder | spectrogram)
+ >>> pipe.run()
+
+ """
implements(IAnalyzer)
# Define Parameters
else:
self.fft_size = fft_size
- self.values = []
+ self.values = BufferTable()
@interfacedoc
def setup(self, channels=None, samplerate=None,
@downmix_to_mono
@frames_adapter
def process(self, frames, eod=False):
- self.values.append(np.abs(np.fft.rfft(frames, self.fft_size)))
+ stft = np.fft.rfft(frames, self.fft_size)
+ self.values.append('stft', stft)
return frames, eod
def post_process(self):
spectrogram = self.new_result(data_mode='value', time_mode='framewise')
spectrogram.parameters = {'fft_size': self.fft_size}
- spectrogram.data_object.value = self.values
+ spectrogram.data_object.value = np.abs(self.values['stft'])
nb_freq = spectrogram.data_object.value.shape[1]
spectrogram.data_object.y_value = (np.arange(0, nb_freq) *
self.samplerate() / self.fft_size)
self.add_result(spectrogram)
+
+if __name__ == "__main__":
+ import doctest
+ doctest.testmod()