Running a pipe with previously decoded frames
===============================================
-Example of use of the `stack` option in :func:`timeside.core.ProcessPipe.run` to run a pipe with previously decoded frames stacked in memory on a second pass.
+Example of use of the `stack` argument in :class:`timeside.decoder.core.FileDecoder` to run a pipe with previously decoded frames stacked in memory on a second pass.
+
+First, let's import everything and define the audio file source :
>>> import timeside
>>> import numpy as np
>>> audio_file = 'http://github.com/yomguy/timeside-samples/raw/master/samples/sweep.mp3'
->>> decoder = timeside.decoder.FileDecoder(audio_file)
+
+Then let's setup a :class:`FileDecoder <timeside.decoder.core.FileDecoder>` with argument `stack=True` (default argument is `stack=False`) :
+
+>>> decoder = timeside.decoder.FileDecoder(audio_file, stack=True)
Setup an arbitrary analyzer to check that decoding process from file and from stack are equivalent:
>>> print pipe.processors #doctest: +ELLIPSIS
[<timeside.decoder.core.FileDecoder object at 0x...>, <timeside.analyzer.aubio_pitch.AubioPitch object at 0x...>]
-If the pipe is run with the default argument `stack=False`, the other processes of the pipe are released from the pipe after the run and only the :class:`fileDecoder <timeside.decoder.core.FileDecoder>` is kept in the pipe:
+After the pipe has been run, the other processes of the pipe are removed from the pipe and only the :class:`FileDecoder <timeside.decoder.core.FileDecoder>` is kept :
>>> pipe.run()
>>> print pipe.processors #doctest: +ELLIPSIS
[<timeside.decoder.core.FileDecoder object at 0x...>]
+The processed frames are stored in the pipe attribute `frames_stack` as a list of frames :
-If the pipe is run with the argument `stack=True`, the processed frames are stored in the pipe attribute `frames_stack`.
-The other processes of the pipe are also released from the pipe after the run but the :class:`fileDecoder <timeside.decoder.core.FileDecoder>` is replaced by an :class:`ArrayDecoder <timeside.decoder.core.ArrayDecoder>`:
-
->>> pipe = (decoder | pitch_on_file)
->>> pipe.run(stack=True)
->>> print pipe.processors #doctest: +ELLIPSIS
-[<timeside.decoder.core.ArrayDecoder object at 0x...>]
+>>> print type(pipe.frames_stack)
+<type 'list'>
-The stack
+First frame :
->>> pipe.frames_stack #doctest: +ELLIPSIS
-array([[...]], dtype=float32)
+>>> print pipe.frames_stack[0] #doctest: +ELLIPSIS
+(array([[...]], dtype=float32), False)
+Last frame :
-Then we can run a second pipe with the previously decoded frames and pass the frames to the same analyzer.
+>>> print pipe.frames_stack[-1] #doctest: +ELLIPSIS
+(array([[...]], dtype=float32), True)
-Define a second analyzer equivalent to the previous one:
+If the pipe is used for a second run, the processed frames stored in the stack are passed to the other processors without decoding the audio source again.
+Let's define a second analyzer equivalent to the previous one:
>>> pitch_on_stack = timeside.analyzer.AubioPitch()
>>> pipe |= pitch_on_stack
>>> print pipe.processors #doctest: +ELLIPSIS
-[<timeside.decoder.core.ArrayDecoder object at 0x...>, <timeside.analyzer.aubio_pitch.AubioPitch object at 0x...>]
-
+[<timeside.decoder.core.FileDecoder object at 0x...>, <timeside.analyzer.aubio_pitch.AubioPitch object at 0x...>]
-Run the pipe:
+And run the pipe:
>>> pipe.run()
pipe += ' | '
return pipe
- def run(self, channels=None, samplerate=None, blocksize=None, stack=None):
+ def run(self, channels=None, samplerate=None, blocksize=None):
"""Setup/reset all processors in cascade and stream audio data along
- the pipe. Also returns the pipe itself."""
+ the pipe."""
source = self.processors[0]
items = self.processors[1:]
source.setup(channels=channels, samplerate=samplerate,
blocksize=blocksize)
- if stack is None:
- self.stack = False
- else:
- self.stack = stack
-
- if self.stack:
- self.frames_stack = []
-
last = source
# setup/reset processors and configure properties throughout the pipe
eod = False
while not eod:
frames, eod = source.process()
- if self.stack:
- self.frames_stack.append(frames)
for item in items:
frames, eod = item.process(frames, eod)
item.post_process()
# Release processors
- if self.stack:
- if not isinstance(self.frames_stack, numpy.ndarray):
- self.frames_stack = numpy.vstack(self.frames_stack)
- from timeside.decoder.core import ArrayDecoder
- new_source = ArrayDecoder(samples=self.frames_stack,
- samplerate=source.samplerate())
- new_source.setup(channels=source.channels(),
- samplerate=source.samplerate(),
- blocksize=source.blocksize())
- self.processors[0] = new_source
-
for item in items:
item.release()
self.processors.remove(item)
QUEUE_SIZE = 10
+def stack(process_func):
+
+ import functools
+
+ @functools.wraps(process_func)
+ def wrapper(decoder):
+ # Processing
+ frames, eod = process_func(decoder)
+ if decoder.stack:
+ decoder.process_pipe.frames_stack.append((frames, eod))
+ return frames, eod
+ return wrapper
+
+
class FileDecoder(Processor):
""" gstreamer-based decoder """
implements(IDecoder)
def id():
return "gst_dec"
- def __init__(self, uri, start=0, duration=None):
+ def __init__(self, uri, start=0, duration=None, stack=False):
"""
Construct a new FileDecoder
start time of the segment in seconds
duration : float
duration of the segment in seconds
+
"""
super(FileDecoder, self).__init__()
+ self.from_stack = False
+ self.stack = stack
+
self.uri = get_uri(uri)
self.uri_start = float(start)
else:
self.uri_duration = duration
- if start==0 and duration is None:
+ if start == 0 and duration is None:
self.is_segment = False
else:
self.is_segment = True
+
def set_uri_default_duration(self):
# Set the duration from the length of the file
uri_total_duration = get_media_uri_info(self.uri)['duration']
def setup(self, channels=None, samplerate=None, blocksize=None):
+ self.eod = False
+ self.last_buffer = None
+
+ if self.from_stack:
+ return
+
+ if self.stack:
+ self.process_pipe.frames_stack = []
+
if self.uri_duration is None:
self.set_uri_default_duration()
#self.mainloopthread = get_loop_thread()
##self.mainloop = self.mainloopthread.mainloop
- self.eod = False
-
- self.last_buffer = None
-
# start pipeline
self.pipeline.set_state(gst.STATE_PLAYING)
self.queue.put([new_block, False])
@interfacedoc
- def process(self, frames=None, eod=False):
+ @stack
+ def process(self):
buf = self.queue.get()
if buf == gst.MESSAGE_EOS:
return self.last_buffer, True
@interfacedoc
def release(self):
+ if self.stack:
+ self.stack = False
+ self.from_stack = True
pass
@interfacedoc
yield (self.samples[nb_frames * self.output_blocksize:], True)
@interfacedoc
- def process(self, frames=None, eod=False):
-
+ def process(self):
return self.frames.next()
@interfacedoc