- Add functions data(), time() and duration() to AnalyzerResult to easilly acces those values
- Modify Analyzer id to get results with the following format : 'AnalyzerID.ResultID' (e.g. 'aubio_temporal.beat', aubio_temporal.bpm', ...)
- Analyzer result() function now return only the results in the container that match the Analyzer id (e.g. 'aubio_temporal.*')
- Add Waveform Analyzer and Spectrogram Analyzers (mainly for demonstration and illustration purpose)
- some PEP8 refactoring (with autopep8)
from unit_timeside import *
from timeside.analyzer.core import *
+from timeside.__init__ import __version__
from numpy import ones, array
from math import pi
+
verbose = 0
self.result = AnalyzerResult(dataMode='value', timeMode='framewise')
from datetime import datetime
self.result.idMetadata = dict(date=datetime.now().replace(microsecond=0).isoformat(' '),
- version=TimeSideVersion,
+ version=__version__,
author='TimeSide',
id="foo_bar",
name="Foo bar",
max_level_value = -6.021
rms_level_value = -9.856
- self.expected = {'max_level':max_level_value , 'rms_level':rms_level_value }
+ self.expected = {'level.max':max_level_value , 'level.rms':rms_level_value }
def testOnGuitar(self):
"runs on guitar"
max_level_value = -4.258
rms_level_value = -21.945
- self.expected = {'max_level':max_level_value , 'rms_level':rms_level_value }
+ self.expected = {'level.max':max_level_value , 'level.rms':rms_level_value }
def tearDown(self):
from aubio_mfcc import *
from aubio_melenergy import *
from aubio_specdesc import *
-from yaafe import * # TF : add Yaafe analyzer
\ No newline at end of file
+from yaafe import * # TF : add Yaafe analyzer
+from spectrogram import Spectrogram
+from waveform import Waveform
\ No newline at end of file
# Author: Paul Brossier <piem@piem.org>
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from utils import downsample_blocking
import numpy
from aubio import filterbank, pvoc
+
class AubioMelEnergy(Analyzer):
implements(IAnalyzer)
def __init__(self):
self.input_blocksize = 1024
- self.input_stepsize = self.input_blocksize/4
+ self.input_stepsize = self.input_blocksize / 4
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(AubioMelEnergy, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(AubioMelEnergy, self).setup(
+ channels, samplerate, blocksize, totalframes)
self.n_filters = 40
self.n_coeffs = 13
self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
@staticmethod
@interfacedoc
def id():
- return "aubio_mel_analyzer"
+ return "aubio_melenergy"
@staticmethod
@interfacedoc
def name():
- return "Mel Energy analysis (aubio)"
+ return "Mel Energy (aubio)"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
def process(self, frames, eod=False):
for samples in downsample_blocking(frames, self.input_stepsize):
# TODO : check pourquoi on utilise pas le blocksize ?
fftgrain = self.pvoc(samples)
- self.melenergy_results = numpy.vstack( [ self.melenergy_results, self.melenergy(fftgrain) ])
+ self.melenergy_results = numpy.vstack(
+ [self.melenergy_results, self.melenergy(fftgrain)])
self.block_read += 1
return frames, eod
melenergy = self.new_result(dataMode='value', timeMode='framewise')
# Metadata
- melenergy.parameters = dict(n_filters= self.n_filters,
- n_coeffs= self.n_coeffs)
- # Set metadata
- melenergy.idMetadata.id ="aubio_melenergy"
- melenergy.idMetadata.name = "melenergy (aubio)"
- melenergy.idMetadata.unit=''
-
+ melenergy.parameters = dict(n_filters=self.n_filters,
+ n_coeffs=self.n_coeffs)
# Set Data
melenergy.dataObject.value = self.melenergy_results
self._results.add(melenergy)
-
# Author: Paul Brossier <piem@piem.org>
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from utils import downsample_blocking
import numpy
from aubio import mfcc, pvoc
-from math import isnan
class AubioMfcc(Analyzer):
implements(IAnalyzer)
def __init__(self):
self.input_blocksize = 1024
- self.input_stepsize = self.input_blocksize/4
+ self.input_stepsize = self.input_blocksize / 4
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(AubioMfcc, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(AubioMfcc, self).setup(
+ channels, samplerate, blocksize, totalframes)
self.n_filters = 40
self.n_coeffs = 13
self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
- self.mfcc = mfcc(self.input_blocksize, self.n_filters, self.n_coeffs, samplerate)
+ self.mfcc = mfcc(self.input_blocksize,
+ self.n_filters,
+ self.n_coeffs,
+ samplerate)
self.block_read = 0
self.mfcc_results = numpy.zeros([self.n_coeffs, ])
@staticmethod
@interfacedoc
def id():
- return "aubio_mfcc_analyzer"
+ return "aubio_mfcc"
@staticmethod
@interfacedoc
def name():
- return "MFCC analysis (aubio)"
+ return "MFCC (aubio)"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
def process(self, frames, eod=False):
for samples in downsample_blocking(frames, self.input_stepsize):
- #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
fftgrain = self.pvoc(samples)
coeffs = self.mfcc(fftgrain)
self.mfcc_results = numpy.vstack((self.mfcc_results, coeffs))
def release(self):
# MFCC
mfcc = self.new_result(dataMode='value', timeMode='framewise')
- parameters = dict(n_filters= self.n_filters,
- n_coeffs= self.n_coeffs)
- mfcc.idMetadata.id = "aubio_mfcc"
- mfcc.idMetadata.name = "mfcc (aubio)"
- mfcc.idMetadata.unit = ""
- mfcc.parameters = parameters
+ mfcc.parameters = dict(n_filters=self.n_filters,
+ n_coeffs=self.n_coeffs)
mfcc.dataObject.value = self.mfcc_results
self._results.add(mfcc)
# Author: Paul Brossier <piem@piem.org>
from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import *
-from timeside.api import IValueAnalyzer
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
from aubio import pitch
+
class AubioPitch(Analyzer):
- implements(IAnalyzer) # TODO check if needed with inheritance
+ implements(IAnalyzer) # TODO check if needed with inheritance
def __init__(self):
self.input_blocksize = 2048
self.input_stepsize = self.input_blocksize / 2
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(AubioPitch, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(AubioPitch, self).setup(channels,
+ samplerate,
+ blocksize,
+ totalframes)
self.p = pitch("default", self.input_blocksize, self.input_stepsize,
samplerate)
self.p.set_unit("freq")
@staticmethod
@interfacedoc
def unit():
- return ""
+ return "Hz"
def __str__(self):
return "pitch values"
# set Result
pitch = self.new_result(dataMode='value', timeMode='framewise')
- pitch.idMetadata.id = "aubio_pitch"
- pitch.idMetadata.name = "f0 (aubio)"
- pitch.idMetadata.unit = 'Hz'
-
- # parameters : None # TODO check with Piem "default" and "freq" in setup
+ # parameters : None # TODO check with Piem "default" and "freq" in
+ # setup
# Set Data
- pitch.dataObject.value = numpy.array(self.pitches)
+ pitch.dataObject.value = self.pitches
self._results.add(pitch)
-
from timeside.api import IAnalyzer
from utils import downsample_blocking
-import numpy
from aubio import specdesc, pvoc
+
class AubioSpecdesc(Analyzer):
implements(IAnalyzer)
-
def __init__(self):
self.input_blocksize = 1024
self.input_stepsize = self.input_blocksize / 4
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(AubioSpecdesc, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(
+ AubioSpecdesc,
+ self).setup(
+ channels,
+ samplerate,
+ blocksize,
+ totalframes)
self.block_read = 0
self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
- self.methods = ['default', 'energy', 'hfc', 'complex', 'phase', 'specdiff', 'kl',
- 'mkl', 'specflux', 'centroid', 'slope', 'rolloff', 'spread', 'skewness',
- 'kurtosis', 'decrease']
+ self.methods = [
+ 'default', 'energy', 'hfc', 'complex', 'phase', 'specdiff', 'kl',
+ 'mkl', 'specflux', 'centroid', 'slope', 'rolloff', 'spread', 'skewness',
+ 'kurtosis', 'decrease']
self.specdesc = {}
self.specdesc_results = {}
for method in self.methods:
@staticmethod
@interfacedoc
def id():
- return "aubio_specdesc_analyzer"
+ return "aubio_specdesc"
@staticmethod
@interfacedoc
def name():
return "Spectral Descriptor (aubio)"
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
def process(self, frames, eod=False):
for samples in downsample_blocking(frames, self.input_stepsize):
fftgrain = self.pvoc(samples)
for method in self.methods:
- self.specdesc_results[method] += [self.specdesc[method](fftgrain)[0]]
+ self.specdesc_results[method] += [
+ self.specdesc[method](fftgrain)[0]]
return frames, eod
def release(self):
- unit = ""
-
# For each method store results in container
for method in self.methods:
res_specdesc = self.new_result(dataMode='value',
timeMode='framewise')
# Set metadata
- res_specdesc.idMetadata.id = '_'.join(["aubio_specdesc", method])
- res_specdesc.idMetadata.name = ' '.join(["spectral descriptor", method, "(aubio)"])
-
- res_specdesc.idMetadata.unit = unit
-
+ res_specdesc.idMetadata.id += '.' + method
+ res_specdesc.idMetadata.name = ' ' + method
res_specdesc.dataObject.value = self.specdesc_results[method]
self._results.add(res_specdesc)
-
# Author: Paul Brossier <piem@piem.org>
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from utils import downsample_blocking
import numpy
+
class AubioTemporal(Analyzer):
implements(IAnalyzer)
self.input_stepsize = 256
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(AubioTemporal, self).setup(channels, samplerate, blocksize, totalframes)
- self.o = onset("default", self.input_blocksize, self.input_stepsize, samplerate)
- self.t = tempo("default", self.input_blocksize, self.input_stepsize, samplerate)
+ def setup(self,
+ channels=None,
+ samplerate=None,
+ blocksize=None,
+ totalframes=None):
+ super(AubioTemporal, self).setup(
+ channels, samplerate, blocksize, totalframes)
+ self.o = onset(
+ "default", self.input_blocksize, self.input_stepsize, samplerate)
+ self.t = tempo(
+ "default", self.input_blocksize, self.input_stepsize, samplerate)
self.block_read = 0
self.onsets = []
self.beats = []
@staticmethod
@interfacedoc
def unit():
- return "seconds"
+ return ""
def __str__(self):
- return "%s %s" % (str(self.value), unit())
+ return "%s %s" % (str(self.value), self.unit())
def process(self, frames, eod=False):
for samples in downsample_blocking(frames, self.input_stepsize):
#---------------------------------
onsets = self.new_result(dataMode='label', timeMode='event')
- onsets.idMetadata.id = "aubio_onset"
- onsets.idMetadata.name = "onsets (aubio)"
+ onsets.idMetadata.id += '.' + 'onset'
+ onsets.idMetadata.name += ' ' + 'Onset'
onsets.idMetadata.unit = 's'
# Set Data , dataMode='label', timeMode='event'
#---------------------------------
onsetrate = self.new_result(dataMode='value', timeMode='event')
# Set metadata
- onsetrate.idMetadata.id = "aubio_onset_rate"
- onsetrate.idMetadata.name = "onset rate (aubio)"
+ onsetrate.idMetadata.id += '.' + "onset_rate"
+ onsetrate.idMetadata.name = " " + "Onset Rate"
onsetrate.idMetadata.unit = "bpm"
# Set Data , dataMode='value', timeMode='event'
#---------------------------------
beats = self.new_result(dataMode='label', timeMode='segment')
# Set metadata
- beats.idMetadata.id = "aubio_beat"
- beats.idMetadata.name = "beats (aubio)"
+ beats.idMetadata.id += '.' + "beat"
+ beats.idMetadata.name += " " + "Beats"
beats.idMetadata.unit = "s"
# Set Data, dataMode='label', timeMode='segment'
# Segment = list of (time, duration, labelId)
if len(self.beats) > 1:
duration = numpy.diff(self.beats)
- duration = numpy.append(duration,duration[-1])
+ duration = numpy.append(duration, duration[-1])
beats.dataObject.time = self.beats
beats.dataObject.duration = duration
beats.dataObject.label = numpy.ones(len(self.beats))
#---------------------------------
bpm = self.new_result(dataMode='value', timeMode='segment')
# Set metadata
- bpm.idMetadata.id = "aubio_bpm"
- bpm.idMetadata.name = "bpm (aubio)"
+ bpm.idMetadata.id += '.' + "bpm"
+ bpm.idMetadata.name += ' ' + "bpm"
bpm.idMetadata.unit = "bpm"
# Set Data, dataMode='value', timeMode='segment'
# Guillaume Pellerin <yomguy at parisson.com>
# Paul Brossier <piem@piem.org>
# Thomas Fillon <thomas at parisson.com>
+from __future__ import division
-from utils import downsample_blocking
from timeside.core import Processor, implements, interfacedoc
from timeside.api import IAnalyzer
from timeside.__init__ import __version__
% (value, self._validTimeMode))
super(AnalyzerResult, self).__setattr__(name, value)
+ def __len__(self):
+ if self.dataMode == 'value':
+ return len(self.dataObject.value)
+ else:
+ return len(self.dataObject.label)
+
def as_dict(self):
return dict([(key, self[key].as_dict())
for key in self.keys() if hasattr(self[key], 'as_dict')] +
[('dataMode', self.dataMode), ('timeMode', self.timeMode)])
# TODO : check if it can be simplified now
+
def to_xml(self):
import xml.etree.ElementTree as ET
root = ET.Element('result')
dataModeChild = root.find('dataMode')
timeModeChild = root.find('timeMode')
result = AnalyzerResult(dataMode=dataModeChild.text,
- timeMode=timeModeChild.text)
+ timeMode=timeModeChild.text)
for child in root:
key = child.tag
if key not in ['dataMode', 'timeMode']:
return result
def data(self):
- return {key: self.dataObject[key] for key in ['value', 'label'] if key in self.dataObject.keys()}
+ if self.dataMode is None:
+ return (
+ {key: self.dataObject[key]
+ for key in ['value', 'label'] if key in self.dataObject.keys()}
+ )
+ elif self.dataMode is 'value':
+ return self.dataObject.value
+ elif self.dataMode is 'label':
+ return self.dataObject.label
def time(self):
if self.timeMode == 'global':
return self.audioMetadata.start
elif self.timeMode == 'framewise':
return (self.audioMetadata.start +
- self.frameMetadata.stepsize * numpy.arange(0,len(self)))
+ self.frameMetadata.stepsize /
+ self.frameMetadata.samplerate *
+ numpy.arange(0, len(self)))
else:
- return self.dataObject.time
+ return self.audioMetadata.start + self.dataObject.time
pass
def duration(self):
if self.timeMode == 'global':
return self.audioMetadata.duration
elif self.timeMode == 'framewise':
- return self.frameMetadata.blockwise * numpy.ones(len(self))
+ return (self.frameMetadata.blockwise /
+ self.frameMetadata.samplerate
+ * numpy.ones(len(self)))
elif self.timeMode == 'event':
return numpy.zeros(len(self))
elif self.timeMode == 'segment':
return self.dataObject.duration
- def __len__(self):
- if self.dataMode == 'value':
- return len(self.dataObject.value)
- else:
- return len(self.dataObject.label)
-
# @property
# def properties(self):
# max=numpy.max(self.data, axis=0),
# min=numpy.min(self.data, axis=0)
# )
-# # ajouter size
+# ajouter size
# return(prop)
'''
def __init__(self, analyzer_results=None):
- super(AnalyzerResultContainer,self).__init__()
+ super(AnalyzerResultContainer, self).__init__()
if analyzer_results is not None:
self.add(analyzer_results)
# def __repr__(self):
# return [res.as_dict() for res in self.values()].__repr__()
- #def __eq__(self, other):
- #if hasattr(other, 'results'):
+ # def __eq__(self, other):
+ # if hasattr(other, 'results'):
# other = other.results
# return self == other
- #def __ne__(self, other):
+ # def __ne__(self, other):
# return not self.__eq__(other)
def add(self, analyzer_result):
for res_json in results_json:
res = AnalyzerResult(dataMode=res_json['dataMode'],
- timeMode=res_json['timeMode'])
+ timeMode=res_json['timeMode'])
for key in res_json.keys():
if key not in ['dataMode', 'timeMode']:
res[key] = res_json[key]
for (group_name, group) in h5_file.items():
result = AnalyzerResult(dataMode=group.attrs['dataMode'],
- timeMode=group.attrs['timeMode'])
+ timeMode=group.attrs['timeMode'])
# Read Sub-Group
for subgroup_name, subgroup in group.items():
# Read attributes
self.result_stepsize = self.input_stepsize
def results(self):
- #TODO :return self._results[id=analyzerID]
- return self._results
+ # TODO :return self._results[id=analyzerID]
+ return {key:self._results[key] for key in self._results.keys()
+ if key.split('.')[0] == self.id()}
@staticmethod
@interfacedoc
microsecond=0).isoformat(' ')
result.idMetadata.version = __version__
result.idMetadata.author = 'TimeSide'
+ result.idMetadata.id = self.id()
+ result.idMetadata.name = self.name()
+ result.idMetadata.unit = self.unit()
result.audioMetadata.uri = self.mediainfo()['uri']
result.audioMetadata.start = self.mediainfo()['start']
# Author: Guillaume Pellerin <yomguy@parisson.com>
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IValueAnalyzer
import numpy
+
class MeanDCShift(Analyzer):
implements(IValueAnalyzer)
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(MeanDCShift, self).setup(channels, samplerate, blocksize, totalframes)
+ def setup(self, channels=None,
+ samplerate=None,
+ blocksize=None,
+ totalframes=None):
+ super(MeanDCShift, self).setup(
+ channels, samplerate, blocksize, totalframes)
self.values = numpy.array([0])
@staticmethod
@interfacedoc
def id():
- return "dc_analyzer"
+ return "mean_dc_shift"
@staticmethod
@interfacedoc
def name():
- return "Mean DC shift analyzer"
+ return "Mean DC shift"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return "%"
def process(self, frames, eod=False):
if frames.size:
def release(self):
dc_result = self.new_result(dataMode='value', timeMode='global')
- # Set metadata
- dc_result.idMetadata.id = "mean_dc_shift"
- dc_result.idMetadata.name = "Mean DC shift"
- dc_result.idMetadata.unit = "%"
+
# Set Data
- dc_result.dataObject.value = numpy.round(numpy.mean(100*self.values),3)
- self._results.add(dc_result)
\ No newline at end of file
+ dc_result.dataObject.value = numpy.round(
+ numpy.mean(100 * self.values), 3)
+ self._results.add(dc_result)
# Author: Guillaume Pellerin <yomguy@parisson.com>
-from timeside.core import Processor, implements, interfacedoc, \
- FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IValueAnalyzer
-import numpy
+import numpy as np
class Level(Analyzer):
# max_level
self.max_value = 0
# rms_level
- self.mean_values = numpy.array([])
+ self.mean_values = np.array([])
@staticmethod
@interfacedoc
def id():
- return "level_analyzer"
+ return "level"
@staticmethod
@interfacedoc
def name():
- return "level analyzer"
+ return "Level Analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return "dBFS"
def process(self, frames, eod=False):
if frames.size:
if max_value > self.max_value:
self.max_value = max_value
# rms_level
- self.mean_values = numpy.append(self.mean_values,
- numpy.mean(numpy.square(frames)))
+ self.mean_values = np.append(self.mean_values,
+ np.mean(np.square(frames)))
return frames, eod
def release(self):
# Max level
max_level = self.new_result(dataMode='value', timeMode='global')
- max_level.idMetadata.id = "max_level"
- max_level.idMetadata.name = "Max level"
- max_level.idMetadata.unit = "dBFS"
+ max_level.idMetadata.id += '.' + "max"
+ max_level.idMetadata.name += ' ' + "Max"
- max_level.dataObject.value = numpy.round(20*numpy.log10(self.max_value), 3)
+ max_level.dataObject.value = np.round(20*np.log10(self.max_value), 3)
self._results.add(max_level)
# RMS level
rms_level = self.new_result(dataMode='value', timeMode='global')
- rms_level.idMetadata.id = "rms_level"
- rms_level.idMetadata.name="RMS level"
- rms_level.idMetadata.unit="dBFS"
+ rms_level.idMetadata.id += '.' + "rms"
+ rms_level.idMetadata.name += ' ' + "RMS"
- rms_level.dataObject.value = numpy.round(20*numpy.log10(
- numpy.sqrt(numpy.mean(self.mean_values))), 3)
+ rms_level.dataObject.value = np.round(20*np.log10(
+ np.sqrt(np.mean(self.mean_values))), 3)
self._results.add(rms_level)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2013 Paul Brossier <piem@piem.org>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import implements, interfacedoc
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
+import numpy as np
+
+
+class Spectrogram(Analyzer):
+ implements(IAnalyzer) # TODO check if needed with inheritance
+
+ def __init__(self):
+ self.input_blocksize = 2048
+ self.input_stepsize = self.input_blocksize / 2
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(
+ Spectrogram,
+ self).setup(
+ channels,
+ samplerate,
+ blocksize,
+ totalframes)
+
+ self.values = []
+ self.FFT_SIZE = 2048
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "spectrogram_analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Spectrogram Analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
+ def process(self, frames, eod=False):
+ for samples in downsample_blocking(frames, self.input_stepsize):
+ #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
+ self.values.append(np.abs(np.fft.rfft(samples, self.FFT_SIZE)))
+
+ return frames, eod
+
+ def release(self):
+ # set Result
+ spectrogram = self.new_result(dataMode='value', timeMode='framewise')
+
+ # parameters :
+ spectrogram.parameters = {'FFT_SIZE': self.FFT_SIZE}
+
+ # Set Data
+ spectrogram.dataObject.value = self.values
+
+ self._results.add(spectrogram)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2013 Paul Brossier <piem@piem.org>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import implements, interfacedoc
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+from utils import downsample_blocking
+import numpy as np
+
+
+class Waveform(Analyzer):
+ implements(IAnalyzer) # TODO check if needed with inheritance
+
+ def __init__(self):
+ self.input_blocksize = 2048
+ self.input_stepsize = self.input_blocksize / 2
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
+ super(
+ Waveform,
+ self).setup(
+ channels,
+ samplerate,
+ blocksize,
+ totalframes)
+
+ self.values = []
+ self.result_blocksize = 1
+ self.result_stepsize = 1
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "waveform_analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Waveform Analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
+ def process(self, frames, eod=False):
+ for samples in downsample_blocking(frames, self.input_blocksize):
+ self.values.append(samples)
+
+ return frames, eod
+
+ def release(self):
+ # set Result
+ waveform = self.new_result(dataMode='value', timeMode='framewise')
+
+ # Set Data
+ waveform.dataObject.value = np.asarray(self.values).flatten()
+
+ self._results.add(waveform)
@author: Thomas Fillon
"""
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from yaafelib import *
def __init__(self, yaafeSpecification):
# Check arguments
- if isinstance(yaafeSpecification,DataFlow):
+ if isinstance(yaafeSpecification, DataFlow):
self.dataFlow = yaafeSpecification
- elif isinstance(yaafeSpecification,FeaturePlan):
+ elif isinstance(yaafeSpecification, FeaturePlan):
self.featurePlan = yaafeSpecification
self.dataFlow = self.featurePlan.getDataFlow()
else:
- raise TypeError("'%s' Type must be either '%s' or '%s'" % (str(yaafeSpecification),str(DataFlow),str(FeaturePlan)))
+ raise TypeError("'%s' Type must be either '%s' or '%s'" %
+ (str(yaafeSpecification),
+ str(DataFlow),
+ str(FeaturePlan)))
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ def setup(self, channels=None, samplerate=None,
+ blocksize=None, totalframes=None):
super(Yaafe, self).setup(channels, samplerate, blocksize, totalframes)
# Configure a YAAFE engine
self.yaafe_engine = Engine()
def name():
return "Yaafe Descriptor"
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ''
+
def process(self, frames, eod=False):
# do process things...
- # Downmixing to mono and convert to float64 for compatibility with Yaafe
- yaafe_frames = frames.sum(axis=-1,dtype=numpy.float64) / frames.shape[-1]
+ # Downmixing to mono and convert to float64 for compatibility with
+ # Yaafe
+ yaafe_frames = frames.sum(
+ axis=-1, dtype=numpy.float64) / frames.shape[-1]
# Reshape for compatibility with Yaafe input format
- yaafe_frames.shape = (1,yaafe_frames.shape[0])
+ yaafe_frames.shape = (1, yaafe_frames.shape[0])
# write audio array on 'audio' input
- self.yaafe_engine.writeInput('audio',yaafe_frames)
+ self.yaafe_engine.writeInput('audio', yaafe_frames)
# process available data
self.yaafe_engine.process()
if eod:
if len(featNames) == 0:
raise KeyError('Yaafe engine did not return any feature')
for featName in featNames:
- # Define ID fields
- id = 'yaafe_' + featName
- name = 'Yaafe ' + featName
- # Get results from Yaafe engine
result = self.new_result(dataMode='value', timeMode='framewise')
- result.idMetadata.id = id
- result.idMetadata.name = name
- result.idMetadata.unit = ''
+ result.idMetadata.id += '.' + featName
+ result.idMetadata.name += ' ' + featName
# Read Yaafe Results
result.dataObject.value = self.yaafe_engine.readOutput(featName)
# Store results in Container
if len(result.dataObject.value):
self._results.add(result)
-
-