#'float128',
'float64',
'float32',
- 'float16',
+ #'float16', Not supported by h5py for version < 2.2
'int64',
'int16',
'int32',
'uint32',
'uint16',
'uint8',
+ 'object_',
+ 'string_',
+ 'longlong',
#'timedelta64',
#'datetime64',
- #'complex128',',
+ #'complex128',
#'complex64',
]
numpy_data_types = map(lambda x: getattr(numpy, x), numpy_data_types)
-numpy_data_types += [numpy.ndarray]
+#numpy_data_types += [numpy.ndarray]
class MetadataObject(object):
(name, self.__class__.__name__))
super(MetadataObject, self).__setattr__(name, value)
+ def __delattr__(self, name):
+ if name in self._default_value.keys():
+ new_default_value = self._default_value.copy()
+ del new_default_value[name]
+ super(MetadataObject, self).__setattr__('_default_value',
+ new_default_value)
+ super(MetadataObject, self).__delattr__(name)
+
def as_dict(self):
return dict((att, getattr(self, att))
- for att in self._default_value.keys())
+ for att in self._default_value.keys())
def keys(self):
- return self.as_dict().keys()
+ return [attr for attr in self._default_value.keys()
+ if hasattr(self, attr)]
def values(self):
- return self.as_dict().values()
+ return [self[attr] for attr in self._default_value.keys()
+ if hasattr(self, attr)]
def items(self):
- return self.as_dict().items()
+ return [(attr, self[attr]) for attr in self._default_value.keys()
+ if hasattr(self, attr)]
+
+ def __getitem__(self, key, default=None):
+ try:
+ return getattr(self, key)
+ except AttributeError:
+ return default
+
+ def __setitem__(self, key, value):
+ setattr(self, key, value)
def __repr__(self):
return '{}({})'.format(
def __eq__(self, other):
return (isinstance(other, self.__class__)
- and self.as_dict() == other.as_dict())
+ and all([self[key] == other[key] for key in self.keys()]))
def __ne__(self, other):
return not(isinstance(other, self.__class__)
import xml.etree.ElementTree as ET
import ast
root = ET.fromstring(xml_string)
- for key in self.keys():
- child = root.find(key)
+ for child in root:
+ key = child.tag
if child.text:
- self.__setattr__(key, ast.literal_eval(child.text))
+ self[key] = ast.literal_eval(child.text)
class IdMetadata(MetadataObject):
Attributes
----------
- data : numpy array or list ?
- dataType : type
- dataMode : str
- dataMode describe the type of the data :
- - 'value' for values
- - 'label' for label data
+ data : numpy array
+ time : numpy array of float
+ duration : numpy array of float
+
'''
from collections import OrderedDict
# Define default values
- _default_value = OrderedDict([('data', None),
- ('dataType', ''),
- ('dataMode', '')])
+ _default_value = OrderedDict([('value', None),
+ ('label', []),
+ ('time', []),
+ ('duration', [])])
def __setattr__(self, name, value):
- # Set Data with the proper type
- if name == 'data':
- if value is None:
- value = []
- # make a numpy.array out of list
- if type(value) is list:
- value = numpy.array(value)
- # serialize using numpy
- if type(value) in numpy_data_types:
- value = value.tolist()
- if type(value) not in [list, str, int, long, float, complex, type(None)] + numpy_data_types:
- raise TypeError('AnalyzerResult can not accept type %s' %
- type(value))
-
- # TODO : guess dataType from value and set datType with:
- #super(AnalyzerData, self).__setattr__('dataType', dataType)
+ if value is not None:
+ # Set Data with the proper type
+ if name == 'value':
+ value = numpy.asarray(value)
+ if value.dtype.type not in numpy_data_types:
+ raise TypeError(
+ 'AnalyzerResult can not accept type %s for %s' %
+ (value.dtype.type, name))
+ if value.shape == ():
+ value.resize((1,))
+
+ elif name == 'label':
+ try:
+ value = numpy.asarray(value, dtype='int')
+ except ValueError:
+ raise TypeError(
+ 'AnalyzerResult can not accept type %s for %s' %
+ (value.dtype.type, name))
+
+ elif name in ['time', 'duration']:
+ try:
+ value = numpy.asfarray(value)
+ except ValueError:
+ raise TypeError(
+ 'AnalyzerResult can not accept type %s for %s' %
+ (value.dtype.type, name))
+ elif name == 'dataType':
+ return
super(AnalyzerData, self).__setattr__(name, value)
+ def __eq__(self, other):
+ try:
+ return (isinstance(other, self.__class__) and
+ all([numpy.array_equal (self[key], other[key])
+ for key in self.keys()]))
+ except AttributeError:
+ #print self
+ #print [self[key] == other[key] for key in self.keys()]
+ return (isinstance(other, self.__class__) and
+ all([bool(numpy.logical_and.reduce((self[key] == other[key]).ravel()))
+ for key in self.keys()]))
+
+ def __ne__(self, other):
+ return not(isinstance(other, self.__class__) or
+ any([numpy.array_equal (self[key], other[key])
+ for key in self.keys()]))
+
+ def to_xml(self):
+ import xml.etree.ElementTree as ET
+ root = ET.Element('Metadata')
+
+ for key in self.keys():
+ child = ET.SubElement(root, key)
+ value = getattr(self, key)
+ if value not in [None, []]:
+ child.text = repr(value.tolist())
+ child.set('dtype',value.dtype.__str__())
+
+ return ET.tostring(root, encoding="utf-8", method="xml")
+
+ def from_xml(self, xml_string):
+ import xml.etree.ElementTree as ET
+ import ast
+ root = ET.fromstring(xml_string)
+ for child in root:
+ key = child.tag
+ if child.text:
+ self[key] = numpy.asarray(ast.literal_eval(child.text),
+ dtype=child.get('dtype'))
+
class AnalyzerParameters(dict):
if child.text:
self.set(child.tag, ast.literal_eval(child.text))
+ def as_dict(self):
+ return self
+
class newAnalyzerResult(MetadataObject):
"""
Attributes
----------
- data : MetadataObject
- idMetadata : MetadataObject
- audioMetadata : MetadataObject
- frameMetadata : MetadataObject
- labelMetadata : MetadataObject
- parameters : dict
+ dataMode : str
+ dataMode describes the type of data :
+ - 'value' for values
+ - 'label' for label data see LabelMetadata
+ timeMode : str
+ timeMode describes the correspondance between data values and time
+ - 'framewise'
+ - 'global'
+ - 'segment'
+ - 'event'
+ data : AnalyzerData
+ idMetadata : IdMetadata
+ audioMetadata : AudioMetadata
+ frameMetadata : FrameMetadata
+ labelMetadata : LabelMetadata
+ parameters : AnalyzerParameters Object
"""
from collections import OrderedDict
# Define default values as an OrderDict
# in order to keep the order of the keys for display
- _default_value = OrderedDict([('idMetadata', None),
+ _default_value = OrderedDict([('dataMode', None),
+ ('timeMode', None),
+ ('idMetadata', None),
('data', None),
('audioMetadata', None),
('frameMetadata', None),
('parameters', None)
])
+ _validDataMode = ['value', 'label', None]
+ _validTimeMode = ['framewise', 'global', 'segment', 'event', None]
+
+ def __init__(self, dataMode=None,
+ timeMode=None):
+ super(newAnalyzerResult, self).__init__()
+ self.dataMode = dataMode
+ self.timeMode = timeMode
+
def __setattr__(self, name, value):
setFuncDict = {'idMetadata': IdMetadata,
'data': AnalyzerData,
setFunc = setFuncDict[name]
if isinstance(value, setFunc):
super(newAnalyzerResult, self).__setattr__(name, value)
+ return
elif isinstance(value, dict):
- super(newAnalyzerResult, self).__setattr__(name, setFunc(**value))
- elif value in [[], None, '']:
+ for (sub_name, sub_value) in value.items():
+ self[name][sub_name] = sub_value
+ #super(newAnalyzerResult, self).__setattr__(name, setFunc(**value))
+ return
+ elif value is None:
super(newAnalyzerResult, self).__setattr__(name, setFunc())
+ return
else:
raise TypeError('Wrong argument')
- elif name == 'parameters':
- if value:
- super(newAnalyzerResult, self).__setattr__(name, value)
+ elif name == 'dataMode':
+ if self[name] is not None:
+ raise AttributeError("The value of attribute ''timeMode'' \\\
+ can not change after setup")
+ if value == 'value':
+ # Initialize data with:
+ # 'value', 'dataType'
+ # Remove labelMetadata
+ del self.labelMetadata
+ del self.data.label
+ elif value == 'label':
+ # Initialize data with:
+ # 'label', 'dataType'
+ del self.data.value
+ # TODO : restore labelMetadata if needed
+ # Initialize labelMetadata
+ pass
+ elif value is None:
+ pass
else:
- super(newAnalyzerResult, self).__setattr__(name, {})
-
- def as_dict(self):
-
- def makeDict(val):
- if isinstance(val, MetadataObject):
- return val.as_dict()
- elif isinstance(val, dict) or val in [None, []]:
- return val
+ raise ValueError('Argument ''dataMode''=%s should be in %s'
+ % (value, self._validDataMode))
+ elif name == 'timeMode':
+ if self[name] is not None:
+ raise AttributeError("The value of attribute ''timeMode'' \\\
+ can not change after setup")
+
+ if value == 'framewise':
+ # Initialize frameMetadata
+ # Remove time and duration from data
+ del self.data.time
+ del self.data.duration
+ pass
+ elif value == 'global':
+ # Remove time and duration from data
+ del self.data.time
+ del self.data.duration
+ # Remove frameMetadata
+ del self.frameMetadata
+
+ pass
+ elif value == 'segment':
+ # Remove frameMetadata
+ del self.frameMetadata
+ elif value == 'event':
+ # Remove frameMetadata
+ del self.frameMetadata
+ # Remove duration from data
+ del self.data.duration
+
+ pass
+ elif value is None:
+ pass
else:
- print val
- raise TypeError('Argument must be a dict or a MetadataObject')
+ raise ValueError('Argument ''timeMode''=%s should be in %s'
+ % (value, self._validTimeMode))
+ super(newAnalyzerResult, self).__setattr__(name, value)
- return dict((att, makeDict(getattr(self, att)))
- for att in self._default_value.keys())
+ def as_dict(self):
+ return dict([(key, self[key].as_dict())
+ for key in self.keys() if hasattr(self[key],'as_dict')]+
+ [('dataMode', self.dataMode), ('timeMode', self.timeMode)])
def to_xml(self):
import xml.etree.ElementTree as ET
root.metadata = {'name': self.idMetadata.name,
'id': self.idMetadata.id}
- for key in self._default_value:
- child = ET.fromstring(getattr(self, key).to_xml())
+ for key in self.keys():
+ if key in ['dataMode', 'timeMode']:
+ child = ET.SubElement(root, key)
+ child.text = str(self[key])
+ else:
+ child = ET.fromstring(self[key].to_xml())
child.tag = key
root.append(child)
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_string)
- result = newAnalyzerResult()
- for key in result.keys():
- child = root.find(key)
- child_string = ET.tostring(child)
- result.__getattribute__(key).from_xml(child_string)
+ dataModeChild = root.find('dataMode')
+ timeModeChild = root.find('timeMode')
+ result = newAnalyzerResult(dataMode=dataModeChild.text,
+ timeMode=timeModeChild.text)
+ for child in root:
+ key = child.tag
+ if key not in ['dataMode', 'timeMode']:
+ child_string = ET.tostring(child)
+ result[key].from_xml(child_string)
return result
def __repr__(self):
return self.to_json()
-
def __eq__(self, other):
-
return (isinstance(other, self.__class__)
and self.as_dict() == other.as_dict())
class AnalyzerResultContainer(object):
+ '''
+ >>> from timeside.decoder import FileDecoder
+ >>> #from timeside.analyzer.core import Analyzer
+ >>> #from timeside.analyzer import AnalyzerResultContainer, newAnalyzerResult
+ >>> wavFile = 'tests/samples/sweep.wav'
+ >>> d = FileDecoder(wavFile, start=1)
+
+ >>> a = Analyzer()
+ >>> (d|a).run() #doctest: +ELLIPSIS
+ <timeside.core.ProcessPipe object at 0x...>
+ >>> a.new_result() #doctest: +ELLIPSIS
+ newAnalyzerResult(dataMode=None, timeMode=None, idMetadata=IdMetadata(id='', name='', unit='', description='', date='...', version='0.4.4', author='TimeSide'), data=AnalyzerData(data=None, time=None, duration=None, dataType=None), audioMetadata=AudioMetadata(uri='file:///home/thomas/code/timeside/TimeSide/tests/samples/sweep.wav', start=1.0, duration=7.0, channels=None, channelsManagement=''), frameMetadata=FrameMetadata(samplerate=None, blocksize=None, stepsize=None), labelMetadata=LabelMetadata(label=None, description=None, labelType='mono'), parameters={})
+ >>> resContainer = AnalyzerResultContainer()
+ '''
def __init__(self, analyzer_results=None):
self.results = []
if analyzer_results is not None:
def __eq__(self, other):
if hasattr(other, 'results'):
other = other.results
-
return self.results == other
def __ne__(self, other):
return not self.__eq__(other)
- def __ne__(self, other):
- return not self.__eq__(other)
-
def add_result(self, analyzer_result):
if type(analyzer_result) == list:
for res in analyzer_result:
self.add_result(res)
return
+ # Check result
if not (isinstance(analyzer_result, AnalyzerResult)
or isinstance(analyzer_result, newAnalyzerResult)):
raise TypeError('only AnalyzerResult can be added')
+
self.results += [analyzer_result]
def to_xml(self, data_list=None):
root = ET.Element('timeside')
for result in data_list:
-
if result:
root.append(ET.fromstring(result.to_xml()))
def from_xml(self, xml_string):
import xml.etree.ElementTree as ET
- import ast
results = AnalyzerResultContainer()
# TODO : from file
return results
-
def to_json(self):
#if data_list == None: data_list = self.results
import simplejson as json
- return json.dumps([res.as_dict() for res in self])
+
+ # Define Specialize JSON encoder for numpy array
+ def NumpyArrayEncoder(obj):
+ if isinstance(obj, numpy.ndarray):
+ return {'numpyArray': obj.tolist(),
+ 'dtype': obj.dtype.__str__()}
+ raise TypeError(repr(obj) + " is not JSON serializable")
+
+ return json.dumps([res.as_dict() for res in self],
+ default=NumpyArrayEncoder)
def from_json(self, json_str):
import simplejson as json
- results_json = json.loads(json_str)
+
+ # Define Specialize JSON decoder for numpy array
+ def NumpyArrayDecoder(obj):
+ if isinstance(obj, dict) and 'numpyArray' in obj:
+ numpy_obj = numpy.asarray(obj['numpyArray'],
+ dtype=obj['dtype'])
+ return numpy_obj
+ else:
+ return obj
+
+ results_json = json.loads(json_str, object_hook=NumpyArrayDecoder)
results = AnalyzerResultContainer()
for res_json in results_json:
- res = newAnalyzerResult()
- res.idMetadata = res_json['idMetadata']
- res.data = res_json['data']
- res.audioMetadata = res_json['audioMetadata']
- res.frameMetadata = res_json['frameMetadata']
- res.labelMetadata = res_json['labelMetadata']
- res.parameters = res_json['parameters']
+ res = newAnalyzerResult(dataMode=res_json['dataMode'],
+ timeMode=res_json['timeMode'])
+ for key in res_json.keys():
+ if key not in ['dataMode', 'timeMode']:
+ res[key] = res_json[key]
results.add_result(res)
return results
def to_yaml(self):
#if data_list == None: data_list = self.results
import yaml
+
+ # Define Specialize Yaml encoder for numpy array
+ def numpyArray_representer(dumper, obj):
+ return dumper.represent_mapping(u'!numpyArray',
+ {'dtype': obj.dtype.__str__(),
+ 'array': obj.tolist()})
+
+ yaml.add_representer(numpy.ndarray, numpyArray_representer)
+
return yaml.dump([res.as_dict() for res in self])
def from_yaml(self, yaml_str):
import yaml
+ # Define Specialize Yaml encoder for numpy array
+ def numpyArray_constructor(loader, node):
+ mapping = loader.construct_mapping(node, deep=True)
+ return numpy.asarray(mapping['array'], dtype=mapping['dtype'])
+
+ yaml.add_constructor(u'!numpyArray', numpyArray_constructor)
+
results_yaml = yaml.load(yaml_str)
results = AnalyzerResultContainer()
for res_yaml in results_yaml:
res = newAnalyzerResult()
- for key in res.keys():
- res.__setattr__(key, res_yaml[key])
+ for key in res_yaml.keys():
+ res[key] = res_yaml[key]
results.add_result(res)
return results
def to_numpy(self, output_file, data_list=None):
if data_list is None:
data_list = self.results
- import numpy
numpy.save(output_file, data_list)
def from_numpy(self, input_file):
- import numpy
return numpy.load(input_file)
def to_hdf5(self, output_file, data_list=None):
import h5py
- # Open HDF5 file and save dataset
- h5_file = h5py.File(output_file, 'w') # overwrite any existing file
- try:
+ # Open HDF5 file and save dataset (overwrite any existing file)
+ with h5py.File(output_file, 'w') as h5_file:
for res in data_list:
# Save results in HDF5 Dataset
group = h5_file.create_group(res.idMetadata.id)
+ group.attrs['dataMode'] = res['dataMode']
+ group.attrs['timeMode'] = res['timeMode']
for key in res.keys():
- if key == 'data':
- dset = group.create_dataset(key,
- data=res.data.data)
- # Save associated metadata
- attrs = res.data.keys()
- attrs.remove('data')
- for name in attrs:
- dset.attrs[name] = res.data.__getattribute__(name)
- else:
+ if key not in ['dataMode', 'timeMode', 'data']:
subgroup = group.create_group(key)
- attrs = res.__getattribute__(key).keys()
+
+ # Write attributes
+ attrs = res[key].keys()
for name in attrs:
- value = res.__getattribute__(key).__getattribute__(name)
- if value:
- subgroup.attrs[name] = res.__getattribute__(key).__getattribute__(name)
- #dset.attrs["name"] = data['name']
- except TypeError:
- raise
- finally:
- h5_file.close() # Close the HDF5 file
+ if res[key][name] is not None:
+ subgroup.attrs[name] = res[key][name]
+
+ # Write Datasets
+ key = 'data'
+ subgroup = group.create_group(key)
+ for dsetName in res[key].keys():
+ if res[key][dsetName] is not None:
+ if res[key][dsetName].dtype == 'object':
+ # Handle numpy type = object as vlen string
+ subgroup.create_dataset(dsetName,
+ data=res[key][dsetName].tolist().__repr__(),
+ dtype=h5py.special_dtype(vlen=str))
+ else:
+ subgroup.create_dataset(dsetName,
+ data=res[key][dsetName])
+
+
def from_hdf5(self, input_file):
import h5py
data_list = AnalyzerResultContainer()
try:
for (group_name, group) in h5_file.items():
- result = newAnalyzerResult()
+
+ result = newAnalyzerResult(dataMode=group.attrs['dataMode'],
+ timeMode=group.attrs['timeMode'])
# Read Sub-Group
for subgroup_name, subgroup in group.items():
+ # Read attributes
+ for name, value in subgroup.attrs.items():
+ result[subgroup_name][name] = value
+
if subgroup_name == 'data':
- dset = subgroup
- # Load value from the hdf5 dataset and store in data
- # FIXME : the following conditional statement is to prevent
- # reading an empty dataset.
- # see : https://github.com/h5py/h5py/issues/281
- # It should be fixed by the next h5py version
- if dset.shape != (0,):
- result.data.data = dset[...]
- else:
- result.data.data = []
- # Load Audio metadata
- for name, value in dset.attrs.items():
- result.data.__setattr__(name, value)
- else:
- # Load Audio metadata
- for name, value in subgroup.attrs.items():
- result.__getattribute__(subgroup_name).__setattr__(name, value)
+ for dsetName, dset in subgroup.items():
+ # Load value from the hdf5 dataset and store in data
+ # FIXME : the following conditional statement is to prevent
+ # reading an empty dataset.
+ # see : https://github.com/h5py/h5py/issues/281
+ # It should be fixed by the next h5py version
+ if dset.shape != (0,):
+ if h5py.check_dtype(vlen=dset.dtype):
+ # to deal with VLEN data used for list of list
+ result[subgroup_name][dsetName] = eval(
+ dset[...].tolist())
+ else:
+ result[subgroup_name][dsetName] = dset[...]
+ else:
+ result[subgroup_name][dsetName] = []
data_list.add_result(result)
except TypeError:
super(Analyzer, self).setup(channels, samplerate,
blocksize, totalframes)
- # Set default values for output_* attributes
+ # Set default values for result_* attributes
# may be overwritten by the analyzer
- self.output_channels = self.input_channels
- self.output_samplerate = self.input_samplerate
- self.output_blocksize = self.input_blocksize
- self.output_stepsize = self.input_blocksize
+ self.result_channels = self.input_channels
+ self.result_samplerate = self.input_samplerate
+ self.result_blocksize = self.input_blocksize
+ self.result_stepsize = self.input_stepsize
def results(self):
container = AnalyzerResultContainer()
def unit():
return ""
- def new_result(self, dataMode='value', resultType='framewise'):
+ def new_result(self, dataMode=newAnalyzerResult._default_value['dataMode'],
+ timeMode=newAnalyzerResult._default_value['timeMode']):
'''
Create a new result
from datetime import datetime
- result = newAnalyzerResult()
+ result = newAnalyzerResult(dataMode=dataMode, timeMode=timeMode)
# Automatically write known metadata
result.idMetadata = IdMetadata(date=datetime.now().replace(microsecond=0).isoformat(' '),
version=TimeSideVersion,
author='TimeSide')
- result.audioMetadata = AudioMetadata(uri=self.mediainfo()['uri'])
+ result.audioMetadata = AudioMetadata(uri=self.mediainfo()['uri'],
+ start=self.mediainfo()['start'],
+ duration=self.mediainfo()['duration'])
- result.data = AnalyzerData(dataMode=dataMode)
+ result.data = AnalyzerData()
if dataMode == 'value':
pass
# raise ArgError('')
pass
- if resultType == 'framewise':
+ if timeMode == 'framewise':
result.frameMetadata = FrameMetadata(
- samplerate=self.output_samplerate,
- blocksize=self.output_blocksize,
- stepsize=self.input_stepsize)
- elif resultType == 'value':
+ samplerate=self.result_samplerate,
+ blocksize=self.result_blocksize,
+ stepsize=self.result_stepsize)
+ elif timeMode == 'global':
# None : handle by data
pass
- elif resultType == 'segment':
+ elif timeMode == 'segment':
# None : handle by data
pass
- elif resultType == 'event':
+ elif timeMode == 'event':
# None : handle by data, duration = 0
pass
else:
# raise ArgError('')
pass
- return result
\ No newline at end of file
+ return result
+
+
+if __name__ == "__main__":
+ import doctest
+ doctest.testmod()