attributes=AnalyzerAttributes(name="Mean DC shift",
unit="%",
id="mean_dc_shift",
- sampleRate=44100,
- blockSize=None,
- stepSize=None)
-
+ samplerate=44100,
+ blocksize=None,
+ stepsize=None)
+
self.expected = AnalyzerResult(data=-0.000, attributes=attributes)
def testOnGuitar(self):
attributes=AnalyzerAttributes(name="Mean DC shift",
unit="%",
id="mean_dc_shift",
- sampleRate=44100,
- blockSize=None,
- stepSize=None)
- self.expected = AnalyzerResult(data=0.054, attributes=attributes)
+ samplerate=44100,
+ blocksize=None,
+ stepsize=None)
+ self.expected = AnalyzerResult(data=0.054, attributes=attributes)
def tearDown(self):
decoder = FileDecoder(self.source)
def testOnSweep(self):
"runs on sweep"
self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
-
+
# Max level
attr = AnalyzerAttributes(id="max_level",
name="Max level",
unit = "dBFS",
- sampleRate=44100)
+ samplerate=44100)
max_level = AnalyzerResult(-6.021, attr)
-
+
# RMS level
attr = AnalyzerAttributes(id="rms_level",
name="RMS level",
unit="dBFS",
- sampleRate=44100)
- rms_level = AnalyzerResult(-9.856, attr)
+ samplerate=44100)
+ rms_level = AnalyzerResult(-9.856, attr)
self.expected = AnalyzerResultContainer([max_level,rms_level])
-
+
def testOnGuitar(self):
"runs on guitar"
self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
-
+
# Max level
attr = AnalyzerAttributes(id="max_level",
name="Max level",
unit = "dBFS",
- sampleRate=44100)
+ samplerate=44100)
max_level = AnalyzerResult(-4.258, attr)
-
+
# RMS level
attr = AnalyzerAttributes(id="rms_level",
name="RMS level",
unit="dBFS",
- sampleRate=44100)
- rms_level = AnalyzerResult(-21.945, attr)
+ samplerate=44100)
+ rms_level = AnalyzerResult(-21.945, attr)
self.expected = AnalyzerResultContainer([max_level,rms_level])
def tearDown(self):
def setUp(self):
self.sample_rate = 16000
-
+
def testOnSweepWithFeaturePlan(self):
- "runs on sweep and define feature plan manualy"
+ "runs on sweep and define feature plan manually"
self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
# Setup Yaafe Analyzer
fp.addFeature('mfcc: MFCC blockSize=512 stepSize=256')
fp.addFeature('mfcc_d1: MFCC blockSize=512 stepSize=256 > Derivate DOrder=1')
fp.addFeature('mfcc_d2: MFCC blockSize=512 stepSize=256 > Derivate DOrder=2')
-
- # Setup a new Yaafe TimeSide analyzer
+
+ # Setup a new Yaafe TimeSide analyzer
# from FeaturePlan
self.analyzer = Yaafe(fp)
# Load Yaafe Feature Plan
fp = FeaturePlan(sample_rate=self.sample_rate)
fp_file = os.path.join (os.path.dirname(__file__), "yaafe_config", "yaafeFeaturePlan")
-
+
fp.loadFeaturePlan(fp_file)
- # Setup a new Yaafe TimeSide analyzer
+ # Setup a new Yaafe TimeSide analyzer
# from FeaturePlan
self.analyzer = Yaafe(fp)
-
+
def testOnGuitarWithDataFlow(self):
"runs on guitar and load Yaafe dataflow from file"
self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
# Setup Yaafe Analyzer
# Load DataFlow from file
- df = DataFlow()
+ df = DataFlow()
df_file = os.path.join (os.path.dirname(__file__), "yaafe_config", "yaafeDataFlow")
df.load(df_file)
-
- # Setup a new Yaafe TimeSide analyzer
+
+ # Setup a new Yaafe TimeSide analyzer
# from DataFlow
self.analyzer = Yaafe(df)
container = AnalyzerResultContainer()
melenergy = AnalyzerResult()
-
+
# Get attributes
- sampleRate = self.samplerate()
- blockSize = self.win_s
- stepSize = self.hop_s
+ samplerate = self.samplerate()
+ blocksize = self.win_s
+ stepsize = self.hop_s
parameters = dict(n_filters= self.n_filters,
n_coeffs= self.n_coeffs)
# Set attributes
melenergy.attributes = AnalyzerAttributes(id="aubio_melenergy",
name="melenergy (aubio)",
unit='',
- sampleRate = sampleRate,
- blockSize = blockSize,
- stepSize = stepSize,
- parameters = parameters)
+ samplerate = samplerate,
+ blocksize = blocksize,
+ stepsize = stepsize,
+ parameters = parameters)
# Set Data
melenergy.data = self.melenergy_results
container.add_result(melenergy)
def results(self):
# MFCC
mfcc = AnalyzerResult()
- sampleRate = self.samplerate()
- blockSize = self.win_s
- stepSize = self.hop_s
+ samplerate = self.samplerate()
+ blocksize = self.win_s
+ stepsize = self.hop_s
parameters = dict(n_filters= self.n_filters,
n_coeffs= self.n_coeffs)
- mfcc.attributes = AnalyzerAttributes(id = "aubio_mfcc",
+ mfcc.attributes = AnalyzerAttributes(id = "aubio_mfcc",
name = "mfcc (aubio)",
unit = "",
- sampleRate = sampleRate,
- blockSize = blockSize,
- stepSize = stepSize,
+ samplerate = samplerate,
+ blocksize = blocksize,
+ stepsize = stepsize,
parameters = parameters)
mfcc.data = [list(line) for line in self.mfcc_results] # TODO : type ? list list ?
-
+
return AnalyzerResultContainer(mfcc)
container = AnalyzerResultContainer()
pitch = AnalyzerResult()
-
+
# Get attributes
- sampleRate = self.samplerate()
- blockSize = self.win_s
- stepSize = self.hop_s
+ samplerate = self.samplerate()
+ blocksize = self.win_s
+ stepsize = self.hop_s
# parameters : None # TODO check with Piem "default" and "freq" in setup
-
+
# Set attributes
pitch.attributes = AnalyzerAttributes(id="aubio_pitch",
name="f0 (aubio)",
unit='Hz',
- sampleRate = sampleRate,
- blockSize = blockSize,
- stepSize = stepSize)
+ samplerate = samplerate,
+ blocksize = blocksize,
+ stepsize = stepsize)
# Set Data
self.pitches = numpy.array(self.pitches)
pitch.data = self.pitches
container.add_result(pitch)
-
+
return container
container = AnalyzerResultContainer()
# Get common attributes
- sampleRate = self.samplerate()
- blockSize = self.win_s
- stepSize = self.hop_s
+ samplerate = self.samplerate()
+ blocksize = self.win_s
+ stepsize = self.hop_s
unit = ""
# For each method store results in container
for method in self.methods:
# Set attributes
id = '_'.join(["aubio_specdesc", method])
name = ' '.join(["spectral descriptor", method, "(aubio)"])
-
+
specdesc.attributes = AnalyzerAttributes(id = id,
name = name,
unit = unit,
- sampleRate = sampleRate,
- blockSize = blockSize,
- stepSize = stepSize)
-
- # Set Data
+ samplerate = samplerate,
+ blocksize = blocksize,
+ stepsize = stepsize)
+
+ # Set Data
specdesc.data = numpy.array(self.specdesc_results[method])
container.add_result(specdesc)
def results(self):
# Get common attributes
- commonAttr = dict(sampleRate=self.samplerate(),
- blockSize=self.win_s,
- stepSize=self.hop_s)
+ commonAttr = dict(samplerate=self.samplerate(),
+ blocksize=self.win_s,
+ stepsize=self.hop_s)
# FIXME : Onsets, beat and onset rate are not frame based Results
- # sampleRate, blockSize, etc. are not appropriate here
+ # samplerate, blocksize, etc. are not appropriate here
# Those might be some kind of "AnalyzerSegmentResults"
#---------------------------------
#---------------------------------
# Onset Rate
#---------------------------------
- onsetRate = AnalyzerResult()
+ onsetrate = AnalyzerResult()
# Set attributes
- onsetRateAttr = dict(id="aubio_onset_rate",
+ onsetrateAttr = dict(id="aubio_onset_rate",
name="onset rate (aubio)",
unit="bpm")
- onsetRate.attributes = dict(onsetRateAttr.items() + commonAttr.items())
+ onsetrate.attributes = dict(onsetrateAttr.items() + commonAttr.items())
# Set Data
if len(self.onsets) > 1:
#periods = [60./(b - a) for a,b in zip(self.onsets[:-1],self.onsets[1:])]
periods = 60. / numpy.diff(self.onsets)
- onsetRate.data = periods
+ onsetrate.data = periods
else:
- onsetRate.data = []
+ onsetrate.data = []
#---------------------------------
# Beats
else:
bpm.data = []
- return AnalyzerResultContainer([onsets, onsetRate, beats, bpm])
\ No newline at end of file
+ return AnalyzerResultContainer([onsets, onsetrate, beats, bpm])
\ No newline at end of file
id : string
name : string
unit : string
- sampleRate : int or float
- blockSize : int
- stepSize : int
+ samplerate : int or float
+ blocksize : int
+ stepsize : int
parameters : dict
Methods
-------
- asdict()
+ as_dict()
Return a dictionnary representation of the AnalyzerAttributes
"""
from collections import OrderedDict
_default_value = OrderedDict([('id', ''),
('name', ''),
('unit', ''),
- ('sampleRate', None),
- ('blockSize', None),
- ('stepSize', None),
+ ('samplerate', None),
+ ('blocksize', None),
+ ('stepsize', None),
('parameters', {})
])
# TODO : rajouter
id : string
name : string
unit : string
- sampleRate : int or float
- blockSize : int
- stepSize : int
+ samplerate : int or float
+ blocksize : int
+ stepsize : int
parameters : dict
Returns
(name, self.__class__.__name__))
super(AnalyzerAttributes, self).__setattr__(name, value)
- def asdict(self):
+ def as_dict(self):
return dict((att, getattr(self, att))
for att in self._default_value.keys())
', '.join('{}={}'.format(
att, repr(getattr(self, att)))
for att in self._default_value.keys()))
-
+
def __eq__(self,other):
return (isinstance(other, self.__class__)
- and self.asdict() == other.asdict())
+ and self.as_dict() == other.as_dict())
class AnalyzerResult(object):
# return self[name]
# return super(AnalyzerResult, self).__getattr__(name)
- def asdict(self):
- return(dict(data=self.data, attributes=self.attributes.asdict()))
+ def as_dict(self):
+ return(dict(data=self.data, attributes=self.attributes.as_dict()))
def to_json(self):
import simplejson as json
- return json.dumps(self.asdict())
+ return json.dumps(self.as_dict())
def __repr__(self):
return self.to_json()
-
+
def __eq__(self,other):
return (isinstance(other, self.__class__)
- and self.asdict() == other.asdict())
+ and self.as_dict() == other.as_dict())
def __ne__(self, other):
return not self.__eq__(other)
return len(self.results)
def __repr__(self):
- return [res.asdict() for res in self.results]
+ return [res.as_dict() for res in self.results]
def __eq__(self, other):
if hasattr(other, 'results'):
if a != b:
return False
return True
-
+
def __ne__(self, other):
- return not self.__eq__(other)
+ return not self.__eq__(other)
def add_result(self, analyzer_result):
if type(analyzer_result) == list:
data_node.text = repr(result.data)
# Serialize Attributes
attr_node = ET.SubElement(res_node, 'attributes')
- for (name, val) in result.attributes.asdict().items():
+ for (name, val) in result.attributes.as_dict().items():
# TODO reorder keys
child = ET.SubElement(attr_node, name)
if name == 'parameters':
result.data = ast.literal_eval(result_child.find('data').text)
except:
result.data = result_child.find('data').text
-
+
# Get attributes
for attr_child in result_child.find('attributes'):
name = attr_child.tag
def to_json(self):
#if data_list == None: data_list = self.results
import simplejson as json
- return json.dumps([res.asdict() for res in self])
+ return json.dumps([res.as_dict() for res in self])
def from_json(self, json_str):
import simplejson as json
def to_yaml(self):
#if data_list == None: data_list = self.results
import yaml
- return yaml.dump([res.asdict() for res in self])
+ return yaml.dump([res.as_dict() for res in self])
def from_yaml(self, yaml_str):
import yaml
def results(self):
result = AnalyzerResult()
# Set attributes
- # FIXME : blockSize and stepSize are not appropriate here
+ # FIXME : blocksize and stepsize are not appropriate here
result.attributes = AnalyzerAttributes(id="mean_dc_shift",
name = "Mean DC shift",
unit = "%",
- sampleRate=self.samplerate(),
- blockSize=None,
- stepSize=None)
-
+ samplerate=self.samplerate(),
+ blocksize=None,
+ stepsize=None)
+
# Set Data
result.data = numpy.round(numpy.mean(100*self.values),3)
return AnalyzerResultContainer(result)
def results(self):
# Max level
- # FIXME : blockSize and stepSize are not appropriate here
+ # FIXME : blocksize and stepsize are not appropriate here
attr = AnalyzerAttributes(id="max_level",
name="Max level",
unit = "dBFS",
- sampleRate=self.samplerate())
+ samplerate=self.samplerate())
data = numpy.round(20*numpy.log10(self.max_value), 3)
max_level = AnalyzerResult(data, attr)
-
+
# RMS level
- # FIXME : blockSize and stepSize are not appropriate here
+ # FIXME : blocksize and stepsize are not appropriate here
attr = AnalyzerAttributes(id="rms_level",
name="RMS level",
unit="dBFS",
- sampleRate=self.samplerate())
+ samplerate=self.samplerate())
data = numpy.round(20*numpy.log10(numpy.sqrt(numpy.mean(self.mean_values))), 3)
rms_level = AnalyzerResult(data, attr)
-
+
return AnalyzerResultContainer([max_level, rms_level])
# Author : Thomas Fillon <thomas@parisson.com>
"""
-Module Yaafe Analyzer
+Module Yaafe Analyzer
Created on Thu Jun 13 16:05:02 2013
@author: Thomas Fillon
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
from timeside.analyzer.core import *
from timeside.api import IValueAnalyzer
-#
from yaafelib import *
-#
import numpy
+
class Yaafe(Processor):
implements(IValueAnalyzer)
+
def __init__(self, yaafeSpecification):
# Check arguments
if isinstance(yaafeSpecification,DataFlow):
self.dataFlow = self.featurePlan.getDataFlow()
else:
raise TypeError("'%s' Type must be either '%s' or '%s'" % (str(yaafeSpecification),str(DataFlow),str(FeaturePlan)))
-
+
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(Yaafe, self).setup(channels, samplerate, blocksize, totalframes)
self.yaafe_engine = Engine()
self.yaafe_engine.load(self.dataFlow)
self.yaafe_engine.reset()
+ self.samplerate = samplerate
+ self.blocksize = blocksize
@staticmethod
@interfacedoc
def process(self, frames, eod=False):
# do process things...
- # Downmixing to mono and convert to float64 for compatibility with Yaafe
+ # Downmixing to mono and convert to float64 for compatibility with Yaafe
yaafe_frames = frames.sum(axis=-1,dtype=numpy.float64) / frames.shape[-1]
- # Reshape for compatibility with Yaafe input format
- yaafe_frames.shape = (1,yaafe_frames.shape[0])
+ # Reshape for compatibility with Yaafe input format
+ yaafe_frames.shape = (1,yaafe_frames.shape[0])
# write audio array on 'audio' input
- self.yaafe_engine.writeInput('audio',yaafe_frames)
- # process available data
- self.yaafe_engine.process()
+ self.yaafe_engine.writeInput('audio',yaafe_frames)
+ # process available data
+ self.yaafe_engine.process()
if eod:
# flush yaafe engine to process remaining data
- self.yaafe_engine.flush()
-
+ self.yaafe_engine.flush()
+
return frames, eod
def results(self):
# Get back current container
container = AnalyzerResultContainer()
# Get feature extraction results from yaafe
- map_keys = {'sampleRate': 'sampleRate',
- 'frameLength': 'blockSize',
- 'sampleStep': 'stepSize',
- 'parameters': 'parameters',
- }
featNames = self.yaafe_engine.getOutputs().keys()
for featName in featNames:
- # Map Yaafe attributes into AnalyzerResults dict
- res_dict = {map_keys[name]: self.yaafe_engine.getOutputs()['mfcc'][name] for name in map_keys.keys()}
- # Define ID fields
- res_dict['id'] = 'yaafe_' + featName
- res_dict['name'] = 'Yaafe ' + featName
- res_dict['unit'] = ''
- # create AnalyzerResult and set its attributes
- result = AnalyzerResult(attributes=res_dict)
+ # Define ID fields
+ id = 'yaafe_' + featName
+ name = 'Yaafe ' + featName
+ unit = ''
+
# Get results from Yaafe engine
- result.data = self.yaafe_engine.readOutput(featName) # Read Yaafe Results
+ result = AnalyzerResult()
+ result.attributes = AnalyzerAttributes(id = id,
+ name = name,
+ unit = unit,
+ samplerate = self.samplerate,
+ blocksize = self.blocksize,
+ stepsize = None)
+
+ result.data = self.yaafe_engine.readOutput(featName) # Read Yaafe Results
# Store results in Container
if len(result.data):
container.add_result(result)
-
- return container
-
-
+ return container