from timeside.api import IValueAnalyzer
from aubio import pitch
-class AubioPitch(Processor):
- implements(IValueAnalyzer)
+class AubioPitch(Analyzer):
+ implements(IAnalyzer) # TODO check if needed with inheritance
+
+ def __init__(self):
+ self.input_blocksize = 2048
+ self.input_stepsize = self.input_blocksize / 2
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(AubioPitch, self).setup(channels, samplerate, blocksize, totalframes)
- self.win_s = 2048
- self.hop_s = self.win_s / 2
- self.p = pitch("default", self.win_s, self.hop_s, samplerate)
+ self.p = pitch("default", self.input_blocksize, self.input_stepsize,
+ samplerate)
self.p.set_unit("freq")
self.block_read = 0
self.pitches = []
return "pitch values"
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.hop_s):
- #time = self.block_read * self.hop_s * 1. / self.samplerate()
+ for samples in downsample_blocking(frames, self.input_stepsize):
+ #time = self.block_read * self.input_stepsize * 1. / self.samplerate()
self.pitches += [self.p(samples)[0]]
self.block_read += 1
return frames, eod
def results(self):
- container = AnalyzerResultContainer()
- pitch = AnalyzerResult()
+ container = super(AubioPitch, self).results()
+
+ pitch = self.new_result(dataMode='value', resultType='framewise')
+ pitch.idMetadata.id = "aubio_pitch"
+ pitch.idMetadata.name = "f0 (aubio)"
+ pitch.idMetadata.unit = 'Hz'
# Get metadata
- samplerate = self.samplerate()
- blocksize = self.win_s
- stepsize = self.hop_s
-
- # Set metadata
- pitch.metadata = AnalyzerMetadata(id="aubio_pitch",
- name="f0 (aubio)",
- unit='Hz',
- samplerate = samplerate,
- blocksize = blocksize,
- stepsize = stepsize)
+
+ # parameters : None # TODO check with Piem "default" and "freq" in setup
+
+
# Set Data
self.pitches = numpy.array(self.pitches)
- pitch.data = self.pitches
+ pitch.data.data = self.pitches
+ pitch.data.dataType = float
container.add_result(pitch)
return container
from aubio import onset, tempo
-class AubioTemporal(Processor):
+class AubioTemporal(Analyzer):
implements(IAnalyzer)
+ def __init__(self):
+ self.input_blocksize = 1024
+ self.input_stepsize = 256
+
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(AubioTemporal, self).setup(channels, samplerate, blocksize, totalframes)
- self.win_s = 1024
- self.hop_s = 256
- self.o = onset("default", self.win_s, self.hop_s, samplerate)
- self.t = tempo("default", self.win_s, self.hop_s, samplerate)
+ self.o = onset("default", self.input_blocksize, self.input_stepsize, samplerate)
+ self.t = tempo("default", self.input_blocksize, self.input_stepsize, samplerate)
self.block_read = 0
self.onsets = []
self.beats = []
return "%s %s" % (str(self.value), unit())
def process(self, frames, eod=False):
- for samples in downsample_blocking(frames, self.hop_s):
+ for samples in downsample_blocking(frames, self.input_stepsize):
if self.o(samples):
self.onsets += [self.o.get_last_s()]
if self.t(samples):
return frames, eod
def results(self):
- # Get common metadata
- commonAttr = dict(samplerate=self.samplerate(),
- blocksize=self.win_s,
- stepsize=self.hop_s)
- # FIXME : Onsets, beat and onset rate are not frame based Results
- # samplerate, blocksize, etc. are not appropriate here
- # Those might be some kind of "AnalyzerSegmentResults"
-
- # list of onset locations
- onsets = AnalyzerResult()
- # Set metadata
- onsetsAttr = dict(id="aubio_onset",
- name="onsets (aubio)",
- unit="s")
- onsets.metadata = dict(onsetsAttr.items() + commonAttr.items())
- # Set Data
- onsets.data = self.onsets
-
- # list of inter-onset intervals, in beats per minute
- onsetrate = AnalyzerResult()
+
+ container = super(AubioTemporal, self).results()
+
+ #---------------------------------
+ # Onsets
+ #---------------------------------
+ onsets = self.new_result(dataMode='label', resultType='event')
+
+ onsets.idMetadata.id = "aubio_onset"
+ onsets.idMetadata.name = "onsets (aubio)"
+ onsets.idMetadata.unit = 's'
+
+ # Set Data , dataMode='label', resultType='event'
+ # Event = list of (time, labelId)
+ onsets.data.data = [(time,1) for time in self.onsets]
+
+ onsets.labelMetadata.label = {1: 'Onset'}
+
+ container.add_result(onsets)
+
+ #---------------------------------
+ # Onset Rate
+ #---------------------------------
+ onsetrate = self.new_result(dataMode='value', resultType='event')
# Set metadata
- onsetrateAttr = dict(id="aubio_onset_rate",
- name="onset rate (aubio)",
- unit="bpm")
- onsetrate.metadata = dict(onsetrateAttr.items() + commonAttr.items())
- # Set Data
+ onsetrate.idMetadata.id = "aubio_onset_rate"
+ onsetrate.idMetadata.name="onset rate (aubio)"
+ onsetrate.idMetadata.unit="bpm"
+
+ # Set Data , dataMode='value', resultType='event'
+ # Event = list of (time, value)
if len(self.onsets) > 1:
periods = 60. / numpy.diff(self.onsets)
- onsetrate.data = periods
+ onsetrate.data.data = zip(periods,self.onsets[:-1])
else:
- onsetrate.data = []
+ onsetrate.data.data = []
- # list of beat locations
- beats = AnalyzerResult()
+ container.add_result(onsetrate)
+
+ #---------------------------------
+ # Beats
+ #---------------------------------
+ beats = self.new_result(dataMode='label', resultType='segment')
# Set metadata
- beatsAttr = dict(id="aubio_beat",
- name="beats (aubio)",
- unit="s")
- beats.metadata = dict(beatsAttr.items() + commonAttr.items())
- # Set Data
- beats.data = self.beats
-
- # list of inter-beat intervals, in beats per minute
- bpm = AnalyzerResult()
+ beats.idMetadata.id="aubio_beat"
+ beats.idMetadata.name="beats (aubio)"
+ beats.idMetadata.unit="s"
+
+ # Set Data, dataMode='label', resultType='segment'
+ # Segment = list of (time, duration, labelId)
+ if len(self.beats) > 1:
+ duration = numpy.diff(self.beats)
+ duration = numpy.append(duration,duration[-1])
+ beats.data.data = [(time,dur,1) for (time, dur) in zip(self.beats, duration)]
+ else:
+ beats.data.data = []
+ beats.labelMetadata.label = {1: 'Beat'}
+
+ container.add_result(beats)
+
+ #---------------------------------
+ # BPM
+ #---------------------------------
+ bpm = self.new_result(dataMode='value', resultType='segment')
# Set metadata
- bpmAttr = dict(id="aubio_bpm",
- name="bpm (aubio)",
- unit="bpm")
- bpm.metadata = dict(bpmAttr.items() + commonAttr.items())
- # Set Data
+ bpm.idMetadata.id="aubio_bpm"
+ bpm.idMetadata.name="bpm (aubio)"
+ bpm.idMetadata.unit="bpm"
+
+ # Set Data, dataMode='value', resultType='segment'
if len(self.beats) > 1:
periods = 60. / numpy.diff(self.beats)
- bpm.data = periods
+ periods = numpy.append(periods,periods[-1])
+
+ bpm.data.data = zip(self.beats, duration, periods)
+
else:
- bpm.data = []
+ bpm.data.data = []
+
+ container.add_result(bpm)
- return AnalyzerResultContainer([onsets, onsetrate, beats, bpm])
+ return container
# Authors:
# Guillaume Pellerin <yomguy at parisson.com>
# Paul Brossier <piem@piem.org>
+# Thomas Fillon <thomas at parisson.com>
from utils import downsample_blocking
-
+from timeside.core import Processor, implements, interfacedoc
+from timeside.api import IAnalyzer
+from timeside import __version__ as TimeSideVersion
import numpy
numpy_data_types = [
#'float128',
('date', ''),
('version', ''),
('author', '')])
- # HINT :
- # from datetime import datetime
- #date = datetime.now().replace(microsecond=0).isoformat(' ')
+
class AudioMetadata(MetadataObject):
'''
# Define default values
_default_value = OrderedDict([('label', None),
('description', None),
- ('labelType', None)])
+ ('labelType', 'mono')])
class FrameMetadata(MetadataObject):
('blocksize', None),
('stepsize', None)])
-
class AnalyserData(MetadataObject):
'''
Metadata object to handle Frame related Metadata
('dataType', ''),
('dataMode', '')])
+ def __setattr__(self, name, value):
+ # Set Data with the proper type
+ if name == 'data':
+ if value is None:
+ value = []
+ # make a numpy.array out of list
+ if type(value) is list:
+ value = numpy.array(value)
+ # serialize using numpy
+ if type(value) in numpy_data_types:
+ value = value.tolist()
+ if type(value) not in [list, str, int, long, float, complex, type(None)] + numpy_data_types:
+ raise TypeError('AnalyzerResult can not accept type %s' %
+ type(value))
+
+ # TODO : guess dataType from value and set datType with:
+ #super(AnalyserData, self).__setattr__('dataType', dataType)
-class newAnalyzerResults(MetadataObject):
+ super(AnalyserData, self).__setattr__(name, value)
+
+
+class newAnalyzerResult(MetadataObject):
"""
Object that contains the metadata and parameters of an analyzer process
from collections import OrderedDict
# Define default values as an OrderDict
# in order to keep the order of the keys for display
- _default_value = OrderedDict([('data', None),
- ('idMetadata', None),
+ _default_value = OrderedDict([('idMetadata', None),
+ ('data', None),
('audioMetadata', None),
('frameMetadata', None),
('labelMetadata', None),
for res in analyzer_result:
self.add_result(res)
return
- if type(analyzer_result) != AnalyzerResult:
+ if not (isinstance(analyzer_result, AnalyzerResult)
+ or isinstance(analyzer_result, newAnalyzerResult)):
raise TypeError('only AnalyzerResult can be added')
self.results += [analyzer_result]
finally:
h5_file.close() # Close the HDF5 file
- return data_list
\ No newline at end of file
+ return data_list
+
+
+class Analyzer(Processor):
+ '''
+ Generic class for the analyzers
+ '''
+
+ implements(IAnalyzer)
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ super(Analyzer, self).setup(channels, samplerate, blocksize, totalframes)
+
+ # Set default values for output_* attributes
+ # may be overwritten by the analyzer
+ self.output_channels = self.input_channels
+ self.output_samplerate = self.input_samplerate
+ self.output_blocksize = self.input_blocksize
+ self.output_stepsize = self.input_blocksize
+
+ def results(self):
+ container = AnalyzerResultContainer()
+ return container
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Generic analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return ""
+
+ def new_result(self, dataMode='value', resultType='framewise'):
+ '''
+ Create a new result
+
+ Attributes
+ ----------
+ data : MetadataObject
+ idMetadata : MetadataObject
+ audioMetadata : MetadataObject
+ frameMetadata : MetadataObject
+ labelMetadata : MetadataObject
+ parameters : dict
+
+ '''
+
+ from datetime import datetime
+
+ result = newAnalyzerResult()
+ # Automatically write known metadata
+ result.idMetadata = IdMetadata(date=datetime.now().replace(microsecond=0).isoformat(' '),
+ version=TimeSideVersion,
+ author='TimeSide')
+ result.audioMetadata = AudioMetadata(uri=self.mediainfo()['uri'])
+
+ result.data = AnalyserData(dataMode=dataMode)
+
+ if dataMode == 'value':
+ pass
+ elif dataMode == 'label':
+ result.labelMetadata = LabelMetadata()
+ else:
+ # raise ArgError('')
+ pass
+
+ if resultType == 'framewise':
+ result.frameMetadata = FrameMetadata(
+ samplerate=self.output_samplerate,
+ blocksize=self.output_blocksize,
+ stepsize=self.input_stepsize)
+ elif resultType == 'value':
+ # None : handle by data
+ pass
+ elif resultType == 'segment':
+ # None : handle by data
+ pass
+ elif resultType == 'event':
+ # None : handle by data, duration = 0
+ pass
+ else:
+ # raise ArgError('')
+ pass
+
+ return result
\ No newline at end of file
# implementations should always call the parent method
+ def mediainfo():
+ """
+ Information about the media object
+ uri
+ start
+ duration
+ """
class IEncoder(IProcessor):
"""Encoder driver interface. Each encoder is expected to support a specific
implements(IProcessor)
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- self.input_channels = channels
- self.input_samplerate = samplerate
- self.input_blocksize = blocksize
- self.input_totalframes = totalframes
+ def setup(self, channels=None, samplerate=None, blocksize=None,
+ totalframes=None):
+ self.source_channels = channels
+ self.source_samplerate = samplerate
+ self.source_blocksize = blocksize
+ self.source_totalframes = totalframes
+
+ # If empty Set default values for input_* attributes
+ # may be setted by the processor during __init__()
+ if not hasattr(self, 'input_channels'):
+ self.input_channels = self.source_channels
+ if not hasattr(self, 'input_samplerate'):
+ self.input_samplerate = self.source_samplerate
+ if not hasattr(self, 'input_blocksize'):
+ self.input_blocksize = self.source_blocksize
+ if not hasattr(self, 'input_stepsize'):
+ self.input_stepsize = self.source_blocksize
+
# default channels(), samplerate() and blocksize() implementations returns
- # the input characteristics, but processors may change this behaviour by
+ # the source characteristics, but processors may change this behaviour by
# overloading those methods
@interfacedoc
def channels(self):
- return self.input_channels
+ return self.source_channels
@interfacedoc
def samplerate(self):
- return self.input_samplerate
+ return self.source_samplerate
@interfacedoc
def blocksize(self):
- return self.input_blocksize
+ return self.source_blocksize
@interfacedoc
def totalframes(self):
- return self.input_totalframes
+ return self.source_totalframes
@interfacedoc
def process(self, frames, eod):
def release(self):
pass
+ @interfacedoc
+ def mediainfo(self):
+ return self.source_mediainfo
+
def __del__(self):
self.release()
samplerate = last.samplerate(),
blocksize = last.blocksize(),
totalframes = last.totalframes())
+ item.source_mediainfo = source.mediainfo()
last = item
# now stream audio data along the pipe
for item in items:
item.release()
- return self
\ No newline at end of file
+ return self
def release(self):
pass
+ @interfacedoc
+ def mediainfo(self):
+ return dict(uri=self.uri)
+ # TODO : for segment support :
+ #return dict(uri=self.uri, duration=self.input_duration, segment_start=self.segment_start, segment_duration=self.segment_duration)
+
def __del__(self):
self.release()
@interfacedoc
def metadata(self):
# TODO check
- return self.tags
\ No newline at end of file
+ return self.tags
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(VorbisEncoder, self).setup(channels, samplerate, blocksize, totalframes)
-
self.pipe = ''' appsrc name=src
! audioconvert
! vorbisenc