Setup an arbitrary analyzer to check that decoding process from file and from stack are equivalent:
->>> pitch_on_file = get_processor('aubio_pitch')()
->>> pipe = (decoder | pitch_on_file)
+>>> pitch = get_processor('aubio_pitch')()
+>>> pipe = (decoder | pitch)
>>> print pipe.processors #doctest: +ELLIPSIS
-[<timeside.decoder.file.FileDecoder object at 0x...>, <timeside.analyzer.aubio.aubio_pitch.AubioPitch object at 0x...>]
+[gst_dec-{}, aubio_pitch-{}]
-After the pipe has been run, the other processes of the pipe are removed from the pipe and only the :class:`FileDecoder <timeside.decoder.file.FileDecoder>` is kept :
+
+Run the pipe:
>>> pipe.run()
->>> print pipe.processors #doctest: +ELLIPSIS
-[<timeside.decoder.file.FileDecoder object at 0x...>]
The processed frames are stored in the pipe attribute `frames_stack` as a list of frames :
(array([[...]], dtype=float32), True)
If the pipe is used for a second run, the processed frames stored in the stack are passed to the other processors without decoding the audio source again.
-Let's define a second analyzer equivalent to the previous one:
-
->>> pitch_on_stack = get_processor('aubio_pitch')()
-
-Add it to the pipe:
-
->>> pipe |= pitch_on_stack
->>> print pipe.processors #doctest: +ELLIPSIS
-[<timeside.decoder.file.FileDecoder object at 0x...>, <timeside.analyzer.aubio.aubio_pitch.AubioPitch object at 0x...>]
-
-And run the pipe:
-
->>> pipe.run()
-
-Assert that the frames passed to the two analyzers are the same, we check that the results of these analyzers are equivalent:
-
->>> np.allclose(pitch_on_file.results.get_result_by_id('aubio_pitch.pitch').data,
-... pitch_on_stack.results.get_result_by_id('aubio_pitch.pitch').data)
-True
-
(decoder | self.analyzer).run()
results = self.analyzer.results
for result_id in self.expected.keys():
- result = results.get_result_by_id(result_id)
+ result = results[result_id]
self.assertEquals(result.data_object.value,
self.expected[result_id])
(decoder | self.analyzer).run()
results = self.analyzer.results
for result_id in self.expected.keys():
- result = results.get_result_by_id(result_id)
+ result = results[result_id]
self.assertEquals(result.data_object.value,
self.expected[result_id])
#print results
"""Internal function that test if there is NaN in the results
of a given analyzer"""
- pipe = (self.decoder | analyzer_cls())
+ analyzer = analyzer_cls()
+ pipe = (self.decoder | analyzer)
pipe.run()
- for key, result in pipe.results.items():
+ for key, result in analyzer.results.items():
if 'value' in result.data_object.keys():
# Test for NaN
self.assertFalse(np.any(np.isnan(result.data)),
"""Internal function that test if there is NaN in the results
of a given analyzer"""
- pipe = (self.decoder | analyzer_cls())
+ analyzer = analyzer_cls()
+ pipe = (self.decoder | analyzer)
pipe.run()
- for key, result in pipe.results.items():
+ for key, result in analyzer.results.items():
if 'value' in result.data_object.keys():
# Test for NaN
self.assertFalse(np.any(np.isnan(result.data)),
start=self.start,
duration=self.duration,
stack=True)
- level_on_file = Level()
- pipe = (decoder | level_on_file)
+ level = Level()
+ pipe = (decoder | level)
pipe.run()
self.assertIsInstance(pipe.frames_stack, list)
- results_on_file = pipe.results.get_result_by_id(
- 'level.rms').data.copy()
+ results_on_file = level.results['level.rms'].data.copy()
# If the pipe is used for a second run, the processed frames stored
# in the stack are passed to the other processors
# without decoding the audio source again.
- #Let's define a second analyzer equivalent to the previous one:
- # Remove level_on_file from pipe
- pipe.processors.pop()
- level_on_stack = Level()
- pipe |= level_on_stack
+ pipe.results = {} # to be sure the previous results are deleted
pipe.run()
# to assert that the frames passed to the two analyzers are the same,
# we check that the results of these analyzers are equivalent:
- results_on_stack = pipe.results.get_result_by_id('level.rms').data
+ results_on_stack = level.results['level.rms'].data
self.assertEqual(results_on_stack,
results_on_file)
melenergy.parameters = dict(n_filters=self.n_filters,
n_coeffs=self.n_coeffs)
melenergy.data_object.value = self.melenergy_results
- self.process_pipe.results.add(melenergy)
+ self.add_result(melenergy)
mfcc.parameters = dict(n_filters=self.n_filters,
n_coeffs=self.n_coeffs)
mfcc.data_object.value = self.mfcc_results
- self.process_pipe.results.add(mfcc)
+ self.add_result(mfcc)
pitch.id_metadata.name += ' ' + "pitch"
pitch.id_metadata.unit = "Hz"
pitch.data_object.value = self.pitches
- self.process_pipe.results.add(pitch)
+ self.add_result(pitch)
pitch_confidence = self.new_result(
data_mode='value', time_mode='framewise')
pitch_confidence.id_metadata.name += ' ' + "pitch confidence"
pitch_confidence.id_metadata.unit = None
pitch_confidence.data_object.value = self.pitch_confidences
- self.process_pipe.results.add(pitch_confidence)
+ self.add_result(pitch_confidence)
res_specdesc.id_metadata.name = ' ' + method
res_specdesc.data_object.value = self.specdesc_results[method]
- self.process_pipe.results.add(res_specdesc)
+ self.add_result(res_specdesc)
onsets.data_object.label = numpy.ones(len(self.onsets))
onsets.data_object.label_metadata.label = {1: 'Onset'}
- self.process_pipe.results.add(onsets)
+ self.add_result(onsets)
#---------------------------------
# Onset Rate: Segment (time, duration, value)
onsetrate.data_object.value = []
onsetrate.data_object.time = []
- self.process_pipe.results.add(onsetrate)
+ self.add_result(onsetrate)
#---------------------------------
# Beats: Event (time, "Beat")
beats.data_object.label = numpy.ones(len(self.beats))
beats.data_object.label_metadata.label = {1: 'Beat'}
- self.process_pipe.results.add(beats)
+ self.add_result(beats)
#---------------------------------
# Beat confidences: Event (time, value)
beat_confidences.data_object.time = self.beats
beat_confidences.data_object.value = self.beat_confidences
- self.process_pipe.results.add(beat_confidences)
+ self.add_result(beat_confidences)
#---------------------------------
# BPM: Segment (time, duration, value)
else:
bpm.data_object.value = []
- self.process_pipe.results.add(bpm)
+ self.add_result(bpm)
self.result_blocksize = self.input_blocksize
self.result_stepsize = self.input_stepsize
+ def add_result(self, result):
+ if not self.uuid() in self.process_pipe.results:
+ self.process_pipe.results[self.uuid()] = AnalyzerResultContainer()
+ self.process_pipe.results[self.uuid()][result.id] = result
+
@property
def results(self):
- return AnalyzerResultContainer(
- [self.process_pipe.results[key]
- for key in self.process_pipe.results.keys()
- if key.startswith(self.uuid())])
+ return self.process_pipe.results[self.uuid()]
@staticmethod
def id():
dc_result = self.new_result(data_mode='value', time_mode='global')
dc_result.data_object.value = numpy.round(
numpy.mean(100 * self.values), 3)
- self.process_pipe.results.add(dc_result)
+ self.add_result(dc_result)
self._aubio_pitch_analyzer = AubioPitch(blocksize_s=self.wLen,
stepsize_s=self.wStep)
- self.parents.append(self._aubio_pitch_analyzer)
+ self.parents['aubio_pitch'] = self._aubio_pitch_analyzer
@interfacedoc
def setup(self, channels=None, samplerate=None,
'''
aubio_res_id = 'aubio_pitch.pitch_confidence'
- pipe_results = self.process_pipe.results
- pitch_confidences = pipe_results.get_result_by_id(aubio_res_id).data
+ aubio_uuid = self.parents['aubio_pitch'].uuid()
+ aubio_results = self.process_pipe.results[aubio_uuid]
+
+ pitch_confidences = aubio_results[aubio_res_id].data
nb_frameDecision = int(self.decisionLen / self.wStep)
epsilon = numpy.spacing(pitch_confidences[0])
conf.id_metadata.name += ' ' + 'Yin Confidence'
conf.data_object.value = pitch_confidences
- self.process_pipe.results.add(conf)
+ self.add_result(conf)
convert = {False: 0, True: 1}
label = {0: 'Poly', 1: 'Mono'}
segs.data_object.duration = [(float(s[1] - s[0]+1) * self.decisionLen)
for s in segList]
- self.process_pipe.results.add(segs)
+ self.add_result(segs)
return
def monoLikelihood(self, m, v):
for s in segsList]
segs.data_object.duration = [(float(s[1] - s[0]) * step)
for s in segsList]
- self.process_pipe.results.add(segs)
+ self.add_result(segs)
def release(self):
self._buffer.close()
modEnergy.data_object.value = conf
- self.process_pipe.results.add(modEnergy)
+ self.add_result(modEnergy)
# Segment
convert = {False: 0, True: 1}
self.samplerate())
for s in segList]
- self.process_pipe.results.add(segs)
+ self.add_result(segs)
# Median filter on decision
segs = self.new_result(data_mode='label', time_mode='segment')
self.samplerate())
for s in segList_filt]
- self.process_pipe.results.add(segs)
+ self.add_result(segs)
conf.id_metadata.name += ' ' + 'Confidence'
conf.data_object.value = confEntropy
- self.process_pipe.results.add(conf)
+ self.add_result(conf)
# Binary Entropy
binaryEntropy = modulentropy > self.threshold
self.samplerate())
for s in segList]
- self.process_pipe.results.add(segs)
+ self.add_result(segs)
return
max_level.data_object.value = np.round(
20 * np.log10(self.max_value), 3)
- self.process_pipe.results.add(max_level)
+ self.add_result(max_level)
# RMS level
rms_level = self.new_result(data_mode='value', time_mode='global')
rms_val = MACHINE_EPSILON
rms_level.data_object.value = np.round(20 * np.log10(rms_val), 3)
- self.process_pipe.results.add(rms_level)
+ self.add_result(rms_level)
'mfccd2: MFCC CepsIgnoreFirstCoeff=0 blockSize=1024 stepSize=256 > Derivate DOrder=2')
spec.addFeature('zcr: ZCR blockSize=1024 stepSize=256')
parent_analyzer = get_processor('yaafe')(spec)
- self.parents.append(parent_analyzer)
+ self.parents['yaafe'] = parent_analyzer
# informative parameters
# these are not really taken into account by the system
return frames, eod
def post_process(self):
- yaafe_result = self.process_pipe.results
- mfcc = yaafe_result.get_result_by_id(
- 'yaafe.mfcc')['data_object']['value']
- mfccd1 = yaafe_result.get_result_by_id(
- 'yaafe.mfccd1')['data_object']['value']
- mfccd2 = yaafe_result.get_result_by_id(
- 'yaafe.mfccd2')['data_object']['value']
- zcr = yaafe_result.get_result_by_id(
- 'yaafe.zcr')['data_object']['value']
+ yaafe_result = self.process_pipe.results[self.parents['yaafe'].uuid()]
+ mfcc = yaafe_result['yaafe.mfcc']['data_object']['value']
+ mfccd1 = yaafe_result['yaafe.mfccd1']['data_object']['value']
+ mfccd2 = yaafe_result['yaafe.mfccd2']['data_object']['value']
+ zcr = yaafe_result['yaafe.zcr']['data_object']['value']
features = np.concatenate((mfcc, mfccd1, mfccd2, zcr), axis=1)
sad_result.id_metadata.name += ' ' + \
'Speech Activity Detection Log Likelihood Difference'
sad_result.data_object.value = res
- self.process_pipe.results.add(sad_result)
+ self.add_result(sad_result)
else:
self.input_stepsize = blocksize / 2
- self.parents.append(Spectrogram(blocksize=self.input_blocksize,
- stepsize=self.input_stepsize))
+ self.parents['spectrogram'] = Spectrogram(
+ blocksize=self.input_blocksize,
+ stepsize=self.input_stepsize)
@interfacedoc
def setup(self, channels=None, samplerate=None,
#spectrogram = self.parents()[0]['spectrogram_analyzer'].data
results = self.process_pipe.results
-
- spectrogram = results.get_result_by_id('spectrogram_analyzer').data
+ parent_uuid = self.parents['spectrogram'].uuid()
+ spectrogram = results[parent_uuid]['spectrogram_analyzer'].data
#spectrogram = self.pipe._results[self.parents()[0].id]
# Low-pass filtering of the spectrogram amplitude along the time axis
odf = self.new_result(data_mode='value', time_mode='framewise')
#odf.parameters = {'FFT_SIZE': self.FFT_SIZE}
odf.data_object.value = odf_diff
- self.process_pipe.results.add(odf)
+ self.add_result(odf)
# Define Parameters
class _Param(HasTraits):
FFT_SIZE = Int()
+ input_blocksize = Int()
+ input_stepsize = Int()
def __init__(self, blocksize=2048, stepsize=None, fft_size=None):
super(Spectrogram, self).__init__()
spectrogram.data_object.y_value = (np.arange(0, nb_freq) *
self.samplerate() / self.FFT_SIZE)
- self.process_pipe.results.add(spectrogram)
+ self.add_result(spectrogram)
plugin_res.id_metadata.name += ' ' + \
' '.join(plugin_line[1:])
- self.process_pipe.results.add(plugin_res)
+ self.add_result(plugin_res)
@staticmethod
def vamp_plugin(plugin, wavfile):
def post_process(self):
waveform = self.new_result(data_mode='value', time_mode='framewise')
waveform.data_object.value = np.vstack(self.values)
- self.process_pipe.results.add(waveform)
+ self.add_result(waveform)
# Store results in Container
if len(result.data_object.value):
- self.process_pipe.results.add(result)
+ self.add_result(result)
Attributes:
- parents : List of parent Processors that must be processed
+ parents : Dictionnary of parent Processors that must be processed
before the current Processor
pipe : The ProcessPipe in which the Processor will run
"""
def __init__(self):
super(Processor, self).__init__()
- self.parents = []
+ self.parents = {}
self.source_mediainfo = None
self.process_pipe = None
self.UUID = uuid.uuid4()
self.get_parameters() == other.get_parameters())
def __repr__(self):
- return self.id() + '\n' + self.get_parameters()
+ return '-'.join([self.id(), self.get_parameters()])
class FixedSizeInputAdapter(object):
Attributes:
processor: List of all processors in the Process pipe
- results : Results Container for all the analyzers of the Pipe process
+ results : Dictionnary of Results Container from all the analyzers
+ in the Pipe process
"""
def __init__(self, *others):
self |= others
- from timeside.analyzer.core import AnalyzerResultContainer
- self.results = AnalyzerResultContainer()
+ self.results = {}
def append_processor(self, proc, source_proc=None):
"Append a new processor to the pipe"
type='audio_source')
proc.process_pipe = self
# Add an edge between each parent and proc
- for parent in proc.parents:
+ for parent in proc.parents.values():
self._graph.add_edge(parent.uuid(), proc.uuid(),
type='data_source')
def __ior__(self, other):
if isinstance(other, Processor):
- for parent in other.parents:
+ for parent in other.parents.values():
self |= parent
self.append_processor(other)
@interfacedoc
def post_process(self):
pipe_result = self.process_pipe.results
- parent_result = pipe_result.get_result_by_id(self._result_id)
+ parent_uuid = self.parents['analyzer'].uuid()
+ parent_result = pipe_result[parent_uuid][self._result_id]
fg_image = parent_result._render_PIL((self.image_width,
self.image_height), self.dpi)
if self._background:
- bg_result = pipe_result.get_result_by_id(self._bg_id)
+ bg_uuid = self.parents['bg_analyzer'].uuid()
+ bg_result = pipe_result[bg_uuid][self._bg_id]
bg_image = bg_result._render_PIL((self.image_width,
self.image_height), self.dpi)
# convert image to grayscale
self._background = True
bg_analyzer = get_processor('waveform_analyzer')()
self._bg_id = bg_analyzer.id()
- self.parents.append(bg_analyzer)
+ self.parents['bg_analyzer'] = bg_analyzer
elif background == 'spectrogram':
self._background = True
bg_analyzer = get_processor('spectrogram_analyzer')()
self._bg_id = bg_analyzer.id()
- self.parents.append(bg_analyzer)
+ self.parents['bg_analyzer'] = bg_analyzer
else:
self._background = None
- self.parents.append(analyzer(**analyzer_parameters))
+ parent_analyzer = analyzer(**analyzer_parameters)
+ self.parents['analyzer'] = parent_analyzer
# TODO : make it generic when analyzer will be "atomize"
+ self._parent_uuid = parent_analyzer.uuid()
self._result_id = result_id
@staticmethod