From a953586ab98543a82b465fa2c1b42fec759db69c Mon Sep 17 00:00:00 2001 From: Guillaume Pellerin Date: Mon, 6 Jan 2014 23:14:44 +0100 Subject: [PATCH] split decoder modules --- tests/test_array_decoding.py | 2 +- tests/test_decoding.py | 2 +- tests/test_graphing.py | 2 +- timeside/analyzer/spectrogram.py | 6 +- timeside/decoder/__init__.py | 4 +- timeside/decoder/array.py | 145 +++++++ timeside/decoder/core.py | 682 +------------------------------ timeside/decoder/file.py | 322 +++++++++++++++ timeside/decoder/live.py | 299 ++++++++++++++ timeside/decoder/utils.py | 13 + 10 files changed, 789 insertions(+), 688 deletions(-) create mode 100644 timeside/decoder/array.py create mode 100644 timeside/decoder/file.py create mode 100644 timeside/decoder/live.py diff --git a/tests/test_array_decoding.py b/tests/test_array_decoding.py index 1c740b9..9d94b66 100644 --- a/tests/test_array_decoding.py +++ b/tests/test_array_decoding.py @@ -1,7 +1,7 @@ #! /usr/bin/env python from __future__ import division -from timeside.decoder.core import ArrayDecoder +from timeside.decoder import ArrayDecoder from unit_timeside import * diff --git a/tests/test_decoding.py b/tests/test_decoding.py index 72986f7..d922512 100755 --- a/tests/test_decoding.py +++ b/tests/test_decoding.py @@ -2,7 +2,7 @@ from __future__ import division -from timeside.decoder.core import FileDecoder +from timeside.decoder import FileDecoder from unit_timeside import * import os.path diff --git a/tests/test_graphing.py b/tests/test_graphing.py index cfb4a17..bf1d721 100755 --- a/tests/test_graphing.py +++ b/tests/test_graphing.py @@ -1,6 +1,6 @@ #! /usr/bin/env python -from timeside.decoder.core import FileDecoder +from timeside.decoder import FileDecoder from unit_timeside import * import os.path diff --git a/timeside/analyzer/spectrogram.py b/timeside/analyzer/spectrogram.py index 587d7f4..3357448 100644 --- a/timeside/analyzer/spectrogram.py +++ b/timeside/analyzer/spectrogram.py @@ -29,6 +29,9 @@ import numpy as np class Spectrogram(Analyzer): implements(IAnalyzer) + values = [] + FFT_SIZE = 2048 + def __init__(self, blocksize=2048, stepsize=None): super(Spectrogram, self).__init__() @@ -44,9 +47,6 @@ class Spectrogram(Analyzer): super(Spectrogram, self).setup(channels, samplerate, blocksize, totalframes) - self.values = [] - self.FFT_SIZE = 2048 - @staticmethod @interfacedoc def id(): diff --git a/timeside/decoder/__init__.py b/timeside/decoder/__init__.py index fcb882b..ab1c3a4 100644 --- a/timeside/decoder/__init__.py +++ b/timeside/decoder/__init__.py @@ -1,3 +1,5 @@ # -*- coding: utf-8 -*- -from core import * +from file import * +from array import * +from live import * diff --git a/timeside/decoder/array.py b/timeside/decoder/array.py new file mode 100644 index 0000000..e13408b --- /dev/null +++ b/timeside/decoder/array.py @@ -0,0 +1,145 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2007-2013 Parisson +# Copyright (c) 2007 Olivier Guilyardi +# Copyright (c) 2007-2013 Guillaume Pellerin +# Copyright (c) 2010-2013 Paul Brossier +# +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Authors: +# Paul Brossier +# Guillaume Pellerin +# Thomas Fillon + + +from timeside.decoder.core import * + + +class ArrayDecoder(Decoder): + """ Decoder taking Numpy array as input""" + implements(IDecoder) + + output_blocksize = 8*1024 + + # IProcessor methods + + @staticmethod + @interfacedoc + def id(): + return "array_dec" + + def __init__(self, samples, samplerate=44100, start=0, duration=None): + ''' + Construct a new ArrayDecoder from an numpy array + + Parameters + ---------- + samples : numpy array of dimension 1 (mono) or 2 (multichannel) + if shape = (n) or (n,1) : n samples, mono + if shape = (n,m) : n samples with m channels + start : float + start time of the segment in seconds + duration : float + duration of the segment in seconds + ''' + super(ArrayDecoder, self).__init__(start=start, duration=duration) + + # Check array dimension + if samples.ndim > 2: + raise TypeError('Wrong number of dimensions for argument samples') + if samples.ndim == 1: + samples = samples[:, np.newaxis] # reshape to 2D array + + self.samples = samples # Create a 2 dimensions array + self.input_samplerate = samplerate + self.input_channels = self.samples.shape[1] + + self.uri = '_'.join(['raw_audio_array', + 'x'.join([str(dim) for dim in samples.shape]), + samples.dtype.type.__name__]) + + self.frames = self.get_frames() + + def setup(self, channels=None, samplerate=None, blocksize=None): + + # the output data format we want + if blocksize: + self.output_blocksize = blocksize + if samplerate: + self.output_samplerate = int(samplerate) + if channels: + self.output_channels = int(channels) + + if self.uri_duration is None: + self.uri_duration = (len(self.samples) / self.input_samplerate + - self.uri_start) + + if self.is_segment: + start_index = self.uri_start * self.input_samplerate + stop_index = start_index + int(np.ceil(self.uri_duration + * self.input_samplerate)) + stop_index = min(stop_index, len(self.samples)) + self.samples = self.samples[start_index:stop_index] + + if not self.output_samplerate: + self.output_samplerate = self.input_samplerate + + if not self.output_channels: + self.output_channels = self.input_channels + + self.input_totalframes = len(self.samples) + self.input_duration = self.input_totalframes / self.input_samplerate + self.input_width = self.samples.itemsize * 8 + + def get_frames(self): + "Define an iterator that will return frames at the given blocksize" + nb_frames = self.input_totalframes // self.output_blocksize + + if self.input_totalframes % self.output_blocksize == 0: + nb_frames -= 1 # Last frame must send eod=True + + for index in xrange(0, + nb_frames * self.output_blocksize, + self.output_blocksize): + yield (self.samples[index:index+self.output_blocksize], False) + + yield (self.samples[nb_frames * self.output_blocksize:], True) + + @interfacedoc + def process(self): + return self.frames.next() + + ## IDecoder methods + @interfacedoc + def format(self): + import re + base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0) + return 'audio/x-raw-'+base_type + + @interfacedoc + def metadata(self): + return None + + +if __name__ == "__main__": + # Run doctest from __main__ and unittest from tests + from tests.unit_timeside import run_test_module + # load corresponding tests + from tests import test_array_decoding + + run_test_module([test_array_decoding]) diff --git a/timeside/decoder/core.py b/timeside/decoder/core.py index 719175a..1eba577 100644 --- a/timeside/decoder/core.py +++ b/timeside/decoder/core.py @@ -32,7 +32,7 @@ from timeside.core import Processor, implements, interfacedoc, abstract from timeside.api import IDecoder from timeside.tools import * -from utils import get_uri, get_media_uri_info +from utils import get_uri, get_media_uri_info, stack import Queue from gst import _gst as gst @@ -43,20 +43,6 @@ GST_APPSINK_MAX_BUFFERS = 10 QUEUE_SIZE = 10 -def stack(process_func): - - import functools - - @functools.wraps(process_func) - def wrapper(decoder): - # Processing - frames, eod = process_func(decoder) - if decoder.stack: - decoder.process_pipe.frames_stack.append((frames, eod)) - return frames, eod - return wrapper - - class Decoder(Processor): """General abstract base class for Decoder """ @@ -123,669 +109,3 @@ class Decoder(Processor): def stop(self): self.src.send_event(gst.event_new_eos()) - -class FileDecoder(Decoder): - """ gstreamer-based decoder """ - implements(IDecoder) - - output_blocksize = 8*1024 - - pipeline = None - mainloopthread = None - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "gst_dec" - - def __init__(self, uri, start=0, duration=None, stack=False): - - """ - Construct a new FileDecoder - - Parameters - ---------- - uri : str - uri of the media - start : float - start time of the segment in seconds - duration : float - duration of the segment in seconds - - """ - - super(FileDecoder, self).__init__(start=start, duration=duration) - - self.from_stack = False - self.stack = stack - - self.uri = get_uri(uri) - - def set_uri_default_duration(self): - # Set the duration from the length of the file - uri_total_duration = get_media_uri_info(self.uri)['duration'] - self.uri_duration = uri_total_duration - self.uri_start - - def setup(self, channels=None, samplerate=None, blocksize=None): - - self.eod = False - self.last_buffer = None - - if self.from_stack: - return - - if self.stack: - self.process_pipe.frames_stack = [] - - if self.uri_duration is None: - self.set_uri_default_duration() - - # a lock to wait wait for gstreamer thread to be ready - import threading - self.discovered_cond = threading.Condition(threading.Lock()) - self.discovered = False - - # the output data format we want - if blocksize: - self.output_blocksize = blocksize - if samplerate: - self.output_samplerate = int(samplerate) - if channels: - self.output_channels = int(channels) - - if self.is_segment: - # Create the pipe with Gnonlin gnlurisource - self.pipe = ''' gnlurisource name=src uri={uri} - start=0 - duration={uri_duration} - media-start={uri_start} - media-duration={uri_duration} - ! audioconvert name=audioconvert - ! audioresample - ! appsink name=sink sync=False async=True - '''.format(uri = self.uri, - uri_start = np.uint64(round(self.uri_start * gst.SECOND)), - uri_duration = np.int64(round(self.uri_duration * gst.SECOND))) - # convert uri_start and uri_duration to nanoseconds - else: - # Create the pipe with standard Gstreamer uridecodbin - self.pipe = ''' uridecodebin name=src uri={uri} - ! audioconvert name=audioconvert - ! audioresample - ! appsink name=sink sync=False async=True - '''.format(uri = self.uri) - - self.pipeline = gst.parse_launch(self.pipe) - - if self.output_channels: - caps_channels = int(self.output_channels) - else: - caps_channels = "[ 1, 2 ]" - if self.output_samplerate: - caps_samplerate = int(self.output_samplerate) - else: - caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" - sink_caps = gst.Caps("""audio/x-raw-float, - endianness=(int)1234, - channels=(int)%s, - width=(int)32, - rate=(int)%s""" % (caps_channels, caps_samplerate)) - - self.src = self.pipeline.get_by_name('src') - self.conv = self.pipeline.get_by_name('audioconvert') - self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) - - self.sink = self.pipeline.get_by_name('sink') - self.sink.set_property("caps", sink_caps) - self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) - self.sink.set_property("drop", False) - self.sink.set_property('emit-signals', True) - self.sink.connect("new-buffer", self._on_new_buffer_cb) - - self.bus = self.pipeline.get_bus() - self.bus.add_signal_watch() - self.bus.connect('message', self._on_message_cb) - - self.queue = Queue.Queue(QUEUE_SIZE) - - import threading - - class MainloopThread(threading.Thread): - def __init__(self, mainloop): - threading.Thread.__init__(self) - self.mainloop = mainloop - - def run(self): - self.mainloop.run() - self.mainloop = gobject.MainLoop() - self.mainloopthread = MainloopThread(self.mainloop) - self.mainloopthread.start() - #self.mainloopthread = get_loop_thread() - ##self.mainloop = self.mainloopthread.mainloop - - # start pipeline - self.pipeline.set_state(gst.STATE_PLAYING) - - self.discovered_cond.acquire() - while not self.discovered: - #print 'waiting' - self.discovered_cond.wait() - self.discovered_cond.release() - - if not hasattr(self, 'input_samplerate'): - if hasattr(self, 'error_msg'): - raise IOError(self.error_msg) - else: - raise IOError('no known audio stream found') - - def _notify_caps_cb(self, pad, args): - self.discovered_cond.acquire() - - caps = pad.get_negotiated_caps() - if not caps: - pad.info("no negotiated caps available") - self.discovered = True - self.discovered_cond.notify() - self.discovered_cond.release() - return - # the caps are fixed - # We now get the total length of that stream - q = gst.query_new_duration(gst.FORMAT_TIME) - pad.info("sending duration query") - if pad.get_peer().query(q): - format, length = q.parse_duration() - if format == gst.FORMAT_TIME: - pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) - else: - pad.info("got duration : %d [format:%d]" % (length, format)) - else: - length = -1 - gst.warning("duration query failed") - - # We store the caps and length in the proper location - if "audio" in caps.to_string(): - self.input_samplerate = caps[0]["rate"] - if not self.output_samplerate: - self.output_samplerate = self.input_samplerate - self.input_channels = caps[0]["channels"] - if not self.output_channels: - self.output_channels = self.input_channels - self.input_duration = length / 1.e9 - - self.input_totalframes = int(self.input_duration * self.input_samplerate) - if "x-raw-float" in caps.to_string(): - self.input_width = caps[0]["width"] - else: - self.input_width = caps[0]["depth"] - - self.discovered = True - self.discovered_cond.notify() - self.discovered_cond.release() - - def _on_message_cb(self, bus, message): - t = message.type - if t == gst.MESSAGE_EOS: - self.queue.put(gst.MESSAGE_EOS) - self.pipeline.set_state(gst.STATE_NULL) - self.mainloop.quit() - elif t == gst.MESSAGE_ERROR: - self.pipeline.set_state(gst.STATE_NULL) - err, debug = message.parse_error() - self.discovered_cond.acquire() - self.discovered = True - self.mainloop.quit() - self.error_msg = "Error: %s" % err, debug - self.discovered_cond.notify() - self.discovered_cond.release() - elif t == gst.MESSAGE_TAG: - # TODO - # msg.parse_tags() - pass - - def _on_new_buffer_cb(self, sink): - buf = sink.emit('pull-buffer') - new_array = gst_buffer_to_numpy_array(buf, self.output_channels) - #print 'processing new buffer', new_array.shape - if self.last_buffer is None: - self.last_buffer = new_array - else: - self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) - while self.last_buffer.shape[0] >= self.output_blocksize: - new_block = self.last_buffer[:self.output_blocksize] - self.last_buffer = self.last_buffer[self.output_blocksize:] - #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape - self.queue.put([new_block, False]) - - @interfacedoc - @stack - def process(self): - buf = self.queue.get() - if buf == gst.MESSAGE_EOS: - return self.last_buffer, True - frames, eod = buf - return frames, eod - - @interfacedoc - def totalframes(self): - if self.input_samplerate == self.output_samplerate: - return self.input_totalframes - else: - ratio = self.output_samplerate / self.input_samplerate - return int(self.input_totalframes * ratio) - - @interfacedoc - def release(self): - if self.stack: - self.stack = False - self.from_stack = True - pass - - ## IDecoder methods - - @interfacedoc - def format(self): - # TODO check - if self.mimetype == 'application/x-id3': - self.mimetype = 'audio/mpeg' - return self.mimetype - - @interfacedoc - def encoding(self): - # TODO check - return self.mimetype.split('/')[-1] - - @interfacedoc - def resolution(self): - # TODO check: width or depth? - return self.input_width - - @interfacedoc - def metadata(self): - # TODO check - return self.tags - - -class LiveDecoder(Decoder): - """ gstreamer-based decoder from live source""" - implements(IDecoder) - - output_blocksize = 8*1024 - - pipeline = None - mainloopthread = None - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "gst_live_dec" - - def __init__(self, num_buffers=-1): - - """ - Construct a new LiveDecoder capturing audio from alsasrc - - Parameters - ---------- - num_buffers : - Number of buffers to output before sending EOS (-1 = unlimited). - (Allowed values: >= -1, Default value: -1) - - - Examples - -------- - - >>> import timeside - - >>> live = timeside.decoder.LiveDecoder(num_buffers=25) - >>> a = timeside.analyzer.Waveform() - >>> e = timeside.encoder.Mp3Encoder('test_live.mp3', overwrite=True) - >>> pipe = (live | a | e) - >>> pipe.run() - - >>> import matplotlib.pyplot as plt # doctest: +SKIP - >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP - a.results['waveform_analyzer'].data) # doctest: +SKIP - >>> plt.show() # doctest: +SKIP - - """ - - super(Decoder, self).__init__() - self.num_buffers = num_buffers - self.uri = None - self.uri_start = 0 - self.uri_duration = None - self.is_segment = False - - def setup(self, channels=None, samplerate=None, blocksize=None): - - self.eod = False - self.last_buffer = None - - # a lock to wait wait for gstreamer thread to be ready - import threading - self.discovered_cond = threading.Condition(threading.Lock()) - self.discovered = False - - # the output data format we want - if blocksize: - self.output_blocksize = blocksize - if samplerate: - self.output_samplerate = int(samplerate) - if channels: - self.output_channels = int(channels) - - # Create the pipe with standard Gstreamer uridecodbin - self.pipe = '''alsasrc num-buffers=%d name=src - ! audioconvert name=audioconvert - ! audioresample - ! appsink name=sink sync=False async=True - ''' % self.num_buffers - - self.pipeline = gst.parse_launch(self.pipe) - - if self.output_channels: - caps_channels = int(self.output_channels) - else: - caps_channels = "[ 1, 2 ]" - if self.output_samplerate: - caps_samplerate = int(self.output_samplerate) - else: - caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" - sink_caps = gst.Caps("""audio/x-raw-float, - endianness=(int)1234, - channels=(int)%s, - width=(int)32, - rate=(int)%s""" % (caps_channels, caps_samplerate)) - - self.src = self.pipeline.get_by_name('src') - self.conv = self.pipeline.get_by_name('audioconvert') - self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) - - self.sink = self.pipeline.get_by_name('sink') - self.sink.set_property("caps", sink_caps) - self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) - self.sink.set_property("drop", False) - self.sink.set_property('emit-signals', True) - self.sink.connect("new-buffer", self._on_new_buffer_cb) - - self.bus = self.pipeline.get_bus() - self.bus.add_signal_watch() - self.bus.connect('message', self._on_message_cb) - - self.queue = Queue.Queue(QUEUE_SIZE) - - import threading - - class MainloopThread(threading.Thread): - def __init__(self, mainloop): - threading.Thread.__init__(self) - self.mainloop = mainloop - - def run(self): - self.mainloop.run() - self.mainloop = gobject.MainLoop() - self.mainloopthread = MainloopThread(self.mainloop) - self.mainloopthread.start() - #self.mainloopthread = get_loop_thread() - ##self.mainloop = self.mainloopthread.mainloop - - # start pipeline - self.pipeline.set_state(gst.STATE_PLAYING) - - self.discovered_cond.acquire() - while not self.discovered: - #print 'waiting' - self.discovered_cond.wait() - self.discovered_cond.release() - - if not hasattr(self, 'input_samplerate'): - if hasattr(self, 'error_msg'): - raise IOError(self.error_msg) - else: - raise IOError('no known audio stream found') - - def _notify_caps_cb(self, pad, args): - self.discovered_cond.acquire() - - caps = pad.get_negotiated_caps() - if not caps: - pad.info("no negotiated caps available") - self.discovered = True - self.discovered_cond.notify() - self.discovered_cond.release() - return - # the caps are fixed - # We now get the total length of that stream - q = gst.query_new_duration(gst.FORMAT_TIME) - pad.info("sending duration query") - if pad.get_peer().query(q): - format, length = q.parse_duration() - if format == gst.FORMAT_TIME: - pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) - else: - pad.info("got duration : %d [format:%d]" % (length, format)) - else: - length = -1 - gst.warning("duration query failed") - - # We store the caps and length in the proper location - if "audio" in caps.to_string(): - self.input_samplerate = caps[0]["rate"] - if not self.output_samplerate: - self.output_samplerate = self.input_samplerate - self.input_channels = caps[0]["channels"] - if not self.output_channels: - self.output_channels = self.input_channels - self.input_duration = length / 1.e9 - - self.input_totalframes = int(self.input_duration * self.input_samplerate) - if "x-raw-float" in caps.to_string(): - self.input_width = caps[0]["width"] - else: - self.input_width = caps[0]["depth"] - - self.discovered = True - self.discovered_cond.notify() - self.discovered_cond.release() - - def _on_message_cb(self, bus, message): - t = message.type - if t == gst.MESSAGE_EOS: - self.queue.put(gst.MESSAGE_EOS) - self.pipeline.set_state(gst.STATE_NULL) - self.mainloop.quit() - elif t == gst.MESSAGE_ERROR: - self.pipeline.set_state(gst.STATE_NULL) - err, debug = message.parse_error() - self.discovered_cond.acquire() - self.discovered = True - self.mainloop.quit() - self.error_msg = "Error: %s" % err, debug - self.discovered_cond.notify() - self.discovered_cond.release() - elif t == gst.MESSAGE_TAG: - # TODO - # msg.parse_tags() - pass - - def _on_new_buffer_cb(self, sink): - buf = sink.emit('pull-buffer') - new_array = gst_buffer_to_numpy_array(buf, self.output_channels) - - #print 'processing new buffer', new_array.shape - if self.last_buffer is None: - self.last_buffer = new_array - else: - self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) - while self.last_buffer.shape[0] >= self.output_blocksize: - new_block = self.last_buffer[:self.output_blocksize] - self.last_buffer = self.last_buffer[self.output_blocksize:] - #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape - self.queue.put([new_block, False]) - - @interfacedoc - def process(self): - buf = self.queue.get() - if buf == gst.MESSAGE_EOS: - return self.last_buffer, True - - frames, eod = buf - return frames, eod - - @interfacedoc - def totalframes(self): - if self.input_samplerate == self.output_samplerate: - return self.input_totalframes - else: - ratio = self.output_samplerate / self.input_samplerate - return int(self.input_totalframes * ratio) - - @interfacedoc - def release(self): - if self.stack: - self.stack = False - self.from_stack = True - pass - - ## IDecoder methods - - @interfacedoc - def format(self): - # TODO check - if self.mimetype == 'application/x-id3': - self.mimetype = 'audio/mpeg' - return self.mimetype - - @interfacedoc - def encoding(self): - # TODO check - return self.mimetype.split('/')[-1] - - @interfacedoc - def resolution(self): - # TODO check: width or depth? - return self.input_width - - @interfacedoc - def metadata(self): - # TODO check - return self.tags - - -class ArrayDecoder(Decoder): - """ Decoder taking Numpy array as input""" - implements(IDecoder) - - output_blocksize = 8*1024 - - # IProcessor methods - - @staticmethod - @interfacedoc - def id(): - return "array_dec" - - def __init__(self, samples, samplerate=44100, start=0, duration=None): - ''' - Construct a new ArrayDecoder from an numpy array - - Parameters - ---------- - samples : numpy array of dimension 1 (mono) or 2 (multichannel) - if shape = (n) or (n,1) : n samples, mono - if shape = (n,m) : n samples with m channels - start : float - start time of the segment in seconds - duration : float - duration of the segment in seconds - ''' - super(ArrayDecoder, self).__init__(start=start, duration=duration) - - # Check array dimension - if samples.ndim > 2: - raise TypeError('Wrong number of dimensions for argument samples') - if samples.ndim == 1: - samples = samples[:, np.newaxis] # reshape to 2D array - - self.samples = samples # Create a 2 dimensions array - self.input_samplerate = samplerate - self.input_channels = self.samples.shape[1] - - self.uri = '_'.join(['raw_audio_array', - 'x'.join([str(dim) for dim in samples.shape]), - samples.dtype.type.__name__]) - - self.frames = self.get_frames() - - def setup(self, channels=None, samplerate=None, blocksize=None): - - # the output data format we want - if blocksize: - self.output_blocksize = blocksize - if samplerate: - self.output_samplerate = int(samplerate) - if channels: - self.output_channels = int(channels) - - if self.uri_duration is None: - self.uri_duration = (len(self.samples) / self.input_samplerate - - self.uri_start) - - if self.is_segment: - start_index = self.uri_start * self.input_samplerate - stop_index = start_index + int(np.ceil(self.uri_duration - * self.input_samplerate)) - stop_index = min(stop_index, len(self.samples)) - self.samples = self.samples[start_index:stop_index] - - if not self.output_samplerate: - self.output_samplerate = self.input_samplerate - - if not self.output_channels: - self.output_channels = self.input_channels - - self.input_totalframes = len(self.samples) - self.input_duration = self.input_totalframes / self.input_samplerate - self.input_width = self.samples.itemsize * 8 - - def get_frames(self): - "Define an iterator that will return frames at the given blocksize" - nb_frames = self.input_totalframes // self.output_blocksize - - if self.input_totalframes % self.output_blocksize == 0: - nb_frames -= 1 # Last frame must send eod=True - - for index in xrange(0, - nb_frames * self.output_blocksize, - self.output_blocksize): - yield (self.samples[index:index+self.output_blocksize], False) - - yield (self.samples[nb_frames * self.output_blocksize:], True) - - @interfacedoc - def process(self): - return self.frames.next() - - ## IDecoder methods - @interfacedoc - def format(self): - import re - base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0) - return 'audio/x-raw-'+base_type - - @interfacedoc - def metadata(self): - return None - - -if __name__ == "__main__": - # Run doctest from __main__ and unittest from tests - from tests.unit_timeside import run_test_module - # load corresponding tests - from tests import test_decoding, test_array_decoding - - run_test_module([test_decoding, test_array_decoding]) diff --git a/timeside/decoder/file.py b/timeside/decoder/file.py new file mode 100644 index 0000000..d48a38b --- /dev/null +++ b/timeside/decoder/file.py @@ -0,0 +1,322 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2007-2013 Parisson +# Copyright (c) 2007 Olivier Guilyardi +# Copyright (c) 2007-2013 Guillaume Pellerin +# Copyright (c) 2010-2013 Paul Brossier +# +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Authors: +# Paul Brossier +# Guillaume Pellerin +# Thomas Fillon + + +from timeside.decoder.core import * + + +class FileDecoder(Decoder): + """ gstreamer-based decoder """ + implements(IDecoder) + + output_blocksize = 8*1024 + + pipeline = None + mainloopthread = None + + # IProcessor methods + + @staticmethod + @interfacedoc + def id(): + return "gst_dec" + + def __init__(self, uri, start=0, duration=None, stack=False): + + """ + Construct a new FileDecoder + + Parameters + ---------- + uri : str + uri of the media + start : float + start time of the segment in seconds + duration : float + duration of the segment in seconds + + """ + + super(FileDecoder, self).__init__(start=start, duration=duration) + + self.from_stack = False + self.stack = stack + + self.uri = get_uri(uri) + + def set_uri_default_duration(self): + # Set the duration from the length of the file + uri_total_duration = get_media_uri_info(self.uri)['duration'] + self.uri_duration = uri_total_duration - self.uri_start + + def setup(self, channels=None, samplerate=None, blocksize=None): + + self.eod = False + self.last_buffer = None + + if self.from_stack: + return + + if self.stack: + self.process_pipe.frames_stack = [] + + if self.uri_duration is None: + self.set_uri_default_duration() + + # a lock to wait wait for gstreamer thread to be ready + import threading + self.discovered_cond = threading.Condition(threading.Lock()) + self.discovered = False + + # the output data format we want + if blocksize: + self.output_blocksize = blocksize + if samplerate: + self.output_samplerate = int(samplerate) + if channels: + self.output_channels = int(channels) + + if self.is_segment: + # Create the pipe with Gnonlin gnlurisource + self.pipe = ''' gnlurisource name=src uri={uri} + start=0 + duration={uri_duration} + media-start={uri_start} + media-duration={uri_duration} + ! audioconvert name=audioconvert + ! audioresample + ! appsink name=sink sync=False async=True + '''.format(uri = self.uri, + uri_start = np.uint64(round(self.uri_start * gst.SECOND)), + uri_duration = np.int64(round(self.uri_duration * gst.SECOND))) + # convert uri_start and uri_duration to nanoseconds + else: + # Create the pipe with standard Gstreamer uridecodbin + self.pipe = ''' uridecodebin name=src uri={uri} + ! audioconvert name=audioconvert + ! audioresample + ! appsink name=sink sync=False async=True + '''.format(uri = self.uri) + + self.pipeline = gst.parse_launch(self.pipe) + + if self.output_channels: + caps_channels = int(self.output_channels) + else: + caps_channels = "[ 1, 2 ]" + if self.output_samplerate: + caps_samplerate = int(self.output_samplerate) + else: + caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" + sink_caps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%s""" % (caps_channels, caps_samplerate)) + + self.src = self.pipeline.get_by_name('src') + self.conv = self.pipeline.get_by_name('audioconvert') + self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) + + self.sink = self.pipeline.get_by_name('sink') + self.sink.set_property("caps", sink_caps) + self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) + self.sink.set_property("drop", False) + self.sink.set_property('emit-signals', True) + self.sink.connect("new-buffer", self._on_new_buffer_cb) + + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect('message', self._on_message_cb) + + self.queue = Queue.Queue(QUEUE_SIZE) + + import threading + + class MainloopThread(threading.Thread): + def __init__(self, mainloop): + threading.Thread.__init__(self) + self.mainloop = mainloop + + def run(self): + self.mainloop.run() + self.mainloop = gobject.MainLoop() + self.mainloopthread = MainloopThread(self.mainloop) + self.mainloopthread.start() + #self.mainloopthread = get_loop_thread() + ##self.mainloop = self.mainloopthread.mainloop + + # start pipeline + self.pipeline.set_state(gst.STATE_PLAYING) + + self.discovered_cond.acquire() + while not self.discovered: + #print 'waiting' + self.discovered_cond.wait() + self.discovered_cond.release() + + if not hasattr(self, 'input_samplerate'): + if hasattr(self, 'error_msg'): + raise IOError(self.error_msg) + else: + raise IOError('no known audio stream found') + + def _notify_caps_cb(self, pad, args): + self.discovered_cond.acquire() + + caps = pad.get_negotiated_caps() + if not caps: + pad.info("no negotiated caps available") + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + return + # the caps are fixed + # We now get the total length of that stream + q = gst.query_new_duration(gst.FORMAT_TIME) + pad.info("sending duration query") + if pad.get_peer().query(q): + format, length = q.parse_duration() + if format == gst.FORMAT_TIME: + pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) + else: + pad.info("got duration : %d [format:%d]" % (length, format)) + else: + length = -1 + gst.warning("duration query failed") + + # We store the caps and length in the proper location + if "audio" in caps.to_string(): + self.input_samplerate = caps[0]["rate"] + if not self.output_samplerate: + self.output_samplerate = self.input_samplerate + self.input_channels = caps[0]["channels"] + if not self.output_channels: + self.output_channels = self.input_channels + self.input_duration = length / 1.e9 + + self.input_totalframes = int(self.input_duration * self.input_samplerate) + if "x-raw-float" in caps.to_string(): + self.input_width = caps[0]["width"] + else: + self.input_width = caps[0]["depth"] + + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + + def _on_message_cb(self, bus, message): + t = message.type + if t == gst.MESSAGE_EOS: + self.queue.put(gst.MESSAGE_EOS) + self.pipeline.set_state(gst.STATE_NULL) + self.mainloop.quit() + elif t == gst.MESSAGE_ERROR: + self.pipeline.set_state(gst.STATE_NULL) + err, debug = message.parse_error() + self.discovered_cond.acquire() + self.discovered = True + self.mainloop.quit() + self.error_msg = "Error: %s" % err, debug + self.discovered_cond.notify() + self.discovered_cond.release() + elif t == gst.MESSAGE_TAG: + # TODO + # msg.parse_tags() + pass + + def _on_new_buffer_cb(self, sink): + buf = sink.emit('pull-buffer') + new_array = gst_buffer_to_numpy_array(buf, self.output_channels) + #print 'processing new buffer', new_array.shape + if self.last_buffer is None: + self.last_buffer = new_array + else: + self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) + while self.last_buffer.shape[0] >= self.output_blocksize: + new_block = self.last_buffer[:self.output_blocksize] + self.last_buffer = self.last_buffer[self.output_blocksize:] + #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape + self.queue.put([new_block, False]) + + @interfacedoc + @stack + def process(self): + buf = self.queue.get() + if buf == gst.MESSAGE_EOS: + return self.last_buffer, True + frames, eod = buf + return frames, eod + + @interfacedoc + def totalframes(self): + if self.input_samplerate == self.output_samplerate: + return self.input_totalframes + else: + ratio = self.output_samplerate / self.input_samplerate + return int(self.input_totalframes * ratio) + + @interfacedoc + def release(self): + if self.stack: + self.stack = False + self.from_stack = True + pass + + ## IDecoder methods + + @interfacedoc + def format(self): + # TODO check + if self.mimetype == 'application/x-id3': + self.mimetype = 'audio/mpeg' + return self.mimetype + + @interfacedoc + def encoding(self): + # TODO check + return self.mimetype.split('/')[-1] + + @interfacedoc + def resolution(self): + # TODO check: width or depth? + return self.input_width + + @interfacedoc + def metadata(self): + # TODO check + return self.tags + + +if __name__ == "__main__": + # Run doctest from __main__ and unittest from tests + from tests.unit_timeside import run_test_module + # load corresponding tests + from tests import test_decoding + + run_test_module([test_decoding]) diff --git a/timeside/decoder/live.py b/timeside/decoder/live.py new file mode 100644 index 0000000..260b55c --- /dev/null +++ b/timeside/decoder/live.py @@ -0,0 +1,299 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright (c) 2007-2013 Parisson +# Copyright (c) 2007 Olivier Guilyardi +# Copyright (c) 2007-2013 Guillaume Pellerin +# Copyright (c) 2010-2013 Paul Brossier +# +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Authors: +# Paul Brossier +# Guillaume Pellerin +# Thomas Fillon + + +from timeside.decoder.core import * + + +class LiveDecoder(Decoder): + """ gstreamer-based decoder from live source""" + implements(IDecoder) + + output_blocksize = 8*1024 + + pipeline = None + mainloopthread = None + input_src = 'alsasrc' + + @staticmethod + @interfacedoc + def id(): + return "gst_live_dec" + + def __init__(self, num_buffers=-1): + + """ + Construct a new LiveDecoder capturing audio from alsasrc + + Parameters + ---------- + num_buffers : + Number of buffers to output before sending EOS (-1 = unlimited). + (Allowed values: >= -1, Default value: -1) + + + Examples + -------- + + >>> import timeside + + >>> live = timeside.decoder.LiveDecoder(num_buffers=25) + >>> a = timeside.analyzer.Waveform() + >>> e = timeside.encoder.Mp3Encoder('test_live.mp3', overwrite=True) + >>> pipe = (live | a | e) + >>> pipe.run() + + >>> import matplotlib.pyplot as plt # doctest: +SKIP + >>> plt.plot(a.results['waveform_analyzer'].time, # doctest: +SKIP + a.results['waveform_analyzer'].data) # doctest: +SKIP + >>> plt.show() # doctest: +SKIP + + """ + + super(Decoder, self).__init__() + self.num_buffers = num_buffers + self.uri = None + self.uri_start = 0 + self.uri_duration = None + self.is_segment = False + + def setup(self, channels=None, samplerate=None, blocksize=None): + + self.eod = False + self.last_buffer = None + + # a lock to wait wait for gstreamer thread to be ready + import threading + self.discovered_cond = threading.Condition(threading.Lock()) + self.discovered = False + + # the output data format we want + if blocksize: + self.output_blocksize = blocksize + if samplerate: + self.output_samplerate = int(samplerate) + if channels: + self.output_channels = int(channels) + + # Create the pipe with standard Gstreamer uridecodbin + self.pipe = '''%s num-buffers=%d name=src + ! audioconvert name=audioconvert + ! audioresample + ! appsink name=sink sync=False async=True + ''' % (self.input_src, self.num_buffers) + + self.pipeline = gst.parse_launch(self.pipe) + + if self.output_channels: + caps_channels = int(self.output_channels) + else: + caps_channels = "[ 1, 2 ]" + if self.output_samplerate: + caps_samplerate = int(self.output_samplerate) + else: + caps_samplerate = "{ 8000, 11025, 12000, 16000, 22050, 24000, 32000, 44100, 48000 }" + sink_caps = gst.Caps("""audio/x-raw-float, + endianness=(int)1234, + channels=(int)%s, + width=(int)32, + rate=(int)%s""" % (caps_channels, caps_samplerate)) + + self.src = self.pipeline.get_by_name('src') + self.conv = self.pipeline.get_by_name('audioconvert') + self.conv.get_pad("sink").connect("notify::caps", self._notify_caps_cb) + + self.sink = self.pipeline.get_by_name('sink') + self.sink.set_property("caps", sink_caps) + self.sink.set_property('max-buffers', GST_APPSINK_MAX_BUFFERS) + self.sink.set_property("drop", False) + self.sink.set_property('emit-signals', True) + self.sink.connect("new-buffer", self._on_new_buffer_cb) + + self.bus = self.pipeline.get_bus() + self.bus.add_signal_watch() + self.bus.connect('message', self._on_message_cb) + + self.queue = Queue.Queue(QUEUE_SIZE) + + import threading + + class MainloopThread(threading.Thread): + def __init__(self, mainloop): + threading.Thread.__init__(self) + self.mainloop = mainloop + + def run(self): + self.mainloop.run() + self.mainloop = gobject.MainLoop() + self.mainloopthread = MainloopThread(self.mainloop) + self.mainloopthread.start() + #self.mainloopthread = get_loop_thread() + ##self.mainloop = self.mainloopthread.mainloop + + # start pipeline + self.pipeline.set_state(gst.STATE_PLAYING) + + self.discovered_cond.acquire() + while not self.discovered: + #print 'waiting' + self.discovered_cond.wait() + self.discovered_cond.release() + + if not hasattr(self, 'input_samplerate'): + if hasattr(self, 'error_msg'): + raise IOError(self.error_msg) + else: + raise IOError('no known audio stream found') + + def _notify_caps_cb(self, pad, args): + self.discovered_cond.acquire() + + caps = pad.get_negotiated_caps() + if not caps: + pad.info("no negotiated caps available") + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + return + # the caps are fixed + # We now get the total length of that stream + q = gst.query_new_duration(gst.FORMAT_TIME) + pad.info("sending duration query") + if pad.get_peer().query(q): + format, length = q.parse_duration() + if format == gst.FORMAT_TIME: + pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),)) + else: + pad.info("got duration : %d [format:%d]" % (length, format)) + else: + length = -1 + gst.warning("duration query failed") + + # We store the caps and length in the proper location + if "audio" in caps.to_string(): + self.input_samplerate = caps[0]["rate"] + if not self.output_samplerate: + self.output_samplerate = self.input_samplerate + self.input_channels = caps[0]["channels"] + if not self.output_channels: + self.output_channels = self.input_channels + self.input_duration = length / 1.e9 + + self.input_totalframes = int(self.input_duration * self.input_samplerate) + if "x-raw-float" in caps.to_string(): + self.input_width = caps[0]["width"] + else: + self.input_width = caps[0]["depth"] + + self.discovered = True + self.discovered_cond.notify() + self.discovered_cond.release() + + def _on_message_cb(self, bus, message): + t = message.type + if t == gst.MESSAGE_EOS: + self.queue.put(gst.MESSAGE_EOS) + self.pipeline.set_state(gst.STATE_NULL) + self.mainloop.quit() + elif t == gst.MESSAGE_ERROR: + self.pipeline.set_state(gst.STATE_NULL) + err, debug = message.parse_error() + self.discovered_cond.acquire() + self.discovered = True + self.mainloop.quit() + self.error_msg = "Error: %s" % err, debug + self.discovered_cond.notify() + self.discovered_cond.release() + elif t == gst.MESSAGE_TAG: + # TODO + # msg.parse_tags() + pass + + def _on_new_buffer_cb(self, sink): + buf = sink.emit('pull-buffer') + new_array = gst_buffer_to_numpy_array(buf, self.output_channels) + + #print 'processing new buffer', new_array.shape + if self.last_buffer is None: + self.last_buffer = new_array + else: + self.last_buffer = np.concatenate((self.last_buffer, new_array), axis=0) + while self.last_buffer.shape[0] >= self.output_blocksize: + new_block = self.last_buffer[:self.output_blocksize] + self.last_buffer = self.last_buffer[self.output_blocksize:] + #print 'queueing', new_block.shape, 'remaining', self.last_buffer.shape + self.queue.put([new_block, False]) + + @interfacedoc + def process(self): + buf = self.queue.get() + if buf == gst.MESSAGE_EOS: + return self.last_buffer, True + + frames, eod = buf + return frames, eod + + @interfacedoc + def totalframes(self): + if self.input_samplerate == self.output_samplerate: + return self.input_totalframes + else: + ratio = self.output_samplerate / self.input_samplerate + return int(self.input_totalframes * ratio) + + @interfacedoc + def release(self): + if self.stack: + self.stack = False + self.from_stack = True + pass + + ## IDecoder methods + + @interfacedoc + def format(self): + # TODO check + if self.mimetype == 'application/x-id3': + self.mimetype = 'audio/mpeg' + return self.mimetype + + @interfacedoc + def encoding(self): + # TODO check + return self.mimetype.split('/')[-1] + + @interfacedoc + def resolution(self): + # TODO check: width or depth? + return self.input_width + + @interfacedoc + def metadata(self): + # TODO check + return self.tags + diff --git a/timeside/decoder/utils.py b/timeside/decoder/utils.py index 64146a5..d96adb4 100644 --- a/timeside/decoder/utils.py +++ b/timeside/decoder/utils.py @@ -137,6 +137,19 @@ def get_media_uri_info(uri): return info +def stack(process_func): + + import functools + + @functools.wraps(process_func) + def wrapper(decoder): + # Processing + frames, eod = process_func(decoder) + if decoder.stack: + decoder.process_pipe.frames_stack.append((frames, eod)) + return frames, eod + return wrapper + if __name__ == "__main__": # Run doctest from __main__ and unittest from tests -- 2.39.5