last = item
# now stream audio data along the pipe
- self._is_running = True
+ if self._stream_thread:
+ self._running_cond.acquire()
++ self._is_running = True
++ if self._stream_thread:
+ self._running_cond.notify()
+ self._running_cond.release()
+
eod = False
if source.id() == 'gst_live_dec':
from timeside.core import Processor, implements, interfacedoc
- from timeside.component import implements, abstract
+ from timeside.component import abstract
from timeside.api import IEncoder
- from timeside.tools import *
-
- from gst import _gst as gst
- import threading
+ from timeside.tools import numpy_array_to_gst_buffer, MainloopThread
-#from gst import _gst as gst
+ import pygst
+ pygst.require('0.10')
+ import gst
- class MainloopThread(threading.Thread):
+ import gobject
+ gobject.threads_init()
- def __init__(self, mainloop):
- threading.Thread.__init__(self)
- self.mainloop = mainloop
+ import threading
- def run(self):
- self.mainloop.run()
+ # Streaming queue configuration
+ QUEUE_SIZE = 10
+ GST_APPSINK_MAX_BUFFERS = 10
class GstEncoder(Processor):
self.end_cond.notify()
self.end_cond.release()
+ def _on_new_buffer_streaming(self, appsink):
+ #print 'pull-buffer'
+ chunk = appsink.emit('pull-buffer')
+ self._streaming_queue.put(chunk)
+
+ def _on_new_preroll_streaming(self, appsink):
+ # print 'preroll'
+ chunk = appsink.emit('pull-preroll')
+ self._streaming_queue.put(chunk)
+
+ @interfacedoc
+ def set_metadata(self, metadata):
+ self.metadata = metadata
+
@interfacedoc
def process(self, frames, eod=False):
self.eod = eod
if eod:
- self.num_samples += frames.shape[0]
+ self.num_samples += frames.shape[0]
else:
self.num_samples += self.blocksize()
- buf = numpy_array_to_gst_buffer(frames, frames.shape[0], self.num_samples, self.samplerate())
++
+ buf = numpy_array_to_gst_buffer(frames, frames.shape[0],
+ self.num_samples, self.samplerate())
++
self.src.emit('push-buffer', buf)
if self.eod:
self.src.emit('end-of-stream')