From: Paul Brossier Date: Fri, 15 Apr 2011 09:55:37 +0000 (+0000) Subject: timeside/decoder/core.py: fix new decoder, was missing mainloop.run(), improve X-Git-Tag: 0.3.2~39 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=6fb32d978486b8c2cd6291f922a030b82fb0a8b8;p=timeside.git timeside/decoder/core.py: fix new decoder, was missing mainloop.run(), improve --- diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 697a7b9..0a8b69f 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -25,6 +25,9 @@ from timeside.core import Processor, implements, interfacedoc from timeside.api import IDecoder from numpy import array, frombuffer, getbuffer, float32, append +from timeside.decoder.old import * + +from timeside.decoder.sink import * import pygst pygst.require('0.10') @@ -32,54 +35,6 @@ import gst import gobject gobject.threads_init() -class TimesideSink(gst.BaseSink): - """ - a simple sink element with a hopsize property to adjust the size of the buffer emitted - """ - _caps = gst.caps_from_string('audio/x-raw-float, \ - rate=[ 1, 2147483647 ], \ - channels=[ 1, 2147483647 ], \ - endianness={ 1234, 4321 }, \ - width=32') - - __gsttemplates__ = ( - gst.PadTemplate ("sink", - gst.PAD_SINK, - gst.PAD_ALWAYS, - _caps), - ) - - def __init__(self, name): - self.__gobject_init__() - self.set_name(name) - self.adapter = gst.Adapter() - self.set_property('sync', False) - - def set_property(self, name, value): - if name == 'hopsize': - # blocksize is in byte, convert from hopsize - from struct import calcsize - self.set_property('blocksize', value * calcsize('f')) - else: - super(gst.BaseSink, self).set_property(name, value) - - def do_render(self, buffer): - self.adapter.push(buffer) - return gst.FLOW_OK - - def pull(self): - # TODO use signals - blocksize = self.get_property('blocksize') - remaining = self.adapter.available() - if remaining == 0: - return None - if remaining >= blocksize: - return self.adapter.take_buffer(blocksize) - if remaining < blocksize and remaining > 0: - return self.adapter.take_buffer(remaining) - -gobject.type_register(TimesideSink) - class FileDecoder(Processor): """ gstreamer-based decoder """ implements(IDecoder) @@ -92,7 +47,7 @@ class FileDecoder(Processor): audiorate = None audionframes = None mimetype = '' - + # IProcessor methods @staticmethod @@ -101,24 +56,23 @@ class FileDecoder(Processor): return "gstreamerdec" def setup(self, channels = None, samplerate = None, nframes = None): - # the output data format we want caps = "audio/x-raw-float, width=32" src = gst.element_factory_make('uridecodebin') src.set_property('uri', self.uri) src.connect('pad-added', self.source_pad_added_cb) - src.connect('pad-removed', self.source_pad_removed_cb) conv = gst.element_factory_make('audioconvert') self.conv = conv + self.apad = self.conv.get_pad('sink') capsfilter = gst.element_factory_make('capsfilter') capsfilter.set_property('caps', gst.caps_from_string(caps)) sink = TimesideSink("sink") - sink.set_property("sync", False) sink.set_property("hopsize", 8*1024) + sink.set_property("sync", False) self.pipe = '''uridecodebin uri="%s" name=src ! audioconvert @@ -128,25 +82,47 @@ class FileDecoder(Processor): self.sink = sink # TODO #self.sink.set_property('emit-signals', True) - # adjust length of emitted buffers - self.src = src #self.pipeline.get_by_name('src') self.pipeline = gst.Pipeline() self.pipeline.add(src, conv, capsfilter, sink) + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect('message::eos', self.on_eos) + self.bus.connect('message::tag', self.on_tag) + self.bus.connect('message::error', self.on_error) + gst.element_link_many(conv, capsfilter, sink) + self.mainloop = gobject.MainLoop() + # start pipeline self.pipeline.set_state(gst.STATE_PLAYING) + self.mainloop.run() def source_pad_added_cb(self, src, pad): name = pad.get_caps()[0].get_name() if name == 'audio/x-raw-float' or name == 'audio/x-raw-int': - pad.link(self.conv.get_pad("sink")) + if not self.apad.is_linked(): + pad.link(self.conv.get_pad("sink")) - def source_pad_removed_cb(self, src, pad): - pad.unlink(self.conv.get_pad("sink")) + def on_eos(self, bus, msg): + #print 'on_eos' + self.pipeline.set_state(gst.STATE_NULL) + self.mainloop.quit() + def on_tag(self, bus, msg): + taglist = msg.parse_tag() + """ + print 'on_tag:' + for key in taglist.keys(): + print '\t%s = %s' % (key, taglist[key]) + """ + + def on_error(self, bus, msg): + error = msg.parse_error() + print 'on_error:', error[1] + self.mainloop.quit() @interfacedoc def channels(self): @@ -171,146 +147,6 @@ class FileDecoder(Processor): return array([0.]), True if buf == None: return array([0.]), True - samples = self.gst_buffer_to_numpy_array(buf) - return samples, False - - @interfacedoc - def release(self): - # nothing to do for now - pass - - ## IDecoder methods - - @interfacedoc - def __init__(self, uri): - # is this a file? - import os.path - if os.path.exists(uri): - # get the absolute path - uri = os.path.abspath(uri) - # first run the file/uri through the discover pipeline - self.discover(uri) - # and make a uri of it - from urllib import quote - self.uri = 'file://'+quote(uri) - else: - self.uri = uri - - @interfacedoc - def format(self): - # TODO check - if self.mimetype == 'application/x-id3': - self.mimetype = 'audio/mpeg' - return self.mimetype - - @interfacedoc - def encoding(self): - # TODO check - return self.mimetype.split('/')[-1] - - @interfacedoc - def resolution(self): - # TODO check: width or depth? - return self.audiowidth - - @interfacedoc - def metadata(self): - # TODO check - return self.tags - - ## gst.extend discoverer - - def discover(self, path): - """ gstreamer based helper function to get file attributes """ - from gst.extend import discoverer - d = discoverer.Discoverer(path, timeout = self.MAX_DISCOVERY_TIME) - d.connect('discovered', self.discovered) - self.mainloop = gobject.MainLoop() - d.discover() - self.mainloop.run() - - def discovered(self, d, is_media): - """ gstreamer based helper executed upon discover() completion """ - if is_media and d.is_audio: - # copy the discoverer attributes to self - self.audiorate = d.audiorate - self.mimetype= d.mimetype - self.audiochannels = d.audiochannels - self.audiowidth = d.audiowidth - # conversion from time in nanoseconds to frames - from math import ceil - duration = d.audiorate * d.audiolength * 1.e-9 - self.audionframes = int (ceil ( duration ) ) - self.tags = d.tags - elif not d.is_audio: - print "error, no audio found!" - else: - print "fail", path - self.mainloop.quit() - - def gst_buffer_to_numpy_array(self, buf): - """ gstreamer buffer to numpy array conversion """ - chan = self.audiochannels - samples = frombuffer(buf.data, dtype=float32) - samples.resize([len(samples)/chan, chan]) - return samples - -class FileDecoderOld(Processor): - """ gstreamer-based decoder """ - implements(IDecoder) - - # duration ms, before discovery process times out - MAX_DISCOVERY_TIME = 3000 - - audioformat = None - audiochannels = None - audiorate = None - audionframes = None - mimetype = '' - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "gstreamerdecold" - - def setup(self, channels = None, samplerate = None, nframes = None): - # the output data format we want - caps = "audio/x-raw-float, width=32" - pipeline = gst.parse_launch('''uridecodebin uri="%s" - ! audioconvert - ! %s - ! appsink name=sink sync=False ''' % (self.uri, caps)) - # store a pointer to appsink in our decoder object - self.sink = pipeline.get_by_name('sink') - # adjust length of emitted buffers - # self.sink.set_property('blocksize', 0x10000) - # start pipeline - pipeline.set_state(gst.STATE_PLAYING) - - @interfacedoc - def channels(self): - return self.audiochannels - - @interfacedoc - def samplerate(self): - return self.audiorate - - @interfacedoc - def nframes(self): - return self.audionframes - - @interfacedoc - def process(self, frames = None, eod = False): - try: - buf = self.sink.emit('pull-buffer') - except SystemError, e: - # should never happen - print 'SystemError', e - return array([0.]), True - if buf == None: - return array([0.]), True return self.gst_buffer_to_numpy_array(buf), False @interfacedoc @@ -393,52 +229,3 @@ class FileDecoderOld(Processor): samples = frombuffer(buf.data, dtype=float32) samples.resize([len(samples)/chan, chan]) return samples - - -class SubProcessPipe: - - def __init__(self, command, stdin=None): - """Read media and stream data through a generator. - Taken from Telemeta (see http://telemeta.org)""" - - self.buffer_size = 0xFFFF - - if not stdin: - stdin = subprocess.PIPE - - self.proc = subprocess.Popen(command.encode('utf-8'), - shell = True, - bufsize = self.buffer_size, - stdin = stdin, - stdout = subprocess.PIPE, - close_fds = True) - - self.input = self.proc.stdin - self.output = self.proc.stdout - - -class DecoderSubProcessCore(Processor): - """Defines the main parts of the decoding tools : - paths, metadata parsing, data streaming thru system command""" - - def __init__(self): - self.command = 'ffmpeg -i "%s" -f wav - ' - - def process(self, source, options=None): - """Encode and stream audio data through a generator""" - - command = self.command % source - proc = SubProcessPipe(command) - return proc.output - - #while True: - #__chunk = proc.output.read(self.proc.buffer_size) - #status = proc.poll() - #if status != None and status != 0: - #raise ExportProcessError('Command failure:', command, proc) - #if len(__chunk) == 0: - #break - #yield __chunk - - -