From 7f5f6c0ccbd1ad58e93e88bd4fc833839f8df35f Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Tue, 31 Jul 2012 22:21:43 -0600 Subject: [PATCH] timeside/encoder/gstutils.py: prepare for more factorise --- timeside/encoder/gstutils.py | 62 +++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/timeside/encoder/gstutils.py b/timeside/encoder/gstutils.py index 3183cfb..efd02bd 100644 --- a/timeside/encoder/gstutils.py +++ b/timeside/encoder/gstutils.py @@ -1,9 +1,13 @@ -from numpy import getbuffer +from timeside.core import Processor, implements, interfacedoc +from timeside.api import IEncoder + +from numpy import array, getbuffer, frombuffer import pygst pygst.require('0.10') import gst import gobject + gobject.threads_init() def numpy_array_to_gst_buffer(frames): @@ -11,3 +15,59 @@ def numpy_array_to_gst_buffer(frames): buf = gst.Buffer(getbuffer(frames.astype("float32"))) return buf +def gst_buffer_to_numpy_array(buf, chan): + """ gstreamer buffer to numpy array conversion """ + samples = frombuffer(buf.data, dtype='float32') + samples.resize([len(samples)/chan, chan]) + return samples + +class GstEncoder(Processor): + + def release(self): + while self.bus.have_pending(): + self.bus.pop() + + def __del__(self): + self.release() + + def start_pipeline(self, channels, samplerate): + self.pipeline = gst.parse_launch(self.pipe) + # store a pointer to appsrc in our encoder object + self.src = self.pipeline.get_by_name('src') + # store a pointer to appsink in our encoder object + self.app = self.pipeline.get_by_name('app') + + srccaps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%d""" % (int(channels), int(samplerate))) + self.src.set_property("caps", srccaps) + self.src.set_property('emit-signals', True) + + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect("message", self.on_message) + + # start pipeline + self.pipeline.set_state(gst.STATE_PLAYING) + + def on_message(self, bus, message): + t = message.type + if t == gst.MESSAGE_EOS: + print "stopping pipeline" + self.pipeline.set_state(gst.STATE_NULL) + elif t == gst.MESSAGE_ERROR: + self.pipeline.set_state(gst.STATE_NULL) + err, debug = message.parse_error() + print "Error: %s" % err, debug + + def process(self, frames, eod=False): + self.eod = eod + buf = numpy_array_to_gst_buffer(frames) + self.src.emit('push-buffer', buf) + if self.eod: + self.src.emit('end-of-stream') + if self.streaming: + self.chunk = self.app.emit('pull-buffer') + return frames, eod -- 2.39.5