#! /usr/bin/env python
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.dc import MeanDCShift
import os
+
class TestAnalyzerDC(unittest.TestCase):
def setUp(self):
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples", "sweep.wav")
self.expected = {'mean_dc_shift': -0.000}
def testOnGuitar(self):
"runs on guitar"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples", "guitar.wav")
self.expected = {'mean_dc_shift': 0.054}
def tearDown(self):
decoder = FileDecoder(self.source)
(decoder | self.analyzer).run()
results = self.analyzer.results
- for key in self.expected.keys():
- self.assertEquals(results[key].data_object.value, self.expected[key])
+ for result_id in self.expected.keys():
+ result = results.get_result_by_id(result_id)
+ self.assertEquals(result.data_object.value,
+ self.expected[result_id])
if __name__ == '__main__':
unittest.main(testRunner=TestRunner())
#! /usr/bin/env python
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.level import Level
import os
+
class TestAnalyzerLevel(unittest.TestCase):
def setUp(self):
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples", "sweep.wav")
max_level_value = -6.021
rms_level_value = -9.856
- self.expected = {'level.max':max_level_value , 'level.rms':rms_level_value }
+ self.expected = {'level.max': max_level_value,
+ 'level.rms': rms_level_value}
def testOnGuitar(self):
"runs on guitar"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples", "guitar.wav")
max_level_value = -4.054
rms_level_value = -21.945
- self.expected = {'level.max':max_level_value , 'level.rms':rms_level_value }
-
+ self.expected = {'level.max': max_level_value,
+ 'level.rms': rms_level_value}
def tearDown(self):
decoder = FileDecoder(self.source)
(decoder | self.analyzer).run()
results = self.analyzer.results
- for key in self.expected.keys():
- self.assertEquals(results[key].data_object.value, self.expected[key])
+ for result_id in self.expected.keys():
+ result = results.get_result_by_id(result_id)
+ self.assertEquals(result.data_object.value,
+ self.expected[result_id])
#print results
#print results.to_yaml()
#print results.to_json()
from timeside.decoder.file import FileDecoder
from timeside.analyzer.level import Level
from timeside.core import ProcessPipe
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
import os.path
self.assertIsInstance(pipe.frames_stack, list)
- results_on_file = pipe.results['level.rms'].data.copy()
+ results_on_file = pipe.results.get_result_by_id(
+ 'level.rms').data.copy()
# If the pipe is used for a second run, the processed frames stored
# in the stack are passed to the other processors
# to assert that the frames passed to the two analyzers are the same,
# we check that the results of these analyzers are equivalent:
- results_on_stack = pipe.results['level.rms'].data
+ results_on_stack = pipe.results.get_result_by_id('level.rms').data
self.assertEqual(results_on_stack,
results_on_file)
self.assertNotIn(name, _parameters.trait_names())
def test_validate_True(self):
- "Validate parameters format against Traits specification : pass"
+ "Validate parameters with good format"
# Validate from dict
self.assertEqual(self.param_dict,
- self.has_param_cls.validate_parameters(self.param_dict))
+ self.has_param_cls.validate_parameters(
+ self.param_dict))
# Validate from JSON
param_json = json.dumps(self.param_dict)
self.assertEqual(self.param_dict,
self.has_param_cls.validate_parameters(param_json))
def test_validate_False(self):
- "Validate parameters format against Traits specification : reject"
+ "Validate parameters with bad format"
bad_param = {"param1": "", "param2": 0, "param3": 0.0,
"param4": 3.3} # Param4 is a Float (it should be a int)
# Validate from dict
- self.assertRaises(ValueError, self.has_param_cls.validate_parameters, bad_param)
+ self.assertRaises(ValueError, self.has_param_cls.validate_parameters,
+ bad_param)
# Validate from JSON
bad_param_json = json.dumps(bad_param)
self.assertRaises(ValueError, self.has_param_cls.validate_parameters,
>>> a = Analyzer()
>>> (d|a).run()
>>> a.new_result() #doctest: +ELLIPSIS
- FrameValueResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='', date='...', version='...', author='TimeSide', uuid='...'), data_object=DataObject(value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='...', start=0.0, duration=8.0..., is_segment=False, sha1='...', channels=2, channelsManagement=''), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192), parameters={})
+ FrameValueResult(id_metadata=IdMetadata(id='analyzer', name='Generic analyzer', unit='', description='', date='...', version='...', author='TimeSide', proc_uuid='...', res_uuid='...'), data_object=DataObject(value=array([], dtype=float64)), audio_metadata=AudioMetadata(uri='...', start=0.0, duration=8.0..., is_segment=False, sha1='...', channels=2, channelsManagement=''), frame_metadata=FrameMetadata(samplerate=44100, blocksize=8192, stepsize=8192), parameters={})
>>> resContainer = timeside.analyzer.core.AnalyzerResultContainer()
'''
self.__setitem__(res_uuid, analyzer_result)
+ def get_result_by_id(self, result_id):
+ for res in self.values():
+ if res.id_metadata.id == result_id:
+ return res
+
def to_xml(self, output_file=None):
import xml.etree.ElementTree as ET
return frames, eod
def post_process(self):
- mfcc = self.process_pipe.results['yaafe.mfcc']['data_object']['value']
- mfccd1 = self.process_pipe.results[
- 'yaafe.mfccd1']['data_object']['value']
- mfccd2 = self.process_pipe.results[
- 'yaafe.mfccd2']['data_object']['value']
- zcr = self.process_pipe.results['yaafe.zcr']['data_object']['value']
+ yaafe_result = self.process_pipe.results
+ mfcc = yaafe_result.get_result_by_id(
+ 'yaafe.mfcc')['data_object']['value']
+ mfccd1 = yaafe_result.get_result_by_id(
+ 'yaafe.mfccd1')['data_object']['value']
+ mfccd2 = yaafe_result.get_result_by_id(
+ 'yaafe.mfccd2')['data_object']['value']
+ zcr = yaafe_result.get_result_by_id(
+ 'yaafe.zcr')['data_object']['value']
features = np.concatenate((mfcc, mfccd1, mfccd2, zcr), axis=1)
def post_process(self):
#spectrogram = self.parents()[0]['spectrogram_analyzer'].data
- spectrogram = self.process_pipe.results['spectrogram_analyzer'].data
+ results = self.process_pipe.results
+
+ spectrogram = results.get_result_by_id('spectrogram_analyzer').data
#spectrogram = self.pipe._results[self.parents()[0].id]
# Low-pass filtering of the spectrogram amplitude along the time axis
@interfacedoc
def post_process(self):
- parent_result = self.process_pipe.results[self._result_id]
+ pipe_result = self.process_pipe.results
+ parent_result = pipe_result.get_result_by_id(self._result_id)
self.image = parent_result._render_PIL((self.image_width,
self.image_height), self.dpi)