from core import *
+from metadata import Metadata
import decode
import encode
import analyze
from timeside.component import Interface
class IProcessor(Interface):
- """Base processor interface"""
+ """Common processor interface"""
@staticmethod
def id():
- """Return a short alphanumeric, lower-case string which uniquely
- identify this processor. Only letters and digits are allowed.
- An exception will be raised by MetaProcessor if the id is malformed or
- not unique amongst registered processors.
+ """Short alphanumeric, lower-case string which uniquely identify this
+ processor, suitable for use as an HTTP/GET argument value, in filenames,
+ etc..."""
+
+ # implementation: only letters and digits are allowed. An exception will
+ # be raised by MetaProcessor if the id is malformed or not unique amongst
+ # registered processors.
+
+ def buffersize(self):
+ """Buffersize this processor operates on, that is; the number of frames
+ expected/returned by process()."""
+
+ def set_buffersize(self, value):
+ """Set the buffer size used by this processor."""
+
+ def set_input_format(self, nchannels, samplerate):
+ """Set the format of input data passed to process(). It is required to call
+ this method before calling process(), except for output-only processors."""
+
+ def input_format(self):
+ """Return a tuple of the form (nchannels, samplerate) indicating the
+ format of the data expected by process(), with the same values as the
+ nchannels and samplerate arguments passed to the constructor."""
+
+ def output_format(self):
+ """Return a tuple of the form (nchannels, samplerate) indicating the
+ format of the data returned by process(). These may differ from the values
+ passed to the constructor (ie: stereo-to-mono effect, samplerate converter,
+ etc...)"""
+
+ def process(self, frames=None):
+ """Process buffersize input frames and return buffersize output frames, both
+ as numpy arrays, where columns are channels. An input/output of less than
+ buffersize frames (or None) means that the end-of-data has been reached (the
+ caller must ensure that this happens).
- Typically this identifier is likely to be used during HTTP requests
- and be passed as a GET parameter. Thus it should be as short as possible."""
+ Output-only processors (such as decoders) will raise an exception if the
+ frames argument is not None. All processors (even encoders) return data,
+ even if that means returning the input unchanged.
+
+ Warning: it is required to call set_input_format() before this method
+ for processors which accept input."""
class IEncoder(IProcessor):
"""Encoder driver interface. Each encoder is expected to support a specific
format."""
- def __init__(self, output, nchannels, samplerate):
- """The constructor must always accept the output, nchannels and samplerate
- arguments. It may accepts extra arguments such as bitrate, depth, etc..,
- but these must be optionnal, that is have a default value.
-
- The output must either be a filepath or a callback function/method for
- for the streaming mode. The callback must accept one argument which is
- block of binary data.
- """
+ def __init__(self, output):
+ """Create a new encoder. output can either be a filename or a python callback
+ function/method for streaming mode.
+
+ The streaming callback prototype is: callback(data, eod)
+ Where data is a block of binary data of an undetermined size, and eod
+ True when end-of-data is reached."""
+
+ # implementation: the constructor must always accept the output argument. It may
+ # accepts extra arguments such as bitrate, depth, etc.., but these must be optionnal,
+ # that is have a default value.
@staticmethod
def format():
"""Return the mime type corresponding to this encode format"""
def set_metadata(self, metadata):
- """metadata is a tuple containing tuples for each descriptor return by
- the dc.Ressource of the item, in the model order :
- ((name1, value1),(name2, value2),(name1, value3), ...)"""
-
- def update(self):
- """Updates the metadata into the file passed as the output argument
- to the constructor. This method can't be called in streaming
- mode."""
-
- def process(self, frames):
- """Encode buffersize frames passed as a numpy array, where columns are channels.
+ """Set the metadata to be embedded in the encoded output.
- A number of frames less than buffersize means that the end of the data
- has been reached, and that the encoder should close the output file, stream,
- etc...
-
- In streaming mode the callback passed to the constructor is called whenever
- a block of encoded data is ready."""
+ In non-streaming mode, this method updates the metadata directly into the
+ output file, without re-encoding the audio data, provided this file already
+ exists.
+
+ It isn't required to call this method, but if called, it must be before
+ process()."""
class IDecoder(IProcessor):
"""Decoder driver interface. Decoders are different of encoders in that
export any static method, all informations are dynamic."""
def __init__(self, filename):
- """Create a new decoder for filename. Implementations of this interface
- may accept optionnal arguments after filename."""
-
- def channels():
- """Return the number of channels"""
-
- def samplerate():
- """Return the samplerate"""
+ """Create a new decoder for filename."""
+ # implementation: additional optionnal arguments are allowed
def duration():
"""Return the duration in seconds"""
+ def nframes():
+ """Return the number of frames"""
+
def format():
"""Return a user-friendly file format string"""
def resolution():
"""Return the sample depth"""
- def process(self):
- """Return a generator over the decoded data, as numpy arrays, where columns are
- channels, each array containing buffersize frames or less if the end of file
- has been reached."""
+ def metadata(self):
+ """Return the metadata embedded into the encoded stream, if any."""
class IGrapher(IProcessor):
"""Media item visualizer driver interface"""
def __init__(self, width, height):
- """Create a new grapher. Implementations of this interface
- may accept optionnal arguments. width and height are generally
+ """Create a new grapher. width and height are generally
in pixels but could be something else for eg. svg rendering, etc.."""
+ # implementation: additional optionnal arguments are allowed
@staticmethod
def name():
"""Return the graph name, such as "Waveform", "Spectral view",
etc.. """
+ def set_nframes(self, nframes):
+ """Duration in frames of the input data. Must be called before process()."""
+
def set_colors(self, background=None, scheme=None):
"""Set the colors used for image generation. background is a RGB tuple,
and scheme a a predefined color theme name"""
pass
- def process(self, frames):
- """Process a block of buffersize frames passed as a numpy array, where
- columns are channels. Passing less than buffersize frames means that
- the end of data has been reached"""
-
def render(self):
"""Return a PIL Image object visually representing all of the data passed
by repeatedly calling process()"""
for each block of data (as in Vamp)."""
def __init__(self):
- """Create a new analyzer. Implementations of this interface
- may accept optionnal arguments."""
+ """Create a new analyzer."""
+ # implementation: additional optionnal arguments are allowed
@staticmethod
def name():
def unit():
"""Return the unit of the data such as "dB", "seconds", etc... """
- def process(self, frames):
- """Process a block of buffersize frames passed as a numpy array, where
- columns are channels. Passing less than buffersize frames means that
- the end of data has been reached"""
-
class IValueAnalyzer(IAnalyzer):
"""Interface for analyzers which return a single numeric value from result()"""
"""Return the final result of the analysis performed over the data passed by
repeatedly calling process()"""
+class IEffect(IProcessor):
+ """Effect processor interface"""
+
+ def __init__(self):
+ """Create a new effect."""
+ # implementation: additional optionnal arguments are allowed
+
+ @staticmethod
+ def name():
+ """Return the effect name"""
+
# implementing a given interface are not automatically considered to implement this
# interface too.
-__all__ = ['Component', 'MetaComponent', 'implements', 'Interface', 'implementations']
+__all__ = ['Component', 'MetaComponent', 'implements', 'abstract',
+ 'interfacedoc', 'Interface', 'implementations', 'ComponentError']
class Interface(object):
"""Marker base class for interfaces."""
def implements(*interfaces):
"""Registers the interfaces implemented by a component when placed in the
class header"""
- _implements.extend(interfaces)
+ MetaComponent.implements.extend(interfaces)
-def implementations(interface, recurse=True):
+def abstract():
+ """Declare a component as abstract when placed in the class header"""
+ MetaComponent.abstract = True
+
+def implementations(interface, recurse=True, abstract=False):
"""Returns the components implementing interface, and if recurse, any of
- the descendants of interface."""
+ the descendants of interface. If abstract is True, also return the
+ abstract implementations."""
result = []
- find_implementations(interface, recurse, result)
+ find_implementations(interface, recurse, abstract, result)
return result
-_implementations = []
-_implements = []
+def interfacedoc(func):
+ if isinstance(func, staticmethod):
+ raise ComponentError("@interfacedoc can't handle staticmethod (try to put @staticmethod above @interfacedoc)")
+
+ if not func.__doc__:
+ func.__doc__ = "@interfacedoc"
+ func._interfacedoc = True
+ return func
class MetaComponent(type):
"""Metaclass of the Component class, used mainly to register the interface
declared to be implemented by a component."""
+
+ implementations = []
+ implements = []
+ abstract = False
+
def __new__(cls, name, bases, d):
new_class = type.__new__(cls, name, bases, d)
- if _implements:
- for i in _implements:
- _implementations.append((i, new_class))
- del _implements[:]
+
+ # Register implementations
+ if MetaComponent.implements:
+ for i in MetaComponent.implements:
+ MetaComponent.implementations.append({
+ 'interface': i,
+ 'class': new_class,
+ 'abstract': MetaComponent.abstract})
+
+ # Propagate @interfacedoc
+ for name in new_class.__dict__:
+ member = new_class.__dict__[name]
+ if isinstance(member, staticmethod):
+ member = getattr(new_class, name)
+
+ if member.__doc__ == "@interfacedoc":
+ if_member = None
+ for i in MetaComponent.implements:
+ if hasattr(i, name):
+ if_member = getattr(i, name)
+ if not if_member:
+ raise ComponentError("@interfacedoc: %s.%s: no such member in implemented interfaces: %s"
+ % (new_class.__name__, name, str(MetaComponent.implements)))
+ member.__doc__ = if_member.__doc__
+
+ MetaComponent.implements = []
+ MetaComponent.abstract = False
+
return new_class
class Component(object):
if item not in list1:
list1.append(item)
-def find_implementations(interface, recurse, result):
+def find_implementations(interface, recurse, abstract, result):
"""Find implementations of an interface or of one of its descendants and
extend result with the classes found."""
- for i, cls in _implementations:
- if (i == interface):
- extend_unique(result, [cls])
+ for item in MetaComponent.implementations:
+ if (item['interface'] == interface and (abstract or not item['abstract'])):
+ extend_unique(result, [item['class']])
if recurse:
subinterfaces = interface.__subclasses__()
if subinterfaces:
for i in subinterfaces:
- find_implementations(i, recurse, result)
+ find_implementations(i, recurse, abstract, result)
+class ComponentError(Exception):
+ pass
from timeside.exceptions import Error, ApiError
import re
-__all__ = ['Processor', 'MetaProcessor', 'implements', 'processors',
- 'get_processor']
+__all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
+ 'interfacedoc', 'processors', 'get_processor']
_processors = {}
"""Metaclass of the Processor class, used mainly for ensuring that processor
id's are wellformed and unique"""
- valid_id = re.compile("^[a-z][a-z0-9]*$")
+ valid_id = re.compile("^[a-z][_a-z0-9]*$")
def __new__(cls, name, bases, d):
new_class = MetaComponent.__new__(cls, name, bases, d)
"""Base component class of all processors"""
__metaclass__ = MetaProcessor
+ abstract()
+ implements(IProcessor)
+
DEFAULT_BUFFERSIZE = 0x10000
MIN_BUFFERSIZE = 0x1000
__buffersize = DEFAULT_BUFFERSIZE
+ @interfacedoc
def buffersize(self):
- """Get the current buffer size"""
- return __buffersize
+ return self.__buffersize
def set_buffersize(self, value):
"""Set the buffer size used by this processor. The buffersize must be a
self.__buffersize = value
+ @interfacedoc
+ def set_input_format(self, nchannels=None, samplerate=None):
+ self.input_channels = nchannels
+ self.input_samplerate = samplerate
+
+ @interfacedoc
+ def input_format(self):
+ return (self.input_channels, self.input_samplerate)
+
+ @interfacedoc
+ def output_format(self):
+ return (self.input_channels, self.input_samplerate)
+
+ @interfacedoc
+ def process(self, frames):
+ return frames
def processors(interface=IProcessor, recurse=True):
"""Returns the processors implementing a given interface and, if recurse,
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2007-2009 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+class Metadata(object):
+ pass
+
+
--- /dev/null
+from timeside.core import Processor, implements, interfacedoc
+from timeside.api import *
+from timeside import Metadata
+from scikits import audiolab
+import numpy
+
+class AudiolabDecoder(Processor):
+ """A simple audiolab-based example decoder"""
+ implements(IDecoder)
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_audiolabdec"
+
+ @interfacedoc
+ def __init__(self, filename):
+ self.file = audiolab.sndfile(filename, 'read')
+ self.position = 0
+
+ @interfacedoc
+ def output_format(self):
+ return (self.file.get_channels(), self.file.get_samplerate())
+
+ @interfacedoc
+ def duration(self):
+ return self.file.get_nframes() / self.file.get_samplerate()
+
+ @interfacedoc
+ def nframes(self):
+ return self.file.get_nframes()
+
+ @interfacedoc
+ def format(self):
+ return self.file.get_file_format()
+
+ @interfacedoc
+ def encoding(self):
+ return self.file.get_encoding()
+
+ @interfacedoc
+ def resolution(self):
+ resolution = None
+ encoding = self.file.get_encoding()
+
+ if encoding == "pcm8":
+ resolution = 8
+ elif encoding == "pcm16":
+ resolution = 16
+ elif encoding == "pcm32":
+ resolution = 32
+
+ return resolution
+
+ @interfacedoc
+ def metadata(self):
+ #TODO
+ return Metadata()
+
+ @interfacedoc
+ def process(self, frames=None):
+ if frames:
+ raise Exception("Decoder doesn't accept input frames")
+
+ buffersize = self.buffersize()
+
+ # Need this because audiolab raises a bogus exception when asked
+ # to read passed the end of the file
+ toread = self.nframes() - self.position
+ if toread > buffersize:
+ toread = buffersize
+
+ frames = self.file.read_frames(toread)
+
+ self.position += toread
+
+ if toread < buffersize:
+ self.file.close()
+
+ return frames
+
+class MaxLevelAnalyzer(Processor):
+ implements(IValueAnalyzer)
+
+ @interfacedoc
+ def __init__(self):
+ self.max_value = 0
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_maxlevel"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Max level test analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ # power? amplitude?
+ return ""
+
+ def process(self, frames=None):
+ max = frames.max()
+ if max > self.max_value:
+ self.max_value = max
+
+ return frames
+
+ def result(self):
+ return self.max_value
+
+class GainEffect(Processor):
+ implements(IEffect)
+
+ @interfacedoc
+ def __init__(self, gain=1.0):
+ self.gain = gain
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_gain"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Gain test effect"
+
+ def process(self, frames=None):
+ return numpy.multiply(frames, self.gain)
+
+class WavEncoder(Processor):
+ implements(IEncoder)
+
+ def __init__(self, output):
+ self.file = None
+ if isinstance(output, basestring):
+ self.filename = output
+ else:
+ raise Exception("Streaming not supported")
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_wavenc"
+
+ @staticmethod
+ @interfacedoc
+ def description():
+ return "Hackish wave encoder"
+
+ @staticmethod
+ @interfacedoc
+ def file_extension():
+ return "wav"
+
+ @staticmethod
+ @interfacedoc
+ def mime_type():
+ return "audio/x-wav"
+
+ @interfacedoc
+ def set_metadata(self, metadata):
+ #TODO
+ pass
+
+ @interfacedoc
+ def process(self, frames):
+ if not self.file:
+ # Can't open the file in constructor because input_channels and input_samplerate
+ # aren't available before set_input_format() has been called
+ info = audiolab.formatinfo("wav", "pcm16")
+ self.file = audiolab.sndfile(self.filename, "write", format=info, channels=self.input_channels,
+ samplerate=self.input_samplerate)
+ self.file.write_frames(frames)
+ if len(frames) < self.buffersize():
+ self.file.close()
+
+ return frames
--- /dev/null
+from timeside.tests.api import examples
+import os
+
+source=os.path.dirname(__file__) + "../samples/sweep_source.wav"
+
+Decoder = examples.AudiolabDecoder
+print "Creating decoder with id=%s for: %s" % (Decoder.id(), source)
+decoder = Decoder(source)
+nchannels, samplerate = decoder.output_format()
+print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
+ decoder.duration(), decoder.nframes(), nchannels, samplerate, decoder.resolution())
+
+analyzer = examples.MaxLevelAnalyzer()
+analyzer.set_input_format(nchannels, samplerate)
+
+while True:
+ frames = decoder.process()
+ analyzer.process(frames)
+ if len(frames) < decoder.buffersize():
+ break
+
+max_level = analyzer.result()
+print "Max level: %f" % max_level
+
+destination = "normalized.wav"
+Encoder = examples.WavEncoder
+print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
+encoder = Encoder(destination)
+decoder = Decoder(source)
+
+nchannels, samplerate = decoder.output_format()
+encoder.set_input_format(nchannels, samplerate)
+
+gain = 1
+if max_level > 0:
+ gain = 0.9 / max_level
+
+effect = examples.GainEffect(gain)
+
+print "Applying effect id=%s with gain=%f" % (effect.id(), gain)
+
+while True:
+ frames = decoder.process()
+ encoder.process(effect.process(frames))
+ if len(frames) < decoder.buffersize():
+ break
+
-from timeside.core import *
+from timeside.component import *
from sys import stdout
class I1(Interface):
class I9(I8):
pass
+class I10(Interface):
+ def test(self):
+ """testdoc"""
+
+ @staticmethod
+ def teststatic(self):
+ """teststaticdoc"""
+
+class I11(Interface):
+ pass
+
class C1(Component):
implements(I1)
class C9(Component):
implements(I8, I9)
+class C10(Component):
+ implements(I10)
+
+ @interfacedoc
+ def test(self):
+ pass
+
+ @staticmethod
+ @interfacedoc
+ def teststatic(self):
+ pass
+
+class C11(Component):
+ abstract()
+ implements(I11)
+
def list_equals(list1, list2):
if len(list1) != len(list2):
return False
def test(desc, actual, expected):
stdout.write(desc + ": ")
- if list_equals(actual, expected):
+ equals = False
+ if isinstance(actual, list) and isinstance(expected, list):
+ equals = list_equals(actual, expected)
+ else:
+ equals = (actual == expected)
+
+ if equals:
stdout.write("OK\n")
else:
stdout.write("FAILED\n")
test("Test whether a component implements an interface's parent", implementations(I5), [C5])
test("Test that a component doesn't implement the interface implemented by its parent", implementations(I7), [C6])
test("Test implementation redundancy across inheritance", implementations(I8), [C8, C9])
+test("Test abstract implementation 1/2", implementations(I11), [])
+test("Test abstract implementation 2/2", implementations(I11, abstract=True), [C11])
+test("Test @interfacedoc", C10.test.__doc__, "testdoc")
+test("Test @interfacedoc on static method", C10.teststatic.__doc__, "teststaticdoc")
+stdout.write("Test @interfacedoc on static method (decorators reversed): ")
+
+try:
+
+ class BogusDoc1(Component):
+ implements(I10)
+
+ @interfacedoc
+ @staticmethod
+ def teststatic(self):
+ pass
+
+ stdout.write("FAILED\n")
+
+except ComponentError:
+ stdout.write("OK\n")
+
+stdout.write("Test @interfacedoc with unexistant method in interface: ")
+try:
+ class BogusDoc2(Component):
+ implements(I10)
+
+ @interfacedoc
+ def nosuchmethod(self):
+ pass
+
+ stdout.write("FAILED\n")
+
+except ComponentError:
+ stdout.write("OK\n")