from vamp_plugin import VampSimpleHost
from irit_speech_entropy import IRITSpeechEntropy
from irit_speech_4hz import IRITSpeech4Hz
+from odf import OnsetDetectionFunction
implements(IAnalyzer)
def __init__(self):
+ super(AubioMelEnergy, self).__init__()
self.input_blocksize = 1024
self.input_stepsize = self.input_blocksize / 4
melenergy.parameters = dict(n_filters=self.n_filters,
n_coeffs=self.n_coeffs)
melenergy.data_object.value = self.melenergy_results
- self._results.add(melenergy)
+ self.pipe.results.add(melenergy)
implements(IAnalyzer)
def __init__(self):
+ super(AubioMfcc, self).__init__()
self.input_blocksize = 1024
self.input_stepsize = self.input_blocksize / 4
mfcc.parameters = dict(n_filters=self.n_filters,
n_coeffs=self.n_coeffs)
mfcc.data_object.value = self.mfcc_results
- self._results.add(mfcc)
+ self.pipe.results.add(mfcc)
implements(IAnalyzer) # TODO check if needed with inheritance
def __init__(self):
+ super(AubioPitch, self).__init__()
self.input_blocksize = 2048
self.input_stepsize = self.input_blocksize / 2
# setup
pitch.data_object.value = self.pitches
- self._results.add(pitch)
+ self.pipe.results.add(pitch)
implements(IAnalyzer)
def __init__(self):
+ super(AubioSpecdesc, self).__init__()
self.input_blocksize = 1024
self.input_stepsize = self.input_blocksize / 4
res_specdesc.id_metadata.name = ' ' + method
res_specdesc.data_object.value = self.specdesc_results[method]
- self._results.add(res_specdesc)
+ self.pipe.results.add(res_specdesc)
implements(IAnalyzer)
def __init__(self):
+ super(AubioTemporal, self).__init__()
self.input_blocksize = 1024
self.input_stepsize = 256
onsets.data_object.time = self.onsets
onsets.label_metadata.label = {1: 'Onset'}
- self._results.add(onsets)
+ self.pipe.results.add(onsets)
#---------------------------------
# Onset Rate
else:
onsetrate.data_object.value = []
- self._results.add(onsetrate)
+ self.pipe.results.add(onsetrate)
#---------------------------------
# Beats
beats.label_metadata.label = {1: 'Beat'}
- self._results.add(beats)
+ self.pipe.results.add(beats)
#---------------------------------
# BPM
else:
bpm.data_object.value = []
- self._results.add(bpm)
+ self.pipe.results.add(bpm)
data_mode_child = root.find('data_mode')
time_mode_child = root.find('time_mode')
result = analyzer_result_factory(data_mode=data_mode_child.text,
- time_mode=time_mode_child.text)
+ time_mode=time_mode_child.text)
for child in root:
key = child.tag
if key not in ['data_mode', 'time_mode']:
return self.id_metadata.unit
-
class ValueObject(AnalyzerResult):
def __init__(self):
Generic class for the analyzers
'''
+ def __init__(self):
+ super(Analyzer, self).__init__()
+
def setup(self, channels=None, samplerate=None,
blocksize=None, totalframes=None):
super(Analyzer, self).setup(channels, samplerate,
def results(self):
return AnalyzerResultContainer(
- [self._results[key] for key in self._results.keys()
+ [self.pipe.results[key] for key in self.pipe.results.keys()
if key.split('.')[0] == self.id()])
@staticmethod
from datetime import datetime
- result = analyzer_result_factory(data_mode=data_mode, time_mode=time_mode)
+ result = analyzer_result_factory(data_mode=data_mode,
+ time_mode=time_mode)
# Automatically write known metadata
result.id_metadata.date = datetime.now().replace(
dc_result = self.new_result(data_mode='value', time_mode='global')
dc_result.data_object.value = numpy.round(
numpy.mean(100 * self.values), 3)
- self._results.add(dc_result)
+ self.pipe.results.add(dc_result)
max_level.id_metadata.name += ' ' + "Max"
max_level.data_object.value = np.round(20*np.log10(self.max_value), 3)
- self._results.add(max_level)
+ self.pipe.results.add(max_level)
# RMS level
rms_level = self.new_result(data_mode='value', time_mode='global')
rms_level.data_object.value = np.round(20*np.log10(
np.sqrt(np.mean(self.mean_values))), 3)
- self._results.add(rms_level)
+ self.pipe.results.add(rms_level)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2013 Paul Brossier <piem@piem.org>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import implements, interfacedoc
+from timeside.analyzer.core import Analyzer
+from timeside.analyzer import Spectrogram
+from timeside.api import IAnalyzer
+import numpy as np
+from numpy import pi as Pi
+from scipy import signal
+
+
+class OnsetDetectionFunction(Analyzer):
+ implements(IAnalyzer)
+
+ def __init__(self, blocksize=1024, stepsize=None):
+ super(OnsetDetectionFunction, self).__init__()
+
+ self.input_blocksize = blocksize
+ if stepsize:
+ self.input_stepsize = stepsize
+ else:
+ self.input_stepsize = blocksize / 2
+
+ self.parents.append(Spectrogram(blocksize=self.input_blocksize,
+ stepsize=self.input_stepsize))
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(OnsetDetectionFunction, self).setup(channels, samplerate,
+ blocksize, totalframes)
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "odf"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Onset Detection Function"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
+ def process(self, frames, eod=False):
+ return frames, eod
+
+ def post_process(self):
+
+ #spectrogram = self.parents()[0]['spectrogram_analyzer'].data
+ spectrogram = self.pipe._results['spectrogram_analyzer'].data
+ #spectrogram = self.pipe._results[self.parents()[0].id]
+
+ # Low-pass filtering of the spectrogram amplitude along the time axis
+ S = signal.lfilter(signal.hann(15), 1, abs(spectrogram), axis=0)
+ # Clip small value to a minimal threshold
+ np.maximum(S, 1e-9, out=S)
+
+ S = np.log10(S)
+ # S[S<1e-3]=0
+ np.maximum(S, 1e-3, out=S)
+
+ # Differentiator filter
+ df_filter = signal.fir_filter_design.remez(31, [0, 0.5], [Pi],
+ type='differentiator')
+
+ S_diff = signal.lfilter(df_filter, 1, S, axis=0)
+ S_diff[S_diff < 1e-10] = 0
+
+ # Summation along the frequency axis
+ odf_diff = S_diff.sum(axis=1)
+ odf_diff = odf_diff / np.median(odf_diff) # Normalize
+
+ odf = self.new_result(data_mode='value', time_mode='framewise')
+ #odf.parameters = {'FFT_SIZE': self.FFT_SIZE}
+ odf.data_object.value = odf_diff
+ self.pipe.results.add(odf)
class Spectrogram(Analyzer):
- implements(IAnalyzer) # TODO check if needed with inheritance
+ implements(IAnalyzer)
- def __init__(self):
- self.input_blocksize = 2048
- self.input_stepsize = self.input_blocksize / 2
+ def __init__(self, blocksize=2048, stepsize=None):
+ super(Spectrogram, self).__init__()
+
+ self.input_blocksize = blocksize
+ if stepsize:
+ self.input_stepsize = stepsize
+ else:
+ self.input_stepsize = blocksize / 2
@interfacedoc
def setup(self, channels=None, samplerate=None,
spectrogram = self.new_result(data_mode='value', time_mode='framewise')
spectrogram.parameters = {'FFT_SIZE': self.FFT_SIZE}
spectrogram.data_object.value = self.values
- self._results.add(spectrogram)
+ self.pipe.results.add(spectrogram)
implements(IAnalyzer)
def __init__(self, plugin_list=None):
-
+ super(VampSimpleHost, self).__init__()
if plugin_list is None:
plugin_list = self.get_plugins_list()
#plugin_list = [['vamp-example-plugins', 'percussiononsets', 'detectionfunction']]
plugin_res.id_metadata.name += ' ' + \
' '.join(plugin_line[1:])
- self._results.add(plugin_res)
+ self.pipe.results.add(plugin_res)
@staticmethod
def vamp_plugin(plugin, wavfile):
def post_process(self):
waveform = self.new_result(data_mode='value', time_mode='framewise')
waveform.data_object.value = np.asarray(self.values).flatten()
- self._results.add(waveform)
+ self.pipe.results.add(waveform)
implements(IAnalyzer)
def __init__(self, yaafeSpecification=None):
+ super(Yaafe,self).__init__()
+
# Check arguments
if yaafeSpecification is None:
yaafeSpecification = FeaturePlan(sample_rate=32000)
result.data_object.value = self.yaafe_engine.readOutput(featName)
# Store results in Container
if len(result.data_object.value):
- self._results.add(result)
+ self.pipe.results.add(result)
# implementations should always call the parent method
- def mediainfo():
+ def mediainfo(self):
"""
Information about the media object
uri
duration
"""
+ @property
+ def parents(self):
+ """
+ Return the processor's parents
+ """
+
+
class IEncoder(IProcessor):
"""Encoder driver interface. Each encoder is expected to support a specific
format."""
# This file defines a generic object interface mechanism and
-# a way to determine which components implements a given interface.
+# a way to determine which components implements a given interface.
#
# For example, the following defines the Music class as implementing the
# listenable interface.
class Processor(Component):
- """Base component class of all processors"""
+ """Base component class of all processors
+
+
+ Attributes:
+ parents : List of parent Processors that must be processed
+ before the current Processor
+ pipe : The current ProcessPipe in which the Processor will run
+ """
__metaclass__ = MetaProcessor
abstract()
implements(IProcessor)
+ def __init__(self):
+ super(Processor, self).__init__()
+
+ self.parents = []
+ self.source_mediainfo = None
+ self.pipe = None
+
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None,
totalframes=None):
"""Return a processor by its id"""
if not _processors.has_key(processor_id):
raise Error("No processor registered with id: '%s'"
- % processor_id)
+ % processor_id)
return _processors[processor_id]
class ProcessPipe(object):
- """Handle a pipe of processors"""
+ """Handle a pipe of processors
+
+ Attributes:
+ processor: List of all processors in the Process pipe
+ results : Results Container for all the analyzers of the Pipe process
+"""
def __init__(self, *others):
self.processors = []
self |= others
+ from timeside.analyzer.core import AnalyzerResultContainer
+ self.results = AnalyzerResultContainer()
+
+ for proc in self.processors:
+ proc.pipe = self
+
def __or__(self, other):
return ProcessPipe(self, other)
def __ior__(self, other):
if isinstance(other, Processor):
+ self |= other.parents
self.processors.append(other)
elif isinstance(other, ProcessPipe):
self.processors.extend(other.processors)
the pipe. Also returns the pipe itself."""
source = self.processors[0]
- items = self.processors[1:]
+ items = self.processors[1:]
source.setup()
last = source
- from timeside.analyzer.core import AnalyzerResultContainer
- self._results = AnalyzerResultContainer()
-
# setup/reset processors and configure properties throughout the pipe
for item in items:
- item.setup(channels = last.channels(),
- samplerate = last.samplerate(),
- blocksize = last.blocksize(),
- totalframes = last.totalframes())
+ item.setup(channels=last.channels(),
+ samplerate=last.samplerate(),
+ blocksize=last.blocksize(),
+ totalframes=last.totalframes())
item.source_mediainfo = source.mediainfo()
- item._results = self._results
last = item
# now stream audio data along the pipe
for item in items:
item.release()
- @property
- def results(self):
- """
- Results Container for all the analyzers of the Pipe process
- """
- if hasattr(self, '_results'):
- return self._results
\ No newline at end of file
duration : float
duration of the segment in seconds
'''
+ super(FileDecoder, self).__init__()
# is this a file?
import os.path
lower_freq = 20
def __init__(self, width=1024, height=256, bg_color=None, color_scheme='default'):
+ super(Grapher, self).__init__()
self.bg_color = bg_color
self.color_scheme = color_scheme
self.graph = None