]> git.parisson.com Git - timeside.git/commitdiff
timeside/encoder/gstutils.py: prepare for more factorise
authorPaul Brossier <piem@piem.org>
Wed, 1 Aug 2012 04:21:43 +0000 (22:21 -0600)
committerPaul Brossier <piem@piem.org>
Wed, 1 Aug 2012 04:21:43 +0000 (22:21 -0600)
timeside/encoder/gstutils.py

index 3183cfb2eba5aaf7de59217eb5ad7d06f77d99e6..efd02bd29eb35c4c3bc74276ccf77d60cf0330a3 100644 (file)
@@ -1,9 +1,13 @@
-from numpy import getbuffer
+from timeside.core import Processor, implements, interfacedoc
+from timeside.api import IEncoder
+
+from numpy import array, getbuffer, frombuffer
 
 import pygst
 pygst.require('0.10')
 import gst
 import gobject
+
 gobject.threads_init()
 
 def numpy_array_to_gst_buffer(frames):
@@ -11,3 +15,59 @@ def numpy_array_to_gst_buffer(frames):
     buf = gst.Buffer(getbuffer(frames.astype("float32")))
     return buf
 
+def gst_buffer_to_numpy_array(buf, chan):
+    """ gstreamer buffer to numpy array conversion """
+    samples = frombuffer(buf.data, dtype='float32')
+    samples.resize([len(samples)/chan, chan])
+    return samples
+
+class GstEncoder(Processor):
+
+    def release(self):
+        while self.bus.have_pending():
+          self.bus.pop()
+
+    def __del__(self):
+        self.release()
+
+    def start_pipeline(self, channels, samplerate):
+        self.pipeline = gst.parse_launch(self.pipe)
+        # store a pointer to appsrc in our encoder object
+        self.src = self.pipeline.get_by_name('src')
+        # store a pointer to appsink in our encoder object
+        self.app = self.pipeline.get_by_name('app')
+
+        srccaps = gst.Caps("""audio/x-raw-float,
+            endianness=(int)1234,
+            channels=(int)%s,
+            width=(int)32,
+            rate=(int)%d""" % (int(channels), int(samplerate)))
+        self.src.set_property("caps", srccaps)
+        self.src.set_property('emit-signals', True)
+
+        self.bus = self.pipeline.get_bus()
+        self.bus.add_signal_watch()
+        self.bus.connect("message", self.on_message)
+
+        # start pipeline
+        self.pipeline.set_state(gst.STATE_PLAYING)
+
+    def on_message(self, bus, message):
+        t = message.type
+        if t == gst.MESSAGE_EOS:
+            print "stopping pipeline"
+            self.pipeline.set_state(gst.STATE_NULL)
+        elif t == gst.MESSAGE_ERROR:
+            self.pipeline.set_state(gst.STATE_NULL)
+            err, debug = message.parse_error()
+            print "Error: %s" % err, debug
+
+    def process(self, frames, eod=False):
+        self.eod = eod
+        buf = numpy_array_to_gst_buffer(frames)
+        self.src.emit('push-buffer', buf)
+        if self.eod:
+            self.src.emit('end-of-stream')
+        if self.streaming:
+            self.chunk = self.app.emit('pull-buffer')
+        return frames, eod