]> git.parisson.com Git - timeside.git/commitdiff
timeside/decoder/: simplify, use bus.pop() and gst_parse_launch(), remove all mainloo...
authorPaul Brossier <piem@piem.org>
Wed, 1 Aug 2012 18:53:58 +0000 (12:53 -0600)
committerPaul Brossier <piem@piem.org>
Wed, 1 Aug 2012 18:53:58 +0000 (12:53 -0600)
timeside/decoder/__init__.py
timeside/decoder/core.py
timeside/decoder/old.py [deleted file]
timeside/decoder/sink.py [deleted file]

index 0741aadb6c4e846dde88c06d3c70ef954fa54f49..78457b38c474f99342fb5953a59e133efa8bd2c0 100644 (file)
@@ -2,5 +2,3 @@
 
 import timeside.core
 from core import *
-from sink import *
-#from old import *
index 0a1d8f7842eb13406ce93e50d7e9796977862415..d6e7f5d519b146d6590d092d0cfd4ee90f860a16 100644 (file)
 
 from timeside.core import Processor, implements, interfacedoc
 from timeside.api import IDecoder
-from numpy import array, frombuffer, getbuffer, float32, append
-from timeside.decoder.sink import *
-
-import time
-import pygst
-pygst.require('0.10')
-import gst
-import gobject
-gobject.threads_init()
+from timeside.encoder.gstutils import *
 
 class FileDecoder(Processor):
     """ gstreamer-based decoder """
@@ -58,91 +50,58 @@ class FileDecoder(Processor):
 
     def setup(self, channels = None, samplerate = None, nframes = None):
         # the output data format we want
-        caps = "audio/x-raw-float, width=32"
-
-        src = gst.element_factory_make('uridecodebin')
-        src.set_property('uri', self.uri)
-        src.connect('pad-added', self.source_pad_added_cb)
-
-        conv = gst.element_factory_make('audioconvert')
-        self.conv = conv
-        self.apad = self.conv.get_pad('sink')
-
-        capsfilter = gst.element_factory_make('capsfilter')
-        capsfilter.set_property('caps', gst.caps_from_string(caps))
-
-        sink = TimesideSink("sink")
-        sink.set_property("hopsize", 8*1024)
-        sink.set_property("sync", False)
-
-        self.pipe = '''uridecodebin uri="%s" name=src
-            ! audioconvert
-            ! %s
-            ! timesidesink name=sink sync=False ''' % (self.uri, caps)
-
-        self.sink = sink
-        # TODO
-        #self.sink.set_property('emit-signals', True)
-
-        self.pipeline = gst.Pipeline()
-        self.pipeline.add(src, conv, capsfilter, sink)
+        blocksize = 8*1024
+        uri = self.uri
+        self.pipe = ''' uridecodebin uri=%(uri)s
+                  ! audioconvert
+                  ! appsink name=sink blocksize=%(blocksize)s sync=False async=True emit-signals=True
+                  ''' % locals()
+        self.pipeline = gst.parse_launch(self.pipe)
+
+        sink_caps = gst.Caps("""audio/x-raw-float,
+            endianness=(int)1234,
+            channels=(int)%s,
+            width=(int)32,
+            rate=(int)%d""" % (int(self.audiochannels), int(self.audiorate)))
+
+        self.sink = self.pipeline.get_by_name('sink')
+        self.sink.set_property("caps", sink_caps)
+        self.sink.set_property('emit-signals', True)
 
         self.bus = self.pipeline.get_bus()
         self.bus.add_signal_watch()
-        self.bus.connect('message::eos', self.on_eos)
-        self.bus.connect('message::tag', self.on_tag)
-        self.bus.connect('message::error', self.on_error)
-
-        gst.element_link_many(conv, capsfilter, sink)
-
-        self.mainloop = gobject.MainLoop()
+        self.bus.connect('message', self.on_message)
 
         # start pipeline
         self.pipeline.set_state(gst.STATE_PLAYING)
 
