class ModelLongTerm(object):
+
'''
'''
- def __init__(self,ordre,echantillon):
- self.ordre = ordre
- self.ft = [0]*(ordre+2)
- self.ftm1 = [0]*(ordre+2)
- self.variance_f = [1]*(ordre+2)
- self.variance_b = [1]*(ordre+2)
- self.et = [0]*(ordre+2)
- self.cor = [0]*(ordre+2)
- self.length = 1
- self.erreur_residuelle = 0
+ def __init__(self, ordre, echantillon):
+ self.ordre = ordre
+ self.ft = [0] * (ordre + 2)
+ self.ftm1 = [0] * (ordre + 2)
+ self.variance_f = [1] * (ordre + 2)
+ self.variance_b = [1] * (ordre + 2)
+ self.et = [0] * (ordre + 2)
+ self.cor = [0] * (ordre + 2)
+ self.length = 1
+ self.erreur_residuelle = 0
self.variance_erreur_residuelle = 0
- oubli = 1.0/float(self.length)
+ oubli = 1.0 / float(self.length)
- self.variance_f[0] = self.variance_f[0]+oubli*(echantillon**2-self.variance_f[0])
+ self.variance_f[0] = self.variance_f[
+ 0] + oubli * (echantillon ** 2 - self.variance_f[0])
self.variance_b[0] = self.variance_f[0]
- self.et[0] = echantillon
- self.ft[0] = echantillon
+ self.et[0] = echantillon
+ self.ft[0] = echantillon
- ik = min([ordre,self.length-1])
+ ik = min([ordre, self.length - 1])
self.erreur_residuelle = self.et[ik]
- self.variance_erreur_residuelle =self.variance_f[ik]
+ self.variance_erreur_residuelle = self.variance_f[ik]
- def update(self,echantillon):
+ def update(self, echantillon):
'''
'''
- self.length+=1
+ self.length += 1
self.ftm1 = self.ft[:]
self.et[0] = echantillon
- oubli = 1.0/float(self.length)
- self.variance_f[0] = self.variance_f[0]+oubli*(echantillon**2-self.variance_f[0])
+ oubli = 1.0 / float(self.length)
+ self.variance_f[0] = self.variance_f[
+ 0] + oubli * (echantillon ** 2 - self.variance_f[0])
self.variance_b[0] = self.variance_f[0]
- ik = min([self.ordre,self.length-1])
+ ik = min([self.ordre, self.length - 1])
- for n in xrange(ik+1) :
- oubli =1.0/float(self.length-n)
+ for n in xrange(ik + 1):
+ oubli = 1.0 / float(self.length - n)
- self.cor[n] = self.cor[n] + oubli*(self.ftm1[n]*self.et[n]-self.cor[n])
+ self.cor[n] = self.cor[n] + oubli * (
+ self.ftm1[n] * self.et[n] - self.cor[n])
- knplus1 = 2*self.cor[n]/(self.variance_f[n] + self.variance_b[n])
- self.et[n+1] = self.et[n]-knplus1*self.ftm1[n]
- self.ft[n+1] = self.ftm1[n]-knplus1*self.et[n]
+ knplus1 = 2 * \
+ self.cor[n] / (self.variance_f[n] + self.variance_b[n])
+ self.et[n + 1] = self.et[n] - knplus1 * self.ftm1[n]
+ self.ft[n + 1] = self.ftm1[n] - knplus1 * self.et[n]
- self.variance_f[n+1] = self.variance_f[n+1]+oubli*(self.et[n+1]**2-self.variance_f[n+1])
- self.variance_b[n+1] = self.variance_b[n+1]+oubli*(self.ft[n+1]**2-self.variance_b[n+1])
+ self.variance_f[n + 1] = self.variance_f[
+ n + 1] + oubli * (self.et[n + 1] ** 2 - self.variance_f[n + 1])
+ self.variance_b[n + 1] = self.variance_b[
+ n + 1] + oubli * (self.ft[n + 1] ** 2 - self.variance_b[n + 1])
- self.ft[0] = echantillon
- self.erreur_residuelle = self.et[ik+1]
- self.variance_erreur_residuelle =self.variance_f[ik+1]
+ self.ft[0] = echantillon
+ self.erreur_residuelle = self.et[ik + 1]
+ self.variance_erreur_residuelle = self.variance_f[ik + 1]
def __str__(self):
- s = 'Model Long Terme\n'
- s += '\tOrdre\t\t%d\n'%self.ordre
- s += '\tLongueur\t%d\n'%self.length
+ s = 'Model Long Terme\n'
+ s += '\tOrdre\t\t%d\n' % self.ordre
+ s += '\tLongueur\t%d\n' % self.length
s += '\tet\t\t['
- for e in self.et :
- s += '%f '%e
+ for e in self.et:
+ s += '%f ' % e
s += ']\n'
s += '\tft\t\t['
- for e in self.ft :
- s += '%f '%e
+ for e in self.ft:
+ s += '%f ' % e
s += ']\n'
s += '\tft-1\t\t['
- for e in self.ftm1 :
- s += '%f '%e
+ for e in self.ftm1:
+ s += '%f ' % e
s += ']\n'
s += '\tVarb\t\t['
- for e in self.variance_b :
- s += '%f '%e
+ for e in self.variance_b:
+ s += '%f ' % e
s += ']\n'
s += '\tVarf\t\t['
- for e in self.variance_f :
- s += '%f '%e
+ for e in self.variance_f:
+ s += '%f ' % e
s += ']\n'
- s += '\tErreur\t\t%f\n'%self.erreur_residuelle
- s += '\tVar(err)\t%f\n'%self.variance_erreur_residuelle
+ s += '\tErreur\t\t%f\n' % self.erreur_residuelle
+ s += '\tVar(err)\t%f\n' % self.variance_erreur_residuelle
return s
class ModelCourtTrerm(object):
+
'''
'''
- def __init__(self,ordre,buff):
+ def __init__(self, ordre, buff):
self.N = len(buff)
self.ordre = ordre
self.erreur_residuelle = 0
self.variance_erreur_residuelle = 0
- self.coef_autocorr = [0]*(self.ordre+2)
- self.AI = [0]*(self.ordre+2)
+ self.coef_autocorr = [0] * (self.ordre + 2)
+ self.AI = [0] * (self.ordre + 2)
self.dernier_echantillon = 0
self.buff = buff
- for tau in xrange(self.ordre+1) :
- for i in xrange(self.N-tau):
- self.coef_autocorr[tau] = self.coef_autocorr[tau]+buff[i]*buff[i+tau-1]
+ for tau in xrange(self.ordre + 1):
+ for i in xrange(self.N - tau):
+ self.coef_autocorr[tau] = self.coef_autocorr[
+ tau] + buff[i] * buff[i + tau - 1]
self.estimModel()
def estimModel(self):
- coef_reflexion = [0]*self.ordre
+ coef_reflexion = [0] * self.ordre
- if self.coef_autocorr[0] <= 0 :
+ if self.coef_autocorr[0] <= 0:
self.coef_autocorr[0] = 1.0
- coef_reflexion[0] = -self.coef_autocorr[1]/self.coef_autocorr[0]
+ coef_reflexion[0] = -self.coef_autocorr[1] / self.coef_autocorr[0]
self.AI[0] = 1
self.AI[1] = coef_reflexion[0]
- self.variance_erreur_residuelle = self.coef_autocorr[0]+self.coef_autocorr[1]*coef_reflexion[0]
+ self.variance_erreur_residuelle = self.coef_autocorr[
+ 0] + self.coef_autocorr[1] * coef_reflexion[0]
- if self.ordre > 1 :
+ if self.ordre > 1:
i_ordre = 1
- while i_ordre<self.ordre and self.variance_erreur_residuelle > 0 :
+ while i_ordre < self.ordre and self.variance_erreur_residuelle > 0:
- if self.variance_erreur_residuelle > 0 :
+ if self.variance_erreur_residuelle > 0:
S = 0
- for i in xrange(i_ordre) :
- S = S+self.AI[i]*self.coef_autocorr[i_ordre-i+1]
+ for i in xrange(i_ordre):
+ S = S + \
+ self.AI[i] * self.coef_autocorr[i_ordre - i + 1]
# coef reflexion
- coef_reflexion[i_ordre] = -S/self.variance_erreur_residuelle
+ coef_reflexion[
+ i_ordre] = -S / self.variance_erreur_residuelle
- MH = i_ordre/2+1
- for i in xrange(1,MH) :
+ MH = i_ordre / 2 + 1
+ for i in xrange(1, MH):
- IB = i_ordre-i+2
- tmp = self.AI[i]+coef_reflexion[i_ordre]*self.AI[IB]
- self.AI[IB] = self.AI[IB]+coef_reflexion[i_ordre]*self.AI[i]
+ IB = i_ordre - i + 2
+ tmp = self.AI[i] + coef_reflexion[
+ i_ordre] * self.AI[IB]
+ self.AI[IB] = self.AI[
+ IB] + coef_reflexion[i_ordre] * self.AI[i]
self.AI[i] = tmp
- self.AI[i_ordre+1] = coef_reflexion[i_ordre]
- self.variance_erreur_residuelle = self.variance_erreur_residuelle+coef_reflexion[i_ordre]*S
+ self.AI[i_ordre + 1] = coef_reflexion[i_ordre]
+ self.variance_erreur_residuelle = self.variance_erreur_residuelle + \
+ coef_reflexion[i_ordre] * S
- i_ordre+=1
+ i_ordre += 1
- if self.variance_erreur_residuelle > 0 :
- self.variance_erreur_residuelle = self.variance_erreur_residuelle/float(self.N-1)
+ if self.variance_erreur_residuelle > 0:
+ self.variance_erreur_residuelle = self.variance_erreur_residuelle / \
+ float(self.N - 1)
self.erreur_residuelle = 0
- for i in range(self.ordre+1) :
- self.erreur_residuelle = self.erreur_residuelle +self.AI[i]*self.buff[self.N-i-1]
+ for i in range(self.ordre + 1):
+ self.erreur_residuelle = self.erreur_residuelle + \
+ self.AI[i] * self.buff[self.N - i - 1]
- def update(self,echantillon):
+ def update(self, echantillon):
self.dernier_echantillon = self.buff.popleft()
self.buff.append(echantillon)
- for tau in xrange(1,self.ordre+1):
- self.coef_autocorr[tau] = self.coef_autocorr[tau]-self.dernier_echantillon*self.buff[tau-1]+self.buff[self.N-tau-1]*self.buff[self.N-1]
- self.coef_autocorr[0] = self.coef_autocorr[0] -self.dernier_echantillon**2+self.buff[self.N-1]**2
+ for tau in xrange(1, self.ordre + 1):
+ self.coef_autocorr[tau] = self.coef_autocorr[tau] - self.dernier_echantillon * self.buff[
+ tau - 1] + self.buff[self.N - tau - 1] * self.buff[self.N - 1]
+ self.coef_autocorr[0] = self.coef_autocorr[
+ 0] - self.dernier_echantillon ** 2 + self.buff[self.N - 1] ** 2
self.estimModel()
def __str__(self):
'''
'''
- s = 'Model Court Terme\n'
- s += '\tOrdre\t%d\n'%self.ordre
+ s = 'Model Court Terme\n'
+ s += '\tOrdre\t%d\n' % self.ordre
s += '\tAI\t['
- for e in self.AI :
- s += '%f '%e
+ for e in self.AI:
+ s += '%f ' % e
s += ']\n'
- s += '\tErreur\t%d\n'%self.erreur_residuelle
- s += '\tVar(err)\t%d\n'%self.variance_erreur_residuelle
+ s += '\tErreur\t%d\n' % self.erreur_residuelle
+ s += '\tVar(err)\t%d\n' % self.variance_erreur_residuelle
s += '\tAutocor\t ['
- for e in self.coef_autocorr :
- s += '%f '%e
+ for e in self.coef_autocorr:
+ s += '%f ' % e
s += ']\n'
return s
-def calculDistance(modeleLong,modeleCourt):
+def calculDistance(modeleLong, modeleCourt):
'''
Calcul de la distance entre les modèles longs et court terme
- modeleCourt (ModelCourtTrerm): Modèle appris sur les Lmin derniers echantillons
'''
- if modeleCourt.variance_erreur_residuelle == 0 :
- # epsilon pour le type de donnés correspondant à modeleLong.variance_erreur_residuelle
+ if modeleCourt.variance_erreur_residuelle == 0:
+ # epsilon pour le type de donnés correspondant à
+ # modeleLong.variance_erreur_residuelle
numerateur = spacing(modeleCourt.variance_erreur_residuelle)
- else :
+ else:
numerateur = modeleCourt.variance_erreur_residuelle
- QV= numerateur/modeleLong.variance_erreur_residuelle
- return (2*modeleCourt.erreur_residuelle*modeleLong.erreur_residuelle/modeleLong.variance_erreur_residuelle-(1.0+QV)*modeleLong.erreur_residuelle**2/modeleLong.variance_erreur_residuelle+QV-1.0)/(2.0*QV)
+ QV = numerateur / modeleLong.variance_erreur_residuelle
+ return (
+ (2 * modeleCourt.erreur_residuelle * modeleLong.erreur_residuelle / modeleLong.variance_erreur_residuelle -
+ (1.0 + QV) * modeleLong.erreur_residuelle ** 2 / modeleLong.variance_erreur_residuelle + QV - 1.0) / (2.0 * QV)
+ )
-def segment(data,fe,ordre=2,Lmin=0.02,lamb=40.0,biais=-0.2,with_backward=True):
+def segment(data, fe, ordre=2, Lmin=0.02,
+ lamb=40.0, biais=-0.2, with_backward=True):
'''
Fonction principale de segmentation.
rupt_last = t
long_signal = len(data)
# taille minimum en echantillons
- Lmin = int(Lmin*fe)
+ Lmin = int(Lmin * fe)
- while t < long_signal-1 :
+ while t < long_signal - 1:
# Nouvelle Rupture
# Critere d'arret : decouverte d'une rupture
Wn = 0.
# Valeur et emplacement de la valeur max
- maxi = (0,-1)
+ maxi = (0, -1)
- audio_buffer = deque([],Lmin)
+ audio_buffer = deque([], Lmin)
# Initialisation du modèle long terme
- echantillon= data[t]
- longTerme = ModelLongTerm(ordre,echantillon)
+ echantillon = data[t]
+ longTerme = ModelLongTerm(ordre, echantillon)
- while (not rupture) and t < long_signal-1 :
- t+=1
+ while (not rupture) and t < long_signal - 1:
+ t += 1
# Mise à jour du long terme
echantillon = data[t]
# Si l'ecart avec la dernière rupture est suffisant
# pour utiliser le modèle court terme
- if t-rupt_last >= Lmin :
+ if t - rupt_last >= Lmin:
# Initialisation du modèle court terme
- if t-rupt_last == Lmin :
+ if t - rupt_last == Lmin:
courtTerme = ModelCourtTrerm(ordre, audio_buffer)
# Mise à jour du modèle court terme
- if t-rupt_last > Lmin :
+ if t - rupt_last > Lmin:
courtTerme.update(echantillon)
# mise à jour du critère
- Wn = Wn+calculDistance(longTerme,courtTerme)-biais
+ Wn = Wn + calculDistance(longTerme, courtTerme) - biais
# Recherche de nouveau maximum
- if Wn > maxi[0] :
- maxi = (Wn,t)
+ if Wn > maxi[0]:
+ maxi = (Wn, t)
# Recherche de rupture par chute superieure à lambda
- if (maxi[0] - Wn) > lamb :
+ if (maxi[0] - Wn) > lamb:
rupture = True
- else :
+ else:
# Sinon, on prepare l'initialisation
audio_buffer.append(echantillon)
t_rupt = maxi[1]
# Si une rupture à été detecté avec un modèle stable (Wn à croit)
- if t_rupt > -1 :
+ if t_rupt > -1:
m = 1
- if with_backward :
+ if with_backward:
bdata = data[t_rupt:rupt_last:-1]
- if len(bdata) > 0 :
+ if len(bdata) > 0:
- front = segment(bdata,fe,ordre,float(Lmin)/fe,lamb,biais,with_backward=False)
- t_bs = [ t_rupt-tr for tr,_ in front]
+ front = segment(bdata, fe, ordre, float(
+ Lmin) / fe, lamb, biais, with_backward=False)
+ t_bs = [t_rupt - tr for tr, _ in front]
- if len(t_bs) > 0 :
+ if len(t_bs) > 0:
t_rupt = t_bs[-1]
- m =-1
+ m = -1
# Sinon on crée un segment de longueur minimale
- else :
- t_rupt = rupt_last+Lmin
+ else:
+ t_rupt = rupt_last + Lmin
m = 0
# Mise à jour des frontières
t = t_rupt
rupt_last = t_rupt
- if rupture :
- frontieres.append((t_rupt,m))
+ if rupture:
+ frontieres.append((t_rupt, m))
return frontieres
-
class IRITDiverg(Analyzer):
implements(IAnalyzer)
'''
'''
- def __init__(self, blocksize=1024, stepsize=None) :
- super(IRITDiverg, self).__init__();
+ def __init__(self, blocksize=1024, stepsize=None):
+ super(IRITDiverg, self).__init__()
self.parents.append(Waveform())
self.ordre = 2
@interfacedoc
- def setup(self, channels=None, samplerate=None,blocksize=None, totalframes=None):
- super(IRITDiverg, self).setup(channels,samplerate,blocksize,totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(IRITDiverg, self).setup(
+ channels, samplerate, blocksize, totalframes)
@staticmethod
@interfacedoc
data = list(audio_data.mean(axis=1))
else:
data = list(audio_data)
- frontieres = segment(data,self.samplerate(),self.ordre)
+ frontieres = segment(data, self.samplerate(), self.ordre)
segs = self.new_result(data_mode='label', time_mode='event')
segs.id_metadata.id += '.' + 'segments'
label = {0: 'Instable', 1: 'Forward', -1: 'Backward'}
segs.label_metadata.label = label
- segs.data_object.label = [s[1] for s in frontieres]
- segs.data_object.time = [(float(s[0]) / self.samplerate()) for s in frontieres]
+ segs.data_object.label = [s[1] for s in frontieres]
+ segs.data_object.time = [(float(s[0]) / self.samplerate())
+ for s in frontieres]
self.process_pipe.results.add(segs)
return
from __future__ import absolute_import
from timeside.analyzer.utils import segmentFromValues
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from aubio import pitch
def setup(self, channels=None, samplerate=None,
blocksize=None, totalframes=None):
super(IRITMonopoly, self).setup(channels,
- samplerate,
- blocksize,
- totalframes)
- self.aubio_pitch = pitch("default", self.input_blocksize, self.input_stepsize,
- samplerate)
+ samplerate,
+ blocksize,
+ totalframes)
+ self.aubio_pitch = pitch(
+ "default", self.input_blocksize, self.input_stepsize,
+ samplerate)
self.aubio_pitch.set_unit("freq")
self.block_read = 0
self.pitches = []
return "Labeled Monophonic/Polyphonic segments"
def process(self, frames, eod=False):
- self.decisionLen = 1.0;# in seconds
- pf = self.aubio_pitch(frames.T[0])
+ self.decisionLen = 1.0
+ # in seconds
+ pf = self.aubio_pitch(frames.T[0])
self.pitches += [pf[0]]
self.pitch_confidences += [self.aubio_pitch.get_confidence()]
self.block_read += 1
'''
'''
- frameLenModulation = int(self.decisionLen * self.samplerate() / self.blocksize())
+ frameLenModulation = int(
+ self.decisionLen * self.samplerate() / self.blocksize())
epsilon = numpy.spacing(self.pitch_confidences[0])
- w = int(self.decisionLen * self.samplerate() /(self.blocksize()*2))
+ w = int(self.decisionLen * self.samplerate() / (self.blocksize() * 2))
is_mono = []
- for i in range(w,len(self.pitch_confidences)-w,frameLenModulation):
- d = self.pitch_confidences[i-w:i+w]
- conf_mean= numpy.mean(d)
- conf_var = numpy.var(d+epsilon)
- if self.monoLikelihood(conf_mean,conf_var) > self.polyLikelihood(conf_mean,conf_var) :
+ for i in range(w, len(self.pitch_confidences) - w, frameLenModulation):
+ d = self.pitch_confidences[i - w:i + w]
+ conf_mean = numpy.mean(d)
+ conf_var = numpy.var(d + epsilon)
+ if self.monoLikelihood(conf_mean, conf_var) > self.polyLikelihood(conf_mean, conf_var):
is_mono += [True]
- else :
- is_mono += [False]
+ else:
+ is_mono += [False]
conf = self.new_result(data_mode='value', time_mode='framewise')
conf = self.new_result(data_mode='value', time_mode='framewise')
segs.label_metadata.label = label
-
segs.data_object.label = [convert[s[2]] for s in segList]
segs.data_object.time = [(float(s[0]) * self.blocksize() /
self.samplerate())
- for s in segList]
- segs.data_object.duration = [(float(s[1]-s[0]) * self.blocksize() /
- self.samplerate())
- for s in segList]
+ for s in segList]
+ segs.data_object.duration = [(float(s[1] - s[0]) * self.blocksize() /
+ self.samplerate())
+ for s in segList]
self.process_pipe.results.add(segs)
- def monoLikelihood(self,m,v):
-
- theta1=0.1007;
- theta2=0.0029;
- beta1=0.5955;
- beta2=0.2821;
- delta=0.848;
- return self.weibullLikelihood(m,v,theta1,theta2,beta1,beta2,delta)
+ def monoLikelihood(self, m, v):
+ theta1 = 0.1007
+ theta2 = 0.0029
+ beta1 = 0.5955
+ beta2 = 0.2821
+ delta = 0.848
+ return self.weibullLikelihood(m, v, theta1, theta2, beta1, beta2, delta)
- def polyLikelihood(self,m,v):
- theta1=0.3224;
- theta2=0.0121;
- beta1=1.889;
- beta2=0.8705;
- delta=0.644;
- return self.weibullLikelihood(m,v,theta1,theta2,beta1,beta2,delta)
+ def polyLikelihood(self, m, v):
+ theta1 = 0.3224
+ theta2 = 0.0121
+ beta1 = 1.889
+ beta2 = 0.8705
+ delta = 0.644
+ return self.weibullLikelihood(m, v, theta1, theta2, beta1, beta2, delta)
- def weibullLikelihood(self,m,v,theta1,theta2,beta1,beta2,delta):
+ def weibullLikelihood(self, m, v, theta1, theta2, beta1, beta2, delta):
m = numpy.array(m)
- v= numpy.array(v)
-
- c0=numpy.log(beta1*beta2/(theta1*theta2));
- a1=m/theta1;
- b1=a1**(beta1/delta);
- c1=numpy.log(a1);
- a2=v/theta2;
- b2=a2**(beta2/delta);
- c2=numpy.log(a2);
- somme1=(b1+b2)**delta;
- Pxy=c0+(beta1/delta-1)*c1+(beta2/delta-1)*c2+(delta-2)*numpy.log(b1+b2)+numpy.log(somme1+1/delta-1)-somme1;
+ v = numpy.array(v)
+
+ c0 = numpy.log(beta1 * beta2 / (theta1 * theta2))
+ a1 = m / theta1
+ b1 = a1 ** (beta1 / delta)
+ c1 = numpy.log(a1)
+ a2 = v / theta2
+ b2 = a2 ** (beta2 / delta)
+ c2 = numpy.log(a2)
+ somme1 = (b1 + b2) ** delta
+ Pxy = c0 + (beta1 / delta - 1) * c1 + (beta2 / delta - 1) * c2 + (delta - 2) * \
+ numpy.log(b1 + b2) + numpy.log(somme1 + 1 / delta - 1) - somme1
return numpy.mean(Pxy)
from timeside.analyzer.utils import segmentFromValues
from timeside.analyzer.irit_diverg import IRITDiverg
from timeside.api import IAnalyzer
-from numpy import logical_and,array, hamming, dot, mean, float, arange, nonzero
+from numpy import logical_and, array, hamming, dot, mean, float, arange, nonzero
from numpy.fft import rfft
from scipy.signal import firwin, lfilter
-from pylab import plot,show
+from pylab import plot, show
class IRITMusicLDN(Analyzer):
implements(IAnalyzer)
- def __init__(self, blocksize=1024, stepsize=None) :
- super(IRITMusicLDN, self).__init__();
+ def __init__(self, blocksize=1024, stepsize=None):
+ super(IRITMusicLDN, self).__init__()
self.parents.append(IRITDiverg())
- self.wLen = 1.0
- self.wStep = 0.1
+ self.wLen = 1.0
+ self.wStep = 0.1
self.threshold = 20
@staticmethod
return "Music confidence indexes"
def process(self, frames, eod=False):
- return frames,eod
+ return frames, eod
def post_process(self):
'''
'''
segList = self.process_pipe.results['irit_diverg.segments'].time
- w = self.wLen/ 2;
+ w = self.wLen / 2
end = segList[-1]
- tLine = arange(0,end,self.wStep)
+ tLine = arange(0, end, self.wStep)
- segLen = array([0]*len(tLine))
+ segLen = array([0] * len(tLine))
- for i,t in enumerate(tLine):
- idx = nonzero(logical_and(segList>(t-w) ,segList<(t+w)))[0]
- segLen[i]= len(idx)
+ for i, t in enumerate(tLine):
+ idx = nonzero(logical_and(segList > (t - w), segList < (t + w)))[0]
+ segLen[i] = len(idx)
-
- plot(tLine,segLen)
+ plot(tLine, segLen)
show()
- # Confidence Index
+ # Confidence Index
conf = array(segLen - self.threshold) / self.threshold
conf[conf > 1] = 1
segs.data_object.label = [convert[s[2]] for s in segList]
segs.data_object.time = [tLine[s[0]] for s in segList]
- segs.data_object.duration = [tLine[s[1]]-tLine[s[0]] for s in segList]
+ segs.data_object.duration = [tLine[s[1]] - tLine[s[0]]
+ for s in segList]
self.process_pipe.results.add(segs)
return
class IRITMusicSLN(Analyzer):
implements(IAnalyzer)
- def __init__(self, blocksize=None, stepsize=None) :
- super(IRITMusicSLN, self).__init__();
+ def __init__(self, blocksize=None, stepsize=None):
+ super(IRITMusicSLN, self).__init__()
self.parents.append(IRITDiverg())
- self.wLen = 1.0
- self.wStep = 0.1
+ self.wLen = 1.0
+ self.wStep = 0.1
self.threshold = 0.05
- self.input_blocksize = 0;
- self.input_stepsize = 0;
+ self.input_blocksize = 0
+ self.input_stepsize = 0
self.maxSegForLength = 7
+
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None,
totalframes=None):
@frames_adapter
def process(self, frames, eod=False):
- return frames,eod
+ return frames, eod
def post_process(self):
'''
segList = self.process_pipe.results['irit_diverg.segments'].time
- w = self.wLen/ 2
+ w = self.wLen / 2
end = segList[-1]
- tLine = arange(w,end-w,self.wStep)
+ tLine = arange(w, end - w, self.wStep)
# Les plus petits ! <> article
- segLen = [mean(diff(getBoundariesInInterval(t-w, t+w, segList))) for t in tLine]
+ segLen = [mean(diff(getBoundariesInInterval(t - w, t + w, segList)))
+ for t in tLine]
- # Confidence Index
- conf = [(s - self.threshold) / self.threshold if s < 2*self.threshold else 1 for s in segLen]
+ # Confidence Index
+ conf = [(s - self.threshold) / self.threshold if s <
+ 2 * self.threshold else 1 for s in segLen]
segLenRes = self.new_result(data_mode='value', time_mode='framewise')
segLenRes.id_metadata.id += '.' + 'energy_confidence'
segs.data_object.label = [convert[s[2]] for s in segList]
segs.data_object.time = [tLine[s[0]] for s in segList]
- segs.data_object.duration = [tLine[s[1]]-tLine[s[0]] for s in segList]
+ segs.data_object.duration = [tLine[s[1]] - tLine[s[0]]
+ for s in segList]
self.process_pipe.results.add(segs)
return
-def getBoundariesInInterval(start,stop,boundaries) :
- return [t for t in boundaries if t >= start and t<= stop]
-
-
+def getBoundariesInInterval(start, stop, boundaries):
+ return [t for t in boundaries if t >= start and t <= stop]
implements(IAnalyzer)
- def __init__(self, blocksize=1024, stepsize=None, samplerate=None) :
- super(IRITMusicSNB, self).__init__();
+ def __init__(self, blocksize=1024, stepsize=None, samplerate=None):
+ super(IRITMusicSNB, self).__init__()
self.parents.append(IRITDiverg())
- self.wLen = 1.0
- self.wStep = 0.1
- self.input_blocksize = 0;
- self.input_stepsize = 0;
+ self.wLen = 1.0
+ self.wStep = 0.1
+ self.input_blocksize = 0
+ self.input_stepsize = 0
self.threshold = 20
@interfacedoc
@frames_adapter
def process(self, frames, eod=False):
- return frames,eod
+ return frames, eod
def post_process(self):
'''
'''
segList = self.process_pipe.results['irit_diverg.segments'].time
- w = self.wLen/ 2
+ w = self.wLen / 2
end = segList[-1]
tLine = arange(0, end, self.wStep)
- segNB = [ len(getBoundariesInInterval(t-w,t+w,segList)) for t in tLine ]
+ segNB = [len(getBoundariesInInterval(t - w, t + w, segList))
+ for t in tLine]
- # Confidence Index
- conf = [float(v - self.threshold) / float(self.threshold) if v < 2*self.threshold else 1.0 for v in segNB]
+ # Confidence Index
+ conf = [float(v - self.threshold) / float(self.threshold)
+ if v < 2 * self.threshold else 1.0 for v in segNB]
segLenRes = self.new_result(data_mode='value', time_mode='framewise')
segLenRes.id_metadata.id += '.' + 'energy_confidence'
segLenRes.id_metadata.name += ' ' + 'Energy Confidence'
segs.data_object.label = [convert[s[2]] for s in segList]
segs.data_object.time = [tLine[s[0]] for s in segList]
- segs.data_object.duration = [tLine[s[1]]-tLine[s[0]] for s in segList]
+ segs.data_object.duration = [tLine[s[1]] - tLine[s[0]]
+ for s in segList]
self.process_pipe.results.add(segs)
return
-def getBoundariesInInterval(start, stop, boundaries) :
- return [t for t in boundaries if t >= start and t<= stop]
+def getBoundariesInInterval(start, stop, boundaries):
+ return [t for t in boundaries if t >= start and t <= stop]
from timeside.api import IAnalyzer
from aubio import pitch
import numpy
-from scipy.signal import firwin,lfilter
-from scipy.ndimage.morphology import binary_opening,binary_closing
+from scipy.signal import firwin, lfilter
+from scipy.ndimage.morphology import binary_opening, binary_closing
import pylab
-
-
class IRITStartSeg(Analyzer):
implements(IAnalyzer)
'''
blocksize=None, totalframes=None):
super(IRITStartSeg, self).setup(channels,
- samplerate,
- blocksize,
- totalframes)
+ samplerate,
+ blocksize,
+ totalframes)
lowFreq = 100.0
self.input_blocksize = int(0.02 * samplerate)
self.input_stepsize = int(0.008 * samplerate)
-
sr = float(samplerate)
- highFreq = sr/2
- f1= lowFreq/sr
- f2= highFreq/sr
- self.filtre = firwin(10, [f1,f2], pass_zero=False)
+ highFreq = sr / 2
+ f1 = lowFreq / sr
+ f2 = highFreq / sr
+ self.filtre = firwin(10, [f1, f2], pass_zero=False)
self.energy = []
self.maxenergy = 0.002
self.min_overlap = 20
self.threshold = 0.1
+
@staticmethod
@interfacedoc
def id():
'''
- self.energy += [numpy.sqrt(numpy.mean(lfilter(self.filtre,1.0,frames.T[0])**2))]
+ self.energy += [
+ numpy.sqrt(numpy.mean(lfilter(self.filtre, 1.0, frames.T[0]) ** 2))]
return frames, eod
def post_process(self):
'''
'''
- self.energy = numpy.array(self.energy)/max(self.energy)
- silences = numpy.zeros((1,len(self.energy)))[0]
- silences[self.energy<self.maxenergy] = 1
+ self.energy = numpy.array(self.energy) / max(self.energy)
+ silences = numpy.zeros((1, len(self.energy)))[0]
+ silences[self.energy < self.maxenergy] = 1
step = float(self.input_stepsize) / float(self.samplerate())
- tL = numpy.arange(len(silences))*step
+ tL = numpy.arange(len(silences)) * step
prototype = numpy.load('timeside/analyzer/protoStart2.dat')
prototype2 = numpy.load('timeside/analyzer/protoStart3.dat')
# Lissage pour éliminer les petits segments dans un sens ou l'autre
- struct= [1]*len(prototype)
+ struct = [1] * len(prototype)
silences = binary_closing(silences, struct)
silences = binary_opening(silences, struct)
- seg = [0,-1,silences[0]]
+ seg = [0, -1, silences[0]]
silencesList = []
- for i,v in enumerate(silences) :
- if not (v == seg[2]) :
+ for i, v in enumerate(silences):
+ if not (v == seg[2]):
seg[1] = i
silencesList.append(tuple(seg))
- seg = [i,-1,v]
+ seg = [i, -1, v]
seg[1] = i
silencesList.append(tuple(seg))
segsList = []
candidates = []
l = len(prototype)
#import pylab
- for s in silencesList :
- if s[2] == 1 :
- shape = numpy.array(self.energy[s[0]:s[1]])
- #shape = shape/numpy.max(shape)
-
- d1,_ = computeDist2(prototype,shape)
- d2,_ = computeDist2(prototype2,shape)
- dist = min([d1,d2])
-
- candidates.append((s[0],s[1],dist))
- #pylab.plot(shape)
- #pylab.plot(range(decal,decal+l),prototype)
- #pylab.show()
- if dist < self.threshold :
- segsList.append(s)
-
- label = {0: 'Start',1:'Session'}
- with open('out.lab','w') as f :
- for s in segsList :
- f.write('%.2f\t%.2f\t%s\n'%(s[0]*step,s[1]*step,label[s[2]]))
-
- with open('cand.lab','w') as f :
- for s in candidates :
- f.write('%.2f\t%.2f\t%f\n'%(s[0]*step,s[1]*step,s[2]))
+ for s in silencesList:
+ if s[2] == 1:
+ shape = numpy.array(self.energy[s[0]:s[1]])
+ #shape = shape/numpy.max(shape)
+
+ d1, _ = computeDist2(prototype, shape)
+ d2, _ = computeDist2(prototype2, shape)
+ dist = min([d1, d2])
+
+ candidates.append((s[0], s[1], dist))
+ # pylab.plot(shape)
+ # pylab.plot(range(decal,decal+l),prototype)
+ # pylab.show()
+ if dist < self.threshold:
+ segsList.append(s)
+
+ label = {0: 'Start', 1: 'Session'}
+ with open('out.lab', 'w') as f:
+ for s in segsList:
+ f.write(
+ '%.2f\t%.2f\t%s\n' %
+ (s[0] * step, s[1] * step, label[s[2]]))
+
+ with open('cand.lab', 'w') as f:
+ for s in candidates:
+ f.write('%.2f\t%.2f\t%f\n' % (s[0] * step, s[1] * step, s[2]))
segs = self.new_result(data_mode='label', time_mode='segment')
segs.id_metadata.id += '.' + 'segments'
segs.id_metadata.name += ' ' + 'Segments'
segs.label_metadata.label = label
segs.data_object.label = [s[2] for s in segsList]
- segs.data_object.time = [(float(s[0])*step)
- for s in segsList]
- segs.data_object.duration = [(float(s[1]-s[0])*step)
- for s in segsList]
+ segs.data_object.time = [(float(s[0]) * step)
+ for s in segsList]
+ segs.data_object.duration = [(float(s[1] - s[0]) * step)
+ for s in segsList]
self.process_pipe.results.add(segs)
-def computeDist2(proto,serie) :
- l = len(proto)
- r= range(len(serie))
- serie = numpy.array(list(serie)+[0]*(l-1))
- v = [numpy.mean(numpy.abs((serie[i:i+l]/numpy.max(serie[i:i+l]))-proto))for i in r]
- return numpy.min(v),numpy.argmin(v)
-
-def computeDist(v1,v2,min_overlap):
- '''
+def computeDist2(proto, serie):
+ l = len(proto)
+ r = range(len(serie))
+ serie = numpy.array(list(serie) + [0] * (l - 1))
+ v = [numpy.mean(numpy.abs((serie[i:i + l] / numpy.max(serie[i:i + l])) - proto))
+ for i in r]
+ return numpy.min(v), numpy.argmin(v)
- '''
- m1 = numpy.argmax(v1)
- m2 = numpy.argmax(v2)
- l1 = len(v1)
- l2 = len(v2)
- decal = numpy.abs(m1-m2)
- if m1 >= m2 :
- fin = numpy.min([l1-decal,l2])
- if fin-decal > min_overlap:
-
- v1_out = numpy.array(v1[decal:decal+fin])
- v2_out = numpy.array(v2[:fin])
- d = numpy.mean(numpy.abs(v1_out-v2_out))
- else :
- v1_out = [0]
- v2_out = [1]
- d = 1
- else :
- return computeDist(v2, v1,min_overlap)
-
-
- return d,v1_out,v2_out
+def computeDist(v1, v2, min_overlap):
+ '''
+ '''
+ m1 = numpy.argmax(v1)
+ m2 = numpy.argmax(v2)
+ l1 = len(v1)
+ l2 = len(v2)
+ decal = numpy.abs(m1 - m2)
+
+ if m1 >= m2:
+ fin = numpy.min([l1 - decal, l2])
+ if fin - decal > min_overlap:
+
+ v1_out = numpy.array(v1[decal:decal + fin])
+ v2_out = numpy.array(v2[:fin])
+ d = numpy.mean(numpy.abs(v1_out - v2_out))
+ else:
+ v1_out = [0]
+ v2_out = [1]
+ d = 1
+ else:
+ return computeDist(v2, v1, min_overlap)
+
+ return d, v1_out, v2_out
super(IRITSpeechEntropy, self).setup(
channels, samplerate, blocksize, totalframes)
self.entropyValue = []
- self.threshold = 0.4
- self.smoothLen = 5
- self.modulLen = 2
- self.wLen = 0.016
- self.wStep = 0.008
+ self.threshold = 0.4
+ self.smoothLen = 5
+ self.modulLen = 2
+ self.wLen = 0.016
+ self.wStep = 0.008
self.input_blocksize = int(self.wLen * samplerate)
- self.input_stepsize = int(self.wStep * samplerate)
+ self.input_stepsize = int(self.wStep * samplerate)
@staticmethod
@interfacedoc
def __str__(self):
return "Speech confidences indexes"
-
+
@frames_adapter
def process(self, frames, eod=False):
self.entropyValue.append(entropy(frames))
def post_process(self):
entropyValue = array(self.entropyValue)
- import pylab
- pylab.plot(entropyValue)
- pylab.show()
- w = self.modulLen/self.wStep
+ import pylab
+ pylab.plot(entropyValue)
+ pylab.show()
+ w = self.modulLen / self.wStep
modulentropy = computeModulation(entropyValue, w, False)
-
-
confEntropy = array(modulentropy - self.threshold) / self.threshold
confEntropy[confEntropy > 1] = 1