From f0b1cce9af51e165acbdfe4036a0a24e9de70f9d Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Wed, 1 Aug 2012 12:53:58 -0600 Subject: [PATCH] timeside/decoder/: simplify, use bus.pop() and gst_parse_launch(), remove all mainloops and older code --- timeside/decoder/__init__.py | 2 - timeside/decoder/core.py | 178 +++++++++++++---------------------- timeside/decoder/old.py | 173 ---------------------------------- timeside/decoder/sink.py | 74 --------------- 4 files changed, 64 insertions(+), 363 deletions(-) delete mode 100644 timeside/decoder/old.py delete mode 100644 timeside/decoder/sink.py diff --git a/timeside/decoder/__init__.py b/timeside/decoder/__init__.py index 0741aad..78457b3 100644 --- a/timeside/decoder/__init__.py +++ b/timeside/decoder/__init__.py @@ -2,5 +2,3 @@ import timeside.core from core import * -from sink import * -#from old import * diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 0a1d8f7..d6e7f5d 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -26,15 +26,7 @@ from timeside.core import Processor, implements, interfacedoc from timeside.api import IDecoder -from numpy import array, frombuffer, getbuffer, float32, append -from timeside.decoder.sink import * - -import time -import pygst -pygst.require('0.10') -import gst -import gobject -gobject.threads_init() +from timeside.encoder.gstutils import * class FileDecoder(Processor): """ gstreamer-based decoder """ @@ -58,91 +50,58 @@ class FileDecoder(Processor): def setup(self, channels = None, samplerate = None, nframes = None): # the output data format we want - caps = "audio/x-raw-float, width=32" - - src = gst.element_factory_make('uridecodebin') - src.set_property('uri', self.uri) - src.connect('pad-added', self.source_pad_added_cb) - - conv = gst.element_factory_make('audioconvert') - self.conv = conv - self.apad = self.conv.get_pad('sink') - - capsfilter = gst.element_factory_make('capsfilter') - capsfilter.set_property('caps', gst.caps_from_string(caps)) - - sink = TimesideSink("sink") - sink.set_property("hopsize", 8*1024) - sink.set_property("sync", False) - - self.pipe = '''uridecodebin uri="%s" name=src - ! audioconvert - ! %s - ! timesidesink name=sink sync=False ''' % (self.uri, caps) - - self.sink = sink - # TODO - #self.sink.set_property('emit-signals', True) - - self.pipeline = gst.Pipeline() - self.pipeline.add(src, conv, capsfilter, sink) + blocksize = 8*1024 + uri = self.uri + self.pipe = ''' uridecodebin uri=%(uri)s + ! audioconvert + ! appsink name=sink blocksize=%(blocksize)s sync=False async=True emit-signals=True + ''' % locals() + self.pipeline = gst.parse_launch(self.pipe) + + sink_caps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%d""" % (int(self.audiochannels), int(self.audiorate))) + + self.sink = self.pipeline.get_by_name('sink') + self.sink.set_property("caps", sink_caps) + self.sink.set_property('emit-signals', True) self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() - self.bus.connect('message::eos', self.on_eos) - self.bus.connect('message::tag', self.on_tag) - self.bus.connect('message::error', self.on_error) - - gst.element_link_many(conv, capsfilter, sink) - - self.mainloop = gobject.MainLoop() + self.bus.connect('message', self.on_message) # start pipeline self.pipeline.set_state(gst.STATE_PLAYING) - # self.mainloop.run() - # NOTE: instead of running the mainloop on the main thread, put it in a - # thread so that the main thread start calling process and pull from - # buffers from timesidesink - - import threading - class MainloopThread(threading.Thread): - def __init__(self, mainloop): - threading.Thread.__init__(self) - self.mainloop = mainloop - - def run(self): - self.mainloop.run() - self.mainloopthread = MainloopThread(self.mainloop) - self.mainloopthread.start() - - #FIXME: prevent mp3 encoder from hanging - time.sleep(0.1) - - def source_pad_added_cb(self, src, pad): - name = pad.get_caps()[0].get_name() - if name == 'audio/x-raw-float' or name == 'audio/x-raw-int': - if not self.apad.is_linked(): - pad.link(self.conv.get_pad("sink")) - - def on_eos(self, bus, msg): - #print 'on_eos' - self.sink.on_eos() - self.pipeline.set_state(gst.STATE_NULL) - self.mainloop.quit() - - def on_tag(self, bus, msg): - taglist = msg.parse_tag() - """ - print 'on_tag:' - for key in taglist.keys(): - print '\t%s = %s' % (key, taglist[key]) - """ + def on_message(self, bus, message): + t = message.type + if t == gst.MESSAGE_EOS: + self.pipeline.set_state(gst.STATE_NULL) + elif t == gst.MESSAGE_ERROR: + self.pipeline.set_state(gst.STATE_NULL) + err, debug = message.parse_error() + print "Error: %s" % err, debug + elif t == gst.MESSAGE_TAG: + # TODO + # msg.parse_tags() + pass - def on_error(self, bus, msg): - error = msg.parse_error() - print 'on_error:', error[1] - self.mainloop.quit() + @interfacedoc + def process(self, frames = None, eod = False): + self.eod = eod + if self.eod: + self.src.emit('end-of-stream') + try: + buf = self.sink.emit('pull-buffer') + except SystemError, e: + # should never happen + print 'SystemError', e + return array([0.]), True + if buf == None: + return array([0.]), True + return gst_buffer_to_numpy_array(buf, self.audiochannels), self.eod @interfacedoc def channels(self): @@ -156,23 +115,13 @@ class FileDecoder(Processor): def nframes(self): return self.audionframes - @interfacedoc - def process(self, frames = None, eod = False): - try: - #buf = self.sink.emit('pull-buffer') - buf = self.sink.pull() - except SystemError, e: - # should never happen - print 'SystemError', e - return array([0.]), True - if buf == None: - return array([0.]), True - return self.gst_buffer_to_numpy_array(buf), False - @interfacedoc def release(self): - self.mainloopthread.join() - pass + while self.bus.have_pending(): + self.bus.pop() + + def __del__(self): + self.release() ## IDecoder methods @@ -223,12 +172,21 @@ class FileDecoder(Processor): from gst.extend import discoverer d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME) d.connect('discovered', self.discovered) - self.mainloop = gobject.MainLoop() + self.bus = d.get_bus() + self.bus.add_signal_watch() + self.pipeline = d + self.bus.connect("message", self.on_message) + + # start pipeline d.discover() - self.mainloop.run() + self.pipeline.set_state(gst.STATE_PLAYING) + while self.bus.have_pending(): + self.bus.pop() + d.print_info() def discovered(self, d, is_media): """ gstreamer based helper executed upon discover() completion """ + from math import ceil if is_media and d.is_audio: # copy the discoverer attributes to self self.audiorate = d.audiorate @@ -238,19 +196,11 @@ class FileDecoder(Processor): # conversion from time in nanoseconds to seconds self.duration = d.audiolength * 1.e-9 # conversion from time in nanoseconds to frames - from math import ceil - duration = d.audiorate * d.audiolength * 1.e-9 - self.audionframes = int (ceil ( duration ) ) + self.audionframes = int ( ceil (d.audiorate * d.audiolength * 1.e-9) ) + # copy tags self.tags = d.tags elif not d.is_audio: print "error, no audio found!" else: print "fail", path - self.mainloop.quit() - - def gst_buffer_to_numpy_array(self, buf): - """ gstreamer buffer to numpy array conversion """ - chan = self.audiochannels - samples = frombuffer(buf.data, dtype=float32) - samples.resize([len(samples)/chan, chan]) - return samples + self.pipeline.set_state(gst.STATE_NULL) diff --git a/timeside/decoder/old.py b/timeside/decoder/old.py deleted file mode 100644 index f2971cb..0000000 --- a/timeside/decoder/old.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright (C) 2007-2009 Parisson -# Copyright (c) 2007 Olivier Guilyardi -# Copyright (c) 2007-2009 Guillaume Pellerin -# -# This file is part of TimeSide. - -# TimeSide is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 2 of the License, or -# (at your option) any later version. - -# TimeSide is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with TimeSide. If not, see . - -# Author: Paul Brossier - -from timeside.core import Processor, implements, interfacedoc -from timeside.api import IDecoder -from numpy import array, frombuffer, getbuffer, float32, append - -import pygst -pygst.require('0.10') -import gst -import gobject -gobject.threads_init() - -class FileDecoderOld(Processor): - """ gstreamer-based decoder """ - #implements(IDecoder) - - # duration ms, before discovery process times out - MAX_DISCOVERY_TIME = 3000 - - audioformat = None - audiochannels = None - audiorate = None - audionframes = None - mimetype = '' - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "gstreamerdecold" - - def setup(self, channels = None, samplerate = None, nframes = None): - # the output data format we want - caps = "audio/x-raw-float, width=32" - pipeline = gst.parse_launch('''uridecodebin uri="%s" - ! audioconvert - ! %s - ! appsink name=sink sync=False ''' % (self.uri, caps)) - # store a pointer to appsink in our decoder object - self.sink = pipeline.get_by_name('sink') - # adjust length of emitted buffers - # self.sink.set_property('blocksize', 0x10000) - # start pipeline - pipeline.set_state(gst.STATE_PLAYING) - #self.mainloop.run() - - @interfacedoc - def channels(self): - return self.audiochannels - - @interfacedoc - def samplerate(self): - return self.audiorate - - @interfacedoc - def nframes(self): - return self.audionframes - - @interfacedoc - def process(self, frames = None, eod = False): - try: - buf = self.sink.emit('pull-buffer') - except SystemError, e: - # should never happen - print 'SystemError', e - return array([0.]), True - if buf == None: - return array([0.]), True - return self.gst_buffer_to_numpy_array(buf), False - - @interfacedoc - def release(self): - # nothing to do for now - pass - - ## IDecoder methods - - @interfacedoc - def __init__(self, uri): - # is this a file? - import os.path - if os.path.exists(uri): - # get the absolute path - uri = os.path.abspath(uri) - # first run the file/uri through the discover pipeline - self.discover(uri) - # and make a uri of it - from urllib import quote - self.uri = 'file://'+quote(uri) - else: - self.uri = uri - - @interfacedoc - def format(self): - # TODO check - if self.mimetype == 'application/x-id3': - self.mimetype = 'audio/mpeg' - return self.mimetype - - @interfacedoc - def encoding(self): - # TODO check - return self.mimetype.split('/')[-1] - - @interfacedoc - def resolution(self): - # TODO check: width or depth? - return self.audiowidth - - @interfacedoc - def metadata(self): - # TODO check - return self.tags - - ## gst.extend discoverer - - def discover(self, path): - """ gstreamer based helper function to get file attributes """ - from gst.extend import discoverer - d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME) - d.connect('discovered', self.discovered) - self.mainloop = gobject.MainLoop() - d.discover() - self.mainloop.run() - - def discovered(self, d, is_media): - """ gstreamer based helper executed upon discover() completion """ - if is_media and d.is_audio: - # copy the discoverer attributes to self - self.audiorate = d.audiorate - self.mimetype= d.mimetype - self.audiochannels = d.audiochannels - self.audiowidth = d.audiowidth - # conversion from time in nanoseconds to frames - from math import ceil - duration = d.audiorate * d.audiolength * 1.e-9 - self.audionframes = int (ceil ( duration ) ) - self.tags = d.tags - elif not d.is_audio: - print "error, no audio found!" - else: - print "fail", path - self.mainloop.quit() - - def gst_buffer_to_numpy_array(self, buf): - """ gstreamer buffer to numpy array conversion """ - chan = self.audiochannels - samples = frombuffer(buf.data, dtype=float32) - samples.resize([len(samples)/chan, chan]) - return samples diff --git a/timeside/decoder/sink.py b/timeside/decoder/sink.py deleted file mode 100644 index 878811e..0000000 --- a/timeside/decoder/sink.py +++ /dev/null @@ -1,74 +0,0 @@ -import pygst -pygst.require('0.10') -import gst -import gobject -gobject.threads_init() - -class TimesideSink(gst.BaseSink): - """ - a simple sink element with a hopsize property to adjust the size of the buffer emitted - """ - _caps = gst.caps_from_string('audio/x-raw-float, \ - rate=[ 1, 2147483647 ], \ - channels=[ 1, 2147483647 ], \ - endianness={ 1234, 4321 }, \ - width=32') - - __gsttemplates__ = ( - gst.PadTemplate ("sink", - gst.PAD_SINK, - gst.PAD_ALWAYS, - _caps), - ) - - def __init__(self, name): - import threading - self.cv = threading.Condition(threading.Lock()) - self.eos = 0 - self.__gobject_init__() - self.set_name(name) - self.adapter = gst.Adapter() - self.set_property('sync', False) - - def set_property(self, name, value): - if name == 'hopsize': - # blocksize is in byte, convert from hopsize - from struct import calcsize - self.set_property('blocksize', value * calcsize('f')) - else: - super(gst.BaseSink, self).set_property(name, value) - - def do_render(self, buffer): - self.cv.acquire() - self.adapter.push(buffer) - if self.adapter.available() == 0: self.eos = 1 - self.cv.notify() - self.cv.release() - return gst.FLOW_OK - - def on_eos(self): - self.cv.acquire() - self.eos = 1 - self.cv.notify() - self.cv.release() - - def pull(self): - self.cv.acquire() - while not self.adapter.available(): - if self.eos: break - self.cv.wait() - # TODO use signals - blocksize = self.get_property('blocksize') - remaining = self.adapter.available() - if remaining == 0: - ret = None - if remaining >= blocksize: - ret = self.adapter.take_buffer(blocksize) - if remaining < blocksize and remaining > 0: - ret = self.adapter.take_buffer(remaining) - self.cv.release() - return ret - -gobject.type_register(TimesideSink) - - -- 2.39.5