-        # self.mainloop.run()
-        # NOTE: instead of running the mainloop on the main thread, put it in a
-        # thread so that the main thread start calling process and pull from
-        # buffers from timesidesink
-
-        import threading
-        class MainloopThread(threading.Thread):
-            def __init__(self, mainloop):
-                threading.Thread.__init__(self)
-                self.mainloop = mainloop
-
-            def run(self):
-                self.mainloop.run()
-        self.mainloopthread = MainloopThread(self.mainloop)
-        self.mainloopthread.start()
-
-        #FIXME: prevent mp3 encoder from hanging
-        time.sleep(0.1)
-
-    def source_pad_added_cb(self, src, pad):
-        name = pad.get_caps()[0].get_name()
-        if name == 'audio/x-raw-float' or name == 'audio/x-raw-int':
-            if not self.apad.is_linked():
-                pad.link(self.conv.get_pad("sink"))
-
-    def on_eos(self, bus, msg):
-        #print 'on_eos'
-        self.sink.on_eos()
-        self.pipeline.set_state(gst.STATE_NULL)
-        self.mainloop.quit()
-
-    def on_tag(self, bus, msg):
-        taglist = msg.parse_tag()
-        """
-        print 'on_tag:'
-        for key in taglist.keys():
-            print '\t%s = %s' % (key, taglist[key])
-        """
+    def on_message(self, bus, message):
+        t = message.type
+        if t == gst.MESSAGE_EOS:
+            self.pipeline.set_state(gst.STATE_NULL)
+        elif t == gst.MESSAGE_ERROR:
+            self.pipeline.set_state(gst.STATE_NULL)
+            err, debug = message.parse_error()
+            print "Error: %s" % err, debug
+        elif t == gst.MESSAGE_TAG:
+            # TODO
+            # msg.parse_tags()
+            pass
 
-    def on_error(self, bus, msg):
-        error = msg.parse_error()
-        print 'on_error:', error[1]
-        self.mainloop.quit()
+    @interfacedoc
+    def process(self, frames = None, eod = False):
+        self.eod = eod
+        if self.eod:
+            self.src.emit('end-of-stream')
+        try:
+            buf = self.sink.emit('pull-buffer')
+        except SystemError, e:
+            # should never happen
+            print 'SystemError', e
+            return array([0.]), True
+        if buf == None:
+            return array([0.]), True
+        return gst_buffer_to_numpy_array(buf, self.audiochannels), self.eod
 
     @interfacedoc
     def channels(self):
@@ -156,23 +115,13 @@ class FileDecoder(Processor):
     def nframes(self):
         return self.audionframes
 
-    @interfacedoc
-    def process(self, frames = None, eod = False):
-        try:
-            #buf = self.sink.emit('pull-buffer')
-            buf = self.sink.pull()
-        except SystemError, e:
-            # should never happen
-            print 'SystemError', e
-            return array([0.]), True
-        if buf == None:
-            return array([0.]), True
-        return self.gst_buffer_to_numpy_array(buf), False
-
     @interfacedoc
     def release(self):
-        self.mainloopthread.join()
-        pass
+        while self.bus.have_pending():
+          self.bus.pop()
+
+    def __del__(self):
+        self.release()
 
     ## IDecoder methods
 
@@ -223,12 +172,21 @@ class FileDecoder(Processor):
         from gst.extend import discoverer
         d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
         d.connect('discovered', self.discovered)
-        self.mainloop = gobject.MainLoop()
+        self.bus = d.get_bus()
+        self.bus.add_signal_watch()
+        self.pipeline = d
+        self.bus.connect("message", self.on_message)
+
+        # start pipeline
         d.discover()
-        self.mainloop.run()
+        self.pipeline.set_state(gst.STATE_PLAYING)
+        while self.bus.have_pending():
+            self.bus.pop()
+        d.print_info()
 
     def discovered(self, d, is_media):
         """ gstreamer based helper executed upon discover() completion """
+        from math import ceil
         if is_media and d.is_audio:
             # copy the discoverer attributes to self
             self.audiorate = d.audiorate
