def setUp(self):
self.result = AnalyzerResult(data_mode='value',
- time_mode='framewise')
+ time_mode='framewise')
from datetime import datetime
res_date = datetime.now().replace(microsecond=0).isoformat(' ')
id="foo_bar",
name="Foo bar",
unit="foo")
+
self.result.audio_metadata = dict(uri='Foo.wav',
start=0, duration=20,
channels=2)
self.assertEqual(d_numpy, results)
-class TestAnalyzerResultHdf5(TestAnalyzerResultGoodType):
+class LevelAnalyzer(object):
+ def testOnLevelAnaylyzer(self):
+ from timeside.core import get_processor
+ from timeside.core.tools.test_samples import samples
+
+ wav_file = samples['C4_scale.wav']
+ decoder = get_processor('file_decoder')(uri=wav_file)
+ analyzer = get_processor('level')()
+ pipe = (decoder | analyzer)
+ pipe.run()
+ self.result = analyzer.results
+
+
+class TestAnalyzerResultHdf5(TestAnalyzerResultGoodType, LevelAnalyzer):
""" test AnalyzerResult hdf5 serialize """
def tearDown(self):
- results = AnalyzerResultContainer([self.result])
- results.to_hdf5('/tmp/t.h5')
- res_hdf5 = results.from_hdf5('/tmp/t.h5')
+ if isinstance(self.result, AnalyzerResult):
+ results = AnalyzerResultContainer([self.result])
+ elif isinstance(self.result, AnalyzerResultContainer) :
+ results = self.result
+ else:
+ raise(TypeError, "Wrong type for self.result")
+
+ import tempfile
+ h5_file = tempfile.NamedTemporaryFile(suffix='.h5', delete=True)
+ results.to_hdf5(h5_file.name)
+
+ from_results = AnalyzerResultContainer()
+ from_results.from_hdf5(h5_file.name)
+
if verbose:
print '%15s' % 'from hdf5:',
- print res_hdf5
- self.assertEqual(results, res_hdf5)
-
+ print from_results
+ self.assertEqual(results, from_results)
+ h5_file.close()
class TestAnalyzerResultYaml(TestAnalyzerResultGoodType):
""" test AnalyzerResult yaml serialize """
if verbose:
print 'to yaml:'
print r_yaml
- d_yaml = results.from_yaml(r_yaml)
+ from_results = AnalyzerResultContainer()
+ from_results.from_yaml(r_yaml)
if verbose:
print '%15s' % 'from yaml:',
- print d_yaml
- #for i in range(len(d_yaml)):
- for res in results:
- for key in res:
- self.assertEqual(results[res][key],
- d_yaml[res][key])
-
+ print from_results
+ self.assertEqual(type(self.result.data_object.frame_metadata),
+ type(from_results['foo_bar'].data_object.frame_metadata))
class TestAnalyzerResultXml(TestAnalyzerResultGoodType):
""" test AnalyzerResult xml serialize """
print 'to xml:'
print r_xml
- d_xml = results.from_xml(r_xml)
+ from_results = AnalyzerResultContainer()
+ from_results.from_xml(r_xml)
if verbose:
print '%15s' % 'from xml:',
- print d_xml
+ print from_results
#for i in range(len(d_xml)):
- self.assertEqual(d_xml, results)
+ self.assertEqual(results, from_results)
class TestAnalyzerResultJson(TestAnalyzerResultGoodType):
print 'to json:'
print r_json
- d_json = results.from_json(r_json)
+ from_results = AnalyzerResultContainer()
+ from_results.from_json(r_json)
if verbose:
- print d_json
+ print from_results
print '%15s' % 'from json:',
- #for i in range(len(d_json)):
- for res in results:
- for key in res:
- self.assertEqual(results[res][key],
- d_json[res][key])
+ self.assertEqual(results, from_results)
class TestAnalyzerResultAsDict(TestAnalyzerResultGoodType):
from .tools import hdf5
import timeside # import __version__
-from timeside.core import implements, interfacedoc, abstract
+from timeside.core import implements, abstract
from timeside.core.api import IAnalyzer
import numpy as np
#numpy_data_types += [np.ndarray]
-class MetadataObject(object):
+class Parameters(dict):
+
+ def as_dict(self):
+ return self
+
+ def to_xml(self):
+ import xml.etree.ElementTree as ET
+ root = ET.Element('Metadata')
+
+ for key in self.keys():
+ child = ET.SubElement(root, key)
+ child.text = repr(self[key])
+
+ return ET.tostring(root, encoding="utf-8", method="xml")
+
+
+ def from_xml(self, xml_string):
+ import xml.etree.ElementTree as ET
+ import ast
+ root = ET.fromstring(xml_string)
+ for child in root:
+ key = child.tag
+ if child.text:
+ self[key] = ast.literal_eval(child.text)
+
+
+ def to_hdf5(self, h5group):
+ hdf5.dict_to_hdf5(self, h5group)
+
+ def from_hdf5(self, h5group):
+ hdf5.dict_from_hdf5(self, h5group)
+
+ def from_dict(self, dict_obj):
+ for key, value in dict_obj.items():
+ try:
+ self[key].from_dict(value)
+ except AttributeError:
+ self[key] = value
+
+
+class MetadataObject(Parameters):
"""
Object that contains a metadata structure
return not(isinstance(other, self.__class__)
or self.as_dict() != other.as_dict())
- def to_xml(self):
- import xml.etree.ElementTree as ET
- root = ET.Element('Metadata')
-
- for key in self.keys():
- child = ET.SubElement(root, key)
- child.text = repr(getattr(self, key))
-
- return ET.tostring(root, encoding="utf-8", method="xml")
-
- def from_xml(self, xml_string):
- import xml.etree.ElementTree as ET
- import ast
- root = ET.fromstring(xml_string)
- for child in root:
- key = child.tag
- if child.text:
- self[key] = ast.literal_eval(child.text)
-
- def to_hdf5(self, h5group):
- hdf5.dict_to_hdf5(self, h5group)
-
- def from_hdf5(self, h5group):
- hdf5.dict_from_hdf5(self, h5group)
-
class IdMetadata(MetadataObject):
version : str
author : str
proc_uuid : str
- res_uuid : str
'''
# Define default values
('version', None),
('author', None),
('proc_uuid', None),
- ('res_uuid', None)])
+ ])
def __setattr__(self, name, value):
if value is None:
self.__setattr__(key, [])
-class AnalyzerParameters(dict):
-
- def as_dict(self):
- return self
-
- def to_xml(self):
- import xml.etree.ElementTree as ET
- root = ET.Element('Metadata')
-
- for key, value in self.items():
- child = ET.SubElement(root, key)
- child.text = repr(self.get(key))
-
- return ET.tostring(root, encoding="utf-8", method="xml")
-
- def from_xml(self, xml_string):
- import xml.etree.ElementTree as ET
- import ast
- root = ET.fromstring(xml_string)
- for child in root.iter():
- if child.text:
- self.set(child.tag, ast.literal_eval(child.text))
-
- def to_hdf5(self, subgroup):
- hdf5.dict_to_hdf5(self, subgroup)
-
- def from_hdf5(self, h5group):
- hdf5.dict_from_hdf5(self, h5group)
def data_objet_class(data_mode='value', time_mode='framewise'):
- audio_metadata : :class:`AudioMetadata`
- frame_metadata : :class:`FrameMetadata`
- label_metadata : :class:`LabelMetadata`
- - parameters : :class:`AnalyzerParameters` Object
+ - parameters : :class:`Parameters` Object
"""
def __init__(self, data_mode='value', time_mode='framewise'):
super(AnalyzerResult, self).__init__()
-
self._data_mode = data_mode
self._time_mode = time_mode
self.id_metadata = IdMetadata()
self.audio_metadata = AudioMetadata()
- self.parameters = AnalyzerParameters()
+ self.parameters = Parameters()
self.data_object = data_objet_class(data_mode, time_mode)()
# self.label_metadata = LabelMetadata()
def as_dict(self):
return dict([(key, self[key].as_dict())
- for key in self.keys() if hasattr(self[key], 'as_dict')] +
+ for key in self.keys() if hasattr(self[key], 'as_dict')] +
[('data_mode', self.data_mode), ('time_mode', self.time_mode)])
# TODO : check if it can be simplified now
data_mode_child = root.find('data_mode')
time_mode_child = root.find('time_mode')
result = AnalyzerResult(data_mode=data_mode_child.text,
- time_mode=time_mode_child.text)
+ time_mode=time_mode_child.text)
for child in root:
key = child.tag
if key not in ['data_mode', 'time_mode']:
def to_hdf5(self, h5_file):
# Save results in HDF5 Dataset
- group = h5_file.create_group(self.id_metadata.res_uuid)
+ group = h5_file.create_group(self.id)
group.attrs['data_mode'] = self.__getattribute__('data_mode')
group.attrs['time_mode'] = self.__getattribute__('time_mode')
for key in self.keys():
def from_hdf5(h5group):
# Read Sub-Group
result = AnalyzerResult(data_mode=h5group.attrs['data_mode'],
- time_mode=h5group.attrs['time_mode'])
+ time_mode=h5group.attrs['time_mode'])
for subgroup_name, h5subgroup in h5group.items():
result[subgroup_name].from_hdf5(h5subgroup)
return result
>>> a = Analyzer()
>>> (d|a).run()
>>> a.new_result() #doctest: +ELLIPSIS
- AnalyzerResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='...', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=FrameValueObject(value=array([], dtype=float64), y_value=array([], dtype=float64), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192)), audio_metadata=AudioMetadata(uri='.../sweep.mp3', start=0.0, duration=8.0..., is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})
+ AnalyzerResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='...', date='...', version='...', author='TimeSide', proc_uuid='...'), data_object=FrameValueObject(value=array([], dtype=float64), y_value=array([], dtype=float64), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192)), audio_metadata=AudioMetadata(uri='.../sweep.mp3', start=0.0, duration=8.0..., is_segment=False, sha1='...', channels=2, channelsManagement=''), parameters={})
>>> resContainer = timeside.core.analyzer.AnalyzerResultContainer()
'''
if analyzer_results is not None:
self.add(analyzer_results)
- def add(self, analyzer_result):
+ def add(self, analyzer_result, overwrite=False):
+
if isinstance(analyzer_result, list):
for res in analyzer_result:
self.add(res)
return
# Check result
if not isinstance(analyzer_result, AnalyzerResult):
- raise TypeError('only AnalyzerResult can be added')
+ raise TypeError('Only AnalyzerResult can be added')
+
+ if (not analyzer_result.id in self) or overwrite:
+ self[analyzer_result.id] = analyzer_result
+ else:
+ raise ValueError(('Duplicated id in AnalyzerResultContainer: %s '
+ 'Please supply a unique id')
+ % analyzer_result.id)
- # Update result uuid by adding a suffix uuid
- # It enable to deal with multiple results for the same processor uuid
- uuid = analyzer_result.id_metadata.proc_uuid
- count = 0
- for res_uuid in self.keys():
- count += res_uuid.startswith(uuid)
- res_uuid = '-'.join([uuid, format(count, '02x')])
- analyzer_result.id_metadata.res_uuid = res_uuid
- self.__setitem__(res_uuid, analyzer_result)
def get_result_by_id(self, result_id):
if self.list_id().count(result_id) > 1:
else:
return xml_str
- @staticmethod
- def from_xml(xml_string):
+ def from_xml(self, xml_string):
import xml.etree.ElementTree as ET
- results = AnalyzerResultContainer()
# TODO : from file
#tree = ET.parse(xml_file)
#root = tree.getroot()
root = ET.fromstring(xml_string)
for child in root.iter('result'):
- results.add(AnalyzerResult.from_xml(ET.tostring(child)))
-
- return results
+ self.add(AnalyzerResult.from_xml(ET.tostring(child)),
+ overwrite=True)
def to_json(self, output_file=None):
#if data_list == None: data_list = self.results
else:
return json_str
- @staticmethod
- def from_json(json_str):
+ def from_json(self, json_str):
import simplejson as json
# Define Specialize JSON decoder for numpy array
return obj
results_json = json.loads(json_str, object_hook=NumpyArrayDecoder)
- results = AnalyzerResultContainer()
for res_json in results_json:
-
res = AnalyzerResult(data_mode=res_json['data_mode'],
- time_mode=res_json['time_mode'])
+ time_mode=res_json['time_mode'])
+
for key in res_json.keys():
if key not in ['data_mode', 'time_mode']:
- res[key] = res_json[key]
-
- results.add(res)
- return results
+ res[key].from_dict(res_json[key])
+ self.add(res, overwrite=True)
def to_yaml(self, output_file=None):
#if data_list == None: data_list = self.results
else:
return yaml_str
- @staticmethod
- def from_yaml(yaml_str):
+ def from_yaml(self, yaml_str):
import yaml
# Define Specialize Yaml encoder for numpy array
yaml.add_constructor(u'!numpyArray', numpyArray_constructor)
results_yaml = yaml.load(yaml_str)
- results = AnalyzerResultContainer()
for res_yaml in results_yaml:
res = AnalyzerResult(data_mode=res_yaml['data_mode'],
- time_mode=res_yaml['time_mode'])
+ time_mode=res_yaml['time_mode'])
for key in res_yaml.keys():
if key not in ['data_mode', 'time_mode']:
- res[key] = res_yaml[key]
- results.add(res)
- return results
+ res[key].from_dict(res_yaml[key])
+
+ self.add(res, overwrite=True)
def to_numpy(self, output_file=None):
if output_file:
else:
return self
- @staticmethod
- def from_numpy(input_file):
+ def from_numpy(self, input_file):
return np.load(input_file)
def to_hdf5(self, output_file):
for res in self.values():
res.to_hdf5(h5_file)
- @staticmethod
- def from_hdf5(input_file):
+ def from_hdf5(self, input_file):
import h5py
# TODO : enable import for yaafe hdf5 format
# Open HDF5 file for reading and get results
h5_file = h5py.File(input_file, 'r')
- results = AnalyzerResultContainer()
try:
for group in h5_file.values():
result = AnalyzerResult.from_hdf5(group)
- results.add(result)
+ self.add(result, overwrite=True)
except TypeError:
print('TypeError for HDF5 serialization')
finally:
h5_file.close() # Close the HDF5 file
- return results
-
class Analyzer(Processor):
def add_result(self, result):
if not self.uuid() in self.process_pipe.results:
self.process_pipe.results[self.uuid()] = AnalyzerResultContainer()
- self.process_pipe.results[self.uuid()][result.id] = result
+ self.process_pipe.results[self.uuid()].add(result)
@property
def results(self):