]> git.parisson.com Git - timeside.git/commitdiff
split decoder modules
authorGuillaume Pellerin <yomguy@parisson.com>
Mon, 6 Jan 2014 22:14:44 +0000 (23:14 +0100)
committerGuillaume Pellerin <yomguy@parisson.com>
Mon, 6 Jan 2014 22:14:44 +0000 (23:14 +0100)
tests/test_array_decoding.py
tests/test_decoding.py
tests/test_graphing.py
timeside/analyzer/spectrogram.py
timeside/decoder/__init__.py
timeside/decoder/array.py [new file with mode: 0644]
timeside/decoder/core.py
timeside/decoder/file.py [new file with mode: 0644]
timeside/decoder/live.py [new file with mode: 0644]
timeside/decoder/utils.py

index 1c740b9363d7607ba72ee0cb88e7fb6573ea0a5b..9d94b663c50e3b89404bb78dc6e0c41c78b0e928 100644 (file)
@@ -1,7 +1,7 @@
 #! /usr/bin/env python
 from __future__ import division
 
-from timeside.decoder.core import ArrayDecoder
+from timeside.decoder import ArrayDecoder
 from unit_timeside import *
 
 
index 72986f79c6304c3861579fb6a61b90532c2898f4..d9225127e1fad29f3fe857fd222b9ae4073f8624 100755 (executable)
@@ -2,7 +2,7 @@
 
 from __future__ import division
 
-from timeside.decoder.core import FileDecoder
+from timeside.decoder import FileDecoder
 from unit_timeside import *
 
 import os.path
index cfb4a1779db201d4435e21a96354ae9fad836048..bf1d7218a73fa91884c8add0bc2fc2b001c89431 100755 (executable)
@@ -1,6 +1,6 @@
 #! /usr/bin/env python
 
-from timeside.decoder.core import FileDecoder
+from timeside.decoder import FileDecoder
 from unit_timeside import *
 
 import os.path
index 587d7f4c99f15016b191fa0fb7c27aae9fc8de56..3357448d9fa731765eedf0d4f4fe1c59f0432eae 100644 (file)
@@ -29,6 +29,9 @@ import numpy as np
 class Spectrogram(Analyzer):
     implements(IAnalyzer)
 
+    values = []
+    FFT_SIZE = 2048
+
     def __init__(self, blocksize=2048, stepsize=None):
         super(Spectrogram, self).__init__()
 
@@ -44,9 +47,6 @@ class Spectrogram(Analyzer):
         super(Spectrogram, self).setup(channels, samplerate,
               blocksize, totalframes)
 
-        self.values = []
-        self.FFT_SIZE = 2048
-
     @staticmethod
     @interfacedoc
     def id():
index fcb882bb2d705e1f759e734fd4a7d34868b68976..ab1c3a43f382bbd54e8acef4ed95bd93c178f5e8 100644 (file)
@@ -1,3 +1,5 @@
 # -*- coding: utf-8 -*-
 
-from core import *
+from file import *
+from array import *
+from live import *
diff --git a/timeside/decoder/array.py b/timeside/decoder/array.py
new file mode 100644 (file)
index 0000000..e13408b
--- /dev/null
@@ -0,0 +1,145 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2007-2013 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com>
+# Copyright (c) 2010-2013 Paul Brossier <piem@piem.org>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+
+# Authors:
+# Paul Brossier <piem@piem.org>
+# Guillaume Pellerin <yomguy@parisson.com>
+# Thomas Fillon <thomas@parisson.com>
+
+
+from timeside.decoder.core import *
+
+
+class ArrayDecoder(Decoder):
+    """ Decoder taking Numpy array as input"""
+    implements(IDecoder)
+
+    output_blocksize = 8*1024
+
+    # IProcessor methods
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "array_dec"
+
+    def __init__(self, samples, samplerate=44100, start=0, duration=None):
+        '''
+            Construct a new ArrayDecoder from an numpy array
+
+            Parameters
+            ----------
+            samples : numpy array of dimension 1 (mono) or 2 (multichannel)
+                    if shape = (n) or (n,1) : n samples, mono
+                    if shape = (n,m) : n samples with m channels
+            start : float
+                start time of the segment in seconds
+            duration : float
+                duration of the segment in seconds
+        '''
+        super(ArrayDecoder, self).__init__(start=start, duration=duration)
+
+        # Check array dimension
+        if samples.ndim > 2:
+            raise TypeError('Wrong number of dimensions for argument samples')
+        if samples.ndim == 1:
+            samples = samples[:, np.newaxis]  # reshape to 2D array
+
+        self.samples = samples  # Create a 2 dimensions array
+        self.input_samplerate = samplerate
+        self.input_channels = self.samples.shape[1]
+
+        self.uri = '_'.join(['raw_audio_array',
+                            'x'.join([str(dim) for dim in samples.shape]),
+                             samples.dtype.type.__name__])
+
+        self.frames = self.get_frames()
+
+    def setup(self, channels=None, samplerate=None, blocksize=None):
+
+        # the output data format we want
+        if blocksize:
+            self.output_blocksize = blocksize
+        if samplerate:
+            self.output_samplerate = int(samplerate)
+        if channels:
+            self.output_channels = int(channels)
+
+        if self.uri_duration is None:
+            self.uri_duration = (len(self.samples) / self.input_samplerate
+                                 - self.uri_start)
+
+        if self.is_segment:
+            start_index = self.uri_start * self.input_samplerate
+            stop_index = start_index + int(np.ceil(self.uri_duration
+                                           * self.input_samplerate))
+            stop_index = min(stop_index, len(self.samples))
+            self.samples = self.samples[start_index:stop_index]
+
+        if not self.output_samplerate:
+            self.output_samplerate = self.input_samplerate
+
+        if not self.output_channels:
+            self.output_channels = self.input_channels
+
+        self.input_totalframes = len(self.samples)
+        self.input_duration = self.input_totalframes / self.input_samplerate
+        self.input_width = self.samples.itemsize * 8
+
+    def get_frames(self):
+        "Define an iterator that will return frames at the given blocksize"
+        nb_frames = self.input_totalframes // self.output_blocksize
+
+        if self.input_totalframes % self.output_blocksize == 0:
+            nb_frames -= 1  # Last frame must send eod=True
+
+        for index in xrange(0,
+                            nb_frames * self.output_blocksize,
+                            self.output_blocksize):
+            yield (self.samples[index:index+self.output_blocksize], False)
+
+        yield (self.samples[nb_frames * self.output_blocksize:], True)
+
+    @interfacedoc
+    def process(self):
+        return self.frames.next()
+
+    ## IDecoder methods
+    @interfacedoc
+    def format(self):
+        import re
+        base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0)
+        return 'audio/x-raw-'+base_type
+
+    @interfacedoc
+    def metadata(self):
+        return None
+
+
+if __name__ == "__main__":
+    # Run doctest from __main__ and unittest from tests
+    from tests.unit_timeside import run_test_module
+    # load corresponding tests
+    from tests import test_array_decoding
+
+    run_test_module([test_array_decoding])
index 719175acd90e10515708c5432bf6d3e38ac5ef0e..1eba57768a8a0a5eaa7653d6967bf2cd85521a3f 100644 (file)
@@ -32,7 +32,7 @@ from timeside.core import Processor, implements, interfacedoc, abstract
 from timeside.api import IDecoder
 from timeside.tools import *
 
