self.eod = False
self.metadata = None
+ self.num_samples = 0
def release(self):
if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'):
self.src.set_property('emit-signals', True)
self.src.set_property('num-buffers', -1)
self.src.set_property('block', True)
+ self.src.set_property('do-timestamp', True)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.end_reached = True
self.end_cond.notify()
self.end_cond.release()
+
elif t == gst.MESSAGE_ERROR:
self.end_cond.acquire()
self.pipeline.set_state(gst.STATE_NULL)
def process(self, frames, eod=False):
self.eod = eod
- buf = numpy_array_to_gst_buffer(frames)
+ if eod:
+ self.num_samples += frames.shape[0]
+ else:
+ self.num_samples += self.blocksize()
+ buf = numpy_array_to_gst_buffer(frames, self.blocksize(),self.num_samples, self.samplerate())
self.src.emit('push-buffer', buf)
if self.eod:
self.src.emit('end-of-stream')
import pygst
pygst.require('0.10')
+import gst
import gobject
gobject.threads_init()
-def numpy_array_to_gst_buffer(frames):
+
+def numpy_array_to_gst_buffer(frames, CHUNK_SIZE, num_samples, SAMPLE_RATE):
from gst import Buffer
""" gstreamer buffer to numpy array conversion """
buf = Buffer(getbuffer(frames.astype("float32")))
+ #Set its timestamp and duration
+ buf.timestamp = gst.util_uint64_scale(num_samples, gst.SECOND, SAMPLE_RATE)
+ buf.duration = gst.util_uint64_scale(CHUNK_SIZE, gst.SECOND, SAMPLE_RATE)
+
return buf
+
def gst_buffer_to_numpy_array(buf, chan):
""" gstreamer buffer to numpy array conversion """
samples = frombuffer(buf.data, dtype='float32')