From ebdf90cb5f0c6d1b505a14f33fcd24681a1f61ac Mon Sep 17 00:00:00 2001 From: Olivier Guilyardi Date: Wed, 17 Feb 2010 22:05:23 +0000 Subject: [PATCH] - add FixedSizeInputAdapter utility - make nframes() a common IProcessor method, and add an nframes argument to IProcessor.setup() - release processors from Processor destructor - remove IDecoder.duration(), redundant with nframes and samplerate --- api.py | 32 ++++++++----- core.py | 99 ++++++++++++++++++++++++++++++++++----- tests/api/examples.py | 50 +++++++++++++++----- tests/api/test_lolevel.py | 8 +--- tests/api/test_pipe.py | 4 +- tests/testinputadapter.py | 53 +++++++++++++++++++++ 6 files changed, 201 insertions(+), 45 deletions(-) create mode 100644 tests/testinputadapter.py diff --git a/api.py b/api.py index a4a5601..30a5f10 100644 --- a/api.py +++ b/api.py @@ -34,9 +34,14 @@ class IProcessor(Interface): # be raised by MetaProcessor if the id is malformed or not unique amongst # registered processors. - def setup(self, channels=None, samplerate=None): - """Setup this processor to handle incoming data with nchannels at samplerate. - Also reset the processor internal state so that it can be reused.""" + def setup(self, channels=None, samplerate=None, nframes=None): + """Allocate internal resources and reset state, so that this processor is + ready for a new run. + + The channels, samplerate and/or nframes arguments may be required by + processors which accept input. An error will occur if any of + these arguments is passed to an output-only processor such as a decoder. + """ def channels(self): """Number of channels in the data returned by process(). May be different from @@ -46,6 +51,10 @@ class IProcessor(Interface): """Samplerate of the data returned by process(). May be different from the samplerate passed to setup()""" + def nframes(): + """The total number of frames that this processor can output, or None if + the duration is unknown.""" + def process(self, frames=None, eod=False): """Process input frames and return a (output_frames, eod) tuple. Both input and output frames are numpy arrays, where columns are @@ -62,6 +71,8 @@ class IProcessor(Interface): """Release resources owned by this processor. The processor cannot be used anymore after calling this method.""" + # implementations should always call the parent method + class IEncoder(IProcessor): """Encoder driver interface. Each encoder is expected to support a specific format.""" @@ -75,8 +86,7 @@ class IEncoder(IProcessor): True when end-of-data is reached.""" # implementation: the constructor must always accept the output argument. It may - # accepts extra arguments such as bitrate, depth, etc.., but these must be optionnal, - # that is have a default value. + # accept extra arguments such as bitrate, depth, etc.., but these must be optionnal @staticmethod def format(): @@ -118,12 +128,6 @@ class IDecoder(IProcessor): """Create a new decoder for filename.""" # implementation: additional optionnal arguments are allowed - def duration(): - """Return the duration in seconds""" - - def nframes(): - """Return the number of frames""" - def format(): """Return a user-friendly file format string""" @@ -139,12 +143,14 @@ class IDecoder(IProcessor): class IGrapher(IProcessor): """Media item visualizer driver interface""" + # implementation: graphers which need to know the total number of frames + # should raise an exception in setup() if the nframes argument is None + def __init__(self, width, height): """Create a new grapher. width and height are generally in pixels but could be something else for eg. svg rendering, etc.. """ - # implementation: additional optionnal arguments (such as the total number - # of frames, etc...) are allowed + # implementation: additional optionnal arguments are allowed @staticmethod def name(): diff --git a/core.py b/core.py index 9a6f449..6606e8e 100644 --- a/core.py +++ b/core.py @@ -21,9 +21,11 @@ from timeside.component import * from timeside.api import IProcessor from timeside.exceptions import Error, ApiError import re +import numpy __all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract', - 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe'] + 'interfacedoc', 'processors', 'get_processor', 'ProcessPipe', + 'FixedSizeInputAdapter'] _processors = {} @@ -56,33 +58,97 @@ class Processor(Component): implements(IProcessor) @interfacedoc - def setup(self, channels=None, samplerate=None): + def setup(self, channels=None, samplerate=None, nframes=None): self.input_channels = channels self.input_samplerate = samplerate + self.input_nframes = nframes + # default channels(), samplerate() and nframes() implementations returns + # the input characteristics, but processors may change this behaviour by + # overloading those methods @interfacedoc def channels(self): - # default implementation returns the input channels, but processors may change - # this behaviour by overloading this method return self.input_channels @interfacedoc def samplerate(self): - # default implementation returns the input samplerate, but processors may change - # this behaviour by overloading this method return self.input_samplerate @interfacedoc - def process(self, frames): - return frames + def nframes(self): + return self.input_nframes + + @interfacedoc + def process(self, frames, eod): + return frames, eod @interfacedoc def release(self): pass + def __del__(self): + self.release() + def __or__(self, other): return ProcessPipe(self, other) +class FixedSizeInputAdapter(object): + """Utility to make it easier to write processors which require fixed + size input buffers.""" + + def __init__(self, buffer_size, channels, pad=False): + """Construct a new adapter: buffer_size is the desired buffer size in frames, + channels the number of channels, and pad indicates whether the last block should + be padded with zero. """ + + self.buffer = numpy.empty((buffer_size, channels)) + self.buffer_size = buffer_size + self.len = 0 + self.pad = pad + + def nframes(self, input_nframes): + """Return the total number of frames that this adapter will output according to the + input_nframes argument""" + + nframes = input_nframes + if self.pad: + mod = input_nframes % self.buffer_size + if mod: + nframes += self.buffer_size - mod + + return nframes + + + def process(self, frames, eod): + """Returns an iterator over tuples of the form (buffer, eod) where buffer is a + fixed-sized block of data, and eod indicates whether this is the last block. + In case padding is deactivated the last block may be smaller than the buffer size. + """ + src_index = 0 + remaining = len(frames) + + while remaining: + space = self.buffer_size - self.len + copylen = remaining < space and remaining or space + self.buffer[self.len:self.len + copylen] = frames[src_index:src_index + copylen] + remaining -= copylen + src_index += copylen + self.len += copylen + + if self.len == self.buffer_size: + yield self.buffer, False + self.len = 0 + + if eod and self.len: + block = self.buffer + if self.pad: + self.buffer[self.len:self.buffer_size] = 0 + else: + block = self.buffer[0:self.len] + + yield block, True + self.len = 0 + def processors(interface=IProcessor, recurse=True): """Returns the processors implementing a given interface and, if recurse, any of the descendants of this interface.""" @@ -102,8 +168,7 @@ class ProcessPipe(object): def __init__(self, *others): self.processors = [] - for p in others: - self |= p + self |= others def __or__(self, other): return ProcessPipe(self, other) @@ -114,13 +179,19 @@ class ProcessPipe(object): elif isinstance(other, ProcessPipe): self.processors.extend(other.processors) else: - raise Error("Piped item is neither a Processor nor a ProcessPipe") + try: + iter(other) + except TypeError: + raise Error("Can not add this type of object to a pipe: %s", str(other)) + + for item in other: + self |= item return self def run(self): """Setup/reset all processors in cascade and stream audio data along - the pipe""" + the pipe. Also returns the pipe itself.""" source = self.processors[0] items = self.processors[1:] @@ -129,7 +200,7 @@ class ProcessPipe(object): source.setup() last = source for item in items: - item.setup(last.channels(), last.samplerate()) + item.setup(last.channels(), last.samplerate(), last.nframes()) last = item # now stream audio data along the pipe @@ -139,3 +210,5 @@ class ProcessPipe(object): for item in items: frames, eod = item.process(frames, eod) + return self + diff --git a/tests/api/examples.py b/tests/api/examples.py index 4df0001..40ebe7b 100644 --- a/tests/api/examples.py +++ b/tests/api/examples.py @@ -24,13 +24,14 @@ class FileDecoder(Processor): self.position = 0 @interfacedoc - def setup(self, channels=None, samplerate=None): - Processor.setup(self, channels, samplerate) + def setup(self): + super(FileDecoder, self).setup() if self.position != 0: self.file.seek(0); self.position = 0 def release(self): + super(FileDecoder, self).release() if self.file: self.file.close() self.file = None @@ -43,10 +44,6 @@ class FileDecoder(Processor): def samplerate(self): return self.file.samplerate - @interfacedoc - def duration(self): - return self.file.nframes / self.file.samplerate - @interfacedoc def nframes(self): return self.file.nframes @@ -101,8 +98,8 @@ class MaxLevel(Processor): implements(IValueAnalyzer) @interfacedoc - def setup(self, channels=None, samplerate=None): - Processor.setup(self, channels, samplerate) + def setup(self, channels=None, samplerate=None, nframes=None): + super(MaxLevel, self).setup(channels, samplerate, nframes) self.max_value = 0 @staticmethod @@ -162,8 +159,8 @@ class WavEncoder(Processor): raise Exception("Streaming not supported") @interfacedoc - def setup(self, channels=None, samplerate=None): - Processor.setup(self, channels, samplerate) + def setup(self, channels=None, samplerate=None, nframes=None): + super(WavEncoder, self).setup(channels, samplerate, nframes) if self.file: self.file.close() @@ -245,8 +242,8 @@ class Waveform(Processor): self.color_scheme = scheme @interfacedoc - def setup(self, channels=None, samplerate=None): - Processor.setup(self, channels, samplerate) + def setup(self, channels=None, samplerate=None, nframes=None): + super(Waveform, self).setup(channels, samplerate, nframes) if self.image: self.image.close() self.image = WaveformImage(self.width, self.height, self.nframes) @@ -268,4 +265,33 @@ class Waveform(Processor): #self.image.save() #return self.image +class Duration(Processor): + """A rather useless duration analyzer. Its only purpose is to test the + nframes characteristic.""" + implements(IValueAnalyzer) + + @interfacedoc + def setup(self, channels, samplerate, nframes): + if not nframes: + raise Exception("nframes argument required") + super(Duration, self).setup(channels, samplerate, nframes) + + @staticmethod + @interfacedoc + def id(): + return "test_duration" + + @staticmethod + @interfacedoc + def name(): + return "Duration analyzer" + + @staticmethod + @interfacedoc + def unit(): + return "seconds" + + def result(self): + return self.input_nframes / float(self.input_samplerate) + diff --git a/tests/api/test_lolevel.py b/tests/api/test_lolevel.py index c508511..d2d75be 100644 --- a/tests/api/test_lolevel.py +++ b/tests/api/test_lolevel.py @@ -10,10 +10,11 @@ analyzer = examples.MaxLevel() decoder.setup() nchannels = decoder.channels() samplerate = decoder.samplerate() +nframes = decoder.nframes() analyzer.setup(nchannels, samplerate) print "Stats: duration=%f, nframes=%d, nchannels=%d, samplerate=%d, resolution=%d" % ( - decoder.duration(), decoder.nframes(), nchannels, samplerate, decoder.resolution()) + nframes / float(samplerate), nframes, nchannels, samplerate, decoder.resolution()) while True: frames, eod = decoder.process() @@ -24,8 +25,6 @@ while True: max_level = analyzer.result() print "Max level: %f" % max_level -analyzer.release() - destination = "normalized.wav" Encoder = examples.WavEncoder print "Creating encoder with id=%s for: %s" % (Encoder.id(), destination) @@ -49,6 +48,3 @@ while True: if eod: break -decoder.release() -effect.release() -encoder.release() diff --git a/tests/api/test_pipe.py b/tests/api/test_pipe.py index 9bf6d1b..571646f 100644 --- a/tests/api/test_pipe.py +++ b/tests/api/test_pipe.py @@ -10,8 +10,9 @@ source = os.path.join(os.path.dirname(__file__), "../samples/guitar.wav") print "Normalizing %s" % source decoder = examples.FileDecoder(source) maxlevel = examples.MaxLevel() +duration = examples.Duration() -(decoder | maxlevel).run() +(decoder | maxlevel | duration).run() gain = 1 if maxlevel.result() > 0: @@ -19,6 +20,7 @@ if maxlevel.result() > 0: print "input maxlevel: %f" % maxlevel.result() print "gain: %f" % gain +print "duration: %f %s" % (duration.result(), duration.unit()) gain = examples.Gain(gain) encoder = examples.WavEncoder("normalized.wav") diff --git a/tests/testinputadapter.py b/tests/testinputadapter.py new file mode 100644 index 0000000..e42f6e6 --- /dev/null +++ b/tests/testinputadapter.py @@ -0,0 +1,53 @@ +from timeside.core import FixedSizeInputAdapter +from sys import stdout +import numpy + +def test(adapter, data, eod, expected): + expected.reverse() + for buffer, _eod in adapter.process(data, eod): + a = expected.pop() + if not numpy.array_equiv(buffer, a): + raise Exception("\n-- Actual --\n%s\n -- Expected -- \n%s\n" % (str(buffer), str(a))) + + if expected: + raise Exception("trailing expected data: %s" % expected) + + stdout.write(".") + +data = numpy.arange(44).reshape(2,22).transpose() + +adapter = FixedSizeInputAdapter(4, 2) +stdout.write("%s simple test" % adapter.__class__.__name__) + +expected = len(data) +actual = adapter.nframes(len(data)) +if actual != expected: + raise Exception("%d != %d nframes", (actual, expected)) + +test(adapter, data[0:1], False, []) +test(adapter, data[1:5], False, [data[0:4]]) +test(adapter, data[5:12], False, [data[4:8], data[8:12]]) +test(adapter, data[12:13], False, []) +test(adapter, data[13:14], False, []) +test(adapter, data[14:18], False, [data[12:16]]) +test(adapter, data[18:20], False, [data[16:20]]) +test(adapter, data[20:21], False, []) +test(adapter, data[21:22], True, [data[20:22]]) +stdout.write(" OK\n") + +adapter = FixedSizeInputAdapter(4, 2, pad=True) +stdout.write("%s padding test" % adapter.__class__.__name__) + +expected = len(data) + 2 +actual = adapter.nframes(len(data)) +if actual != expected: + raise Exception("%d != %d nframes", (actual, expected)) + +test(adapter, data[0:21], False, [data[0:4], data[4:8], data[8:12], data[12:16], data[16:20]]) +test(adapter, data[21:22], True, [[ + [20, 42], + [21, 43], + [0, 0], + [0, 0] +]]) +stdout.write(" OK\n") -- 2.39.5