-from utils import get_uri, get_media_uri_info
+from utils import get_uri, get_media_uri_info, stack
 
 import Queue
 from gst import _gst as gst
@@ -43,20 +43,6 @@ GST_APPSINK_MAX_BUFFERS = 10
 QUEUE_SIZE = 10
 
 
-def stack(process_func):
-
-    import functools
-
-    @functools.wraps(process_func)
-    def wrapper(decoder):
-        # Processing
-        frames, eod = process_func(decoder)
-        if decoder.stack:
-            decoder.process_pipe.frames_stack.append((frames, eod))
-        return frames, eod
-    return wrapper
-
-
 class Decoder(Processor):
     """General abstract base class for Decoder
     """
@@ -123,669 +109,3 @@ class Decoder(Processor):
     def stop(self):
         self.src.send_event(gst.event_new_eos())
 
-
-class FileDecoder(Decoder):
-    """ gstreamer-based decoder """
-    implements(IDecoder)
-
-    output_blocksize = 8*1024
-
-    pipeline = None
-    mainloopthread = None
-
-    # IProcessor methods
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "gst_dec"
-
-    def __init__(self, uri, start=0, duration=None, stack=False):
-
-        """
-        Construct a new FileDecoder
-
-        Parameters
-        ----------
-        uri : str
-            uri of the media
-        start : float
-            start time of the segment in seconds
-        duration : float
-            duration of the segment in seconds
-
-        """
-
-        super(FileDecoder, self).__init__(start=start, duration=duration)
-
-        self.from_stack = False
-        self.stack = stack
-
-        self.uri = get_uri(uri)
-
-    def set_uri_default_duration(self):
-        # Set the duration from the length of the file
-        uri_total_duration = get_media_uri_info(self.uri)['duration']
-        self.uri_duration = uri_total_duration - self.uri_start
-
-    def setup(self, channels=None, samplerate=None, blocksize=None):
-
-        self.eod = False
-        self.last_buffer = None
-
-        if self.from_stack:
-            return
-
-        if self.stack:
-            self.process_pipe.frames_stack = []
-
-        if self.uri_duration is None:
-            self.set_uri_default_duration()
-
-        # a lock to wait wait for gstreamer thread to be ready
-        import threading
-        self.discovered_cond = threading.Condition(threading.Lock())
-        self.discovered = False
-
-        # the output data format we want
-        if blocksize:
-            self.output_blocksize = blocksize
-        if samplerate:
-            self.output_samplerate = int(samplerate)
-        if channels:
-            self.output_channels = int(channels)
-
-        if self.is_segment:
-            # Create the pipe with Gnonlin gnlurisource
-            self.pipe = ''' gnlurisource name=src uri={uri}
-                            start=0
-                            duration={uri_duration}
-                            media-start={uri_start}
-                            media-duration={uri_duration}
-                            ! audioconvert name=audioconvert
-                            ! audioresample
-                            ! appsink name=sink sync=False async=True
-                            '''.format(uri = self.uri,
-                                       uri_start = np.uint64(round(self.uri_start * gst.SECOND)),
-                                       uri_duration = np.int64(round(self.uri_duration * gst.SECOND)))
-                                       # convert uri_start and uri_duration to nanoseconds
-        else:
-            # Create the pipe with standard Gstreamer uridecodbin
-            self.pipe = ''' uridecodebin name=src uri={uri}
-                           ! audioconvert name=audioconvert
-                           ! audioresample
-                           ! appsink name=sink sync=False async=True
-                           '''.format(uri = self.uri)
-
-        self.pipeline = gst.parse_launch(self.pipe)
-
-        if self.output_channels:
-            caps_channels = int(self.output_channels)
-        else:
-            caps_channels = "[ 1, 2 ]"
-        if self.output_samplerate:
-            caps_samplerate = int(self.output_samplerate)
-        else:
-            caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
-        sink_caps = gst.Caps("""audio/x-raw-float,
-            endianness=(int)1234,
-            channels=(int)%s,
-            width=(int)32,
-            rate=(int)%s""" % (caps_channels, caps_samplerate))
-
-        self.src = self.pipeline.get_by_name('src')
-        self.conv = self.pipeline.get_by_name('audioconvert')
-        self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
-
-        self.sink = self.pipeline.get_by_name('sink')
-        self.sink.set_property("caps", sink_caps)
-        self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
-        self.sink.set_property("drop", False)
-        self.sink.set_property('emit-signals', True)
-        self.sink.connect("new-buffer", self._on_new_buffer_cb)
-
-        self.bus = self.pipeline.get_bus()
-        self.bus.add_signal_watch()
-        self.bus.connect('message', self._on_message_cb)
-
-        self.queue = Queue.Queue(QUEUE_SIZE)
-
-        import threading
-
-        class MainloopThread(threading.Thread):
-            def __init__(self, mainloop):
-                threading.Thread.__init__(self)
-                self.mainloop = mainloop
-
-            def run(self):
-                self.mainloop.run()
-        self.mainloop = gobject.MainLoop()
-        self.mainloopthread = MainloopThread(self.mainloop)
-        self.mainloopthread.start()
-        #self.mainloopthread = get_loop_thread()
-        ##self.mainloop = self.mainloopthread.mainloop
-
-        # start pipeline
-        self.pipeline.set_state(gst.STATE_PLAYING)
-
-        self.discovered_cond.acquire()
-        while not self.discovered:
-            #print 'waiting'
-            self.discovered_cond.wait()
-        self.discovered_cond.release()
-
-        if not hasattr(self, 'input_samplerate'):
-            if hasattr(self, 'error_msg'):
-                raise IOError(self.error_msg)
-            else:
-                raise IOError('no known audio stream found')
-
-    def _notify_caps_cb(self, pad, args):
-        self.discovered_cond.acquire()
-
-        caps = pad.get_negotiated_caps()
-        if not caps:
-            pad.info("no negotiated caps available")
-            self.discovered = True
-            self.discovered_cond.notify()
-            self.discovered_cond.release()
-            return
-        # the caps are fixed
-        # We now get the total length of that stream
-        q = gst.query_new_duration(gst.FORMAT_TIME)
-        pad.info("sending duration query")
-        if pad.get_peer().query(q):
-            format, length = q.parse_duration()
-            if format == gst.FORMAT_TIME:
-                pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
-            else:
-                pad.info("got duration : %d [format:%d]" % (length, format))
-        else:
-            length = -1
-            gst.warning("duration query failed")
-
-        # We store the caps and length in the proper location
-        if "audio" in caps.to_string():
-            self.input_samplerate = caps[0]["rate"]
-            if not self.output_samplerate:
-                self.output_samplerate = self.input_samplerate
-            self.input_channels = caps[0]["channels"]
-            if not self.output_channels:
-                self.output_channels = self.input_channels
-            self.input_duration = length / 1.e9
-
-            self.input_totalframes = int(self.input_duration * self.input_samplerate)
-            if "x-raw-float" in caps.to_string():
-                self.input_width = caps[0]["width"]
-            else:
-                self.input_width = caps[0]["depth"]
-
-        self.discovered = True
-        self.discovered_cond.notify()
-        self.discovered_cond.release()
-
-    def _on_message_cb(self, bus, message):
-        t = message.type
-        if t == gst.MESSAGE_EOS:
-            self.queue.put(gst.MESSAGE_EOS)
-            self.pipeline.set_state(gst.STATE_NULL)
-            self.mainloop.quit()
-        elif t == gst.MESSAGE_ERROR:
-            self.pipeline.set_state(gst.STATE_NULL)
-            err, debug = message.parse_error()
-            self.discovered_cond.acquire()
-            self.discovered = True
-            self.mainloop.quit()
-            self.error_msg = "Error: %s" % err, debug
-            self.discovered_cond.notify()
-            self.discovered_cond.release()
-        elif t == gst.MESSAGE_TAG:
-            # TODO
-            # msg.parse_tags()
-            pass
-
-    def _on_new_buffer_cb(self, sink):
-        buf = sink.emit('pull-buffer')
-        new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
-        #print 'processing new buffer', new_array.shape
-        if self.last_buffer is None:
-            self.last_buffer = new_array
-        else:
-            self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
-        while self.last_buffer.shape[0] >= self.output_blocksize:
-            new_block = self.last_buffer[:self.output_blocksize]
-            self.last_buffer = self.last_buffer[self.output_blocksize:]
-            #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
-            self.queue.put([new_block, False])
-
-    @interfacedoc
-    @stack
-    def process(self):
-        buf = self.queue.get()
-        if buf == gst.MESSAGE_EOS:
-            return self.last_buffer, True
-        frames, eod = buf
-        return frames, eod
-
-    @interfacedoc
-    def totalframes(self):
-        if self.input_samplerate == self.output_samplerate:
-            return self.input_totalframes
-        else:
-            ratio = self.output_samplerate / self.input_samplerate
-            return int(self.input_totalframes * ratio)
-
-    @interfacedoc
-    def release(self):
-        if self.stack:
-            self.stack = False
-            self.from_stack = True
-        pass
-
-    ## IDecoder methods
-
-    @interfacedoc
-    def format(self):
-        # TODO check
-        if self.mimetype == 'application/x-id3':
-            self.mimetype = 'audio/mpeg'
-        return self.mimetype
-
-    @interfacedoc
-    def encoding(self):
-        # TODO check
-        return self.mimetype.split('/')[-1]
-
-    @interfacedoc
-    def resolution(self):
-        # TODO check: width or depth?
-        return self.input_width
-
-    @interfacedoc
-    def metadata(self):
-        # TODO check
-        return self.tags
-
-
-class LiveDecoder(Decoder):
-    """ gstreamer-based decoder from live source"""
-    implements(IDecoder)
-
-    output_blocksize = 8*1024
-
-    pipeline = None
-    mainloopthread = None
-
-    # IProcessor methods
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "gst_live_dec"
-
-    def __init__(self, num_buffers=-1):
-
-        """
-        Construct a new LiveDecoder capturing audio from alsasrc
-
-        Parameters
-        ----------
-        num_buffers :
-            Number of buffers to output before sending EOS (-1 = unlimited).
-            (Allowed values: >= -1, Default value: -1)
-
-
-        Examples
-        --------
-
-        >>> import timeside
-
-        >>> live = timeside.decoder.LiveDecoder(num_buffers=25)
-        >>> a = timeside.analyzer.Waveform()
-        >>> e = timeside.encoder.Mp3Encoder('test_live.mp3', overwrite=True)
-        >>> pipe = (live | a | e)
-        >>> pipe.run()
-
-        >>> import matplotlib.pyplot as plt # doctest: +SKIP
-        >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP
-                 a.results['waveform_analyzer'].data) # doctest: +SKIP
-        >>> plt.show() # doctest: +SKIP
-
-        """
-
-        super(Decoder, self).__init__()
-        self.num_buffers = num_buffers
-        self.uri = None
-        self.uri_start = 0
-        self.uri_duration = None
-        self.is_segment = False
-
-    def setup(self, channels=None, samplerate=None, blocksize=None):
-
-        self.eod = False
-        self.last_buffer = None
-
-        # a lock to wait wait for gstreamer thread to be ready
-        import threading
-        self.discovered_cond = threading.Condition(threading.Lock())
-        self.discovered = False
-
-        # the output data format we want
-        if blocksize:
-            self.output_blocksize = blocksize
-        if samplerate:
-            self.output_samplerate = int(samplerate)
-        if channels:
-            self.output_channels = int(channels)
-
-        # Create the pipe with standard Gstreamer uridecodbin
-        self.pipe = '''alsasrc num-buffers=%d name=src
-                       ! audioconvert name=audioconvert
-                       ! audioresample
-                       ! appsink name=sink sync=False async=True
-                       ''' % self.num_buffers
-
-        self.pipeline = gst.parse_launch(self.pipe)
-
-        if self.output_channels:
-            caps_channels = int(self.output_channels)
-        else:
-            caps_channels = "[ 1, 2 ]"
-        if self.output_samplerate:
-            caps_samplerate = int(self.output_samplerate)
-        else:
-            caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
-        sink_caps = gst.Caps("""audio/x-raw-float,
-            endianness=(int)1234,
-            channels=(int)%s,
-            width=(int)32,
-            rate=(int)%s""" % (caps_channels, caps_samplerate))
-
-        self.src = self.pipeline.get_by_name('src')
-        self.conv = self.pipeline.get_by_name('audioconvert')
-        self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
-
-        self.sink = self.pipeline.get_by_name('sink')
-        self.sink.set_property("caps", sink_caps)
-        self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
-        self.sink.set_property("drop", False)
-        self.sink.set_property('emit-signals', True)
-        self.sink.connect("new-buffer", self._on_new_buffer_cb)
-
-        self.bus = self.pipeline.get_bus()
-        self.bus.add_signal_watch()
-        self.bus.connect('message', self._on_message_cb)
-
-        self.queue = Queue.Queue(QUEUE_SIZE)
-
-        import threading
-
-        class MainloopThread(threading.Thread):
-            def __init__(self, mainloop):
-                threading.Thread.__init__(self)
-                self.mainloop = mainloop
-
-            def run(self):
-                self.mainloop.run()
-        self.mainloop = gobject.MainLoop()
-        self.mainloopthread = MainloopThread(self.mainloop)
-        self.mainloopthread.start()
-        #self.mainloopthread = get_loop_thread()
-        ##self.mainloop = self.mainloopthread.mainloop
-
-        # start pipeline
-        self.pipeline.set_state(gst.STATE_PLAYING)
-
-        self.discovered_cond.acquire()
-        while not self.discovered:
-            #print 'waiting'
-            self.discovered_cond.wait()
-        self.discovered_cond.release()
-
-        if not hasattr(self, 'input_samplerate'):
-            if hasattr(self, 'error_msg'):
-                raise IOError(self.error_msg)
-            else:
-                raise IOError('no known audio stream found')
-
-    def _notify_caps_cb(self, pad, args):
-        self.discovered_cond.acquire()
-
-        caps = pad.get_negotiated_caps()
-        if not caps:
-            pad.info("no negotiated caps available")
-            self.discovered = True
-            self.discovered_cond.notify()
-            self.discovered_cond.release()
-            return
-        # the caps are fixed
-        # We now get the total length of that stream
-        q = gst.query_new_duration(gst.FORMAT_TIME)
-        pad.info("sending duration query")
-        if pad.get_peer().query(q):
-            format, length = q.parse_duration()
-            if format == gst.FORMAT_TIME:
-                pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
-            else:
-                pad.info("got duration : %d [format:%d]" % (length, format))
-        else:
-            length = -1
-            gst.warning("duration query failed")
-
-        # We store the caps and length in the proper location
-        if "audio" in caps.to_string():
-            self.input_samplerate = caps[0]["rate"]
-            if not self.output_samplerate:
-                self.output_samplerate = self.input_samplerate
-            self.input_channels = caps[0]["channels"]
-            if not self.output_channels:
-                self.output_channels = self.input_channels
-            self.input_duration = length / 1.e9
-
-            self.input_totalframes = int(self.input_duration * self.input_samplerate)
-            if "x-raw-float" in caps.to_string():
-                self.input_width = caps[0]["width"]
-            else:
-                self.input_width = caps[0]["depth"]
-
-        self.discovered = True
-        self.discovered_cond.notify()
-        self.discovered_cond.release()
-
-    def _on_message_cb(self, bus, message):
-        t = message.type
-        if t == gst.MESSAGE_EOS:
-            self.queue.put(gst.MESSAGE_EOS)
-            self.pipeline.set_state(gst.STATE_NULL)
-            self.mainloop.quit()
-        elif t == gst.MESSAGE_ERROR:
-            self.pipeline.set_state(gst.STATE_NULL)
-            err, debug = message.parse_error()
-            self.discovered_cond.acquire()
-            self.discovered = True
-            self.mainloop.quit()
-            self.error_msg = "Error: %s" % err, debug
-            self.discovered_cond.notify()
-            self.discovered_cond.release()
-        elif t == gst.MESSAGE_TAG:
-            # TODO
-            # msg.parse_tags()
-            pass
-
-    def _on_new_buffer_cb(self, sink):
-        buf = sink.emit('pull-buffer')
-        new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
-
-        #print 'processing new buffer', new_array.shape
-        if self.last_buffer is None:
-            self.last_buffer = new_array
-        else:
-            self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
-        while self.last_buffer.shape[0] >= self.output_blocksize:
-            new_block = self.last_buffer[:self.output_blocksize]
-            self.last_buffer = self.last_buffer[self.output_blocksize:]
-            #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
-            self.queue.put([new_block, False])
-
-    @interfacedoc
-    def process(self):
-        buf = self.queue.get()
-        if buf == gst.MESSAGE_EOS:
-            return self.last_buffer, True
-
-        frames, eod = buf
-        return frames, eod
-
-    @interfacedoc
-    def totalframes(self):
-        if self.input_samplerate == self.output_samplerate:
-            return self.input_totalframes
-        else:
-            ratio = self.output_samplerate / self.input_samplerate
-            return int(self.input_totalframes * ratio)
-
-    @interfacedoc
-    def release(self):
-        if self.stack:
-            self.stack = False
-            self.from_stack = True
-        pass
-
-    ## IDecoder methods
-
-    @interfacedoc
-    def format(self):
-        # TODO check
-        if self.mimetype == 'application/x-id3':
-            self.mimetype = 'audio/mpeg'
-        return self.mimetype
-
-    @interfacedoc
-    def encoding(self):
-        # TODO check
-        return self.mimetype.split('/')[-1]
-
-    @interfacedoc
-    def resolution(self):
-        # TODO check: width or depth?
-        return self.input_width
-
-    @interfacedoc
-    def metadata(self):
-        # TODO check
-        return self.tags
-
-
-class ArrayDecoder(Decoder):
-    """ Decoder taking Numpy array as input"""
-    implements(IDecoder)
-
-    output_blocksize = 8*1024
-
-    # IProcessor methods
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "array_dec"
-
-    def __init__(self, samples, samplerate=44100, start=0, duration=None):
-        '''
-            Construct a new ArrayDecoder from an numpy array
-
-            Parameters
-            ----------
-            samples : numpy array of dimension 1 (mono) or 2 (multichannel)
-                    if shape = (n) or (n,1) : n samples, mono
-                    if shape = (n,m) : n samples with m channels
-            start : float
-                start time of the segment in seconds
-            duration : float
-                duration of the segment in seconds
-        '''
-        super(ArrayDecoder, self).__init__(start=start, duration=duration)
-
-        # Check array dimension
-        if samples.ndim > 2:
-            raise TypeError('Wrong number of dimensions for argument samples')
-        if samples.ndim == 1:
-            samples = samples[:, np.newaxis]  # reshape to 2D array
-
-        self.samples = samples  # Create a 2 dimensions array
-        self.input_samplerate = samplerate
-        self.input_channels = self.samples.shape[1]
-
-        self.uri = '_'.join(['raw_audio_array',
-                            'x'.join([str(dim) for dim in samples.shape]),
-                             samples.dtype.type.__name__])
-
-        self.frames = self.get_frames()
-
-    def setup(self, channels=None, samplerate=None, blocksize=None):
-
-        # the output data format we want
-        if blocksize:
-            self.output_blocksize = blocksize
-        if samplerate:
-            self.output_samplerate = int(samplerate)
-        if channels:
-            self.output_channels = int(channels)
-
-        if self.uri_duration is None:
-            self.uri_duration = (len(self.samples) / self.input_samplerate
-                                 - self.uri_start)
-
-        if self.is_segment:
-            start_index = self.uri_start * self.input_samplerate
-            stop_index = start_index + int(np.ceil(self.uri_duration
-                                           * self.input_samplerate))
-            stop_index = min(stop_index, len(self.samples))
-            self.samples = self.samples[start_index:stop_index]
-
-        if not self.output_samplerate:
-            self.output_samplerate = self.input_samplerate
-
-        if not self.output_channels:
-            self.output_channels = self.input_channels
-
-        self.input_totalframes = len(self.samples)
-        self.input_duration = self.input_totalframes / self.input_samplerate
-        self.input_width = self.samples.itemsize * 8
-
-    def get_frames(self):
-        "Define an iterator that will return frames at the given blocksize"
-        nb_frames = self.input_totalframes // self.output_blocksize
-
-        if self.input_totalframes % self.output_blocksize == 0:
-            nb_frames -= 1  # Last frame must send eod=True
-
-        for index in xrange(0,
-                            nb_frames * self.output_blocksize,
-                            self.output_blocksize):
-            yield (self.samples[index:index+self.output_blocksize], False)
-
-        yield (self.samples[nb_frames * self.output_blocksize:], True)
-
-    @interfacedoc
-    def process(self):
-        return self.frames.next()
-
-    ## IDecoder methods
-    @interfacedoc
-    def format(self):
-        import re
-        base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0)
-        return 'audio/x-raw-'+base_type
-
-    @interfacedoc
-    def metadata(self):
-        return None
-
-
-if __name__ == "__main__":
-    # Run doctest from __main__ and unittest from tests
-    from tests.unit_timeside import run_test_module
-    # load corresponding tests
-    from tests import test_decoding, test_array_decoding
-
-    run_test_module([test_decoding, test_array_decoding])
diff --git a/timeside/decoder/file.py b/timeside/decoder/file.py
new file mode 100644 (file)
index 0000000..d48a38b
--- /dev/null
@@ -0,0 +1,322 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2007-2013 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com>
+# Copyright (c) 2010-2013 Paul Brossier <piem@piem.org>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+
+# Authors:
+# Paul Brossier <piem@piem.org>
+# Guillaume Pellerin <yomguy@parisson.com>
+# Thomas Fillon <thomas@parisson.com>
+
+
+from timeside.decoder.core import *
+
+
+class FileDecoder(Decoder):
+    """ gstreamer-based decoder """
+    implements(IDecoder)
+
+    output_blocksize = 8*1024
+
+    pipeline = None
+    mainloopthread = None
+
+    # IProcessor methods
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "gst_dec"
+
+    def __init__(self, uri, start=0, duration=None, stack=False):
+
+        """
+        Construct a new FileDecoder
+
+        Parameters
+        ----------
+        uri : str
+            uri of the media
+        start : float
+            start time of the segment in seconds
+        duration : float
+            duration of the segment in seconds
+
+        """
+
+        super(FileDecoder, self).__init__(start=start, duration=duration)
+
+        self.from_stack = False
+        self.stack = stack
+
+        self.uri = get_uri(uri)
+
+    def set_uri_default_duration(self):
+        # Set the duration from the length of the file
+        uri_total_duration = get_media_uri_info(self.uri)['duration']
+        self.uri_duration = uri_total_duration - self.uri_start
+
+    def setup(self, channels=None, samplerate=None, blocksize=None):
+
+        self.eod = False
+        self.last_buffer = None
+
+        if self.from_stack:
+            return
+
+        if self.stack:
+            self.process_pipe.frames_stack = []
+
+        if self.uri_duration is None:
+            self.set_uri_default_duration()
+
+        # a lock to wait wait for gstreamer thread to be ready
+        import threading
+        self.discovered_cond = threading.Condition(threading.Lock())
+        self.discovered = False
+
+        # the output data format we want
+        if blocksize:
+            self.output_blocksize = blocksize
+        if samplerate:
+            self.output_samplerate = int(samplerate)
+        if channels:
+            self.output_channels = int(channels)
+
+        if self.is_segment:
+            # Create the pipe with Gnonlin gnlurisource
+            self.pipe = ''' gnlurisource name=src uri={uri}
+                            start=0
+                            duration={uri_duration}
+                            media-start={uri_start}
+                            media-duration={uri_duration}
+                            ! audioconvert name=audioconvert
+                            ! audioresample
+                            ! appsink name=sink sync=False async=True
+                            '''.format(uri = self.uri,
+                                       uri_start = np.uint64(round(self.uri_start * gst.SECOND)),
+                                       uri_duration = np.int64(round(self.uri_duration * gst.SECOND)))
+                                       # convert uri_start and uri_duration to nanoseconds
+        else:
+            # Create the pipe with standard Gstreamer uridecodbin
+            self.pipe = ''' uridecodebin name=src uri={uri}
+                           ! audioconvert name=audioconvert
+                           ! audioresample
+                           ! appsink name=sink sync=False async=True
+                           '''.format(uri = self.uri)
+
+        self.pipeline = gst.parse_launch(self.pipe)
+
+        if self.output_channels:
+            caps_channels = int(self.output_channels)
+        else:
+            caps_channels = "[ 1, 2 ]"
+        if self.output_samplerate:
+            caps_samplerate = int(self.output_samplerate)
+        else:
+            caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
+        sink_caps = gst.Caps("""audio/x-raw-float,
+            endianness=(int)1234,
+            channels=(int)%s,
+            width=(int)32,
+            rate=(int)%s""" % (caps_channels, caps_samplerate))
+
+        self.src = self.pipeline.get_by_name('src')
+        self.conv = self.pipeline.get_by_name('audioconvert')
+        self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
+
+        self.sink = self.pipeline.get_by_name('sink')
+        self.sink.set_property("caps", sink_caps)
+        self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
+        self.sink.set_property("drop", False)
+        self.sink.set_property('emit-signals', True)
+        self.sink.connect("new-buffer", self._on_new_buffer_cb)
+
+        self.bus = self.pipeline.get_bus()
+        self.bus.add_signal_watch()
+        self.bus.connect('message', self._on_message_cb)
+
+        self.queue = Queue.Queue(QUEUE_SIZE)
+
+        import threading
+
+        class MainloopThread(threading.Thread):
+            def __init__(self, mainloop):
+                threading.Thread.__init__(self)
+                self.mainloop = mainloop
+
+            def run(self):
+                self.mainloop.run()
+        self.mainloop = gobject.MainLoop()
+        self.mainloopthread = MainloopThread(self.mainloop)
+        self.mainloopthread.start()
+        #self.mainloopthread = get_loop_thread()
+        ##self.mainloop = self.mainloopthread.mainloop
+
+        # start pipeline
+        self.pipeline.set_state(gst.STATE_PLAYING)
+
+        self.discovered_cond.acquire()
+        while not self.discovered:
+            #print 'waiting'
+            self.discovered_cond.wait()
+        self.discovered_cond.release()
+
+        if not hasattr(self, 'input_samplerate'):
+            if hasattr(self, 'error_msg'):
+                raise IOError(self.error_msg)
+            else:
+                raise IOError('no known audio stream found')
+
+    def _notify_caps_cb(self, pad, args):
+        self.discovered_cond.acquire()
+
+        caps = pad.get_negotiated_caps()
+        if not caps:
+            pad.info("no negotiated caps available")
+            self.discovered = True
+            self.discovered_cond.notify()
+            self.discovered_cond.release()
+            return
+        # the caps are fixed
+        # We now get the total length of that stream
+        q = gst.query_new_duration(gst.FORMAT_TIME)
+        pad.info("sending duration query")
+        if pad.get_peer().query(q):
+            format, length = q.parse_duration()
+            if format == gst.FORMAT_TIME:
+                pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
+            else:
+                pad.info("got duration : %d [format:%d]" % (length, format))
+        else:
+            length = -1
+            gst.warning("duration query failed")
+
+        # We store the caps and length in the proper location
+        if "audio" in caps.to_string():
+            self.input_samplerate = caps[0]["rate"]
+            if not self.output_samplerate:
+                self.output_samplerate = self.input_samplerate
+            self.input_channels = caps[0]["channels"]
+            if not self.output_channels:
+                self.output_channels = self.input_channels
+            self.input_duration = length / 1.e9
+
+            self.input_totalframes = int(self.input_duration * self.input_samplerate)
+            if "x-raw-float" in caps.to_string():
+                self.input_width = caps[0]["width"]
+            else:
+                self.input_width = caps[0]["depth"]
+
+        self.discovered = True
+        self.discovered_cond.notify()
+        self.discovered_cond.release()
+
+    def _on_message_cb(self, bus, message):
+        t = message.type
+        if t == gst.MESSAGE_EOS:
+            self.queue.put(gst.MESSAGE_EOS)
+            self.pipeline.set_state(gst.STATE_NULL)
+            self.mainloop.quit()
+        elif t == gst.MESSAGE_ERROR:
+            self.pipeline.set_state(gst.STATE_NULL)
+            err, debug = message.parse_error()
+            self.discovered_cond.acquire()
+            self.discovered = True
+            self.mainloop.quit()
+            self.error_msg = "Error: %s" % err, debug
+            self.discovered_cond.notify()
+            self.discovered_cond.release()
+        elif t == gst.MESSAGE_TAG:
+            # TODO
+            # msg.parse_tags()
+            pass
+
+    def _on_new_buffer_cb(self, sink):
+        buf = sink.emit('pull-buffer')
+        new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
+        #print 'processing new buffer', new_array.shape
+        if self.last_buffer is None:
+            self.last_buffer = new_array
+        else:
+            self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
+        while self.last_buffer.shape[0] >= self.output_blocksize:
+            new_block = self.last_buffer[:self.output_blocksize]
+            self.last_buffer = self.last_buffer[self.output_blocksize:]
+            #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
+            self.queue.put([new_block, False])
+
+    @interfacedoc
+    @stack
+    def process(self):
+        buf = self.queue.get()
+        if buf == gst.MESSAGE_EOS:
+            return self.last_buffer, True
+        frames, eod = buf
+        return frames, eod
+
+    @interfacedoc
+    def totalframes(self):
+        if self.input_samplerate == self.output_samplerate:
+            return self.input_totalframes
+        else:
+            ratio = self.output_samplerate / self.input_samplerate
+            return int(self.input_totalframes * ratio)
+
+    @interfacedoc
+    def release(self):
+        if self.stack:
+            self.stack = False
+            self.from_stack = True
+        pass
+
+    ## IDecoder methods
+
+    @interfacedoc
+    def format(self):
+        # TODO check
+        if self.mimetype == 'application/x-id3':
+            self.mimetype = 'audio/mpeg'
+        return self.mimetype
+
+    @interfacedoc
+    def encoding(self):
+        # TODO check
+        return self.mimetype.split('/')[-1]
+
+    @interfacedoc
+    def resolution(self):
+        # TODO check: width or depth?
+        return self.input_width
+
+    @interfacedoc
+    def metadata(self):
+        # TODO check
+        return self.tags
+
+
+if __name__ == "__main__":
+    # Run doctest from __main__ and unittest from tests
+    from tests.unit_timeside import run_test_module
+    # load corresponding tests
+    from tests import test_decoding
+
+    run_test_module([test_decoding])
diff --git a/timeside/decoder/live.py b/timeside/decoder/live.py
new file mode 100644 (file)
index 0000000..260b55c
--- /dev/null
@@ -0,0 +1,299 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2007-2013 Parisson
+# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
+# Copyright (c) 2007-2013 Guillaume Pellerin <pellerin@parisson.com>
+# Copyright (c) 2010-2013 Paul Brossier <piem@piem.org>
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+
+# Authors:
+# Paul Brossier <piem@piem.org>
+# Guillaume Pellerin <yomguy@parisson.com>
+# Thomas Fillon <thomas@parisson.com>
+
+
+from timeside.decoder.core import *
+
+
+class LiveDecoder(Decoder):
+    """ gstreamer-based decoder from live source"""
+    implements(IDecoder)
+
+    output_blocksize = 8*1024
+
+    pipeline = None
+    mainloopthread = None
+    input_src = 'alsasrc'
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "gst_live_dec"
+
+    def __init__(self, num_buffers=-1):
+
+        """
+        Construct a new LiveDecoder capturing audio from alsasrc
+
+        Parameters
+        ----------
+        num_buffers :
+            Number of buffers to output before sending EOS (-1 = unlimited).
+            (Allowed values: >= -1, Default value: -1)
+
+
+        Examples
+        --------
+
+        >>> import timeside
+
+        >>> live = timeside.decoder.LiveDecoder(num_buffers=25)
+        >>> a = timeside.analyzer.Waveform()
+        >>> e = timeside.encoder.Mp3Encoder('test_live.mp3', overwrite=True)
+        >>> pipe = (live | a | e)
+        >>> pipe.run()
+
+        >>> import matplotlib.pyplot as plt # doctest: +SKIP
+        >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP
+                 a.results['waveform_analyzer'].data) # doctest: +SKIP
+        >>> plt.show() # doctest: +SKIP
+
+        """
+
+        super(Decoder, self).__init__()
+        self.num_buffers = num_buffers
+        self.uri = None
+        self.uri_start = 0
+        self.uri_duration = None
+        self.is_segment = False
+
+    def setup(self, channels=None, samplerate=None, blocksize=None):
+
+        self.eod = False
+        self.last_buffer = None
+
+        # a lock to wait wait for gstreamer thread to be ready
+        import threading
+        self.discovered_cond = threading.Condition(threading.Lock())
+        self.discovered = False
+
+        # the output data format we want
+        if blocksize:
+            self.output_blocksize = blocksize
+        if samplerate:
+            self.output_samplerate = int(samplerate)
+        if channels:
+            self.output_channels = int(channels)
+
+        # Create the pipe with standard Gstreamer uridecodbin
+        self.pipe = '''%s num-buffers=%d name=src
+                       ! audioconvert name=audioconvert
+                       ! audioresample
+                       ! appsink name=sink sync=False async=True
+                       ''' % (self.input_src, self.num_buffers)
+
+        self.pipeline = gst.parse_launch(self.pipe)
+
+        if self.output_channels:
+            caps_channels = int(self.output_channels)
+        else:
+            caps_channels = "[ 1, 2 ]"
+        if self.output_samplerate:
+            caps_samplerate = int(self.output_samplerate)
+        else:
+            caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }"
+        sink_caps = gst.Caps("""audio/x-raw-float,
+            endianness=(int)1234,
+            channels=(int)%s,
+            width=(int)32,
+            rate=(int)%s""" % (caps_channels, caps_samplerate))
+
+        self.src = self.pipeline.get_by_name('src')
+        self.conv = self.pipeline.get_by_name('audioconvert')
+        self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb)
+
+        self.sink = self.pipeline.get_by_name('sink')
+        self.sink.set_property("caps", sink_caps)
+        self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
+        self.sink.set_property("drop", False)
+        self.sink.set_property('emit-signals', True)
+        self.sink.connect("new-buffer", self._on_new_buffer_cb)
+
+        self.bus = self.pipeline.get_bus()
+        self.bus.add_signal_watch()
+        self.bus.connect('message', self._on_message_cb)
+
+        self.queue = Queue.Queue(QUEUE_SIZE)
+
+        import threading
+
+        class MainloopThread(threading.Thread):
+            def __init__(self, mainloop):
+                threading.Thread.__init__(self)
+                self.mainloop = mainloop
+
+            def run(self):
+                self.mainloop.run()
+        self.mainloop = gobject.MainLoop()
+        self.mainloopthread = MainloopThread(self.mainloop)
+        self.mainloopthread.start()
+        #self.mainloopthread = get_loop_thread()
+        ##self.mainloop = self.mainloopthread.mainloop
+
+        # start pipeline
+        self.pipeline.set_state(gst.STATE_PLAYING)
+
+        self.discovered_cond.acquire()
+        while not self.discovered:
+            #print 'waiting'
+            self.discovered_cond.wait()
+        self.discovered_cond.release()
+
+        if not hasattr(self, 'input_samplerate'):
+            if hasattr(self, 'error_msg'):
+                raise IOError(self.error_msg)
+            else:
+                raise IOError('no known audio stream found')
+
+    def _notify_caps_cb(self, pad, args):
+        self.discovered_cond.acquire()
+
+        caps = pad.get_negotiated_caps()
+        if not caps:
+            pad.info("no negotiated caps available")
+            self.discovered = True
+            self.discovered_cond.notify()
+            self.discovered_cond.release()
+            return
+        # the caps are fixed
+        # We now get the total length of that stream
+        q = gst.query_new_duration(gst.FORMAT_TIME)
+        pad.info("sending duration query")
+        if pad.get_peer().query(q):
+            format, length = q.parse_duration()
+            if format == gst.FORMAT_TIME:
+                pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
+            else:
+                pad.info("got duration : %d [format:%d]" % (length, format))
+        else:
+            length = -1
+            gst.warning("duration query failed")
+
+        # We store the caps and length in the proper location
+        if "audio" in caps.to_string():
+            self.input_samplerate = caps[0]["rate"]
+            if not self.output_samplerate:
+                self.output_samplerate = self.input_samplerate
+            self.input_channels = caps[0]["channels"]
+            if not self.output_channels:
+                self.output_channels = self.input_channels
+            self.input_duration = length / 1.e9
+
+            self.input_totalframes = int(self.input_duration * self.input_samplerate)
+            if "x-raw-float" in caps.to_string():
+                self.input_width = caps[0]["width"]
+            else:
+                self.input_width = caps[0]["depth"]
+
+        self.discovered = True
+        self.discovered_cond.notify()
+        self.discovered_cond.release()
+
+    def _on_message_cb(self, bus, message):
+        t = message.type
+        if t == gst.MESSAGE_EOS:
+            self.queue.put(gst.MESSAGE_EOS)
+            self.pipeline.set_state(gst.STATE_NULL)
+            self.mainloop.quit()
+        elif t == gst.MESSAGE_ERROR:
+            self.pipeline.set_state(gst.STATE_NULL)
+            err, debug = message.parse_error()
+            self.discovered_cond.acquire()
+            self.discovered = True
+            self.mainloop.quit()
+            self.error_msg = "Error: %s" % err, debug
+            self.discovered_cond.notify()
+            self.discovered_cond.release()
+        elif t == gst.MESSAGE_TAG:
+            # TODO
+            # msg.parse_tags()
+            pass
+
+    def _on_new_buffer_cb(self, sink):
+        buf = sink.emit('pull-buffer')
+        new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
+
+        #print 'processing new buffer', new_array.shape
+        if self.last_buffer is None:
+            self.last_buffer = new_array
+        else:
+            self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0)
+        while self.last_buffer.shape[0] >= self.output_blocksize:
+            new_block = self.last_buffer[:self.output_blocksize]
+            self.last_buffer = self.last_buffer[self.output_blocksize:]
+            #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape
+            self.queue.put([new_block, False])
+
+    @interfacedoc
+    def process(self):
+        buf = self.queue.get()
+        if buf == gst.MESSAGE_EOS:
+            return self.last_buffer, True
+
+        frames, eod = buf
+        return frames, eod
+
+    @interfacedoc
+    def totalframes(self):
+        if self.input_samplerate == self.output_samplerate:
+            return self.input_totalframes
+        else:
+            ratio = self.output_samplerate / self.input_samplerate
+            return int(self.input_totalframes * ratio)
+
+    @interfacedoc
+    def release(self):
+        if self.stack:
+            self.stack = False
+            self.from_stack = True
+        pass
+
+    ## IDecoder methods
+
+    @interfacedoc
+    def format(self):
+        # TODO check
+        if self.mimetype == 'application/x-id3':
+            self.mimetype = 'audio/mpeg'
+        return self.mimetype
+
+    @interfacedoc
+    def encoding(self):
+        # TODO check
+        return self.mimetype.split('/')[-1]
+
+    @interfacedoc
+    def resolution(self):
+        # TODO check: width or depth?
+        return self.input_width
+
+    @interfacedoc
+    def metadata(self):
+        # TODO check
+        return self.tags
+
index 64146a5945b107c27b862d4475ab1818f8c74a99..d96adb44fa2adec89c3e10ca96476539f54cef25 100644 (file)
@@ -137,6 +137,19 @@ def get_media_uri_info(uri):
     return info
 
 
+def stack(process_func):
+
+    import functools
+
+    @functools.wraps(process_func)
+    def wrapper(decoder):
+        # Processing
+        frames, eod = process_func(decoder)
+        if decoder.stack:
+            decoder.process_pipe.frames_stack.append((frames, eod))
+        return frames, eod
+    return wrapper
+
 
 if __name__ == "__main__":
     # Run doctest from __main__ and unittest from tests