class AubioMelEnergy(Analyzer):
+
"""Aubio Mel Energy analyzer"""
implements(IAnalyzer)
class AubioMfcc(Analyzer):
+
"""Aubio MFCC analyzer"""
implements(IAnalyzer)
from aubio import pitch
import numpy as np
+
class AubioPitch(Analyzer):
+
"""Aubio Pitch estimation analyzer"""
implements(IAnalyzer) # TODO check if needed with inheritance
samplerate,
blocksize,
totalframes)
- self.aubio_pitch = pitch("default", self.input_blocksize, self.input_stepsize,
- samplerate)
+ self.aubio_pitch = pitch(
+ "default", self.input_blocksize, self.input_stepsize,
+ samplerate)
self.aubio_pitch.set_unit("freq")
self.block_read = 0
self.pitches = []
def process(self, frames, eod=False):
#time = self.block_read * self.input_stepsize * 1. / self.samplerate()
self.pitches += [self.aubio_pitch(frames)[0]]
- self.pitch_confidences += [np.nan_to_num(self.aubio_pitch.get_confidence())]
+ self.pitch_confidences += [
+ np.nan_to_num(self.aubio_pitch.get_confidence())]
self.block_read += 1
return frames, eod
pitch.data_object.value = self.pitches
self.process_pipe.results.add(pitch)
- pitch_confidence = self.new_result(data_mode='value', time_mode='framewise')
+ pitch_confidence = self.new_result(
+ data_mode='value', time_mode='framewise')
pitch_confidence.id_metadata.id += '.' + "pitch_confidence"
pitch_confidence.id_metadata.name += ' ' + "pitch confidence"
pitch_confidence.id_metadata.unit = None
class AubioSpecdesc(Analyzer):
+
"""Aubio Spectral Descriptors collection analyzer"""
implements(IAnalyzer)
class AubioTemporal(Analyzer):
+
"""Aubio Temporal analyzer"""
implements(IAnalyzer)
def __str__(self):
return "%s %s" % (str(self.value), self.unit())
-
@downmix_to_mono
@frames_adapter
def process(self, frames, eod=False):
#---------------------------------
# Beat confidences: Event (time, value)
#---------------------------------
- beat_confidences = self.new_result(data_mode='value', time_mode='event')
+ beat_confidences = self.new_result(
+ data_mode='value', time_mode='event')
beat_confidences.id_metadata.id += '.' + "beat_confidence"
beat_confidences.id_metadata.name += " " + "Beat confidences"
beat_confidences.id_metadata.unit = None
from __future__ import division
from timeside.core import Processor
-import timeside #import __version__
+import timeside # import __version__
import numpy
from collections import OrderedDict
import h5py
maxshape = None
else:
maxshape = (None,)
- h5group.create_dataset(key, data=self.__getattribute__(key), maxshape = maxshape)
+ h5group.create_dataset(
+ key, data=self.__getattribute__(key), maxshape=maxshape)
def from_hdf5(self, h5group):
for key, dataset in h5group.items():
class FrameValueResult(ValueObject, FramewiseObject, AnalyzerResult):
+
def _render_plot(self, ax):
ax.plot(self.time, self.data)
class FrameLabelResult(LabelObject, FramewiseObject, AnalyzerResult):
+
def _render_plot(self, ax):
pass
class SegmentValueResult(ValueObject, SegmentObject, AnalyzerResult):
+
def _render_plot(self, ax):
for time, value in (self.time, self.data):
ax.axvline(time, ymin=0, ymax=value, color='r')
class SegmentLabelResult(LabelObject, SegmentObject, AnalyzerResult):
+
def _render_plot(self, ax):
import itertools
colors = itertools.cycle(['b', 'g', 'r', 'c', 'm', 'y', 'k'])
for key in self.label_metadata.label.keys():
ax_color[key] = colors.next()
for time, duration, label in zip(self.time, self.duration, self.data):
- ax.axvspan(time, time+duration, color=ax_color[label], alpha=0.3)
+ ax.axvspan(time, time + duration, color=ax_color[label], alpha=0.3)
class AnalyzerResultContainer(dict):
class MeanDCShift(Analyzer):
+
"""Mean DC shift analyzer"""
implements(IValueAnalyzer)
class IRITSpeech4Hz(Analyzer):
+
'''Speech Segmentor based on the 4Hz energy modulation analysis.
Properties:
segs.data_object.time = [(float(s[0]) * self.blocksize() /
self.samplerate())
for s in segList]
- segs.data_object.duration = [(float(s[1]-s[0]+1) * self.blocksize() /
+ segs.data_object.duration = [(float(s[1] - s[0] + 1) * self.blocksize() /
self.samplerate())
for s in segList]
class IRITSpeechEntropy(Analyzer):
+
"""Speech Segmentor based on Entropy analysis."""
implements(IAnalyzer)
segs.data_object.time = [(float(s[0]) * self.blocksize() /
self.samplerate())
for s in segList]
- segs.data_object.duration = [(float(s[1]-s[0]+1) * self.blocksize() /
+ segs.data_object.duration = [(float(s[1] - s[0] + 1) * self.blocksize() /
self.samplerate())
for s in segList]
import numpy as np
from .utils import MACHINE_EPSILON
+
class Level(Analyzer):
+
"""RMS level analyzer"""
implements(IValueAnalyzer)
if self.max_value == 0: # Prevent np.log10(0) = Inf
self.max_value = MACHINE_EPSILON
- max_level.data_object.value = np.round(20*np.log10(self.max_value), 3)
+ max_level.data_object.value = np.round(
+ 20 * np.log10(self.max_value), 3)
self.process_pipe.results.add(max_level)
# RMS level
if rms_val == 0:
rms_val = MACHINE_EPSILON
- rms_level.data_object.value = np.round(20*np.log10(rms_val), 3)
+ rms_level.data_object.value = np.round(20 * np.log10(rms_val), 3)
self.process_pipe.results.add(rms_level)
- 2 * N.dot(x, (self.means / self.vars).T)
+ N.dot(x ** 2, (1.0 / self.vars).T))
+ N.log(self.weights)
- m = N.amax(llh,1)
+ m = N.amax(llh, 1)
dif = llh - N.atleast_2d(m).T
- return m + N.log(N.sum(N.exp(dif),1))
+ return m + N.log(N.sum(N.exp(dif), 1))
class LimsiSad(Analyzer):
+
"""
Limsi Speech Activity Detection Systems
LimsiSad performs frame level speech activity detection based on GMM models
"""
implements(IAnalyzer)
-
def __init__(self, sad_model='etape'):
"""
Parameters:
# feature extraction defition
spec = yaafelib.FeaturePlan(sample_rate=16000)
- spec.addFeature('mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256')
- spec.addFeature('mfccd1: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=1')
- spec.addFeature('mfccd2: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=2')
+ spec.addFeature(
+ 'mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256')
+ spec.addFeature(
+ 'mfccd1: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=1')
+ spec.addFeature(
+ 'mfccd2: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=2')
spec.addFeature('zcr: ZCR blockSize=1024 stepSize=256')
parent_analyzer = Yaafe(spec)
self.parents.append(parent_analyzer)
# load gmm model
if sad_model not in ['etape', 'maya']:
- raise ValueError("argument sad_model %s not supported. Supported values are 'etape' or 'maya'" % sad_model)
- picfname = os.path.join(timeside.__path__[0], 'trained_models', 'limsi_sad_%s.pkl' % sad_model)
+ raise ValueError(
+ "argument sad_model %s not supported. Supported values are 'etape' or 'maya'" % sad_model)
+ picfname = os.path.join(
+ timeside.__path__[0], 'trained_models', 'limsi_sad_%s.pkl' % sad_model)
self.gmms = pickle.load(open(picfname, 'rb'))
-
@staticmethod
@interfacedoc
def id():
def process(self, frames, eod=False):
if self.input_samplerate != 16000:
- raise Exception('%s requires 16000 input sample rate: %d provided' % (self.__class__.__name__, self.input_samplerate))
+ raise Exception(
+ '%s requires 16000 input sample rate: %d provided' %
+ (self.__class__.__name__, self.input_samplerate))
return frames, eod
def post_process(self):
mfcc = self.process_pipe.results['yaafe.mfcc']['data_object']['value']
- mfccd1 = self.process_pipe.results['yaafe.mfccd1']['data_object']['value']
- mfccd2 = self.process_pipe.results['yaafe.mfccd2']['data_object']['value']
+ mfccd1 = self.process_pipe.results[
+ 'yaafe.mfccd1']['data_object']['value']
+ mfccd2 = self.process_pipe.results[
+ 'yaafe.mfccd2']['data_object']['value']
zcr = self.process_pipe.results['yaafe.zcr']['data_object']['value']
features = N.concatenate((mfcc, mfccd1, mfccd2, zcr), axis=1)
- res = 0.5 + 0.5 * (self.gmms[0].llh(features) - self.gmms[1].llh(features))
+ res = 0.5 + 0.5 * \
+ (self.gmms[0].llh(features) - self.gmms[1].llh(features))
sad_result = self.new_result(data_mode='value', time_mode='framewise')
sad_result.id_metadata.id += '.' + 'sad_lhh_diff'
- sad_result.id_metadata.name += ' ' + 'Speech Activity Detection Log Likelihood Difference'
+ sad_result.id_metadata.name += ' ' + \
+ 'Speech Activity Detection Log Likelihood Difference'
sad_result.data_object.value = res
self.process_pipe.results.add(sad_result)
-
class OnsetDetectionFunction(Analyzer):
+
"""Onset Detection Function analyzer"""
implements(IAnalyzer)
# Low-pass filtering of the spectrogram amplitude along the time axis
S = signal.lfilter(signal.hann(15)[8:], 1, abs(spectrogram), axis=0)
-
import matplotlib.pyplot as plt
# plt.figure()
# plt.imshow(np.log10(abs(spectrogram)), origin='lower', aspect='auto', interpolation='nearest')
-
# Clip small value to a minimal threshold
np.maximum(S, 1e-9, out=S)
of the analyzer
'''
+
def downmix_to_mono(process_func):
'''
Pre-processing decorator that downmixes frames from multi-channel to mono
if eod and len(eod_list):
eod_list[-1] = eod
- for index, eod in zip(xrange(0, nb_frames*self.stepsize, self.stepsize), eod_list):
- yield (stack[index:index + self.blocksize],eod)
+ for index, eod in zip(xrange(0, nb_frames * self.stepsize, self.stepsize), eod_list):
+ yield (stack[index:index + self.blocksize], eod)
@functools.wraps(process_func)
def wrapper(analyzer, frames, eod):
class Spectrogram(Analyzer):
+
"""Spectrogram analyzer"""
implements(IAnalyzer)
def setup(self, channels=None, samplerate=None,
blocksize=None, totalframes=None):
super(Spectrogram, self).setup(channels, samplerate,
- blocksize, totalframes)
+ blocksize, totalframes)
@staticmethod
@interfacedoc
MACHINE_EPSILON = np.finfo(np.float32).eps
+
def downsample_blocking(frames, hop_s, dtype='float32'):
# downmixing to one channel
if len(frames.shape) != 1:
- downsampled = frames.sum(axis = -1) / frames.shape[-1]
+ downsampled = frames.sum(axis=-1) / frames.shape[-1]
else:
downsampled = frames
# zero padding to have a multiple of hop_s
if downsampled.shape[0] % hop_s != 0:
- pad_length = hop_s + downsampled.shape[0] / hop_s * hop_s - downsampled.shape[0]
- downsampled = np.hstack([downsampled, np.zeros(pad_length, dtype = dtype)])
+ pad_length = hop_s + \
+ downsampled.shape[0] / hop_s * hop_s - downsampled.shape[0]
+ downsampled = np.hstack(
+ [downsampled, np.zeros(pad_length, dtype=dtype)])
# blocking
return downsampled.reshape(downsampled.shape[0] / hop_s, hop_s)
'''
sLen = len(serie)
modul = np.zeros((sLen,))
- w = int(wLen/2)
+ w = int(wLen / 2)
- for i in range(w, sLen-w):
+ for i in range(w, sLen - w):
- d = serie[i-w:i+w]
+ d = serie[i - w:i + w]
if withLog:
if not (d > 0).all():
d[d <= 0] = MACHINE_EPSILON # prevent log(0)=inf
modul[i] = np.var(d)
modul[:w] = modul[w]
- modul[-w:] = modul[-w-1]
+ modul[-w:] = modul[-w - 1]
return modul
'''
- seg = [offset,-1,values[0]]
+ seg = [offset, -1, values[0]]
segList = []
- for i,v in enumerate(values) :
+ for i, v in enumerate(values):
- if not (v == seg[2]) :
- seg[1] = i+offset-1
+ if not (v == seg[2]):
+ seg[1] = i + offset - 1
segList.append(tuple(seg))
- seg = [i+offset,-1,v]
+ seg = [i + offset, -1, v]
- seg[1] = i+offset
+ seg[1] = i + offset
segList.append(tuple(seg))
return segList
# Double emploi avec le calcul mfcc d'aubio. Voir pour la fusion...
# Maxime
-def melFilterBank(nbFilters,fftLen,sr) :
+def melFilterBank(nbFilters, fftLen, sr):
'''
Grenerate a Mel Filter-Bank
(Use numpy *dot* function).
'''
- fh = float(sr)/2.0
- mh = 2595*np.log10(1+fh/700)
+ fh = float(sr) / 2.0
+ mh = 2595 * np.log10(1 + fh / 700)
- step = mh/nbFilters;
+ step = mh / nbFilters
- mcenter = np.arange(step,mh,step)
+ mcenter = np.arange(step, mh, step)
- fcenter = 700*(10**(mcenter/2595)-1)
+ fcenter = 700 * (10 ** (mcenter / 2595) - 1)
- filterbank = np.zeros((fftLen,nbFilters));
+ filterbank = np.zeros((fftLen, nbFilters))
- for i,_ in enumerate(fcenter) :
+ for i, _ in enumerate(fcenter):
- if i == 0 :
+ if i == 0:
fmin = 0.0
- else :
- fmin = fcenter[i-1]
+ else:
+ fmin = fcenter[i - 1]
- if i == len(fcenter)-1 :
+ if i == len(fcenter) - 1:
fmax = fh
- else :
- fmax = fcenter[i+1]
+ else:
+ fmax = fcenter[i + 1]
- imin = np.ceil(fmin/fh*fftLen)
- imax = np.ceil(fmax/fh*fftLen)
+ imin = np.ceil(fmin / fh * fftLen)
+ imax = np.ceil(fmax / fh * fftLen)
- filterbank[imin:imax,i] = triangle(imax-imin)
+ filterbank[imin:imax, i] = triangle(imax - imin)
return filterbank
- triangle : triangle filter.
'''
- triangle = np.zeros((1,length))[0]
- climax= np.ceil(length/2)
+ triangle = np.zeros((1, length))[0]
+ climax = np.ceil(length / 2)
- triangle[0:climax] = np.linspace(0,1,climax)
- triangle[climax:length] = np.linspace(1,0,length-climax)
+ triangle[0:climax] = np.linspace(0, 1, climax)
+ triangle[climax:length] = np.linspace(1, 0, length - climax)
return triangle
-def entropy(serie,nbins=10,base=np.exp(1),approach='unbiased'):
+def entropy(serie, nbins=10, base=np.exp(1), approach='unbiased'):
'''
Compute entropy of a serie using the histogram method.
estimate = 0
sigma = 0
- bins,edges = np.histogram(serie,nbins);
+ bins, edges = np.histogram(serie, nbins)
ncell = len(bins)
- norm = (np.max(edges)-np.min(edges))/len(bins)
-
+ norm = (np.max(edges) - np.min(edges)) / len(bins)
- for b in bins :
- if b == 0 :
+ for b in bins:
+ if b == 0:
logf = 0
- else :
+ else:
logf = np.log(b)
- estimate = estimate - b*logf
- sigma = sigma + b * logf**2
+ estimate = estimate - b * logf
+ sigma = sigma + b * logf ** 2
count = np.sum(bins)
- estimate=estimate/count;
- sigma=np.sqrt( (sigma/count-estimate**2)/float(count-1) );
- estimate=estimate+np.log(count)+np.log(norm);
- nbias=-(ncell-1)/(2*count);
-
- if approach =='unbiased' :
- estimate=estimate-nbias;
- nbias=0;
-
- elif approach =='mmse' :
- estimate=estimate-nbias;
- nbias=0;
- lambda_value=estimate^2/(estimate^2+sigma^2);
- nbias =(1-lambda_value)*estimate;
- estimate=lambda_value*estimate;
- sigma =lambda_value*sigma;
- else :
+ estimate = estimate / count
+ sigma = np.sqrt((sigma / count - estimate ** 2) / float(count - 1))
+ estimate = estimate + np.log(count) + np.log(norm)
+ nbias = -(ncell - 1) / (2 * count)
+
+ if approach == 'unbiased':
+ estimate = estimate - nbias
+ nbias = 0
+
+ elif approach == 'mmse':
+ estimate = estimate - nbias
+ nbias = 0
+ lambda_value = estimate ^ 2 / (estimate ^ 2 + sigma ^ 2)
+ nbias = (1 - lambda_value) * estimate
+ estimate = lambda_value * estimate
+ sigma = lambda_value * sigma
+ else:
return 0
- estimate=estimate/np.log(base);
- nbias =nbias /np.log(base);
- sigma =sigma /np.log(base);
+ estimate = estimate / np.log(base)
+ nbias = nbias / np.log(base)
+ sigma = sigma / np.log(base)
return estimate
-
class VampSimpleHost(Analyzer):
+
"""Vamp plugins library interface analyzer"""
implements(IAnalyzer)
return
if duration is not None:
- plugin_res = self.new_result(data_mode='value', time_mode='segment')
+ plugin_res = self.new_result(
+ data_mode='value', time_mode='segment')
plugin_res.data_object.duration = duration
else:
- plugin_res = self.new_result(data_mode='value', time_mode='event')
+ plugin_res = self.new_result(
+ data_mode='value', time_mode='event')
plugin_res.data_object.time = time
plugin_res.data_object.value = value
-
-# # Fix strat, duration issues if audio is a segment
+# Fix strat, duration issues if audio is a segment
# if self.mediainfo()['is_segment']:
# start_index = np.floor(self.mediainfo()['start'] *
# self.result_samplerate /
# plugin_res.audio_metadata.duration = fixed_duration
#
# value = value[start_index:stop_index + 1]
-
plugin_res.id_metadata.id += '.' + '.'.join(plugin_line[1:])
plugin_res.id_metadata.name += ' ' + \
' '.join(plugin_line[1:])
stepsize = int(m.groups()[1])
# Get the results
- value = np.asfarray([line.split(': ')[1].split(' ') for line in res if (len(line.split(': ')) > 1)])
+ value = np.asfarray([line.split(': ')[1].split(' ')
+ for line in res if (len(line.split(': ')) > 1)])
time = np.asfarray([r.split(':')[0].split(',')[0] for r in res])
time_len = len(res[0].split(':')[0].split(','))
duration = None
elif time_len == 2:
# segment
- duration = np.asfarray([r.split(':')[0].split(',')[1] for r in res])
+ duration = np.asfarray(
+ [r.split(':')[0].split(',')[1] for r in res])
return (time, duration, value)
from preprocessors import downmix_to_mono, frames_adapter
+
class Waveform(Analyzer):
+
"""Waveform analyzer"""
implements(IAnalyzer) # TODO check if needed with inheritance
def setup(self, channels=None, samplerate=None,
blocksize=None, totalframes=None):
super(Waveform, self).setup(channels, samplerate,
- blocksize, totalframes)
+ blocksize, totalframes)
self.values = []
self.result_blocksize = 1
self.result_stepsize = 1
class Yaafe(Analyzer):
+
"""Yaafe feature extraction library interface analyzer"""
implements(IAnalyzer)
def __init__(self, yaafeSpecification=None):
- super(Yaafe,self).__init__()
+ super(Yaafe, self).__init__()
# Check arguments
if yaafeSpecification is None:
yaafeSpecification = FeaturePlan(sample_rate=32000)
# add feature definitions manually
- yaafeSpecification.addFeature('mfcc: MFCC blockSize=512 stepSize=256')
+ yaafeSpecification.addFeature(
+ 'mfcc: MFCC blockSize=512 stepSize=256')
if isinstance(yaafeSpecification, DataFlow):
self.dataFlow = yaafeSpecification
str(FeaturePlan)))
self.yaafe_engine = None
-
@interfacedoc
def setup(self, channels=None, samplerate=None,
blocksize=None, totalframes=None):
# do process things...
# Convert to float64and reshape
# for compatibility with Yaafe engine
- yaafe_frames = frames.astype(numpy.float64).reshape(1,-1)
+ yaafe_frames = frames.astype(numpy.float64).reshape(1, -1)
# write audio array on 'audio' input
self.yaafe_engine.writeInput('audio', yaafe_frames)