- make nframes() a common IProcessor method, and add an nframes argument to IProcessor.setup()
- release processors from Processor destructor
- remove IDecoder.duration(), redundant with nframes and samplerate
# be raised by MetaProcessor if the id is malformed or not unique amongst
# registered processors.
- def setup(self, channels=None, samplerate=None):
- """Setup this processor to handle incoming data with nchannels at samplerate.
- Also reset the processor internal state so that it can be reused."""
+ def setup(self, channels=None, samplerate=None, nframes=None):
+ """Allocate internal resources and reset state, so that this processor is
+ ready for a new run.
+
+ The channels, samplerate and/or nframes arguments may be required by
+ processors which accept input. An error will occur if any of
+ these arguments is passed to an output-only processor such as a decoder.
+ """
def channels(self):
"""Number of channels in the data returned by process(). May be different from
"""Samplerate of the data returned by process(). May be different from
the samplerate passed to setup()"""
+ def nframes():
+ """The total number of frames that this processor can output, or None if
+ the duration is unknown."""
+
def process(self, frames=None, eod=False):
"""Process input frames and return a (output_frames, eod) tuple.
Both input and output frames are numpy arrays, where columns are
"""Release resources owned by this processor. The processor cannot
be used anymore after calling this method."""
+ # implementations should always call the parent method
+
class IEncoder(IProcessor):
"""Encoder driver interface. Each encoder is expected to support a specific
format."""
True when end-of-data is reached."""
# implementation: the constructor must always accept the output argument. It may
- # accepts extra arguments such as bitrate, depth, etc.., but these must be optionnal,
- # that is have a default value.
+ # accept extra arguments such as bitrate, depth, etc.., but these must be optionnal
@staticmethod
def format():
"""Create a new decoder for filename."""
# implementation: additional optionnal arguments are allowed
- def duration():
- """Return the duration in seconds"""
-
- def nframes():
- """Return the number of frames"""
-
def format():
"""Return a user-friendly file format string"""
class IGrapher(IProcessor):
"""Media item visualizer driver interface"""
+ # implementation: graphers which need to know the total number of frames
+ # should raise an exception in setup() if the nframesĀ argument is None
+
def __init__(self, width, height):
"""Create a new grapher. width and height are generally
in pixels but could be something else for eg. svg rendering, etc.. """
- # implementation: additional optionnal arguments (such as the total number
- # of frames, etc...) are allowed
+ # implementation: additional optionnal arguments are allowed
@staticmethod
def name():
from timeside.api import IProcessor
from timeside.exceptions import Error, ApiError
import re
+import numpy
__all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
- 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe']
+ 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe',
+ 'FixedSizeInputAdapter']
_processors = {}
implements(IProcessor)
@interfacedoc
- def setup(self, channels=None, samplerate=None):
+ def setup(self, channels=None, samplerate=None, nframes=None):
self.input_channels = channels
self.input_samplerate = samplerate
+ self.input_nframes = nframes
+ # default channels(), samplerate() and nframes() implementations returns
+ # the input characteristics, but processors may change this behaviour by
+ # overloading those methods
@interfacedoc
def channels(self):
- # default implementation returns the input channels, but processors may change
- # this behaviour by overloading this method
return self.input_channels
@interfacedoc
def samplerate(self):
- # default implementation returns the input samplerate, but processors may change
- # this behaviour by overloading this method
return self.input_samplerate
@interfacedoc
- def process(self, frames):
- return frames
+ def nframes(self):
+ return self.input_nframes
+
+ @interfacedoc
+ def process(self, frames, eod):
+ return frames, eod
@interfacedoc
def release(self):
pass
+ def __del__(self):
+ self.release()
+
def __or__(self, other):
return ProcessPipe(self, other)
+class FixedSizeInputAdapter(object):
+ """Utility to make it easier to write processors which require fixed
+ size input buffers."""
+
+ def __init__(self, buffer_size, channels, pad=False):
+ """Construct a new adapter: buffer_size is the desired buffer size in frames,
+ channels the number of channels, and pad indicates whether the last block should
+ be padded with zero. """
+
+ self.buffer = numpy.empty((buffer_size, channels))
+ self.buffer_size = buffer_size
+ self.len = 0
+ self.pad = pad
+
+ def nframes(self, input_nframes):
+ """Return the total number of frames that this adapter will output according to the
+ input_nframes argument"""
+
+ nframes = input_nframes
+ if self.pad:
+ mod = input_nframes % self.buffer_size
+ if mod:
+ nframes += self.buffer_size - mod
+
+ return nframes
+
+
+ def process(self, frames, eod):
+ """Returns an iterator over tuples of the form (buffer, eod) where buffer is a
+ fixed-sized block of data, and eod indicates whether this is the last block.
+ In case padding is deactivated the last block may be smaller than the buffer size.
+ """
+ src_index = 0
+ remaining = len(frames)
+
+ while remaining:
+ space = self.buffer_size - self.len
+ copylen = remaining < space and remaining or space
+ self.buffer[self.len:self.len + copylen] = frames[src_index:src_index + copylen]
+ remaining -= copylen
+ src_index += copylen
+ self.len += copylen
+
+ if self.len == self.buffer_size:
+ yield self.buffer, False
+ self.len = 0
+
+ if eod and self.len:
+ block = self.buffer
+ if self.pad:
+ self.buffer[self.len:self.buffer_size] = 0
+ else:
+ block = self.buffer[0:self.len]
+
+ yield block, True
+ self.len = 0
+
def processors(interface=IProcessor, recurse=True):
"""Returns the processors implementing a given interface and, if recurse,
any of the descendants of this interface."""
def __init__(self, *others):
self.processors = []
- for p in others:
- self |= p
+ self |= others
def __or__(self, other):
return ProcessPipe(self, other)
elif isinstance(other, ProcessPipe):
self.processors.extend(other.processors)
else:
- raise Error("Piped item is neither a Processor nor a ProcessPipe")
+ try:
+ iter(other)
+ except TypeError:
+ raise Error("Can not add this type of object to a pipe: %s", str(other))
+
+ for item in other:
+ self |= item
return self
def run(self):
"""Setup/reset all processors in cascade and stream audio data along
- the pipe"""
+ the pipe. Also returns the pipe itself."""
source = self.processors[0]
items = self.processors[1:]
source.setup()
last = source
for item in items:
- item.setup(last.channels(), last.samplerate())
+ item.setup(last.channels(), last.samplerate(), last.nframes())
last = item
# now stream audio data along the pipe
for item in items:
frames, eod = item.process(frames, eod)
+ return self
+
self.position = 0
@interfacedoc
- def setup(self, channels=None, samplerate=None):
- Processor.setup(self, channels, samplerate)
+ def setup(self):
+ super(FileDecoder, self).setup()
if self.position != 0:
self.file.seek(0);
self.position = 0
def release(self):
+ super(FileDecoder, self).release()
if self.file:
self.file.close()
self.file = None
def samplerate(self):
return self.file.samplerate
- @interfacedoc
- def duration(self):
- return self.file.nframes / self.file.samplerate
-
@interfacedoc
def nframes(self):
return self.file.nframes
implements(IValueAnalyzer)
@interfacedoc
- def setup(self, channels=None, samplerate=None):
- Processor.setup(self, channels, samplerate)
+ def setup(self, channels=None, samplerate=None, nframes=None):
+ super(MaxLevel, self).setup(channels, samplerate, nframes)
self.max_value = 0
@staticmethod
raise Exception("Streaming not supported")
@interfacedoc
- def setup(self, channels=None, samplerate=None):
- Processor.setup(self, channels, samplerate)
+ def setup(self, channels=None, samplerate=None, nframes=None):
+ super(WavEncoder, self).setup(channels, samplerate, nframes)
if self.file:
self.file.close()
self.color_scheme = scheme
@interfacedoc
- def setup(self, channels=None, samplerate=None):
- Processor.setup(self, channels, samplerate)
+ def setup(self, channels=None, samplerate=None, nframes=None):
+ super(Waveform, self).setup(channels, samplerate, nframes)
if self.image:
self.image.close()
self.image = WaveformImage(self.width, self.height, self.nframes)
#self.image.save()
#return self.image
+class Duration(Processor):
+ """A rather useless duration analyzer. Its only purpose is to test the
+ nframes characteristic."""
+ implements(IValueAnalyzer)
+
+ @interfacedoc
+ def setup(self, channels, samplerate, nframes):
+ if not nframes:
+ raise Exception("nframes argument required")
+ super(Duration, self).setup(channels, samplerate, nframes)
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "test_duration"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Duration analyzer"
+
+ @staticmethod
+ @interfacedoc
+ def unit():
+ return "seconds"
+
+ def result(self):
+ return self.input_nframes / float(self.input_samplerate)
+
decoder.setup()
nchannels = decoder.channels()
samplerate = decoder.samplerate()
+nframes = decoder.nframes()
analyzer.setup(nchannels, samplerate)
print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % (
- decoder.duration(), decoder.nframes(), nchannels, samplerate, decoder.resolution())
+ nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution())
while True:
frames, eod = decoder.process()
max_level = analyzer.result()
print "Max level: %f" % max_level
-analyzer.release()
-
destination = "normalized.wav"
Encoder = examples.WavEncoder
print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination)
if eod:
break
-decoder.release()
-effect.release()
-encoder.release()
print "Normalizing %s" % source
decoder = examples.FileDecoder(source)
maxlevel = examples.MaxLevel()
+duration = examples.Duration()
-(decoder | maxlevel).run()
+(decoder | maxlevel | duration).run()
gain = 1
if maxlevel.result() > 0:
print "input maxlevel: %f" % maxlevel.result()
print "gain: %f" % gain
+print "duration: %f %s" % (duration.result(), duration.unit())
gain = examples.Gain(gain)
encoder = examples.WavEncoder("normalized.wav")
--- /dev/null
+from timeside.core import FixedSizeInputAdapter
+from sys import stdout
+import numpy
+
+def test(adapter, data, eod, expected):
+ expected.reverse()
+ for buffer, _eod in adapter.process(data, eod):
+ a = expected.pop()
+ if not numpy.array_equiv(buffer, a):
+ raise Exception("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a)))
+
+ if expected:
+ raise Exception("trailing expected data: %s" % expected)
+
+ stdout.write(".")
+
+data = numpy.arange(44).reshape(2,22).transpose()
+
+adapter = FixedSizeInputAdapter(4, 2)
+stdout.write("%s simple test" % adapter.__class__.__name__)
+
+expected = len(data)
+actual = adapter.nframes(len(data))
+if actual != expected:
+ raise Exception("%d != %d nframes", (actual, expected))
+
+test(adapter, data[0:1], False, [])
+test(adapter, data[1:5], False, [data[0:4]])
+test(adapter, data[5:12], False, [data[4:8], data[8:12]])
+test(adapter, data[12:13], False, [])
+test(adapter, data[13:14], False, [])
+test(adapter, data[14:18], False, [data[12:16]])
+test(adapter, data[18:20], False, [data[16:20]])
+test(adapter, data[20:21], False, [])
+test(adapter, data[21:22], True, [data[20:22]])
+stdout.write(" OK\n")
+
+adapter = FixedSizeInputAdapter(4, 2, pad=True)
+stdout.write("%s padding test" % adapter.__class__.__name__)
+
+expected = len(data) + 2
+actual = adapter.nframes(len(data))
+if actual != expected:
+ raise Exception("%d != %d nframes", (actual, expected))
+
+test(adapter, data[0:21], False, [data[0:4], data[4:8], data[8:12], data[12:16], data[16:20]])
+test(adapter, data[21:22], True, [[
+ [20, 42],
+ [21, 43],
+ [0, 0],
+ [0, 0]
+]])
+stdout.write(" OK\n")