From: yomguy Date: Mon, 21 Mar 2011 22:00:03 +0000 (+0000) Subject: add vorbis streaming test, add queue to vorbis encoder X-Git-Tag: 0.3.2~61 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=8b2b024692a630bb2361bb344f753fc26488f4f9;p=timeside.git add vorbis streaming test, add queue to vorbis encoder --- diff --git a/timeside/encoder/ogg.py b/timeside/encoder/ogg.py index 3da76c6..647e16e 100644 --- a/timeside/encoder/ogg.py +++ b/timeside/encoder/ogg.py @@ -22,6 +22,7 @@ from timeside.core import Processor, implements, interfacedoc from timeside.api import IEncoder from numpy import array, frombuffer, getbuffer, float32 +import Queue import pygst pygst.require('0.10') @@ -43,6 +44,9 @@ class VorbisEncoder(Processor): if not self.filename and not self.streaming: raise Exception('Must give an output') + + self.eod = False + self.buffer_size = 8192 @interfacedoc def setup(self, channels=None, samplerate=None, nframes=None): @@ -79,9 +83,18 @@ class VorbisEncoder(Processor): rate=(int)%d""" % (int(channels), int(samplerate))) self.src.set_property("caps", srccaps) + if self.streaming: + self.queue = Queue.Queue(self.buffer_size) + self.app.set_property('emit-signals', True) + self.app.connect("new-buffer", self.buffer) + # start pipeline self.pipeline.set_state(gst.STATE_PLAYING) + def buffer(self, appsink): + data = appsink.props.last_buffer.data + self.queue.put_nowait(data) + @staticmethod @interfacedoc def id(): @@ -114,15 +127,16 @@ class VorbisEncoder(Processor): @interfacedoc def process(self, frames, eod=False): + self.eod = eod + print frames.shape buf = self.numpy_array_to_gst_buffer(frames) self.src.emit('push-buffer', buf) if self.streaming: pull = self.app.emit('pull-buffer') # if eod: self.src.emit('end-of-stream') - if not self.streaming: - return frames, eod - else: - return pull, eod + if self.streaming: + self.chunk = self.queue.get(self.buffer_size) + return frames, eod def numpy_array_to_gst_buffer(self, frames): """ gstreamer buffer to numpy array conversion """ diff --git a/timeside/tests/api/test_lolevel_streaming_vorbis.py b/timeside/tests/api/test_lolevel_streaming_vorbis.py new file mode 100644 index 0000000..39a2652 --- /dev/null +++ b/timeside/tests/api/test_lolevel_streaming_vorbis.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +from timeside.core import * +from timeside.decoder import * +from timeside.analyzer import * +from timeside.encoder import * +from timeside.api import * + +import sys +if len(sys.argv) > 1: + source = sys.argv[1] +else: + import os.path + source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac") + +decoder = FileDecoder(source) +print "Creating decoder with id=%s for: %s" % (decoder.id(), source) +decoder.setup() +channels = decoder.channels() +print 'channels :', channels +samplerate = decoder.samplerate() +nframes = decoder.nframes() + +dest1 = "/tmp/test_filesink.ogg" +dest2 = "/tmp/test_appsink.ogg" +f = open(dest2,'w') + +streaming=True +encoder = VorbisEncoder(dest1, streaming=True) +encoder.setup(channels=channels, samplerate=samplerate) + +while True: + encoder.process(*decoder.process()) + if streaming: + f.write(encoder.chunk) + if encoder.eod : + break + +f.close() +print encoder.pipe