- Add a stream() function to the pipe to get an iterator that streams (for Telemeta HttpResponse)
- inside stream(), run pipe.run() inside a thread and get back appsink signal through a Queue
# -*- coding: utf-8 -*-
from timeside.core import *
-from timeside.decoder import *
-from timeside.analyzer import *
-from timeside.encoder import *
-from timeside.api import *
+from timeside.decoder import FileDecoder
+from timeside.encoder import Mp3Encoder
import sys
if len(sys.argv) > 1:
source= os.path.join (os.path.dirname(__file__), "../samples/sweep.flac")
decoder = FileDecoder(source)
+
print "Creating decoder with id=%s for: %s" % (decoder.id(), source)
decoder.setup()
+
channels = decoder.channels()
print 'channels :', channels
samplerate = decoder.samplerate()
-nframes = decoder.nframes()
+#nframes = decoder.nframes()
dest1 = "/tmp/test_filesink.mp3"
dest2 = "/tmp/test_appsink.mp3"
f = open(dest2,'w')
streaming=True
-encoder = Mp3Encoder(dest1, streaming=True)
-encoder.setup(channels=channels, samplerate=samplerate)
-
+encoder = Mp3Encoder(dest1, streaming=True, overwrite=True)
+encoder.setup(channels=channels, samplerate=samplerate,
+ blocksize=decoder.blocksize(), totalframes=decoder.totalframes())
while True:
encoder.process(*decoder.process())
if streaming:
f.write(encoder.chunk)
- if encoder.eod :
+ if encoder.eod:
break
f.close()
print encoder.pipe
+
+import os
+dest1_size = os.path.getsize(dest1)
+dest2_size = os.path.getsize(dest2)
+
+print "sizes : %d , %d" % (dest1_size, dest2_size)
+
+assert os.path.getsize(dest1)==os.path.getsize(dest2)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from timeside.core import *
+from timeside.decoder import FileDecoder
+from timeside.encoder import Mp3Encoder
+
+import sys
+#if len(sys.argv) > 1:
+# source = sys.argv[1]
+#else:
+# import os.path
+audio_file = '/home/thomas/code/timeside/TimeSide/tests/samples/sweep.flac'
+# source= os.path.join (os.path.dirname(__file__), audio_file)
+
+decoder = FileDecoder(audio_file)
+
+print "Creating decoder with id=%s for: %s" % (decoder.id(), audio_file)
+
+dest1 = "/tmp/test_filesink.mp3"
+dest2 = "/tmp/test_appsink.mp3"
+f = open(dest2,'w')
+
+
+streaming=True
+encoder = Mp3Encoder(dest1, streaming=streaming, overwrite=True)
+
+pipe = (decoder | encoder)
+print pipe
+#pipe.run()
+
+for chunk in pipe.stream():
+ f.write(chunk)
+#while True:
+# encoder.process(*decoder.process())
+# if streaming:
+# f.write(encoder.chunk)
+# if encoder.eod:
+# break
+
+f.close()
+#print encoder.pipe
+
+import os
+dest1_size = os.path.getsize(dest1)
+dest2_size = os.path.getsize(dest2)
+
+print "sizes : %d , %d" % (dest1_size, dest2_size)
+
+assert os.path.getsize(dest1)==os.path.getsize(dest2)
import numpy
import uuid
+import gobject
+gobject.threads_init()
+
__all__ = ['Processor', 'MetaProcessor', 'implements', 'abstract',
'interfacedoc', 'processors', 'get_processor', 'ProcessPipe',
'FixedSizeInputAdapter']
def __init__(self, *others):
self.processors = []
+ self._streamer = None
+ self._stream_thread = False
+ self._is_running = False
+
self |= others
from timeside.analyzer.core import AnalyzerResultContainer
return pipe
def run(self, channels=None, samplerate=None, blocksize=None):
- """Setup/reset all processors in cascade and stream audio data along
- the pipe."""
+ """Setup/reset all processors in cascade"""
source = self.processors[0]
items = self.processors[1:]
samplerate=last.samplerate(),
blocksize=last.blocksize(),
totalframes=last.totalframes())
+ self._register_streamer(item)
last = item
# now stream audio data along the pipe
+ if self._stream_thread:
+ self._running_cond.acquire()
+ self._is_running = True
+ self._running_cond.notify()
+ self._running_cond.release()
+
eod = False
if source.id() == 'gst_live_dec':
for item in items:
item.release()
self.processors.remove(item)
+
+ self._is_running = False
+
+ def stream(self):
+ self._stream_thread = True
+ import threading
+ class MainloopThread(threading.Thread):
+ def __init__(self, mainloop):
+ threading.Thread.__init__(self)
+ self.mainloop = mainloop
+
+ def run(self):
+ self.mainloop.run()
+ self.mainloop = gobject.MainLoop()
+ self.mainloopthread = MainloopThread(self.mainloop)
+ self.mainloopthread.start()
+
+ class PipeThread(threading.Thread):
+ def __init__(self, process_pipe):
+ super(PipeThread, self).__init__(name='pipe_thread')
+ self.process_pipe = process_pipe
+
+ def run(self):
+ self.process_pipe.run()
+
+ pipe_thread = PipeThread(self)
+ pipe_thread.start()
+
+ # wait for pipe thread to be ready to stream
+ self._running_cond = threading.Condition(threading.Lock())
+ self._running_cond.acquire()
+ while not self._is_running:
+ self._running_cond.wait()
+ self._running_cond.release()
+
+ if self._streamer is None:
+ raise TypeError('Function only available in streaming mode')
+ print 'streaming'
+
+ while pipe_thread.is_alive():
+ #yield count
+ chunk = self._streamer.get_stream_chunk()
+ #time.sleep(0.1)
+ if chunk is not None:
+ yield chunk # print len(chunk)
+ else:
+ break
+
+
+ return
+
+ def _register_streamer(self, processor):
+ if hasattr(processor, 'streaming') and processor.streaming:
+ if self._streamer is None:
+ self._streamer = processor
+ else:
+ raise TypeError('More than one streaming processor in pipe')
+ print self._streamer
+
from timeside.api import IEncoder
from timeside.tools import *
-from gst import _gst as gst
+#from gst import _gst as gst
+import pygst
+pygst.require('0.10')
+import gst
+
+import gobject
+gobject.threads_init()
+
+# Streaming queue configuration
+QUEUE_SIZE = 10
+GST_APPSINK_MAX_BUFFERS = 10
class GstEncoder(Processor):
self.metadata = None
self.num_samples = 0
+ self._chunk_len = 0
+
@interfacedoc
def release(self):
if hasattr(self, 'eod') and hasattr(self, 'mainloopthread'):
self.pipeline = gst.parse_launch(self.pipe)
# store a pointer to appsrc in our encoder object
self.src = self.pipeline.get_by_name('src')
- # store a pointer to appsink in our encoder object
- self.app = self.pipeline.get_by_name('app')
+
+ if self.streaming:
+ import Queue
+ self._streaming_queue = Queue.Queue(QUEUE_SIZE)
+ # store a pointer to appsink in our encoder object
+ self.app = self.pipeline.get_by_name('app')
+ self.app.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS)
+ self.app.set_property("drop", False)
+ self.app.set_property('emit-signals', True)
+ self.app.connect("new-buffer", self._on_new_buffer_streaming)
+ self.app.connect('new-preroll', self._on_new_preroll_streaming)
srccaps = gst.Caps("""audio/x-raw-float,
endianness=(int)1234,
def _on_message_cb(self, bus, message):
t = message.type
if t == gst.MESSAGE_EOS:
+
+ if self.streaming:
+ self._streaming_queue.put(gst.MESSAGE_EOS)
+
self.end_cond.acquire()
self.pipeline.set_state(gst.STATE_NULL)
self.mainloop.quit()
self.end_cond.notify()
self.end_cond.release()
+ def _on_new_buffer_streaming(self, appsink):
+ #print 'pull-buffer'
+ chunk = appsink.emit('pull-buffer')
+ if chunk == gst.MESSAGE_EOS:
+ print 'chunk is eos *************'
+ raise TypeError
+ else:
+ self._chunk_len += len(chunk)
+ print 'new buffer', self._chunk_len
+
+ if appsink.get_property('eos'):
+ print 'property EOS'
+ #print 'put buffer in queue'
+ self._streaming_queue.put(chunk)
+ #print 'qsize : %d' % self._streaming_queue.qsize()
+ #print 'put ok'
+
+ def _on_new_preroll_streaming(self, appsink):
+ print 'preroll'
+ chunk = appsink.emit('pull-preroll')
+ self._streaming_queue.put(chunk)
+
@interfacedoc
def process(self, frames, eod=False):
self.eod = eod
if self.eod:
self.src.emit('end-of-stream')
if self.streaming:
- self.chunk = self.app.emit('pull-buffer')
+ pass #self.chunk = self.app.emit('pull-buffer')
return frames, eod
+ def get_stream_chunk(self):
+ if self.streaming:
+ #if not self.app.get_property('eos'):
+ #print 'get chunk from queue'
+ #print 'qsize : %d' % self._streaming_queue.qsize()
+ chunk = self._streaming_queue.get(block=True)
+ if chunk == gst.MESSAGE_EOS:
+ return None
+ else:
+ self._streaming_queue.task_done()
+ return chunk
+
+ print 'new buffer', self._chunk_len
+
+ else:
+ raise TypeError('function only available in streaming mode')
if __name__ == "__main__":
# Run doctest from __main__ and unittest from test_analyzer_preprocessors
if self.filename and self.streaming:
self.pipe += ''' ! tee name=t
- ! queue ! filesink location=%s
- t. ! queue ! appsink name=app sync=False
+ ! queue2 ! filesink location=%s
+ t. ! queue2 ! appsink name=app sync=False
''' % self.filename
elif self.filename :
self.pipe += '! filesink location=%s async=False sync=False ' % self.filename
else:
- self.pipe += '! queue ! appsink name=app sync=False '
+ self.pipe += '! queue ! appsink name=app sync=False async=True'
self.start_pipeline(channels, samplerate)