""" test AnalyzerResult """
def setUp(self):
- self.result = AnalyzerResult()
- self.result.metadata=dict(id="foo_bar", name="Foo bar", unit="foo")
+ self.result = newAnalyzerResult()
+ from datetime import datetime
+ self.result.idMetadata = dict(date=datetime.now().replace(microsecond=0).isoformat(' '),
+ version=TimeSideVersion,
+ author='TimeSide',
+ id="foo_bar",
+ name="Foo bar",
+ unit="foo")
+ self.result.audioMetadata = dict(uri='Foo.wav',
+ start=0, duration = 20,
+ channels=2)
+
+ self.result.data = dict(dataMode='value')
+
def testOnFloat(self):
"float result"
- self.result.data = 1.2
+ self.result.data.data = 1.2
def testOnInt(self):
"integer result"
- self.result.data = 1
+ self.result.data.data = 1
def testOnList(self):
"list result"
- self.result.data = [1., 2.]
+ self.result.data.data = [1., 2.]
def testOnString(self):
"string result"
- self.result.data = "hello"
+ self.result.data.data = "hello"
def testOnListOfString(self):
"list of strings result"
- self.result.data = ["hello", "hola"]
+ self.result.data.data = ["hello", "hola"]
def testOnListOfList(self):
"list of lists result"
- self.result.data = [[0, 1], [0, 1, 2]]
+ self.result.data.data = [[0, 1, 3], [0, 1, 2]]
def testOnNumpyVectorOfFloat(self):
"numpy vector of float"
- self.result.data = ones(2, dtype='float') * pi
+ self.result.data.data = ones(2, dtype='float') * pi
def testOnNumpy2DArrayOfFloat64(self):
"numpy 2d array of float64"
- self.result.data = ones([2, 3], dtype='float64') * pi
+ self.result.data.data = ones([2, 3], dtype='float64') * pi
def testOnNumpy3DArrayOfInt32(self):
"numpy 3d array of int32"
- self.result.data = ones([2, 3, 2], dtype='int32') * pi
+ self.result.data.data = ones([2, 3, 2], dtype='int32') * pi
def testOnNumpyArrayOfStrings(self):
"numpy array of strings"
- self.result.data = array(['hello', 'hola'])
+ self.result.data.data = array(['hello', 'hola'])
def testOnEmptyList(self):
"empty list"
- self.result.data = []
+ self.result.data.data = []
def testOnNone(self):
"None"
- self.result.data = None
+ self.result.data.data = None
def testOnUnicode(self):
"None"
- self.result.data = None
+ self.result.data.data = None
def tearDown(self):
pass
def method(self):
"numpy %s" % numpy_data_type
import numpy
- self.result.data = getattr(numpy, numpy_data_type)(pi)
+ self.result.data.data = getattr(numpy, numpy_data_type)(pi)
return method
data = getattr(numpy, numpy_data_type)(pi)
except ValueError:
data = getattr(numpy, numpy_data_type)()
- self.assertRaises(TypeError, self.result.__setattr__, 'data', data)
+ self.assertRaises(TypeError, self.result.data.__setattr__, 'data', data)
return method
for numpy_data_type in good_numpy_data_types:
return dict((att, getattr(self, att))
for att in self._default_value.keys())
+ def keys(self):
+ return self.as_dict().keys()
+
+ def values(self):
+ return self.as_dict().values()
+
+ def items(self):
+ return self.as_dict().items()
+
def __repr__(self):
return '{}({})'.format(
self.__class__.__name__,
return (isinstance(other, self.__class__)
and self.as_dict() == other.as_dict())
+ def __ne__(self, other):
+ return not(isinstance(other, self.__class__)
+ or self.as_dict() != other.as_dict())
+
+ def to_xml(self):
+ import xml.etree.ElementTree as ET
+ root = ET.Element('Metadata')
+
+ for key in self.keys():
+ child = ET.SubElement(root, key)
+ child.text = repr(getattr(self, key))
+
+ return ET.tostring(root, encoding="utf-8", method="xml")
+
+ def from_xml(self, xml_string):
+ import xml.etree.ElementTree as ET
+ import ast
+ root = ET.fromstring(xml_string)
+ for key in self.keys():
+ child = root.find(key)
+ if child.text:
+ self.__setattr__(key, ast.literal_eval(child.text))
+
class IdMetadata(MetadataObject):
'''
from collections import OrderedDict
# Define default values
_default_value = OrderedDict([('uri', ''),
- ('start', None),
+ ('start', 0),
('duration', None),
('channels', None),
('channelsManagement', '')])
('blocksize', None),
('stepsize', None)])
-class AnalyserData(MetadataObject):
+
+class AnalyzerData(MetadataObject):
'''
Metadata object to handle Frame related Metadata
type(value))
# TODO : guess dataType from value and set datType with:
- #super(AnalyserData, self).__setattr__('dataType', dataType)
+ #super(AnalyzerData, self).__setattr__('dataType', dataType)
+
+ super(AnalyzerData, self).__setattr__(name, value)
+
+
+class AnalyzerParameters(dict):
- super(AnalyserData, self).__setattr__(name, value)
+ def to_xml(self):
+ import xml.etree.ElementTree as ET
+ root = ET.Element('Metadata')
+
+ for key, value in self.items():
+ child = ET.SubElement(root, key)
+ child.text = repr(self.get(key))
+
+ return ET.tostring(root, encoding="utf-8", method="xml")
+
+ def from_xml(self, xml_string):
+ import xml.etree.ElementTree as ET
+ import ast
+ root = ET.fromstring(xml_string)
+ for child in root.iter():
+ if child.text:
+ self.set(child.tag, ast.literal_eval(child.text))
class newAnalyzerResult(MetadataObject):
('audioMetadata', None),
('frameMetadata', None),
('labelMetadata', None),
- ('parameters', {})
+ ('parameters', None)
])
+ def __setattr__(self, name, value):
+ setFuncDict = {'idMetadata': IdMetadata,
+ 'data': AnalyzerData,
+ 'audioMetadata': AudioMetadata,
+ 'frameMetadata': FrameMetadata,
+ 'labelMetadata': LabelMetadata,
+ 'parameters': AnalyzerParameters}
+
+ if name in setFuncDict.keys():
+ setFunc = setFuncDict[name]
+ if isinstance(value, setFunc):
+ super(newAnalyzerResult, self).__setattr__(name, value)
+ elif isinstance(value, dict):
+ super(newAnalyzerResult, self).__setattr__(name, setFunc(**value))
+ elif value in [[], None, '']:
+ super(newAnalyzerResult, self).__setattr__(name, setFunc())
+ else:
+ raise TypeError('Wrong argument')
+ elif name == 'parameters':
+ if value:
+ super(newAnalyzerResult, self).__setattr__(name, value)
+ else:
+ super(newAnalyzerResult, self).__setattr__(name, {})
+
+ def as_dict(self):
+
+ def makeDict(val):
+ if isinstance(val, MetadataObject):
+ return val.as_dict()
+ elif isinstance(val, dict) or val in [None, []]:
+ return val
+ else:
+ print val
+ raise TypeError('Argument must be a dict or a MetadataObject')
+
+ return dict((att, makeDict(getattr(self, att)))
+ for att in self._default_value.keys())
+
+ def to_xml(self):
+ import xml.etree.ElementTree as ET
+ root = ET.Element('result')
+ root.metadata = {'name': self.idMetadata.name,
+ 'id': self.idMetadata.id}
+
+ for key in self._default_value:
+ child = ET.fromstring(getattr(self, key).to_xml())
+ child.tag = key
+ root.append(child)
+
+ return ET.tostring(root, encoding="utf-8", method="xml")
+
+ def from_xml(self, xml_string):
+ import xml.etree.ElementTree as ET
+ root = ET.fromstring(xml_string)
+
+ result = newAnalyzerResult()
+ for key in result.keys():
+ child = root.find(key)
+ child_string = ET.tostring(child)
+ result.__getattribute__(key).from_xml(child_string)
+
+ return result
+
class AnalyzerMetadata(MetadataObject):
"""
def __repr__(self):
return self.to_json()
- def __eq__(self,other):
+ def __eq__(self, other):
return (isinstance(other, self.__class__)
and self.as_dict() == other.as_dict())
def __ne__(self, other):
return not self.__eq__(other)
+
class AnalyzerResultContainer(object):
def __init__(self, analyzer_results=None):
def __eq__(self, other):
if hasattr(other, 'results'):
other = other.results
- for a, b in zip(self.results, other):
- if a != b:
- return False
- return True
+
+ return self.results == other
def __ne__(self, other):
return not self.__eq__(other)
root = ET.Element('timeside')
for result in data_list:
- res_node = ET.SubElement(root, 'result')
- res_node.metadata = {'name': result.metadata.name,
- 'id': result.metadata.id}
- # Serialize Data
- data_node = ET.SubElement(res_node, 'data')
- if type(result.data) in [str, unicode]:
- data_node.text = result.data
- else:
- data_node.text = repr(result.data)
- # Serialize Metadata
- metadata_node = ET.SubElement(res_node, 'metadata')
- for (name, val) in result.metadata.as_dict().items():
- # TODO reorder keys
- child = ET.SubElement(metadata_node, name)
- if name == 'parameters':
- for (par_key, par_val) in val.items():
- par_child = ET.SubElement(child, par_key)
- par_child.text = repr(par_val)
- else:
- child.text = repr(val)
-
- #tree = ET.ElementTree(root)
- return ET.tostring(root, encoding="utf-8", method="xml")
+ if result:
+ root.append(ET.fromstring(result.to_xml()))
+ return ET.tostring(root, encoding="utf-8", method="xml")
def from_xml(self, xml_string):
import xml.etree.ElementTree as ET
#tree = ET.parse(xml_file)
#root = tree.getroot()
root = ET.fromstring(xml_string)
- for result_child in root.iter('result'):
- result = AnalyzerResult()
- # Get data
- try:
- result.data = ast.literal_eval(result_child.find('data').text)
- except:
- result.data = result_child.find('data').text
-
- # Get metadata
- for attr_child in result_child.find('metadata'):
- name = attr_child.tag
- if name == 'parameters':
- parameters = dict()
- for param_child in attr_child:
- par_key = param_child.tag
- par_val = param_child.text
- parameters[par_key] = ast.literal_eval(par_val)
- value = parameters
- else:
- value = ast.literal_eval(attr_child.text)
- result.metadata.__setattr__(name, value)
- results.add_result(result)
+ for child in root.iter('result'):
+ result = newAnalyzerResult()
+ results.add_result(result.from_xml(ET.tostring(child)))
return results
results_json = json.loads(json_str)
results = AnalyzerResultContainer()
for res_json in results_json:
- res = AnalyzerResult(data=res_json['data'],
- metadata=res_json['metadata'])
+ res = newAnalyzerResult()
+ res.idMetadata = res_json['idMetadata']
+ res.data = res_json['data']
+ res.audioMetadata = res_json['audioMetadata']
+ res.frameMetadata = res_json['frameMetadata']
+ res.labelMetadata = res_json['labelMetadata']
+ res.parameters = res_json['parameters']
+
results.add_result(res)
return results
results_yaml = yaml.load(yaml_str)
results = AnalyzerResultContainer()
for res_yaml in results_yaml:
- res = AnalyzerResult(data=res_yaml['data'],
- metadata=res_yaml['metadata'])
+ res = newAnalyzerResult()
+ for key in res.keys():
+ res.__setattr__(key, res_yaml[key])
results.add_result(res)
return results
import h5py
# Open HDF5 file and save dataset
- # TODO : Check self.results format
- # as it asumes 'id', 'name', 'value' and 'units' keys
h5_file = h5py.File(output_file, 'w') # overwrite any existing file
try:
- for data in data_list:
+ for res in data_list:
# Save results in HDF5 Dataset
- dset = h5_file.create_dataset(data['id'], data=data['value'])
- # Save associated metadata
- dset.attrs["unit"] = data['unit']
- dset.attrs["name"] = data['name']
+ group = h5_file.create_group(res.idMetadata.id)
+ for key in res.keys():
+ if key == 'data':
+ dset = group.create_dataset(key,
+ data=res.data.data)
+ # Save associated metadata
+ attrs = res.data.keys()
+ attrs.remove('data')
+ for name in attrs:
+ dset.attrs[name] = res.data.__getattribute__(name)
+ else:
+ subgroup = group.create_group(key)
+ attrs = res.__getattribute__(key).keys()
+ for name in attrs:
+ value = res.__getattribute__(key).__getattribute__(name)
+ if value:
+ subgroup.attrs[name] = res.__getattribute__(key).__getattribute__(name)
+ #dset.attrs["name"] = data['name']
except TypeError:
- pass
+ raise
finally:
h5_file.close() # Close the HDF5 file
def from_hdf5(self, input_file):
import h5py
+ # TODO : enable import for yaafe hdf5 format
# Open HDF5 file for reading and get results
h5_file = h5py.File(input_file, 'r')
data_list = AnalyzerResultContainer()
try:
- for name in h5_file.keys():
- dset = h5_file.get(name) # Read Dataset
- id = name
- # Read metadata
- unit = dset.attrs['unit']
- name = dset.attrs['name']
- # Create new AnalyzerResult
- data = AnalyzerResult(id=id, name=name, unit=unit)
-
- # Load value from the hdf5 dataset and store in data
- # FIXME : the following conditional statement is to prevent
- # reading an empty dataset.
- # see : https://github.com/h5py/h5py/issues/281
- # It should be fixed by the next h5py version
- if dset.shape != (0,):
- data.value = dset[...]
- else:
- data.value = []
-
- # TODO : enable import from yaafe hdf5 format
- #for attr_name in dset.attrs.keys():
- # data[attr_name] = dset.attrs[attr_name]
-
- data_list.add_result(data)
+ for (group_name, group) in h5_file.items():
+ result = newAnalyzerResult()
+ # Read Sub-Group
+ for subgroup_name, subgroup in group.items():
+ if subgroup_name == 'data':
+ dset = subgroup
+ # Load value from the hdf5 dataset and store in data
+ # FIXME : the following conditional statement is to prevent
+ # reading an empty dataset.
+ # see : https://github.com/h5py/h5py/issues/281
+ # It should be fixed by the next h5py version
+ if dset.shape != (0,):
+ result.data.data = dset[...]
+ else:
+ result.data.data = []
+ # Load Audio metadata
+ for name, value in dset.attrs.items():
+ result.data.__setattr__(name, value)
+ else:
+ # Load Audio metadata
+ for name, value in subgroup.attrs.items():
+ result.__getattribute__(subgroup_name).__setattr__(name, value)
+
+ data_list.add_result(result)
except TypeError:
print('TypeError for HDF5 serialization')
finally:
implements(IAnalyzer)
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(Analyzer, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(Analyzer, self).setup(channels, samplerate,
+ blocksize, totalframes)
# Set default values for output_* attributes
# may be overwritten by the analyzer
- self.output_channels = self.input_channels
- self.output_samplerate = self.input_samplerate
- self.output_blocksize = self.input_blocksize
- self.output_stepsize = self.input_blocksize
+ self.output_channels = self.input_channels
+ self.output_samplerate = self.input_samplerate
+ self.output_blocksize = self.input_blocksize
+ self.output_stepsize = self.input_blocksize
def results(self):
container = AnalyzerResultContainer()
author='TimeSide')
result.audioMetadata = AudioMetadata(uri=self.mediainfo()['uri'])
- result.data = AnalyserData(dataMode=dataMode)
+ result.data = AnalyzerData(dataMode=dataMode)
if dataMode == 'value':
pass