"Test the low level streaming features"
def setUp(self):
- self.samplerate, self.channels, self.nframes = None, None, None
+ self.samplerate, self.channels, self.blocksize = None, None, None
def testWav(self):
"Test wav decoding"
def tearDown(self):
decoder = FileDecoder(self.source)
- decoder.setup(samplerate = self.samplerate, channels = self.channels, nframes = self.nframes)
+ decoder.setup(samplerate = self.samplerate, channels = self.channels, blocksize = self.blocksize)
totalframes = 0.
frames, eod = decoder.process()
totalframes += frames.shape[0]
if eod or decoder.eod: break
- self.assertEquals(frames.shape[0], decoder.output_nframes )
+ self.assertEquals(frames.shape[0], decoder.output_blocksize )
self.assertEquals(frames.shape[1], decoder.channels() )
ratio = decoder.output_samplerate / float(decoder.input_samplerate)
if 0:
- print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate,
- print "ratio:", ratio
- print "input / output_channels:", decoder.input_channels, decoder.output_channels
- print "input_duration:", decoder.input_duration
- print "input_total_frames:", decoder.input_total_frames
+ print "input / output_samplerate:", decoder.input_samplerate, '/', decoder.output_samplerate,
+ print "ratio:", ratio
+ print "input / output_channels:", decoder.input_channels, decoder.output_channels
+ print "input_duration:", decoder.input_duration
+ print "input_totalframes:", decoder.input_totalframes
# FIXME compute actual number of frames from file
if ratio == 1:
class TestDecodingStereo(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = None, 2, None
+ self.samplerate, self.channels, self.blocksize = None, 2, None
class TestDecodingMonoUpsampling(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = 48000, None, None
+ self.samplerate, self.channels, self.blocksize = 48000, None, None
class TestDecodingMonoDownsampling(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = 16000, None, None
+ self.samplerate, self.channels, self.blocksize = 16000, None, None
class TestDecodingStereoDownsampling(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = 32000, 2, None
+ self.samplerate, self.channels, self.blocksize = 32000, 2, None
class TestDecodingStereoUpsampling(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = 96000, 2, None
+ self.samplerate, self.channels, self.blocksize = 96000, 2, None
-class TestDecodingShortframes(TestDecoding):
+class TestDecodingShortBlock(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = None, None, 256
+ self.samplerate, self.channels, self.blocksize = None, None, 256
-class TestDecodingLongframes(TestDecoding):
+class TestDecodingLongBlock(TestDecoding):
def setUp(self):
- self.samplerate, self.channels, self.nframes = None, None, 1024*8*2
+ self.samplerate, self.channels, self.blocksize = None, None, 1024*8*2
if __name__ == '__main__':
unittest.main(testRunner=TestRunner())
# be raised by MetaProcessor if the id is malformed or not unique amongst
# registered processors.
- def setup(self, channels=None, samplerate=None, nframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None):
"""Allocate internal resources and reset state, so that this processor is
ready for a new run.
- The channels, samplerate and/or nframes arguments may be required by
+ The channels, samplerate and/or blocksize arguments may be required by
processors which accept input. An error will occur if any of
these arguments is passed to an output-only processor such as a decoder.
"""
"""Samplerate of the data returned by process(). May be different from
the samplerate passed to setup()"""
- def nframes():
+ def blocksize():
"""The total number of frames that this processor can output, or None if
the duration is unknown."""
"""Media item visualizer driver interface"""
# implementation: graphers which need to know the total number of frames
- # should raise an exception in setup() if the nframesĀ argument is None
+ # should raise an exception in setup() if the totalframes argument is None
def __init__(self, width, height):
"""Create a new grapher. width and height are generally
import re
import numpy
-__all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
+__all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
'interfacedoc', 'processors', 'get_processor', 'ProcessPipe',
'FixedSizeInputAdapter']
implements(IProcessor)
@interfacedoc
- def setup(self, channels=None, samplerate=None, nframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None):
self.input_channels = channels
self.input_samplerate = samplerate
- self.input_nframes = nframes
+ self.input_blocksize = blocksize
- # default channels(), samplerate() and nframes() implementations returns
- # the input characteristics, but processors may change this behaviour by
+ # default channels(), samplerate() and blocksize() implementations returns
+ # the input characteristics, but processors may change this behaviour by
# overloading those methods
@interfacedoc
def channels(self):
return self.input_samplerate
@interfacedoc
- def nframes(self):
- return self.input_nframes
+ def blocksize(self):
+ return self.input_blocksize
@interfacedoc
def process(self, frames, eod):
def __init__(self, buffer_size, channels, pad=False):
"""Construct a new adapter: buffer_size is the desired buffer size in frames,
- channels the number of channels, and pad indicates whether the last block should
+ channels the number of channels, and pad indicates whether the last block should
be padded with zeros."""
self.buffer = numpy.empty((buffer_size, channels))
self.len = 0
self.pad = pad
- def nframes(self, input_nframes):
+ def blocksize(self, input_blocksize):
"""Return the total number of frames that this adapter will output according to the
- input_nframes argument"""
+ input_blocksize argument"""
- nframes = input_nframes
+ blocksize = input_blocksize
if self.pad:
- mod = input_nframes % self.buffer_size
+ mod = input_blocksize % self.buffer_size
if mod:
- nframes += self.buffer_size - mod
+ blocksize += self.buffer_size - mod
- return nframes
+ return blocksize
def process(self, frames, eod):
- """Returns an iterator over tuples of the form (buffer, eod) where buffer is a
+ """Returns an iterator over tuples of the form (buffer, eod) where buffer is a
fixed-sized block of data, and eod indicates whether this is the last block.
In case padding is deactivated the last block may be smaller than the buffer size.
"""
yield block, True
self.len = 0
-
+
def processors(interface=IProcessor, recurse=True):
"""Returns the processors implementing a given interface and, if recurse,
any of the descendants of this interface."""
return implementations(interface, recurse)
-
def get_processor(processor_id):
"""Return a processor by its id"""
if not _processors.has_key(processor_id):
- raise Error("No processor registered with id: '%s'"
+ raise Error("No processor registered with id: '%s'"
% processor_id)
return _processors[processor_id]
for item in other:
self |= item
- return self
+ return self
def run(self):
"""Setup/reset all processors in cascade and stream audio data along
source.setup()
last = source
for item in items:
- item.setup(last.channels(), last.samplerate(), last.nframes())
+ item.setup(last.channels(), last.samplerate(), last.blocksize())
last = item
# now stream audio data along the pipe
for item in items:
frames, eod = item.process(frames, eod)
- return self
-
+ return self
implements(IDecoder)
mimetype = ''
- output_nframes = 8*1024
+ output_blocksize = 8*1024
output_samplerate = 44100
output_channels = 1
else:
self.uri = uri
- def setup(self, channels = None, samplerate = None, nframes = None):
+ def setup(self, channels = None, samplerate = None, blocksize = None):
# the output data format we want
- if nframes: self.output_nframes = nframes
+ if blocksize: self.output_blocksize = blocksize
if samplerate: self.output_samplerate = int(samplerate)
if channels: self.output_channels = int(channels)
uri = self.uri
self.input_samplerate = caps[0]["rate"]
self.input_channels = caps[0]["channels"]
self.input_duration = length / 1.e9
- self.input_total_frames = int(self.input_duration * self.input_samplerate)
+ self.input_totalframes = int(self.input_duration * self.input_samplerate)
if "x-raw-float" in caps.to_string():
self.input_width = caps[0]["width"]
else:
self.last_buffer = new_array
else:
self.last_buffer = concatenate((self.last_buffer, new_array), axis=0)
- while self.last_buffer.shape[0] >= self.output_nframes:
- new_block = self.last_buffer[:self.output_nframes]
- self.last_buffer = self.last_buffer[self.output_nframes:]
+ while self.last_buffer.shape[0] >= self.output_blocksize:
+ new_block = self.last_buffer[:self.output_blocksize]
+ self.last_buffer = self.last_buffer[self.output_blocksize:]
#print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
self.queue.put( [new_block, False ] )
return self.output_samplerate
@interfacedoc
- def nframes(self):
- return self.input_total_frames
+ def blocksize(self):
+ return self.output_blocksize
+
+ def totalframes(self):
+ if self.input_samplerate == self.output_samplerate:
+ return self.input_totalframes
+ else:
+ ratio = self.input_totalframes / self.output_samplerate
+ return self.input_totalframes * ratio
@interfacedoc
def release(self):
## IDecoder methods
- @interfacedoc
@interfacedoc
def format(self):
# TODO check
return self.tags
def duration(self):
- return self.duration
+ if self.input_samplerate == self.output_samplerate:
+ return self.input_duration
+ else:
+ ratio = self.input_totalframes / self.output_samplerate
+ return self.input_duration * ratio