Example of use of the new analyzerResult structure
-Usage : AnalyzerResult(data_mode=None, time_mode=None)
+Usage : AnalyzerResultFactory(data_mode=None, time_mode=None)
-See : :class:`timeside.analyzer.core.AnalyzerResult`
+See : :function:`timeside.analyzer.core.AnalyzerResultFactory`, :class:`timeside.analyzer.core.AnalyzerResult`
Default
=======
Create a new analyzer result without arguments
- >>> from timeside.analyzer.core import AnalyzerResult
- >>> res = AnalyzerResult()
+ >>> from timeside.analyzer.core import AnalyzerResultFactory
+ >>> res = AnalyzerResultFactory()
This default result has all the metadata and dataObject attribute
>>> res.keys()
- ['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'frame_metadata', 'label_metadata', 'parameters']
+ ['id_metadata', 'data_object', 'audio_metadata', 'frame_metadata', 'parameters']
>>> for key,value in res.items():
... print '%s : %s' % (key, value)
...
- data_mode : None
- time_mode : None
id_metadata : {'description': '', 'author': '', 'version': '', 'date': '', 'id': '', 'unit': '', 'name': ''}
- dataObject : {'duration': array([], dtype=float64), 'time': array([], dtype=float64), 'value': None, 'label': array([], dtype=int64)}
- audio_metadata : {'duration': None, 'start': 0, 'channelsManagement': '', 'uri': '', 'channels': None}
+ data_object : {'value': array([], dtype=float64)}
+ audio_metadata : {'is_segment': None, 'uri': '', 'channels': None, 'start': 0, 'channelsManagement': '', 'duration': None}
frame_metadata : {'blocksize': None, 'samplerate': None, 'stepsize': None}
- label_metadata : {'label_type': 'mono', 'description': None, 'label': None}
parameters : {}
Framewise
---------
->>> res = AnalyzerResult(time_mode='framewise')
+>>> res = AnalyzerResultFactory(time_mode='framewise')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'frame_metadata', 'label_metadata', 'parameters']
+['id_metadata', 'data_object', 'audio_metadata', 'frame_metadata', 'parameters']
Global
------
No frame metadata information is needed for these modes.
The 'frame_metadata' key/attribute is deleted.
->>> res = AnalyzerResult(time_mode='global')
+>>> res = AnalyzerResultFactory(time_mode='global')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'label_metadata', 'parameters']
+['id_metadata', 'data_object', 'audio_metadata', 'parameters']
>>> res.data_object
-DataObject(value=None, label=array([], dtype=int64))
+DataObject(value=array([], dtype=float64))
Segment
-------
->>> res = AnalyzerResult(time_mode='segment')
+>>> res = AnalyzerResultFactory(time_mode='segment')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'label_metadata', 'parameters']
->>> res.data
-DataObject(value=None, label=array([], dtype=int64), time=array([], dtype=float64), duration=array([], dtype=float64))
+['id_metadata', 'data_object', 'audio_metadata', 'parameters']
+>>> res.data_object
+DataObject(value=array([], dtype=float64), time=array([], dtype=float64), duration=array([], dtype=float64))
Event
-----
->>> res = AnalyzerResult(time_mode='event')
+>>> res = AnalyzerResultFactory(time_mode='event')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'label_metadata', 'parameters']
->>> res.data
-DataObject(value=None, label=array([], dtype=int64), time=array([], dtype=float64))
+['id_metadata', 'data_object', 'audio_metadata', 'parameters']
+>>> res.data_object
+DataObject(value=array([], dtype=float64), time=array([], dtype=float64))
Specification of data_mode
=========================
-----
The label_metadata key is deleted.
->>> res = AnalyzerResult(data_mode='value')
+>>> res = AnalyzerResultFactory(data_mode='value')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'frame_metadata', 'parameters']
+['id_metadata', 'data_object', 'audio_metadata', 'frame_metadata', 'parameters']
In the dataObject key, the 'value' key is kept and the 'label' key is deleted.
->>> res.data
-DataObject(value=None, time=array([], dtype=float64), duration=array([], dtype=float64))
+>>> res.data_object
+DataObject(value=array([], dtype=float64))
Label
-----
->>> res = AnalyzerResult(data_mode='label')
+>>> res = AnalyzerResultFactory(data_mode='label')
>>> res.keys()
-['data_mode', 'time_mode', 'id_metadata', 'data', 'audio_metadata', 'frame_metadata', 'label_metadata', 'parameters']
+['id_metadata', 'data_object', 'audio_metadata', 'frame_metadata', 'label_metadata', 'parameters']
In the dataObject key, the 'label' key is kept and the 'value' key is deleted.
->>> res.data
-DataObject(label=array([], dtype=int64), time=array([], dtype=float64), duration=array([], dtype=float64))
+>>> res.data_object
+DataObject(label=array([], dtype=int64))
# Thomas Fillon <thomas at parisson.com>
from __future__ import division
-from timeside.core import Processor, implements, interfacedoc
-from timeside.api import IAnalyzer
+from timeside.core import Processor
from timeside.__init__ import __version__
import numpy
from collections import OrderedDict
+from copy import deepcopy
numpy_data_types = [
#'float128',
def as_dict(self):
return dict((att, getattr(self, att))
- for att in self._default_value.keys())
+ for att in self.keys())
def keys(self):
- return [attr for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ return [attr for attr in self._default_value.keys()]
def values(self):
- return [self[attr] for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ return [self[attr] for attr in self.keys()]
def items(self):
- return [(attr, self[attr]) for attr in self._default_value.keys()
- if hasattr(self, attr)]
+ return [(attr, self[attr]) for attr in self.keys()]
def __getitem__(self, key, default=None):
try:
self.__class__.__name__,
', '.join('{}={}'.format(
att, repr(getattr(self, att)))
- for att in self._default_value.keys()))
+ for att in self.keys()))
def __str__(self):
return self.as_dict().__str__()
'''
# Define default values
- _default_value = OrderedDict([('label', None),
- ('description', None),
+ _default_value = OrderedDict([('label', {}),
+ ('description', {}),
('label_type', 'mono')])
class DataObject(MetadataObject):
'''
- Metadata object to handle Frame related Metadata
+ Metadata object to handle data related Metadata
Attributes
----------
'''
# Define default values
- _default_value = OrderedDict([('value', None),
+ _default_value = OrderedDict([('value', []),
('label', []),
('time', []),
('duration', [])])
return self
+
class AnalyzerResult(MetadataObject):
"""
Returns
-------
A new MetadataObject with the following attributes :
- - data_mode
- - time_mode
- - data : :class:`DataObject`
+ - data_object : :class:`DataObject`
- id_metadata : :class:`IdMetadata`
- audio_metadata : :class:`AudioMetadata`
- frame_metadata : :class:`FrameMetadata`
"""
# Define default values
- _default_value = OrderedDict([('data_mode', None),
- ('time_mode', None),
- ('id_metadata', None),
- ('data_object', None),
- ('audio_metadata', None),
- ('frame_metadata', None),
- ('label_metadata', None),
- ('parameters', None)
+ _default_value = OrderedDict([('id_metadata', IdMetadata()),
+ ('data_object', DataObject()),
+ ('audio_metadata', AudioMetadata()),
+ ('frame_metadata', FrameMetadata()),
+ ('label_metadata', LabelMetadata()),
+ ('parameters', AnalyzerParameters())
])
- _valid_data_mode = ['value', 'label', None]
- _valid_time_mode = ['framewise', 'global', 'segment', 'event', None]
-
def __init__(self, data_mode=None,
time_mode=None):
super(AnalyzerResult, self).__init__()
- self.data_mode = data_mode
- self.time_mode = time_mode
+ self._data_mode = data_mode
+ self._time_mode = time_mode
def __setattr__(self, name, value):
- setFuncDict = {'id_metadata': IdMetadata,
- 'data_object': DataObject,
- 'audio_metadata': AudioMetadata,
- 'frame_metadata': FrameMetadata,
- 'label_metadata': LabelMetadata,
- 'parameters': AnalyzerParameters}
-
- if name in setFuncDict.keys():
- setFunc = setFuncDict[name]
- if isinstance(value, setFunc):
- super(AnalyzerResult, self).__setattr__(name, value)
- return
- elif isinstance(value, dict):
+ if name in ['_data_mode', '_time_mode']:
+ super(MetadataObject, self).__setattr__(name, value)
+ return
+
+ elif name in self.keys():
+ if isinstance(value, dict) and value :
for (sub_name, sub_value) in value.items():
self[name][sub_name] = sub_value
return
- elif value is None:
- super(AnalyzerResult, self).__setattr__(name, setFunc())
- return
- else:
- raise TypeError('Wrong argument')
- elif name == 'data_mode':
- if self[name] is not None:
- raise AttributeError("The value of attribute ''data_mode'' \\\
- can not change after setup")
- if value == 'value':
- del self.label_metadata
- del self.data_object.label
- elif value == 'label':
- del self.data_object.value
- elif value is None:
- pass
- else:
- raise ValueError('Argument ''data_mode''=%s should be in %s'
- % (value, self._valid_data_mode))
- elif name == 'time_mode':
- if self[name] is not None:
- raise AttributeError("The value of attribute ''time_mode'' \\\
- can not change after setup")
-
- if value == 'framewise':
- del self.data_object.time
- del self.data_object.duration
- pass
- elif value == 'global':
- del self.data_object.time
- del self.data_object.duration
- del self.frame_metadata
-
- pass
- elif value == 'segment':
- del self.frame_metadata
- elif value == 'event':
- del self.frame_metadata
- del self.data_object.duration
-
- pass
- elif value is None:
- pass
- else:
- raise ValueError('Argument ''time_mode''=%s should be in %s'
- % (value, self._valid_time_mode))
+
super(AnalyzerResult, self).__setattr__(name, value)
def __len__(self):
root.metadata = {'name': self.id_metadata.name,
'id': self.id_metadata.id}
+ for name in ['data_mode', 'time_mode']:
+ child = ET.SubElement(root, name)
+ child.text = str(self.__getattribute__(name))
+ child.tag = name
+ root.append(child)
+
for key in self.keys():
- if key in ['data_mode', 'time_mode']:
- child = ET.SubElement(root, key)
- child.text = str(self[key])
- else:
- child = ET.fromstring(self[key].to_xml())
+ child = ET.fromstring(self[key].to_xml())
child.tag = key
root.append(child)
return ET.tostring(root, encoding="utf-8", method="xml")
- def from_xml(self, xml_string):
+ @staticmethod
+ def from_xml(xml_string):
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_string)
data_mode_child = root.find('data_mode')
time_mode_child = root.find('time_mode')
- result = AnalyzerResult(data_mode=data_mode_child.text,
+ result = AnalyzerResultFactory(data_mode=data_mode_child.text,
time_mode=time_mode_child.text)
for child in root:
key = child.tag
return result
+ @property
+ def data_mode(self):
+ return self._data_mode
+
+ @property
+ def time_mode(self):
+ return self._time_mode
+
@property
def data(self):
- if self.data_mode is None:
- return (
- {key: self.data_object[key]
- for key in ['value', 'label'] if key in self.data_object.keys()}
- )
- elif self.data_mode is 'value':
- return self.data_object.value
- elif self.data_mode is 'label':
- return self.data_object.label
+ raise NotImplementedError
@property
def time(self):
- if self.time_mode == 'global':
- return self.audio_metadata.start
- elif self.time_mode == 'framewise':
- return (self.audio_metadata.start +
- self.frame_metadata.stepsize /
- self.frame_metadata.samplerate *
- numpy.arange(0, len(self)))
- else:
- return self.audio_metadata.start + self.data_object.time
- pass
+ raise NotImplementedError
@property
def duration(self):
- if self.time_mode == 'global':
- return self.audio_metadata.duration
- elif self.time_mode == 'framewise':
- return (self.frame_metadata.blocksize /
- self.frame_metadata.samplerate
- * numpy.ones(len(self)))
- elif self.time_mode == 'event':
- return numpy.zeros(len(self))
- elif self.time_mode == 'segment':
- return self.data_object.duration
+ raise NotImplementedError
@property
def id(self):
-# @property
-# def properties(self):
-# prop = dict(mean=numpy.mean(self.data, axis=0),
-# std=numpy.std(self.data, axis=0, ddof=1),
-# median=numpy.median(self.data, axis=0),
-# max=numpy.max(self.data, axis=0),
-# min=numpy.min(self.data, axis=0)
-# )
-# ajouter size
-# return(prop)
+class ValueObject(AnalyzerResult):
+
+ def __init__(self):
+ super(ValueObject, self).__init__()
+ del self.data_object.label
+ del self.label_metadata
+
+ @property
+ def data(self):
+ return self.data_object.value
+
+ @property
+ def properties(self):
+ return dict(mean=numpy.mean(self.data, axis=0),
+ std=numpy.std(self.data, axis=0, ddof=1),
+ median=numpy.median(self.data, axis=0),
+ max=numpy.max(self.data, axis=0),
+ min=numpy.min(self.data, axis=0),
+ shape=self.data.shape,
+ )
+
+
+class LabelObject(AnalyzerResult):
+
+ def __init__(self):
+ super(LabelObject, self).__init__()
+ del self.data_object.value
+
+ @property
+ def data(self):
+ return self.data_object.label
+
+
+class GlobalObject(AnalyzerResult):
+
+ def __init__(self):
+ super(GlobalObject, self).__init__()
+ del self.frame_metadata
+ del self.data_object.time
+ del self.data_object.duration
+
+ @property
+ def time(self):
+ return self.audio_metadata.start
+
+ @property
+ def duration(self):
+ return self.audio_metadata.duration
+
+
+class FramewiseObject(AnalyzerResult):
+
+ def __init__(self):
+ super(FramewiseObject, self).__init__()
+ del self.data_object.time
+ del self.data_object.duration
+
+ @property
+ def time(self):
+ return (self.audio_metadata.start +
+ self.frame_metadata.stepsize /
+ self.frame_metadata.samplerate *
+ numpy.arange(0, len(self)))
+
+ @property
+ def duration(self):
+ return (self.frame_metadata.blocksize / self.frame_metadata.samplerate
+ * numpy.ones(len(self)))
+
+
+class EventObject(AnalyzerResult):
+
+ def __init__(self):
+ super(EventObject, self).__init__()
+ del self.frame_metadata
+ del self.data_object.duration
+
+ @property
+ def time(self):
+ return self.audio_metadata.start + self.data_object.time
+
+ @property
+ def duration(self):
+ return numpy.zeros(len(self))
+
+
+class SegmentObject(EventObject):
+
+ def __init__(self):
+ super(EventObject, self).__init__()
+ del self.frame_metadata
+
+ @property
+ def duration(self):
+ return self.data_object.duration
+
+
+class GlobalValueResult(ValueObject, GlobalObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class GlobalLabelResult(LabelObject, GlobalObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class FrameValueResult(ValueObject, FramewiseObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class FrameLabelResult(LabelObject, FramewiseObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class EventValueResult(ValueObject, EventObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class EventLabelResult(LabelObject, EventObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class SegmentValueResult(ValueObject, SegmentObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+class SegmentLabelResult(LabelObject, SegmentObject):
+ _default_value = deepcopy(LabelObject._default_value)
+
+
+def AnalyzerResultFactory(data_mode='value', time_mode='framewise'):
+ '''
+ Analyzer result Factory function
+ '''
+ if (data_mode, time_mode) == ('value', 'framewise'):
+ result = FrameValueResult()
+
+ elif (data_mode, time_mode) == ('label', 'framewise'):
+ result = FrameLabelResult()
+
+ elif (data_mode, time_mode) == ('value', 'global'):
+ result = GlobalValueResult()
+
+ elif (data_mode, time_mode) == ('label', 'global'):
+ result = GlobalLabelResult()
+
+ elif (data_mode, time_mode) == ('value', 'event'):
+ result = EventValueResult()
+
+ elif (data_mode, time_mode) == ('label', 'event'):
+ result = EventLabelResult()
+
+ elif (data_mode, time_mode) == ('value', 'segment'):
+ result = SegmentValueResult()
+
+ elif (data_mode, time_mode) == ('label', 'segment'):
+ result = SegmentLabelResult()
+
+ else:
+ raise ValueError('Wrong arguments')
+
+ result._time_mode = time_mode
+ result._data_mode = data_mode
+
+ return result
class AnalyzerResultContainer(dict):
'''
>>> import timeside
>>> import os
- >>> ModulePath = os.path.dirname(os.path.realpath(coreA.__file__))
+ >>> ModulePath = os.path.dirname(os.path.realpath(timeside.analyzer.core.__file__))
>>> wavFile = os.path.join(ModulePath , '../../tests/samples/sweep.wav')
>>> d = timeside.decoder.FileDecoder(wavFile, start=1)
>>> (d|a).run() #doctest: +ELLIPSIS
<timeside.core.ProcessPipe object at 0x...>
>>> a.new_result() #doctest: +ELLIPSIS
- AnalyzerResult(data_mode=None, time_mode=None, id_metadata=id_metadata(id='', name='', unit='', description='', date='...', version='...', author='TimeSide'), data=DataObject(value=None, label=array([], dtype=int64), time=array([], dtype=float64), duration=array([], dtype=float64)), audio_metadata=audio_metadata(uri='file:///.../tests/samples/sweep.wav', start=1.0, duration=7.0, channels=None, channelsManagement=''), frame_metadata=FrameMetadata(samplerate=None, blocksize=None, stepsize=None), label_metadata=LabelMetadata(label=None, description=None, label_type='mono'), parameters={})
- >>> resContainer = timeside.analyzer.AnalyzerResultContainer()
+ FrameValueResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='', date='...', version='0.5.1', author='TimeSide'), data_object=DataObject(value=None, label=array([], dtype=int64), time=array([], dtype=float64), duration=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='file:///home/thomas/code/timeside/TimeSide/tests/samples/sweep.wav', start=1.0, duration=7.0, is_segment=True, channels=None, channelsManagement=''), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192), label_metadata=None, parameters={})
+ >>> resContainer = timeside.analyzer.core.AnalyzerResultContainer()
'''
#root = tree.getroot()
root = ET.fromstring(xml_string)
for child in root.iter('result'):
- result = AnalyzerResult()
- results.add(result.from_xml(ET.tostring(child)))
+ results.add(AnalyzerResult.from_xml(ET.tostring(child)))
return results
results = AnalyzerResultContainer()
for res_json in results_json:
- res = AnalyzerResult(data_mode=res_json['data_mode'],
+ res = AnalyzerResultFactory(data_mode=res_json['data_mode'],
time_mode=res_json['time_mode'])
for key in res_json.keys():
if key not in ['data_mode', 'time_mode']:
results_yaml = yaml.load(yaml_str)
results = AnalyzerResultContainer()
for res_yaml in results_yaml:
- res = AnalyzerResult()
+ res = AnalyzerResultFactory(data_mode=res_yaml['data_mode'], time_mode=res_yaml['time_mode'])
for key in res_yaml.keys():
- res[key] = res_yaml[key]
+ if key not in ['data_mode', 'time_mode']:
+ res[key] = res_yaml[key]
results.add(res)
return results
try:
for (group_name, group) in h5_file.items():
- result = AnalyzerResult(data_mode=group.attrs['data_mode'],
+ result = AnalyzerResultFactory(data_mode=group.attrs['data_mode'],
time_mode=group.attrs['time_mode'])
# Read Sub-Group
for subgroup_name, subgroup in group.items():
def unit():
return ""
- def new_result(self, data_mode=AnalyzerResult._default_value['data_mode'],
- time_mode=AnalyzerResult._default_value['time_mode']):
+ def new_result(self, data_mode='value', time_mode='framewise'):
'''
Create a new result
from datetime import datetime
- result = AnalyzerResult(data_mode=data_mode, time_mode=time_mode)
+ result = AnalyzerResultFactory(data_mode=data_mode, time_mode=time_mode)
# Automatically write known metadata
result.id_metadata.date = datetime.now().replace(