- Automatic discovering of processors by walking through timeside decoder, analyzer, encoder and grapher subpackages.
- Empty corresponding __init__ files i.e. remove explicit import of processors
from unit_timeside import unittest, TestRunner
from timeside.analyzer.core import AnalyzerResult, AnalyzerResultContainer
-from timeside.__init__ import __version__
+from timeside import __version__
import numpy as np
from math import pi
samplerate = 16000 # LimsiSad require Fs = 16000 Hz
duration = 10
samples = np.zeros((duration * samplerate, 1))
- self.decoder = timeside.decoder.ArrayDecoder(samples,
- samplerate=samplerate)
+ decoder_cls = timeside.core.get_processor('array_dec')
+ self.decoder = decoder_cls(samples, samplerate=samplerate)
def _perform_test(self, analyzer_cls):
"""Internal function that test if there is NaN in the results
samplerate = 16000 # LimsiSad require Fs = 16000 Hz
duration = 10
samples = -1000*np.ones((duration * samplerate, 1))
- self.decoder = timeside.decoder.ArrayDecoder(samples,
- samplerate=samplerate)
+ decoder_cls = timeside.core.get_processor('array_dec')
+ self.decoder = decoder_cls(samples, samplerate=samplerate)
def _tests_factory(test_class, test_doc, list_analyzers, skip_reasons={}):
from timeside.decoder.file import FileDecoder
from timeside.analyzer import WITH_AUBIO
if WITH_AUBIO:
- from timeside.analyzer.aubio_melenergy import AubioMelEnergy
+ from timeside.analyzer.aubio.aubio_melenergy import AubioMelEnergy
import os
@unittest.skipIf(not WITH_AUBIO, 'Aubio library is not available')
from timeside.decoder.file import FileDecoder
from timeside.analyzer import WITH_AUBIO
if WITH_AUBIO:
- from timeside.analyzer.aubio_mfcc import AubioMfcc
+ from timeside.analyzer.aubio.aubio_mfcc import AubioMfcc
import os
from timeside.decoder.file import FileDecoder
from timeside.analyzer import WITH_AUBIO
if WITH_AUBIO:
- from timeside.analyzer.aubio_pitch import AubioPitch
+ from timeside.analyzer.aubio.aubio_pitch import AubioPitch
import os
from timeside.decoder.file import FileDecoder
from timeside.analyzer import WITH_AUBIO
if WITH_AUBIO:
- from timeside.analyzer.aubio_specdesc import AubioSpecdesc
+ from timeside.analyzer.aubio.aubio_specdesc import AubioSpecdesc
import os
from timeside.decoder.file import FileDecoder
from timeside.analyzer import WITH_AUBIO
if WITH_AUBIO:
- from timeside.analyzer.aubio_temporal import AubioTemporal
+ from timeside.analyzer.aubio.aubio_temporal import AubioTemporal
import os
from __future__ import division
from timeside.decoder.file import FileDecoder
-from timeside.analyzer import Level
+from timeside.analyzer.level import Level
from timeside.core import ProcessPipe
-import numpy as np
from unit_timeside import *
import os.path
from __future__ import division
-from timeside.core import *
+from timeside.core import get_processor, ProcessPipe
from timeside.decoder.file import FileDecoder
-from timeside.analyzer import *
-from timeside.encoder import *
from timeside.component import *
from unit_timeside import *
def testWav(self):
"Test conversion to wav"
- self.encoder_function = WavEncoder
+ self.encoder_id = 'gst_wav_enc'
def testMp3(self):
"Test conversion to mp3"
- self.encoder_function = Mp3Encoder
+ self.encoder_id = 'gst_mp3_enc'
def testOgg(self):
"Test conversion to ogg"
- self.encoder_function = VorbisEncoder
+ self.encoder_id = 'gst_vorbis_enc'
def testWebM(self):
"Test conversion to webm"
- self.encoder_function = WebMEncoder
+ self.encoder_id = 'gst_webm_enc'
self.test_duration = False # webmmux encoder with streamable=true
# does not return a valid duration
def testM4a(self):
"Test conversion to m4a"
- self.encoder_function = AacEncoder
+ self.encoder_id = 'gst_aac_enc'
def tearDown(self):
decoder = FileDecoder(self.source)
+ encoder_cls = get_processor(self.encoder_id)
-
- file_extension = '.' + self.encoder_function.file_extension()
+ file_extension = '.' + encoder_cls.file_extension()
self.target = tmp_file_sink(prefix=self.__class__.__name__,
suffix=file_extension)
- encoder = self.encoder_function(self.target)
+ encoder = encoder_cls(self.target)
(decoder | encoder).run()
decoder_encoded = FileDecoder(self.target)
- from timeside.analyzer import Waveform
- a = Waveform() # Arbitrary analyzer for running the next pipe
- (decoder_encoded | a).run()
+ pipe = ProcessPipe(decoder_encoded)
+ pipe.run()
import os
os.unlink(self.target)
from __future__ import division
-#from timeside.core import *
+from timeside.core import get_processor, ProcessPipe
from timeside.decoder.file import FileDecoder
#from timeside.analyzer import *
-from timeside.encoder import *
+#from timeside.encoder import *
#from timeside.component import *
from unit_timeside import *
def testMp3(self):
"Test conversion to mp3"
- self.encoder_function = Mp3Encoder
+ self.encoder_id = 'gst_mp3_enc'
self.filesize_delta = 156
def testOgg(self):
"Test conversion to ogg"
- self.encoder_function = VorbisEncoder
+ self.encoder_id = 'gst_vorbis_enc'
def testOpus(self):
"Test conversion to opus"
- self.encoder_function = OpusEncoder
+ self.encoder_id = 'gst_opus_enc'
self.expected_sample_rate = 48000
def testWebM(self):
"Test conversion to webm"
- self.encoder_function = WebMEncoder
+ self.encoder_id = 'gst_webm_enc'
self.test_duration = False # webmmux encoder with streamable=true
# does not return a valid duration
def tearDown(self):
decoder = FileDecoder(self.source)
+ encoder_cls = get_processor(self.encoder_id)
- file_extension = '.' + self.encoder_function.file_extension()
+ file_extension = '.' + encoder_cls.file_extension()
self.target_filesink = tmp_file_sink(prefix=self.__class__.__name__,
suffix=file_extension)
self.target_appsink = tmp_file_sink(prefix=self.__class__.__name__,
suffix=file_extension)
- encoder = self.encoder_function(self.target_filesink, streaming=True)
+ encoder = encoder_cls(self.target_filesink, streaming=True)
pipe = (decoder | encoder)
with open(self.target_appsink, 'w') as f:
# -*- coding: utf-8 -*-
+from __future__ import absolute_import
-import api
-import core
-import decoder
-import analyzer
-import grapher
-import encoder
+from . import api
+from . import core
+from . import decoder
+from . import analyzer
+from . import grapher
+from . import encoder
__version__ = '0.5.5'
__all__ = ['api', 'core', 'decoder', 'analyzer', 'grapher', 'encoder']
-print __file__
-print __name__
def _discover_modules():
import sys
import pkgutil
import importlib
- pkg = 'timeside'
- #__import__(pkg)
- package = sys.modules[pkg]
- prefix = pkg + "."
- for importer, modname, ispkg in pkgutil.iter_modules(package.__path__,
- prefix):
- #print modname
- if modname.split('.')[1] in ['decoder', 'analyzer', 'encoder', 'grapher']:
- print modname
- mod = importlib.import_module(modname)
+ #pkg_path = os.path.abspath()
- del mod
- del modname
+ #__import__(pkg)
+ proc_modules = ['decoder', 'analyzer', 'encoder', 'grapher']
+
+ for module in proc_modules:
+ pkg = '.'.join([__name__, module])
+ importlib.import_module(pkg)
+ package = sys.modules[pkg]
+ prefix = pkg + "."
+
+ for importer, modname, ispkg in pkgutil.walk_packages(package.__path__,
+ prefix):
+ try:
+ importlib.import_module(modname)
+ #__import__(modname)
+ except ImportError as e:
+ if e.message.find('yaafelib'):
+ print 'No Yaafe'
+ elif e.message.find('aubio'):
+ print 'No aubio'
+ else:
+ raise e
_discover_modules()
# -*- coding: utf-8 -*-
+from __future__ import absolute_import
# ----- Load external libraries ------
# Aubio
try:
WITH_AUBIO = True
- from aubio_temporal import AubioTemporal
- from aubio_pitch import AubioPitch
- from aubio_mfcc import AubioMfcc
- from aubio_melenergy import AubioMelEnergy
- from aubio_specdesc import AubioSpecdesc
+ import aubio
except ImportError:
WITH_AUBIO = False
-
+else:
+ del aubio
# Yaafe
try:
WITH_YAAFE = True
- from yaafe import Yaafe
-
+ import yaafelib
except ImportError:
WITH_YAAFE = False
-
+else:
+ del yaafelib
# Vamp Plugins
try:
- from vamp_plugin import VampSimpleHost
+ from . vamp_plugin import VampSimpleHost
VampSimpleHost.SimpleHostProcess(['-v'])
WITH_VAMP = True
except OSError:
WITH_VAMP = False
-
-
-# ----- Load timeside analyzers ------
-from level import Level
-from dc import MeanDCShift
-from spectrogram import Spectrogram
-from waveform import Waveform
-from irit_speech_entropy import IRITSpeechEntropy
-from irit_speech_4hz import IRITSpeech4Hz
-from odf import OnsetDetectionFunction
-if WITH_YAAFE:
- from limsi_sad import LimsiSad
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2013 Paul Brossier <piem@piem.org>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import implements, interfacedoc
-from timeside.analyzer.core import Analyzer
-from timeside.api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
-
-import numpy
-from aubio import filterbank, pvoc
-
-
-class AubioMelEnergy(Analyzer):
-
- """Aubio Mel Energy analyzer"""
- implements(IAnalyzer)
-
- def __init__(self):
- super(AubioMelEnergy, self).__init__()
- self.input_blocksize = 1024
- self.input_stepsize = self.input_blocksize / 4
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None,
- blocksize=None, totalframes=None):
- super(AubioMelEnergy, self).setup(
- channels, samplerate, blocksize, totalframes)
- self.n_filters = 40
- self.n_coeffs = 13
- self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
- self.melenergy = filterbank(self.n_filters, self.input_blocksize)
- self.melenergy.set_mel_coeffs_slaney(samplerate)
- self.block_read = 0
- self.melenergy_results = []
-
- @staticmethod
- @interfacedoc
- def id():
- return "aubio_melenergy"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Mel Energy (aubio)"
-
- @staticmethod
- @interfacedoc
- def unit():
- return ""
-
- @downmix_to_mono
- @frames_adapter
- def process(self, frames, eod=False):
-
- fftgrain = self.pvoc(frames)
- self.melenergy_results.append(self.melenergy(fftgrain))
- self.block_read += 1
- return frames, eod
-
- def post_process(self):
- melenergy = self.new_result(data_mode='value', time_mode='framewise')
- melenergy.parameters = dict(n_filters=self.n_filters,
- n_coeffs=self.n_coeffs)
- melenergy.data_object.value = self.melenergy_results
- self.process_pipe.results.add(melenergy)
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2013 Paul Brossier <piem@piem.org>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import implements, interfacedoc
-from timeside.analyzer.core import Analyzer
-from timeside.api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
-
-import numpy
-from aubio import mfcc, pvoc
-
-
-class AubioMfcc(Analyzer):
-
- """Aubio MFCC analyzer"""
- implements(IAnalyzer)
-
- def __init__(self):
- super(AubioMfcc, self).__init__()
- self.input_blocksize = 1024
- self.input_stepsize = self.input_blocksize / 4
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None,
- blocksize=None, totalframes=None):
- super(AubioMfcc, self).setup(
- channels, samplerate, blocksize, totalframes)
- self.n_filters = 40
- self.n_coeffs = 13
- self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
- self.mfcc = mfcc(self.input_blocksize,
- self.n_filters,
- self.n_coeffs,
- samplerate)
- self.block_read = 0
- self.mfcc_results = numpy.zeros([self.n_coeffs, ])
-
- @staticmethod
- @interfacedoc
- def id():
- return "aubio_mfcc"
-
- @staticmethod
- @interfacedoc
- def name():
- return "MFCC (aubio)"
-
- @staticmethod
- @interfacedoc
- def unit():
- return ""
-
- @downmix_to_mono
- @frames_adapter
- def process(self, frames, eod=False):
- fftgrain = self.pvoc(frames)
- coeffs = self.mfcc(fftgrain)
- self.mfcc_results = numpy.vstack((self.mfcc_results, coeffs))
- self.block_read += 1
- return frames, eod
-
- def post_process(self):
- mfcc = self.new_result(data_mode='value', time_mode='framewise')
- mfcc.parameters = dict(n_filters=self.n_filters,
- n_coeffs=self.n_coeffs)
- mfcc.data_object.value = self.mfcc_results
- self.process_pipe.results.add(mfcc)
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2013 Paul Brossier <piem@piem.org>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from ..core import implements, interfacedoc
-from .core import Analyzer
-from ..api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
-from aubio import pitch
-import numpy as np
-
-
-class AubioPitch(Analyzer):
-
- """Aubio Pitch estimation analyzer"""
- implements(IAnalyzer) # TODO check if needed with inheritance
-
- def __init__(self):
- super(AubioPitch, self).__init__()
- self.input_blocksize = 2048
- self.input_stepsize = self.input_blocksize / 2
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None,
- blocksize=None, totalframes=None):
- super(AubioPitch, self).setup(channels,
- samplerate,
- blocksize,
- totalframes)
- self.aubio_pitch = pitch(
- "default", self.input_blocksize, self.input_stepsize,
- samplerate)
- self.aubio_pitch.set_unit("freq")
- self.block_read = 0
- self.pitches = []
- self.pitch_confidences = []
-
- @staticmethod
- @interfacedoc
- def id():
- return "aubio_pitch"
-
- @staticmethod
- @interfacedoc
- def name():
- return "f0 (aubio)"
-
- @staticmethod
- @interfacedoc
- def unit():
- return "Hz"
-
- def __str__(self):
- return "pitch values"
-
- @downmix_to_mono
- @frames_adapter
- def process(self, frames, eod=False):
- #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
- self.pitches += [self.aubio_pitch(frames)[0]]
- self.pitch_confidences += [
- np.nan_to_num(self.aubio_pitch.get_confidence())]
- self.block_read += 1
- return frames, eod
-
- def post_process(self):
- pitch = self.new_result(data_mode='value', time_mode='framewise')
-
- # parameters : None # TODO check with Piem "default" and "freq" in
- # setup
-
- pitch.id_metadata.id += '.' + "pitch"
- pitch.id_metadata.name += ' ' + "pitch"
- pitch.id_metadata.unit = "Hz"
- pitch.data_object.value = self.pitches
- self.process_pipe.results.add(pitch)
-
- pitch_confidence = self.new_result(
- data_mode='value', time_mode='framewise')
- pitch_confidence.id_metadata.id += '.' + "pitch_confidence"
- pitch_confidence.id_metadata.name += ' ' + "pitch confidence"
- pitch_confidence.id_metadata.unit = None
- pitch_confidence.data_object.value = self.pitch_confidences
- self.process_pipe.results.add(pitch_confidence)
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2013 Paul Brossier <piem@piem.org>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.analyzer.core import Analyzer
-from timeside.api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
-
-from aubio import specdesc, pvoc
-
-
-class AubioSpecdesc(Analyzer):
-
- """Aubio Spectral Descriptors collection analyzer"""
- implements(IAnalyzer)
-
- def __init__(self):
- super(AubioSpecdesc, self).__init__()
- self.input_blocksize = 1024
- self.input_stepsize = self.input_blocksize / 4
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None,
- blocksize=None, totalframes=None):
- super(
- AubioSpecdesc,
- self).setup(
- channels,
- samplerate,
- blocksize,
- totalframes)
- self.block_read = 0
- self.pvoc = pvoc(self.input_blocksize, self.input_stepsize)
- self.methods = [
- 'default', 'energy', 'hfc', 'complex', 'phase', 'specdiff', 'kl',
- 'mkl', 'specflux', 'centroid', 'slope', 'rolloff', 'spread', 'skewness',
- 'kurtosis', 'decrease']
- self.specdesc = {}
- self.specdesc_results = {}
- for method in self.methods:
- self.specdesc[method] = specdesc(method, self.input_blocksize)
- self.specdesc_results[method] = []
-
- @staticmethod
- @interfacedoc
- def id():
- return "aubio_specdesc"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Spectral Descriptor (aubio)"
-
- @staticmethod
- @interfacedoc
- def unit():
- return ""
-
- @downmix_to_mono
- @frames_adapter
- def process(self, frames, eod=False):
- fftgrain = self.pvoc(frames)
- for method in self.methods:
- self.specdesc_results[method] += [
- self.specdesc[method](fftgrain)[0]]
- return frames, eod
-
- def post_process(self):
-
- # For each method store results in container
- for method in self.methods:
- res_specdesc = self.new_result(data_mode='value',
- time_mode='framewise')
- # Set metadata
- res_specdesc.id_metadata.id += '.' + method
- res_specdesc.id_metadata.name = ' ' + method
- res_specdesc.data_object.value = self.specdesc_results[method]
-
- self.process_pipe.results.add(res_specdesc)
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2013 Paul Brossier <piem@piem.org>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import implements, interfacedoc
-from timeside.analyzer.core import Analyzer
-from timeside.api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
-from aubio import onset, tempo
-
-import numpy
-
-
-class AubioTemporal(Analyzer):
-
- """Aubio Temporal analyzer"""
- implements(IAnalyzer)
-
- def __init__(self):
- super(AubioTemporal, self).__init__()
- self.input_blocksize = 1024
- self.input_stepsize = 256
-
- @interfacedoc
- def setup(self,
- channels=None,
- samplerate=None,
- blocksize=None,
- totalframes=None):
- super(AubioTemporal, self).setup(
- channels, samplerate, blocksize, totalframes)
- self.o = onset(
- "default", self.input_blocksize, self.input_stepsize, samplerate)
- self.t = tempo(
- "default", self.input_blocksize, self.input_stepsize, samplerate)
- self.block_read = 0
- self.onsets = []
- self.beats = []
- self.beat_confidences = []
-
- @staticmethod
- @interfacedoc
- def id():
- return "aubio_temporal"
-
- @staticmethod
- @interfacedoc
- def name():
- return "onsets (aubio)"
-
- @staticmethod
- @interfacedoc
- def unit():
- return ""
-
- def __str__(self):
- return "%s %s" % (str(self.value), self.unit())
-
- @downmix_to_mono
- @frames_adapter
- def process(self, frames, eod=False):
- if self.o(frames):
- self.onsets += [self.o.get_last_s()]
- if self.t(frames):
- self.beats += [self.t.get_last_s()]
- self.beat_confidences += [self.t.get_confidence()]
- self.block_read += 1
- return frames, eod
-
- def post_process(self):
-
- #---------------------------------
- # Onsets: Event (time, "Onset")
- #---------------------------------
- onsets = self.new_result(data_mode='label', time_mode='event')
- onsets.id_metadata.id += '.' + 'onset'
- onsets.id_metadata.name += ' ' + 'Onset'
- onsets.id_metadata.unit = 's'
- onsets.data_object.time = self.onsets
- onsets.data_object.label = numpy.ones(len(self.onsets))
- onsets.label_metadata.label = {1: 'Onset'}
-
- self.process_pipe.results.add(onsets)
-
- #---------------------------------
- # Onset Rate: Segment (time, duration, value)
- #---------------------------------
- onsetrate = self.new_result(data_mode='value', time_mode='segment')
- onsetrate.id_metadata.id += '.' + "onset_rate"
- onsetrate.id_metadata.name = " " + "Onset Rate"
- onsetrate.id_metadata.unit = "bpm"
- if len(self.onsets) > 1:
- periods = numpy.diff(self.onsets)
- periods = numpy.append(periods, periods[-1])
- onsetrate.data_object.time = self.onsets
- onsetrate.data_object.duration = periods
- onsetrate.data_object.value = 60. / periods
- else:
- onsetrate.data_object.value = []
- onsetrate.data_object.time = []
-
- self.process_pipe.results.add(onsetrate)
-
- #---------------------------------
- # Beats: Event (time, "Beat")
- #---------------------------------
- beats = self.new_result(data_mode='label', time_mode='event')
- beats.id_metadata.id += '.' + "beat"
- beats.id_metadata.name += " " + "Beats"
- beats.id_metadata.unit = "s"
- beats.data_object.time = self.beats
- beats.data_object.label = numpy.ones(len(self.beats))
- beats.label_metadata.label = {1: 'Beat'}
-
- self.process_pipe.results.add(beats)
-
- #---------------------------------
- # Beat confidences: Event (time, value)
- #---------------------------------
- beat_confidences = self.new_result(
- data_mode='value', time_mode='event')
- beat_confidences.id_metadata.id += '.' + "beat_confidence"
- beat_confidences.id_metadata.name += " " + "Beat confidences"
- beat_confidences.id_metadata.unit = None
- beat_confidences.data_object.time = self.beats
- beat_confidences.data_object.value = self.beat_confidences
-
- self.process_pipe.results.add(beat_confidences)
-
- #---------------------------------
- # BPM: Segment (time, duration, value)
- #---------------------------------
- bpm = self.new_result(data_mode='value', time_mode='segment')
- bpm.id_metadata.id += '.' + "bpm"
- bpm.id_metadata.name += ' ' + "bpm"
- bpm.id_metadata.unit = "bpm"
- if len(self.beats) > 1:
- periods = numpy.diff(self.beats)
- periods = numpy.append(periods, periods[-1])
- bpm.data_object.time = self.beats
- bpm.data_object.duration = periods
- bpm.data_object.value = 60. / periods
- else:
- bpm.data_object.value = []
-
- self.process_pipe.results.add(bpm)
from timeside.analyzer.core import Analyzer
from timeside.api import IValueAnalyzer
import numpy as np
-from .utils import MACHINE_EPSILON
+from timeside.analyzer.utils import MACHINE_EPSILON
class Level(Analyzer):
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
import timeside
-from timeside.analyzer import WITH_YAAFE
+#from timeside.analyzer import WITH_YAAFE
+WITH_YAAFE = True
if WITH_YAAFE:
from yaafe import Yaafe
import yaafelib
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
-from timeside.analyzer import Spectrogram
+from .spectrogram import Spectrogram
from timeside.api import IAnalyzer
import numpy as np
from numpy import pi as Pi
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from preprocessors import downmix_to_mono, frames_adapter
+from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
import numpy as np
from timeside.api import IAnalyzer
import numpy as np
-from preprocessors import downmix_to_mono, frames_adapter
+from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
class Waveform(Analyzer):
from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
-from timeside.analyzer import WITH_YAAFE
+#from timeside.analyzer import WITH_YAAFE
+WITH_YAAFE = True
if WITH_YAAFE:
from yaafelib import *
import numpy
# You should have received a copy of the GNU General Public License
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import absolute_import
-from timeside.component import Interface
+from .component import Interface
class IProcessor(Interface):
# You should have received a copy of the GNU General Public License
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.component import *
-from timeside.api import IProcessor
-from timeside.exceptions import Error, ApiError
-
+from .component import Component, MetaComponent, abstract
+from .component import implements, implementations, interfacedoc
+from .api import IProcessor
+from .exceptions import Error, ApiError
import re
-import time
import numpy
import uuid
class MetaProcessor(MetaComponent):
-
- """Metaclass of the Processor class, used mainly for ensuring that processor
- id's are wellformed and unique"""
+ """Metaclass of the Processor class, used mainly for ensuring
+ that processor id's are wellformed and unique"""
valid_id = re.compile("^[a-z][_a-z0-9]*$")
pass
else:
raise ApiError("%s and %s have the same id: '%s'"
- % (new_class.__name__, _processors[id].__name__, id))
+ % (new_class.__name__,
+ _processors[id].__name__, id))
if not MetaProcessor.valid_id.match(id):
raise ApiError("%s has a malformed id: '%s'"
% (new_class.__name__, id))
Attributes:
parents : List of parent Processors that must be processed
before the current Processor
- pipe : The current ProcessPipe in which the Processor will run
+ pipe : The ProcessPipe in which the Processor will run
"""
__metaclass__ = MetaProcessor
-# -*- coding: utf-8 -*-
-
-#from file import FileDecoder
-#from array import ArrayDecoder
-#from live import LiveDecoder
-#
-#__all__ = ['FileDecoder', 'ArrayDecoder', 'LiveDecoder']
from timeside.decoder.core import *
-
+import numpy as np
class ArrayDecoder(Decoder):
from timeside.api import IDecoder
from timeside.tools import *
-from utils import get_uri, get_media_uri_info, stack, get_sha1
-
-import Queue
-from gst import _gst as gst
-import numpy as np
-
-
-GST_APPSINK_MAX_BUFFERS = 10
-QUEUE_SIZE = 10
-
class Decoder(Processor):
from __future__ import division
-from timeside.decoder.core import *
-from timeside.tools.gstutils import MainloopThread
+from timeside.decoder.core import Decoder, IDecoder, implements, interfacedoc
+from timeside.tools.gstutils import MainloopThread, gobject, gst_buffer_to_numpy_array
import threading
+from timeside.decoder.utils import get_uri, get_media_uri_info, stack, get_sha1
+
+import Queue
+from gst import _gst as gst
+
+GST_APPSINK_MAX_BUFFERS = 10
+QUEUE_SIZE = 10
+
+import numpy as np
+
class FileDecoder(Decoder):
from timeside.decoder.core import *
from timeside.tools.gstutils import MainloopThread
+import Queue
+from gst import _gst as gst
+
+
+GST_APPSINK_MAX_BUFFERS = 10
+QUEUE_SIZE = 10
+
class LiveDecoder(Decoder):
-# -*- coding: utf-8 -*-
-#
-#from ogg import VorbisEncoder
-#from wav import WavEncoder
-#from mp3 import Mp3Encoder
-#from flac import FlacEncoder
-#from m4a import AacEncoder
-#from webm import WebMEncoder
-#from audiosink import AudioSink
-#from opus import OpusEncoder
-#
-#__all__ = ['VorbisEncoder', 'WavEncoder', 'Mp3Encoder', 'FlacEncoder',
-# 'AacEncoder', 'WebMEncoder', 'AudioSink', 'OpusEncoder']
-# -*- coding: utf-8 -*-
-
-from waveform_simple import Waveform
-from waveform_centroid import WaveformCentroid
-from waveform_transparent import WaveformTransparent
-from waveform_contour import WaveformContourBlack, WaveformContourWhite
-from spectrogram_log import SpectrogramLog
-from spectrogram_lin import SpectrogramLinear
-from render_analyzers import DisplayOnsetDetectionFunction, DisplayWaveform
-from render_analyzers import Display4hzSpeechSegmentation
-
-__all__ = ['Waveform', 'WaveformCentroid', 'WaveformTransparent',
- 'WaveformContourBlack', 'WaveformContourWhite',
- 'SpectrogramLog', 'SpectrogramLinear',
- 'DisplayOnsetDetectionFunction', 'DisplayWaveform',
- 'Display4hzSpeechSegmentation']
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
-from timeside.core import implements, interfacedoc, abstract
+from timeside.core import implements, interfacedoc, abstract, get_processor
from timeside.api import IGrapher
from core import Grapher, Image
-from timeside import analyzer
+from .. import analyzer
class DisplayAnalyzer(Grapher):
# From here define new Grapher based on Analyzers
if analyzer.WITH_AUBIO:
- aubiopitch = analyzer.AubioPitch()
+ aubiopitch = get_processor('aubio_pitch')
DisplayAubioPitch = DisplayAnalyzer.create(analyzer=aubiopitch,
result_id='aubio_pitch.pitch',
grapher_id='grapher_aubio_pitch',
grapher_name='Aubio Pitch')
-odf = analyzer.OnsetDetectionFunction()
+odf = get_processor('odf')
DisplayOnsetDetectionFunction = DisplayAnalyzer.create(analyzer=odf,
result_id='odf',
grapher_id='grapher_odf',
grapher_name='Onset detection function')
-wav = analyzer.Waveform()
+wav = get_processor('waveform_analyzer')
DisplayWaveform = DisplayAnalyzer.create(analyzer=wav,
result_id='waveform_analyzer',
grapher_id='grapher_waveform',
grapher_name='Waveform from Analyzer')
-irit4hz = analyzer.IRITSpeech4Hz()
+irit4hz = get_processor('irit_speech_4hz')
Display4hzSpeechSegmentation = DisplayAnalyzer.create(analyzer=irit4hz,
result_id='irit_speech_4hz.segments',
grapher_id='grapher_irit_speech_4hz_segments',