@@ -238,19 +196,11 @@ class FileDecoder(Processor):
             # conversion from time in nanoseconds to seconds
             self.duration = d.audiolength * 1.e-9
             # conversion from time in nanoseconds to frames
-            from math import ceil
-            duration = d.audiorate * d.audiolength * 1.e-9
-            self.audionframes = int (ceil ( duration ) )
+            self.audionframes = int ( ceil (d.audiorate * d.audiolength * 1.e-9) )
+            # copy tags
             self.tags = d.tags
         elif not d.is_audio:
             print "error, no audio found!"
         else:
             print "fail", path
-        self.mainloop.quit()
-
-    def gst_buffer_to_numpy_array(self, buf):
-        """ gstreamer buffer to numpy array conversion """
-        chan = self.audiochannels
-        samples = frombuffer(buf.data, dtype=float32)
-        samples.resize([len(samples)/chan, chan])
-        return samples
+        self.pipeline.set_state(gst.STATE_NULL)
diff --git a/timeside/decoder/old.py b/timeside/decoder/old.py
deleted file mode 100644 (file)
index f2971cb..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2007-2009 Parisson
-# Copyright (c) 2007 Olivier Guilyardi <olivier@samalyse.com>
-# Copyright (c) 2007-2009 Guillaume Pellerin <pellerin@parisson.com>
-#
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
-
-# Author: Paul Brossier <piem@piem.org>
-
-from timeside.core import Processor, implements, interfacedoc
-from timeside.api import IDecoder
-from numpy import array, frombuffer, getbuffer, float32, append
-
-import pygst
-pygst.require('0.10')
-import gst
-import gobject
-gobject.threads_init()
-
-class FileDecoderOld(Processor):
-    """ gstreamer-based decoder """
-    #implements(IDecoder)
-
-    # duration ms, before discovery process times out
-    MAX_DISCOVERY_TIME = 3000
-
-    audioformat = None
-    audiochannels = None
-    audiorate = None
-    audionframes = None
-    mimetype = ''
-
-    # IProcessor methods
-
-    @staticmethod
-    @interfacedoc
-    def id():
-        return "gstreamerdecold"
-
-    def setup(self, channels = None, samplerate = None, nframes = None):
-        # the output data format we want
-        caps = "audio/x-raw-float, width=32"
-        pipeline = gst.parse_launch('''uridecodebin uri="%s"
-            ! audioconvert
-            ! %s
-            ! appsink name=sink sync=False ''' % (self.uri, caps))
-        # store a pointer to appsink in our decoder object
-        self.sink = pipeline.get_by_name('sink')
-        # adjust length of emitted buffers
-        # self.sink.set_property('blocksize', 0x10000)
-        # start pipeline
-        pipeline.set_state(gst.STATE_PLAYING)
-        #self.mainloop.run()
-
-    @interfacedoc
-    def channels(self):
-        return  self.audiochannels
-
-    @interfacedoc
-    def samplerate(self):
-        return self.audiorate
-
-    @interfacedoc
-    def nframes(self):
-        return self.audionframes
-
-    @interfacedoc
-    def process(self, frames = None, eod = False):
-        try:
-            buf = self.sink.emit('pull-buffer')
-        except SystemError, e:
-            # should never happen
-            print 'SystemError', e
-            return array([0.]), True
-        if buf == None:
-            return array([0.]), True
-        return self.gst_buffer_to_numpy_array(buf), False
-
-    @interfacedoc
-    def release(self):
-        # nothing to do for now
-        pass
-
-    ## IDecoder methods
-
-    @interfacedoc
-    def __init__(self, uri):
-        # is this a file?
-        import os.path
-        if os.path.exists(uri):
-            # get the absolute path
-            uri = os.path.abspath(uri)
-            # first run the file/uri through the discover pipeline
-            self.discover(uri)
-            # and make a uri of it
-            from urllib import quote
-            self.uri = 'file://'+quote(uri)
-        else:
-            self.uri = uri
-
-    @interfacedoc
-    def format(self):
-        # TODO check
-        if self.mimetype == 'application/x-id3':
-            self.mimetype = 'audio/mpeg'
-        return self.mimetype
-
-    @interfacedoc
-    def encoding(self):
-        # TODO check
-        return self.mimetype.split('/')[-1]
-
-    @interfacedoc
-    def resolution(self):
-        # TODO check: width or depth?
-        return self.audiowidth
-
-    @interfacedoc
-    def metadata(self):
-        # TODO check
-        return self.tags
-
-    ## gst.extend discoverer
-
-    def discover(self, path):
-        """ gstreamer based helper function to get file attributes """
-        from gst.extend import discoverer
-        d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME)
-        d.connect('discovered', self.discovered)
-        self.mainloop = gobject.MainLoop()
-        d.discover()
-        self.mainloop.run()
-
-    def discovered(self, d, is_media):
-        """ gstreamer based helper executed upon discover() completion """
-        if is_media and d.is_audio:
-            # copy the discoverer attributes to self
-            self.audiorate = d.audiorate
-            self.mimetype= d.mimetype
-            self.audiochannels = d.audiochannels
-            self.audiowidth = d.audiowidth
-            # conversion from time in nanoseconds to frames
-            from math import ceil
-            duration = d.audiorate * d.audiolength * 1.e-9
-            self.audionframes = int (ceil ( duration ) )
-            self.tags = d.tags
-        elif not d.is_audio:
-            print "error, no audio found!"
-        else:
-            print "fail", path
-        self.mainloop.quit()
-
-    def gst_buffer_to_numpy_array(self, buf):
-        """ gstreamer buffer to numpy array conversion """
-        chan = self.audiochannels
-        samples = frombuffer(buf.data, dtype=float32)
-        samples.resize([len(samples)/chan, chan])
-        return samples
diff --git a/timeside/decoder/sink.py b/timeside/decoder/sink.py
deleted file mode 100644 (file)
index 878811e..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-import pygst
-pygst.require('0.10')
-import gst
-import gobject
-gobject.threads_init()
-
-class TimesideSink(gst.BaseSink):
-    """
-    a simple sink element with a hopsize property to adjust the size of the buffer emitted
-    """
-    _caps = gst.caps_from_string('audio/x-raw-float, \
-                    rate=[ 1, 2147483647 ], \
-                    channels=[ 1, 2147483647 ], \
-                    endianness={ 1234, 4321 }, \
-                    width=32')
-
-    __gsttemplates__ = ( 
-            gst.PadTemplate ("sink",
-                gst.PAD_SINK,
-                gst.PAD_ALWAYS,
-                _caps),
-            )
-
-    def __init__(self, name):
-        import threading
-        self.cv = threading.Condition(threading.Lock())
-        self.eos = 0
-        self.__gobject_init__()
-        self.set_name(name)
-        self.adapter = gst.Adapter()
-        self.set_property('sync', False)
-
-    def set_property(self, name, value): 
-        if name == 'hopsize':
-            # blocksize is in byte, convert from hopsize 
-            from struct import calcsize
-            self.set_property('blocksize', value * calcsize('f'))
-        else:
-            super(gst.BaseSink, self).set_property(name, value)
-
-    def do_render(self, buffer):
-        self.cv.acquire()
-        self.adapter.push(buffer)
-        if self.adapter.available() == 0: self.eos = 1
-        self.cv.notify()
-        self.cv.release()
-        return gst.FLOW_OK
-
-    def on_eos(self):
-        self.cv.acquire()
-        self.eos = 1
-        self.cv.notify()
-        self.cv.release()
-
-    def pull(self):
-        self.cv.acquire()
-        while not self.adapter.available():
-            if self.eos: break
-            self.cv.wait()
-        # TODO use signals
-        blocksize = self.get_property('blocksize')
-        remaining = self.adapter.available()
-        if remaining == 0:
-            ret = None
-        if remaining >= blocksize:
-            ret = self.adapter.take_buffer(blocksize)
-        if remaining < blocksize and remaining > 0:
-            ret = self.adapter.take_buffer(remaining)
-        self.cv.release()
-        return ret
-
-gobject.type_register(TimesideSink)
-
-