def testOnSweep(self):
"runs on sweep"
self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
- metadata=AnalyzerMetadata(name="Mean DC shift",
- unit="%",
- id="mean_dc_shift",
- samplerate=44100,
- blocksize=None,
- stepsize=None)
- self.expected = AnalyzerResult(data=-0.000, metadata=metadata)
+ self.expected = {'mean_dc_shift': -0.000}
def testOnGuitar(self):
"runs on guitar"
self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
- metadata=AnalyzerMetadata(name="Mean DC shift",
- unit="%",
- id="mean_dc_shift",
- samplerate=44100,
- blocksize=None,
- stepsize=None)
- self.expected = AnalyzerResult(data=0.054, metadata=metadata)
+ self.expected = {'mean_dc_shift': 0.054}
def tearDown(self):
decoder = FileDecoder(self.source)
(decoder | self.analyzer).run()
results = self.analyzer.results()
- self.assertEquals(results[0], self.expected)
+ for key in self.expected.keys():
+ self.assertEquals(results[key].data.value, self.expected[key])
if __name__ == '__main__':
unittest.main(testRunner=TestRunner())
from unit_timeside import *
from timeside.decoder import *
from timeside.analyzer.level import Level
-from timeside.analyzer import AnalyzerResult, AnalyzerResultContainer
+from timeside.analyzer import newAnalyzerResult, AnalyzerResultContainer
from timeside.analyzer import AnalyzerMetadata
class TestAnalyzerLevel(TestCase):
"runs on sweep"
self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
- # Max level
- metadata = AnalyzerMetadata(id="max_level",
- name="Max level",
- unit = "dBFS",
- samplerate=44100)
- max_level = AnalyzerResult(-6.021, metadata)
-
- # RMS level
- metadata = AnalyzerMetadata(id="rms_level",
- name="RMS level",
- unit="dBFS",
- samplerate=44100)
- rms_level = AnalyzerResult(-9.856, metadata)
- self.expected = AnalyzerResultContainer([max_level,rms_level])
+ max_level_value = -6.021
+ rms_level_value = -9.856
+
+ self.expected = {'max_level':max_level_value , 'rms_level':rms_level_value }
def testOnGuitar(self):
"runs on guitar"
self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
- # Max level
- metadata = AnalyzerMetadata(id="max_level",
- name="Max level",
- unit = "dBFS",
- samplerate=44100)
- max_level = AnalyzerResult(-4.258, metadata)
-
- # RMS level
- metadata = AnalyzerMetadata(id="rms_level",
- name="RMS level",
- unit="dBFS",
- samplerate=44100)
- rms_level = AnalyzerResult(-21.945, metadata)
- self.expected = AnalyzerResultContainer([max_level,rms_level])
+ max_level_value = -4.258
+ rms_level_value = -21.945
+
+ self.expected = {'max_level':max_level_value , 'rms_level':rms_level_value }
+
def tearDown(self):
decoder = FileDecoder(self.source)
(decoder | self.analyzer).run()
results = self.analyzer.results()
- self.assertEquals(results, self.expected)
+ for key in self.expected.keys():
+ self.assertEquals(results[key].data.value, self.expected[key])
#print results
#print results.to_yaml()
#print results.to_json()
# Author: Paul Brossier <piem@piem.org>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
-from timeside.api import IValueAnalyzer
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
import numpy
from aubio import filterbank, pvoc
-class AubioMelEnergy(Processor):
- implements(IValueAnalyzer)
+class AubioMelEnergy(Analyzer):
+ implements(IAnalyzer)
+
+ def __init__(self):
+ self.input_blocksize = 1024
+ self.input_stepsize = self.input_blocksize/4
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(AubioMelEnergy, self).setup(channels, samplerate, blocksize, totalframes)
- self.win_s = 1024
- self.hop_s = self.win_s / 4
self.n_filters = 40
self.n_coeffs = 13
- self.pvoc = pvoc(self.win_s, self.hop_s)
- self.melenergy = filterbank(self.n_filters, self.win_s)
+ self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
+ self.melenergy = filterbank(self.n_filters, self.input_blocksize)
self.melenergy.set_mel_coeffs_slaney(samplerate)
self.block_read = 0
self.melenergy_results = numpy.zeros([self.n_filters, ])
return "Mel Energy analysis (aubio)"
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.hop_s):
+ for samples in downsample_blocking(frames, self.input_stepsize):
+ # TODO : check pourquoi on utilise pas le blocksize ?
fftgrain = self.pvoc(samples)
self.melenergy_results = numpy.vstack( [ self.melenergy_results, self.melenergy(fftgrain) ])
self.block_read += 1
return frames, eod
- def results(self):
+ def release(self):
- container = AnalyzerResultContainer()
- melenergy = AnalyzerResult()
+ melenergy = self.new_result(dataMode='value', timeMode='framewise')
- # Get metadata
- samplerate = self.samplerate()
- blocksize = self.win_s
- stepsize = self.hop_s
- parameters = dict(n_filters= self.n_filters,
+ # Metadata
+ melenergy.parameters = dict(n_filters= self.n_filters,
n_coeffs= self.n_coeffs)
# Set metadata
- melenergy.metadata = AnalyzerMetadata(id="aubio_melenergy",
- name="melenergy (aubio)",
- unit='',
- samplerate = samplerate,
- blocksize = blocksize,
- stepsize = stepsize,
- parameters = parameters)
+ melenergy.idMetadata.id ="aubio_melenergy"
+ melenergy.idMetadata.name = "melenergy (aubio)"
+ melenergy.idMetadata.unit=''
+
# Set Data
- melenergy.data = self.melenergy_results
- container.add_result(melenergy)
- return container
+ melenergy.data.value = self.melenergy_results
+
+ self.resultContainer.add_result(melenergy)
# Author: Paul Brossier <piem@piem.org>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
-from timeside.api import IValueAnalyzer
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
import numpy
from aubio import mfcc, pvoc
from math import isnan
-class AubioMfcc(Processor):
- implements(IValueAnalyzer)
+class AubioMfcc(Analyzer):
+ implements(IAnalyzer)
+
+ def __init__(self):
+ self.input_blocksize = 1024
+ self.input_stepsize = self.input_blocksize/4
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(AubioMfcc, self).setup(channels, samplerate, blocksize, totalframes)
- self.win_s = 1024
- self.hop_s = self.win_s / 4
self.n_filters = 40
self.n_coeffs = 13
- self.pvoc = pvoc(self.win_s, self.hop_s)
- self.mfcc = mfcc(self.win_s, self.n_filters, self.n_coeffs, samplerate)
+ self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
+ self.mfcc = mfcc(self.input_blocksize, self.n_filters, self.n_coeffs, samplerate)
self.block_read = 0
self.mfcc_results = numpy.zeros([self.n_coeffs, ])
return "MFCC analysis (aubio)"
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.hop_s):
- #time = self.block_read * self.hop_s * 1. / self.samplerate()
+ for samples in downsample_blocking(frames, self.input_stepsize):
+ #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
fftgrain = self.pvoc(samples)
coeffs = self.mfcc(fftgrain)
self.mfcc_results = numpy.vstack((self.mfcc_results, coeffs))
self.block_read += 1
return frames, eod
- def results(self):
+ def release(self):
# MFCC
- mfcc = AnalyzerResult()
- samplerate = self.samplerate()
- blocksize = self.win_s
- stepsize = self.hop_s
+ mfcc = self.new_result(dataMode='value', timeMode='framewise')
parameters = dict(n_filters= self.n_filters,
n_coeffs= self.n_coeffs)
- mfcc.metadata = AnalyzerMetadata(id = "aubio_mfcc",
- name = "mfcc (aubio)",
- unit = "",
- samplerate = samplerate,
- blocksize = blocksize,
- stepsize = stepsize,
- parameters = parameters)
- mfcc.data = [list(line) for line in self.mfcc_results] # TODO : type ? list list ?
-
- return AnalyzerResultContainer(mfcc)
+
+ mfcc.idMetadata.id = "aubio_mfcc"
+ mfcc.idMetadata.name = "mfcc (aubio)"
+ mfcc.idMetadata.unit = ""
+ mfcc.parameters = parameters
+
+ mfcc.data.value = self.mfcc_results
+ self.resultContainer.add_result(mfcc)
@staticmethod
@interfacedoc
def id():
- return "aubio_pitch_analyzer"
+ return "aubio_pitch"
@staticmethod
@interfacedoc
self.block_read += 1
return frames, eod
- def results(self):
-
- container = super(AubioPitch, self).results()
-
+ def release(self):
+ # set Result
pitch = self.new_result(dataMode='value', timeMode='framewise')
pitch.idMetadata.id = "aubio_pitch"
# Set Data
pitch.data.value = numpy.array(self.pitches)
- container.add_result(pitch)
- return container
+ self.resultContainer.add_result(pitch)
+
# Author: Paul Brossier <piem@piem.org>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
-from timeside.api import IValueAnalyzer
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
import numpy
from aubio import specdesc, pvoc
-class AubioSpecdesc(Processor):
- implements(IValueAnalyzer)
+class AubioSpecdesc(Analyzer):
+ implements(IAnalyzer)
+
+
+ def __init__(self):
+ self.input_blocksize = 1024
+ self.input_stepsize = self.input_blocksize / 4
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(AubioSpecdesc, self).setup(channels, samplerate, blocksize, totalframes)
self.block_read = 0
- self.win_s = 1024
- self.hop_s = self.win_s / 4
- self.pvoc = pvoc(self.win_s, self.hop_s)
+ self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
self.methods = ['default', 'energy', 'hfc', 'complex', 'phase', 'specdiff', 'kl',
'mkl', 'specflux', 'centroid', 'slope', 'rolloff', 'spread', 'skewness',
'kurtosis', 'decrease']
self.specdesc = {}
self.specdesc_results = {}
for method in self.methods:
- self.specdesc[method] = specdesc(method, self.win_s)
+ self.specdesc[method] = specdesc(method, self.input_blocksize)
self.specdesc_results[method] = []
@staticmethod
return "Spectral Descriptor (aubio)"
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.hop_s):
+ for samples in downsample_blocking(frames, self.input_stepsize):
fftgrain = self.pvoc(samples)
for method in self.methods:
self.specdesc_results[method] += [self.specdesc[method](fftgrain)[0]]
return frames, eod
- def results(self):
+ def release(self):
- container = AnalyzerResultContainer()
- # Get common metadata
- samplerate = self.samplerate()
- blocksize = self.win_s
- stepsize = self.hop_s
unit = ""
+
# For each method store results in container
for method in self.methods:
- specdesc = AnalyzerResult()
+ res_specdesc = self.new_result(dataMode='value',
+ timeMode='framewise')
# Set metadata
- id = '_'.join(["aubio_specdesc", method])
- name = ' '.join(["spectral descriptor", method, "(aubio)"])
+ res_specdesc.idMetadata.id = '_'.join(["aubio_specdesc", method])
+ res_specdesc.idMetadata.name = ' '.join(["spectral descriptor", method, "(aubio)"])
+ res_specdesc.idMetadata.unit = unit
- specdesc.metadata = AnalyzerMetadata(id = id,
- name = name,
- unit = unit,
- samplerate = samplerate,
- blocksize = blocksize,
- stepsize = stepsize)
- # Set Data
- specdesc.data = numpy.array(self.specdesc_results[method])
+ res_specdesc.data.value = self.specdesc_results[method]
- container.add_result(specdesc)
+ self.resultContainer.add_result(res_specdesc)
- return container
# Author: Paul Brossier <piem@piem.org>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
+from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
+from utils import downsample_blocking
from aubio import onset, tempo
+import numpy
class AubioTemporal(Analyzer):
implements(IAnalyzer)
self.block_read += 1
return frames, eod
- def results(self):
-
- container = super(AubioTemporal, self).results()
+ def release(self):
#---------------------------------
# Onsets
onsets.labelMetadata.label = {1: 'Onset'}
- container.add_result(onsets)
+ self.resultContainer.add_result(onsets)
#---------------------------------
# Onset Rate
else:
onsetrate.data.value = []
- container.add_result(onsetrate)
+ self.resultContainer.add_result(onsetrate)
#---------------------------------
# Beats
beats.labelMetadata.label = {1: 'Beat'}
- container.add_result(beats)
+ self.resultContainer.add_result(beats)
#---------------------------------
# BPM
# Set Data, dataMode='value', timeMode='segment'
if len(self.beats) > 1:
periods = 60. / numpy.diff(self.beats)
- periods = numpy.append(periods,periods[-1])
+ periods = numpy.append(periods, periods[-1])
bpm.data.time = self.beats
bpm.data.duration = duration
else:
bpm.data.value = []
- container.add_result(bpm)
-
- return container
+ self.resultContainer.add_result(bpm)
#'datetime64',
#'complex128',
#'complex64',
- ]
+]
numpy_data_types = map(lambda x: getattr(numpy, x), numpy_data_types)
#numpy_data_types += [numpy.ndarray]
class MetadataObject(object):
+
"""
Object that contains a metadata structure
stucture inspired by [1]
setattr(self, key, value)
# Set metadata passed in as arguments
- #for k, v in zip(self._default_value.keys(), args):
+ # for k, v in zip(self._default_value.keys(), args):
# setattr(self, k, v)
# print 'args'
for key, value in kwargs.items():
def __setattr__(self, name, value):
if name not in self._default_value.keys():
raise AttributeError("%s is not a valid attribute in %s" %
- (name, self.__class__.__name__))
+ (name, self.__class__.__name__))
super(MetadataObject, self).__setattr__(name, value)
def __delattr__(self, name):
def as_dict(self):
return dict((att, getattr(self, att))
- for att in self._default_value.keys())
+ for att in self._default_value.keys())
def keys(self):
return [attr for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ if hasattr(self, attr)]
def values(self):
return [self[attr] for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ if hasattr(self, attr)]
def items(self):
return [(attr, self[attr]) for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ if hasattr(self, attr)]
def __getitem__(self, key, default=None):
try:
return '{}({})'.format(
self.__class__.__name__,
', '.join('{}={}'.format(
- att, repr(getattr(self, att)))
- for att in self._default_value.keys()))
+ att, repr(getattr(self, att)))
+ for att in self._default_value.keys()))
def __str__(self):
return self.as_dict().__str__()
def __eq__(self, other):
return (isinstance(other, self.__class__)
- and all([self[key] == other[key] for key in self.keys()]))
+ and all([self[key] == other[key] for key in self.keys()]))
def __ne__(self, other):
return not(isinstance(other, self.__class__)
- or self.as_dict() != other.as_dict())
+ or self.as_dict() != other.as_dict())
def to_xml(self):
import xml.etree.ElementTree as ET
class IdMetadata(MetadataObject):
+
'''
Metadata object to handle Audio related Metadata
# TODO :
# - (long) description --> Ã mettre dans l'API Processor
-
# Define default values
_default_value = OrderedDict([('id', ''),
('name', ''),
class AudioMetadata(MetadataObject):
+
'''
Metadata object to handle Identification Metadata
class LabelMetadata(MetadataObject):
+
'''
Metadata object to handle Label Metadata
'''
-
# Define default values
_default_value = OrderedDict([('label', None),
('description', None),
class FrameMetadata(MetadataObject):
+
'''
Metadata object to handle Frame related Metadata
'''
# TODO : check is samplerate can support float
-
# Define default values
_default_value = OrderedDict([('samplerate', None),
('blocksize', None),
class AnalyzerData(MetadataObject):
+
'''
Metadata object to handle Frame related Metadata
def __eq__(self, other):
try:
return (isinstance(other, self.__class__) and
- all([numpy.array_equal (self[key], other[key])
- for key in self.keys()]))
+ all([numpy.array_equal(self[key], other[key])
+ for key in self.keys()]))
except AttributeError:
- #print self
- #print [self[key] == other[key] for key in self.keys()]
+ # print self
+ # print [self[key] == other[key] for key in self.keys()]
return (isinstance(other, self.__class__) and
- all([bool(numpy.logical_and.reduce((self[key] == other[key]).ravel()))
+ all([bool(numpy.logical_and.reduce((self[key] == other[key]).ravel()))
for key in self.keys()]))
def __ne__(self, other):
return not(isinstance(other, self.__class__) or
- any([numpy.array_equal (self[key], other[key])
+ any([numpy.array_equal(self[key], other[key])
for key in self.keys()]))
def to_xml(self):
value = getattr(self, key)
if value not in [None, []]:
child.text = repr(value.tolist())
- child.set('dtype',value.dtype.__str__())
+ child.set('dtype', value.dtype.__str__())
return ET.tostring(root, encoding="utf-8", method="xml")
class newAnalyzerResult(MetadataObject):
+
"""
Object that contains the metadata and parameters of an analyzer process
_validTimeMode = ['framewise', 'global', 'segment', 'event', None]
def __init__(self, dataMode=None,
- timeMode=None):
+ timeMode=None):
super(newAnalyzerResult, self).__init__()
self.dataMode = dataMode
self.timeMode = timeMode
pass
else:
raise ValueError('Argument ''dataMode''=%s should be in %s'
- % (value, self._validDataMode))
+ % (value, self._validDataMode))
elif name == 'timeMode':
if self[name] is not None:
raise AttributeError("The value of attribute ''timeMode'' \\\
pass
else:
raise ValueError('Argument ''timeMode''=%s should be in %s'
- % (value, self._validTimeMode))
+ % (value, self._validTimeMode))
super(newAnalyzerResult, self).__setattr__(name, value)
def as_dict(self):
return dict([(key, self[key].as_dict())
- for key in self.keys() if hasattr(self[key],'as_dict')]+
- [('dataMode', self.dataMode), ('timeMode', self.timeMode)])
+ for key in self.keys() if hasattr(self[key], 'as_dict')] +
+ [('dataMode', self.dataMode), ('timeMode', self.timeMode)])
def to_xml(self):
import xml.etree.ElementTree as ET
root = ET.Element('result')
root.metadata = {'name': self.idMetadata.name,
- 'id': self.idMetadata.id}
+ 'id': self.idMetadata.id}
for key in self.keys():
if key in ['dataMode', 'timeMode']:
class AnalyzerMetadata(MetadataObject):
+
"""
Object that contains the metadata and parameters of an analyzer process
class AnalyzerResult(object):
+
"""
Object that contains results return by an analyzer process
metadata :
if value is None:
value = []
# make a numpy.array out of list
- if type(value) is list:
+ if isinstance(value, list):
value = numpy.array(value)
# serialize using numpy
- if type(value) in numpy_data_types+[numpy.ndarray]:
+ if type(value) in numpy_data_types + [numpy.ndarray]:
value = value.tolist()
if type(value) not in [list, str, int, long, float, complex, type(None)] + numpy_data_types:
raise TypeError('AnalyzerResult can not accept type %s' %
- type(value))
+ type(value))
elif name == 'metadata':
if not isinstance(value, AnalyzerMetadata):
value = AnalyzerMetadata(**value)
else:
raise AttributeError("%s is not a valid attribute in %s" %
- (name, self.__class__.__name__))
+ (name, self.__class__.__name__))
return super(AnalyzerResult, self).__setattr__(name, value)
@property
def properties(self):
prop = dict(mean=numpy.mean(self.data, axis=0),
- std=numpy.std(self.data, axis=0, ddof=1),
- median=numpy.median(self.data, axis=0),
- max=numpy.max(self.data, axis=0),
- min=numpy.min(self.data, axis=0)
- )
+ std=numpy.std(self.data, axis=0, ddof=1),
+ median=numpy.median(self.data, axis=0),
+ max=numpy.max(self.data, axis=0),
+ min=numpy.min(self.data, axis=0)
+ )
# ajouter size
return(prop)
def __eq__(self, other):
return (isinstance(other, self.__class__)
- and self.as_dict() == other.as_dict())
+ and self.as_dict() == other.as_dict())
def __ne__(self, other):
return not self.__eq__(other)
-class AnalyzerResultContainer(object):
+class AnalyzerResultContainer(dict):
+
'''
>>> from timeside.decoder import FileDecoder
>>> import timeside.analyzer.core as coreA
>>> resContainer = coreA.AnalyzerResultContainer()
'''
+
def __init__(self, analyzer_results=None):
- self.results = []
+ super(AnalyzerResultContainer,self).__init__()
if analyzer_results is not None:
self.add_result(analyzer_results)
- def __getitem__(self, i):
- return self.results[i]
+# def __getitem__(self, i):
+# return self.results[i]
- def __len__(self):
- return len(self.results)
+# def __len__(self):
+# return len(self.results)
- def __repr__(self):
- return [res.as_dict() for res in self.results].__repr__()
+# def __repr__(self):
+ # return [res.as_dict() for res in self.values()].__repr__()
- def __eq__(self, other):
- if hasattr(other, 'results'):
- other = other.results
- return self.results == other
+ #def __eq__(self, other):
+ #if hasattr(other, 'results'):
+ # other = other.results
+ # return self == other
- def __ne__(self, other):
- return not self.__eq__(other)
+ #def __ne__(self, other):
+ # return not self.__eq__(other)
def add_result(self, analyzer_result):
- if type(analyzer_result) == list:
+ if isinstance(analyzer_result, list):
for res in analyzer_result:
self.add_result(res)
return
or isinstance(analyzer_result, newAnalyzerResult)):
raise TypeError('only AnalyzerResult can be added')
- self.results += [analyzer_result]
+ self.__setitem__(analyzer_result.idMetadata.id,
+ analyzer_result)
+ #self.results += [analyzer_result]
+
+ def to_xml(self):
- def to_xml(self, data_list=None):
- if data_list is None:
- data_list = self.results
import xml.etree.ElementTree as ET
# TODO : cf. telemeta util
root = ET.Element('timeside')
- for result in data_list:
+ for result in self.values():
if result:
root.append(ET.fromstring(result.to_xml()))
'dtype': obj.dtype.__str__()}
raise TypeError(repr(obj) + " is not JSON serializable")
- return json.dumps([res.as_dict() for res in self],
- default=NumpyArrayEncoder)
+ return json.dumps([res.as_dict() for res in self.values()],
+ default=NumpyArrayEncoder)
def from_json(self, json_str):
import simplejson as json
yaml.add_representer(numpy.ndarray, numpyArray_representer)
- return yaml.dump([res.as_dict() for res in self])
+ return yaml.dump([res.as_dict() for res in self.values()])
def from_yaml(self, yaml_str):
import yaml
results.add_result(res)
return results
- def to_numpy(self, output_file, data_list=None):
- if data_list is None:
- data_list = self.results
- numpy.save(output_file, data_list)
+ def to_numpy(self, output_file):
+ numpy.save(output_file, self)
def from_numpy(self, input_file):
return numpy.load(input_file)
- def to_hdf5(self, output_file, data_list=None):
- if data_list is None:
- data_list = self.results
+ def to_hdf5(self, output_file):
import h5py
# Open HDF5 file and save dataset (overwrite any existing file)
with h5py.File(output_file, 'w') as h5_file:
- for res in data_list:
+ for res in self.values():
# Save results in HDF5 Dataset
group = h5_file.create_group(res.idMetadata.id)
group.attrs['dataMode'] = res['dataMode']
if res[key][dsetName].dtype == 'object':
# Handle numpy type = object as vlen string
subgroup.create_dataset(dsetName,
- data=res[key][dsetName].tolist().__repr__(),
- dtype=h5py.special_dtype(vlen=str))
+ data=res[key][
+ dsetName].tolist(
+ ).__repr__(),
+ dtype=h5py.special_dtype(vlen=str))
else:
subgroup.create_dataset(dsetName,
- data=res[key][dsetName])
-
-
+ data=res[key][dsetName])
def from_hdf5(self, input_file):
import h5py
# It should be fixed by the next h5py version
if dset.shape != (0,):
if h5py.check_dtype(vlen=dset.dtype):
- # to deal with VLEN data used for list of list
+ # to deal with VLEN data used for list of
+ # list
result[subgroup_name][dsetName] = eval(
- dset[...].tolist())
+ dset[...].tolist())
else:
result[subgroup_name][dsetName] = dset[...]
else:
class Analyzer(Processor):
+
'''
Generic class for the analyzers
'''
self.result_stepsize = self.input_stepsize
def results(self):
- container = AnalyzerResultContainer()
- return container
+ #container = AnalyzerResultContainer()
+ return self.resultContainer
@staticmethod
@interfacedoc
return ""
def new_result(self, dataMode=newAnalyzerResult._default_value['dataMode'],
- timeMode=newAnalyzerResult._default_value['timeMode']):
+ timeMode=newAnalyzerResult._default_value['timeMode']):
'''
Create a new result
result = newAnalyzerResult(dataMode=dataMode, timeMode=timeMode)
# Automatically write known metadata
- result.idMetadata = IdMetadata(date=datetime.now().replace(microsecond=0).isoformat(' '),
- version=TimeSideVersion,
- author='TimeSide')
+ result.idMetadata = IdMetadata(
+ date=datetime.now().replace(microsecond=0).isoformat(' '),
+ version=TimeSideVersion,
+ author='TimeSide')
result.audioMetadata = AudioMetadata(uri=self.mediainfo()['uri'],
start=self.mediainfo()['start'],
duration=self.mediainfo()['duration'])
if timeMode == 'framewise':
result.frameMetadata = FrameMetadata(
- samplerate=self.result_samplerate,
- blocksize=self.result_blocksize,
- stepsize=self.result_stepsize)
+ samplerate=self.result_samplerate,
+ blocksize=self.result_blocksize,
+ stepsize=self.result_stepsize)
elif timeMode == 'global':
# None : handle by data
pass
# Author: Guillaume Pellerin <yomguy@parisson.com>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
+from timeside.analyzer.core import Analyzer
from timeside.api import IValueAnalyzer
import numpy
-class MeanDCShift(Processor):
+class MeanDCShift(Analyzer):
implements(IValueAnalyzer)
@interfacedoc
self.values = numpy.append(self.values, numpy.mean(frames))
return frames, eod
- def results(self):
- result = AnalyzerResult()
+ def release(self):
+ dc_result = self.new_result(dataMode='value', timeMode='global')
# Set metadata
- # FIXME : blocksize and stepsize are not appropriate here
- result.metadata = AnalyzerMetadata(id="mean_dc_shift",
- name = "Mean DC shift",
- unit = "%",
- samplerate=self.samplerate(),
- blocksize=None,
- stepsize=None)
-
+ dc_result.idMetadata.id = "mean_dc_shift"
+ dc_result.idMetadata.name = "Mean DC shift"
+ dc_result.idMetadata.unit = "%"
# Set Data
- result.data = numpy.round(numpy.mean(100*self.values),3)
- return AnalyzerResultContainer(result)
+ dc_result.data.value = numpy.round(numpy.mean(100*self.values),3)
+ self.resultContainer.add_result(dc_result)
\ No newline at end of file
from timeside.core import Processor, implements, interfacedoc, \
FixedSizeInputAdapter
-from timeside.analyzer.core import AnalyzerMetadata, \
- AnalyzerResultContainer, \
- AnalyzerResult
+from timeside.analyzer.core import Analyzer
from timeside.api import IValueAnalyzer
import numpy
-class Level(Processor):
+class Level(Analyzer):
implements(IValueAnalyzer)
@interfacedoc
numpy.mean(numpy.square(frames)))
return frames, eod
- def results(self):
+ def release(self):
# Max level
- # FIXME : blocksize and stepsize are not appropriate here
- metadata = AnalyzerMetadata(id="max_level",
- name="Max level",
- unit = "dBFS",
- samplerate=self.samplerate())
- data = numpy.round(20*numpy.log10(self.max_value), 3)
- max_level = AnalyzerResult(data, metadata)
+ max_level = self.new_result(dataMode='value', timeMode='global')
+
+ max_level.idMetadata.id = "max_level"
+ max_level.idMetadata.name = "Max level"
+ max_level.idMetadata.unit = "dBFS"
+
+ max_level.data.value = numpy.round(20*numpy.log10(self.max_value), 3)
+ self.resultContainer.add_result(max_level)
# RMS level
- # FIXME : blocksize and stepsize are not appropriate here
- metadata = AnalyzerMetadata(id="rms_level",
- name="RMS level",
- unit="dBFS",
- samplerate=self.samplerate())
- data = numpy.round(20*numpy.log10(
+ rms_level = self.new_result(dataMode='value', timeMode='global')
+ rms_level.idMetadata.id = "rms_level"
+ rms_level.idMetadata.name="RMS level"
+ rms_level.idMetadata.unit="dBFS"
+
+ rms_level.data.value = numpy.round(20*numpy.log10(
numpy.sqrt(numpy.mean(self.mean_values))), 3)
- rms_level = AnalyzerResult(data, metadata)
+ self.resultContainer.add_result(rms_level)
- return AnalyzerResultContainer([max_level, rms_level])
@author: Thomas Fillon
"""
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
-from timeside.api import IValueAnalyzer
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
from yaafelib import *
import numpy
-class Yaafe(Processor):
- implements(IValueAnalyzer)
+class Yaafe(Analyzer):
+ implements(IAnalyzer)
def __init__(self, yaafeSpecification):
# Check arguments
return frames, eod
- def results(self):
- # Get back current container
- container = AnalyzerResultContainer()
+ def release(self):
# Get feature extraction results from yaafe
featNames = self.yaafe_engine.getOutputs().keys()
if len(featNames) == 0:
# Define ID fields
id = 'yaafe_' + featName
name = 'Yaafe ' + featName
- unit = ''
# Get results from Yaafe engine
- result = AnalyzerResult()
- result.metadata = AnalyzerMetadata(id=id,
- name=name,
- unit=unit,
- samplerate=self.samplerate,
- blocksize=self.blocksize,
- stepsize=None)
+ result = self.new_result(dataMode='value', timeMode='framewise')
+ result.idMetadata.id = id
+ result.idMetadata.name = name
+ result.idMetadata.unit = ''
# Read Yaafe Results
- result.data = self.yaafe_engine.readOutput(featName)
+ result.data.value = self.yaafe_engine.readOutput(featName)
# Store results in Container
- if len(result.data):
- container.add_result(result)
+ if len(result.data.value):
+ self.resultContainer.add_result(result)
- return container
last = source
+ from timeside.analyzer.core import AnalyzerResultContainer
+ self.resultContainer = AnalyzerResultContainer()
+
# setup/reset processors and configure properties throughout the pipe
for item in items:
item.setup(channels = last.channels(),
blocksize = last.blocksize(),
totalframes = last.totalframes())
item.source_mediainfo = source.mediainfo()
+ item.resultContainer = self.resultContainer
last = item
# now stream audio data along the pipe
[<matplotlib.lines.Line2D object at 0x...>]
>>> plt.legend(['Source signal', 'Smoothed signal']) # doctest: +ELLIPSIS
<matplotlib.legend.Legend object at 0x...>
- >>> #plt.show()
+ >>> plt.show() # doctest: +SKIP
"""
# TODO: the window parameter could be the window itself if an array instead of a string