--- /dev/null
-
+ # -*- coding: utf-8 -*-
+ #
+ # Copyright (c) 2013 Paul Brossier <piem@piem.org>
+
+ # This file is part of TimeSide.
+
+ # TimeSide is free software: you can redistribute it and/or modify
+ # it under the terms of the GNU General Public License as published by
+ # the Free Software Foundation, either version 2 of the License, or
+ # (at your option) any later version.
+
+ # TimeSide is distributed in the hope that it will be useful,
+ # but WITHOUT ANY WARRANTY; without even the implied warranty of
+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ # GNU General Public License for more details.
+
+ # You should have received a copy of the GNU General Public License
+ # along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+ # Author: Paul Brossier <piem@piem.org>
+ from __future__ import absolute_import
+
+ from ...core import implements, interfacedoc
+ from ..core import Analyzer
+ from ...api import IAnalyzer
+ from ..preprocessors import downmix_to_mono, frames_adapter
+ from aubio import filterbank, pvoc
+
+ class AubioMelEnergy(Analyzer):
+
+ """Aubio Mel Energy analyzer"""
+ implements(IAnalyzer)
+
+ def __init__(self):
+ super(AubioMelEnergy, self).__init__()
+ self.input_blocksize = 1024
+ self.input_stepsize = self.input_blocksize / 4
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(AubioMelEnergy, self).setup(
+ channels, samplerate, blocksize, totalframes)
+ self.n_filters = 40
+ self.n_coeffs = 13
+ self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
+ self.melenergy = filterbank(self.n_filters, self.input_blocksize)
+ self.melenergy.set_mel_coeffs_slaney(samplerate)
+ self.block_read = 0
+ self.melenergy_results = []
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "aubio_melenergy"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Mel Energy (aubio)"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
+ @downmix_to_mono
+ @frames_adapter
+ def process(self, frames, eod=False):
+
+ fftgrain = self.pvoc(frames)
+ self.melenergy_results.append(self.melenergy(fftgrain))
+ self.block_read += 1
+ return frames, eod
+
+ def post_process(self):
+ melenergy = self.new_result(data_mode='value', time_mode='framewise')
+ melenergy.parameters = dict(n_filters=self.n_filters,
+ n_coeffs=self.n_coeffs)
+ melenergy.data_object.value = self.melenergy_results
+ self.add_result(melenergy)
import pickle
import os.path
+ # Require Yaafe
+ if not timeside._WITH_YAAFE:
+ raise ImportError('yaafelib must be missing')
+
class GMM:
++
"""
Gaussian Mixture Model
"""
++
def __init__(self, weights, means, vars):
self.weights = weights
self.means = means
- 2 * np.dot(x, (self.means / self.vars).T)
+ np.dot(x ** 2, (1.0 / self.vars).T))
+ np.log(self.weights)
-- m = np.amax(llh,1)
++ m = np.amax(llh, 1)
dif = llh - np.atleast_2d(m).T
-- return m + np.log(np.sum(np.exp(dif),1))
++ return m + np.log(np.sum(np.exp(dif), 1))
def slidewinmap(lin, winsize, func):
winsize: size of the sliding windows in samples (int)
func: function to be mapped on sliding windows
"""
-- tmpin = ([lin[0]] * (winsize/2)) + list(lin) + ([lin[-1]] * (winsize -1 - winsize/2))
++ tmpin = ([lin[0]] * (winsize / 2)) + list(lin) + \
++ ([lin[-1]] * (winsize - 1 - winsize / 2))
lout = []
for i in xrange(len(lin)):
-- lout.append(func(tmpin[i:(i+winsize)]))
++ lout.append(func(tmpin[i:(i + winsize)]))
assert(len(lin) == len(lout))
return lout
++
def dilatation(lin, winsize):
"""
morphological dilation
"""
return slidewinmap(lin, winsize, max)
++
def erosion(lin, winsize):
"""
morphological erosion
class LimsiSad(Analyzer):
++
"""
Limsi Speech Activity Detection Systems
LimsiSad performs frame level speech activity detection based on trained GMM models
super(LimsiSad, self).__init__()
# feature extraction defition
-- feature_plan = ['mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256',
-- 'mfccd1: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=1',
-- 'mfccd2: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=2',
-- 'zcr: ZCR blockSize=1024 stepSize=256']
++ feature_plan = [
++ 'mfcc: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256',
++ 'mfccd1: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=1',
++ 'mfccd2: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=2',
++ 'zcr: ZCR blockSize=1024 stepSize=256']
yaafe_analyzer = get_processor('yaafe')
self.parents['yaafe'] = yaafe_analyzer(feature_plan=feature_plan,
input_samplerate=16000)
features = np.concatenate((mfcc, mfccd1, mfccd2, zcr), axis=1)
# compute log likelihood difference
-- res = 0.5 + 0.5 * (self.gmms[0].llh(features) - self.gmms[1].llh(features))
++ res = 0.5 + 0.5 * \
++ (self.gmms[0].llh(features) - self.gmms[1].llh(features))
# bounds log likelihood difference
if self.dllh_bounds is not None:
res = np.minimum(np.maximum(res, mindiff), maxdiff)
# performs dilation, erosion, erosion, dilatation
-- ws = int(self.dews * float(self.input_samplerate ) / self.input_stepsize)
-- deed_llh = dilatation(erosion(erosion(dilatation(res, ws), ws), ws), ws)
++ ws = int(
++ self.dews * float(self.input_samplerate) / self.input_stepsize)
++ deed_llh = dilatation(
++ erosion(erosion(dilatation(res, ws), ws), ws), ws)
# infer speech and non speech segments from dilated
# and erroded likelihood difference estimate
labels = []
times = []
durations = []
-- for i, val in enumerate([1 if e > self.speech_threshold else 0 for e in deed_llh]):
++ for i, val in enumerate([1 if e > self.speech_threshold else 0
++ for e in deed_llh]):
if val != last:
labels.append(val)
durations.append(1)
else:
durations[-1] += 1
last = val
-- times = [(float(e) * self.input_stepsize) / self.input_samplerate for e in times]
-- durations = [(float(e) * self.input_stepsize) / self.input_samplerate for e in durations]
--
++ times = [(float(e) * self.input_stepsize)
++ / self.input_samplerate for e in times]
++ durations = [(float(e) * self.input_stepsize)
++ / self.input_samplerate for e in durations]
-- # outputs the raw frame level speech/non speech log likelihood difference
++ # outputs the raw frame level speech/non speech log likelihood
++ # difference
sad_result = self.new_result(data_mode='value', time_mode='framewise')
sad_result.id_metadata.id += '.' + 'sad_lhh_diff'
-- sad_result.id_metadata.name += ' ' + 'Speech Activity Detection Log Likelihood Difference'
++ sad_result.id_metadata.name += ' ' + \
++ 'Speech Activity Detection Log Likelihood Difference'
sad_result.data_object.value = res
self.add_result(sad_result)
# outputs frame level speech/non speech log likelihood difference
# altered with erosion and dilatation procedures
-- sad_de_result = self.new_result(data_mode='value', time_mode='framewise')
++ sad_de_result = self.new_result(
++ data_mode='value', time_mode='framewise')
sad_de_result.id_metadata.id += '.' + 'sad_de_lhh_diff'
-- sad_de_result.id_metadata.name += ' ' + 'Speech Activity Detection Log Likelihood Difference | dilat | erode'
++ sad_de_result.id_metadata.name += ' ' + \
++ 'Speech Activity Detection Log Likelihood Difference | dilat | erode'
sad_de_result.data_object.value = deed_llh
self.add_result(sad_de_result)
# outputs speech/non speech segments
-- sad_seg_result = self.new_result(data_mode='label', time_mode='segment')
++ sad_seg_result = self.new_result(
++ data_mode='label', time_mode='segment')
sad_seg_result.id_metadata.id += '.' + 'sad_segments'
-- sad_seg_result.id_metadata.name += ' ' + 'Speech Activity Detection Segments'
++ sad_seg_result.id_metadata.name += ' ' + \
++ 'Speech Activity Detection Segments'
sad_seg_result.data_object.label = labels
sad_seg_result.data_object.time = times
sad_seg_result.data_object.duration = durations
-- sad_seg_result.data_object.label_metadata.label = {0: 'Not Speech', 1: 'Speech'}
++ sad_seg_result.data_object.label_metadata.label = {
++ 0: 'Not Speech', 1: 'Speech'}
self.add_result(sad_seg_result)