From: Thomas Fillon Date: Fri, 14 Feb 2014 10:53:54 +0000 (+0100) Subject: Merge fix_streaming_bugbranch in dev + Core : enable to run pipe with only a decoder X-Git-Tag: 0.5.4~6 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=d4a7b068a8a75a78246bc7fca46415da34a0364b;p=timeside.git Merge fix_streaming_bugbranch in dev + Core : enable to run pipe with only a decoder --- d4a7b068a8a75a78246bc7fca46415da34a0364b diff --cc timeside/core.py index 1f6e13b,df01a7b..5d16bd4 --- a/timeside/core.py +++ b/timeside/core.py @@@ -298,6 -305,12 +305,13 @@@ class ProcessPipe(object) last = item # now stream audio data along the pipe + if self._stream_thread: + self._running_cond.acquire() - self._is_running = True ++ self._is_running = True ++ if self._stream_thread: + self._running_cond.notify() + self._running_cond.release() + eod = False if source.id() == 'gst_live_dec': diff --cc timeside/encoder/core.py index 24cc823,db34c41..df0d3d8 --- a/timeside/encoder/core.py +++ b/timeside/encoder/core.py @@@ -20,22 -23,23 +23,22 @@@ from timeside.core import Processor, implements, interfacedoc - from timeside.component import implements, abstract + from timeside.component import abstract from timeside.api import IEncoder - from timeside.tools import * - - from gst import _gst as gst - import threading + from timeside.tools import numpy_array_to_gst_buffer, MainloopThread -#from gst import _gst as gst + import pygst + pygst.require('0.10') + import gst - class MainloopThread(threading.Thread): + import gobject + gobject.threads_init() - def __init__(self, mainloop): - threading.Thread.__init__(self) - self.mainloop = mainloop + import threading - def run(self): - self.mainloop.run() + # Streaming queue configuration + QUEUE_SIZE = 10 + GST_APPSINK_MAX_BUFFERS = 10 class GstEncoder(Processor): @@@ -127,18 -146,25 +144,31 @@@ self.end_cond.notify() self.end_cond.release() + def _on_new_buffer_streaming(self, appsink): + #print 'pull-buffer' + chunk = appsink.emit('pull-buffer') + self._streaming_queue.put(chunk) + + def _on_new_preroll_streaming(self, appsink): + # print 'preroll' + chunk = appsink.emit('pull-preroll') + self._streaming_queue.put(chunk) + + @interfacedoc + def set_metadata(self, metadata): + self.metadata = metadata + @interfacedoc def process(self, frames, eod=False): self.eod = eod if eod: - self.num_samples += frames.shape[0] + self.num_samples += frames.shape[0] else: self.num_samples += self.blocksize() - buf = numpy_array_to_gst_buffer(frames, frames.shape[0], self.num_samples, self.samplerate()) ++ + buf = numpy_array_to_gst_buffer(frames, frames.shape[0], + self.num_samples, self.samplerate()) ++ self.src.emit('push-buffer', buf) if self.eod: self.src.emit('end-of-stream') diff --cc timeside/encoder/mp3.py index 61b9e4c,2d5addc..a69b6e5 --- a/timeside/encoder/mp3.py +++ b/timeside/encoder/mp3.py @@@ -27,7 -28,8 +28,7 @@@ from timeside.core import implements, interfacedoc from timeside.encoder.core import GstEncoder from timeside.api import IEncoder - from timeside.tools import * + -import mutagen class Mp3Encoder(GstEncoder): diff --cc timeside/tools/gstutils.py index cb64068,2722085..67e09c6 --- a/timeside/tools/gstutils.py +++ b/timeside/tools/gstutils.py @@@ -6,8 -6,10 +6,10 @@@ import gs import gobject gobject.threads_init() + import threading + -def numpy_array_to_gst_buffer(frames, CHUNK_SIZE, num_samples, SAMPLE_RATE): +def numpy_array_to_gst_buffer(frames, chunk_size, num_samples, sample_rate): from gst import Buffer """ gstreamer buffer to numpy array conversion """ buf = Buffer(getbuffer(frames.astype("float32")))