self.stream.writeln("OK")
return result
-
+def runTestModule(module):
+ suite = unittest.loader.TestLoader().loadTestsFromModule(module)
+ TestRunner().run(suite)
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
import numpy
from aubio import filterbank, pvoc
self.melenergy = filterbank(self.n_filters, self.input_blocksize)
self.melenergy.set_mel_coeffs_slaney(samplerate)
self.block_read = 0
- self.melenergy_results = numpy.zeros([self.n_filters, ])
+ self.melenergy_results = []
@staticmethod
@interfacedoc
def unit():
return ""
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- # TODO : check pourquoi on utilise pas le blocksize ?
- fftgrain = self.pvoc(samples)
- self.melenergy_results = numpy.vstack(
- [self.melenergy_results, self.melenergy(fftgrain)])
- self.block_read += 1
+
+ fftgrain = self.pvoc(frames)
+ self.melenergy_results.append(self.melenergy(fftgrain))
+ self.block_read += 1
return frames, eod
def post_process(self):
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
import numpy
from aubio import mfcc, pvoc
def unit():
return ""
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- fftgrain = self.pvoc(samples)
- coeffs = self.mfcc(fftgrain)
- self.mfcc_results = numpy.vstack((self.mfcc_results, coeffs))
- self.block_read += 1
+ fftgrain = self.pvoc(frames)
+ coeffs = self.mfcc(fftgrain)
+ self.mfcc_results = numpy.vstack((self.mfcc_results, coeffs))
+ self.block_read += 1
return frames, eod
def post_process(self):
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
from aubio import pitch
def __str__(self):
return "pitch values"
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
- self.pitches += [self.p(samples)[0]]
- self.block_read += 1
+ #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
+ self.pitches += [self.p(frames)[0]]
+ self.block_read += 1
return frames, eod
def post_process(self):
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
from aubio import specdesc, pvoc
def unit():
return ""
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- fftgrain = self.pvoc(samples)
- for method in self.methods:
- self.specdesc_results[method] += [
- self.specdesc[method](fftgrain)[0]]
+ fftgrain = self.pvoc(frames)
+ for method in self.methods:
+ self.specdesc_results[method] += [
+ self.specdesc[method](fftgrain)[0]]
return frames, eod
def post_process(self):
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
from aubio import onset, tempo
import numpy
def __str__(self):
return "%s %s" % (str(self.value), self.unit())
+
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- if self.o(samples):
- self.onsets += [self.o.get_last_s()]
- if self.t(samples):
- self.beats += [self.t.get_last_s()]
- self.block_read += 1
+ if self.o(frames):
+ self.onsets += [self.o.get_last_s()]
+ if self.t(frames):
+ self.beats += [self.t.get_last_s()]
+ self.block_read += 1
return frames, eod
def post_process(self):
# You should have received a copy of the GNU General Public License
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-# Author: Paul Brossier <piem@piem.org>
+# Author: Thomas Fillon <thomas@parisson.com>
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
#spectrogram = self.pipe._results[self.parents()[0].id]
# Low-pass filtering of the spectrogram amplitude along the time axis
- S = signal.lfilter(signal.hann(15), 1, abs(spectrogram), axis=0)
+ S = signal.lfilter(signal.hann(15)[8:], 1, abs(spectrogram), axis=0)
+
+
+ import matplotlib.pyplot as plt
+# plt.figure()
+# plt.imshow(np.log10(abs(spectrogram)), origin='lower', aspect='auto', interpolation='nearest')
+
+
# Clip small value to a minimal threshold
np.maximum(S, 1e-9, out=S)
S = np.log10(S)
+
+# plt.figure()
+# plt.imshow(S,origin='lower', aspect='auto', interpolation='nearest')
+# plt.show()
+
# S[S<1e-3]=0
np.maximum(S, 1e-3, out=S)
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
#
# Author : Thomas fillon <thomas@parisson.fr>
+'''
+ Collections of preprocessors to use as decorators for the analyzers process
+ Preprocessors process the (frame, eod) arguments in order to handle various
+ preprocessing such as :
+ - Downmixing to mono
+ - Adapt the frames to match the input_blocksize and input_stepsize
+ of the analyzer
+'''
def downmix_to_mono(process_func):
'''
Pre-processing decorator that downmixes frames from multi-channel to mono
- Downmix by averaging all channels
+
+ Downmix is achieved by averaging all channels
+
>>> @downmix_to_mono
... def process(analyzer,frames,eod):
... print 'Frames, eod inside process :'
def frames_adapter(process_func):
'''
- Pre-processing decorator that adapt frames to match blocksize and stepsize
+ Pre-processing decorator that adapt frames to match input_blocksize and
+ input_stepsize of the decorated analyzer
+
>>> @frames_adapter
... def process(analyzer,frames,eod):
... analyzer.frames.append(frames)
if __name__ == "__main__":
+ # Run doctest
import doctest
doctest.testmod()
+
+ # Run unittest from test_analyzer_preprocessors
+ from tests import test_analyzer_preprocessors
+ from tests.unit_timeside import runTestModule
+ runTestModule(test_analyzer_preprocessors)
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
+from preprocessors import downmix_to_mono, frames_adapter
import numpy as np
def unit():
return ""
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.input_stepsize):
- #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
- self.values.append(np.abs(np.fft.rfft(samples, self.FFT_SIZE)))
- return frames, eod
+ self.values.append(np.abs(np.fft.rfft(frames, self.FFT_SIZE)))
+ return frames, eod
def post_process(self):
spectrogram = self.new_result(data_mode='value', time_mode='framewise')
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from utils import downsample_blocking
import numpy as np
+from preprocessors import downmix_to_mono, frames_adapter
class Waveform(Analyzer):
implements(IAnalyzer) # TODO check if needed with inheritance
def __init__(self):
+ super(Waveform, self).__init__()
self.input_blocksize = 2048
self.input_stepsize = self.input_blocksize / 2
def unit():
return ""
+ @downmix_to_mono
+ @frames_adapter
def process(self, frames, eod=False):
for samples in downsample_blocking(frames, self.input_blocksize):
self.values.append(samples)