WITH_AUBIO = False
else:
del aubio
+
# Yaafe
try:
WITH_YAAFE = True
WITH_YAAFE = False
else:
del yaafelib
+
# Vamp Plugins
try:
from . vamp_plugin import VampSimpleHost
# Author: Paul Brossier <piem@piem.org>
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.analyzer.core import Analyzer
from timeside.api import IAnalyzer
from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
from timeside.analyzer.utils import melFilterBank, computeModulation
from timeside.analyzer.utils import segmentFromValues
from timeside.api import IAnalyzer
-from numpy import array, hamming, dot, mean, float, isnan
+from numpy import array, hamming, dot, mean, float
from numpy.fft import rfft
from scipy.signal import firwin, lfilter
# Low-pass filtering of the spectrogram amplitude along the time axis
S = signal.lfilter(signal.hann(15)[8:], 1, abs(spectrogram), axis=0)
- import matplotlib.pyplot as plt
-# plt.figure()
-# plt.imshow(np.log10(abs(spectrogram)), origin='lower', aspect='auto', interpolation='nearest')
-
# Clip small value to a minimal threshold
np.maximum(S, 1e-9, out=S)
from timeside.api import IAnalyzer
import numpy as np
-from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
-
class Waveform(Analyzer):
#from timeside.analyzer import WITH_YAAFE
WITH_YAAFE = True
if WITH_YAAFE:
- from yaafelib import *
+ import yaafelib
import numpy
from timeside.analyzer.preprocessors import downmix_to_mono
# Check arguments
if yaafeSpecification is None:
- yaafeSpecification = FeaturePlan(sample_rate=32000)
+ yaafeSpecification = yaafelib.FeaturePlan(sample_rate=32000)
# add feature definitions manually
yaafeSpecification.addFeature(
'mfcc: MFCC blockSize=512 stepSize=256')
- if isinstance(yaafeSpecification, DataFlow):
+ if isinstance(yaafeSpecification, yaafelib.DataFlow):
self.dataFlow = yaafeSpecification
- elif isinstance(yaafeSpecification, FeaturePlan):
+ elif isinstance(yaafeSpecification, yaafelib.FeaturePlan):
self.featurePlan = yaafeSpecification
self.dataFlow = self.featurePlan.getDataFlow()
else:
raise TypeError("'%s' Type must be either '%s' or '%s'" %
(str(yaafeSpecification),
- str(DataFlow),
- str(FeaturePlan)))
+ str(yaafelib.DataFlow),
+ str(yaafelib.FeaturePlan)))
self.yaafe_engine = None
@interfacedoc
blocksize=None, totalframes=None):
super(Yaafe, self).setup(channels, samplerate, blocksize, totalframes)
# Configure a YAAFE engine
- self.yaafe_engine = Engine()
+ self.yaafe_engine = yaafelib.Engine()
self.yaafe_engine.load(self.dataFlow)
self.yaafe_engine.reset()
self.input_samplerate = samplerate
# Guillaume Pellerin <yomguy@parisson.com>
# Thomas Fillon <thomas@parisson.com>
-
-from timeside.decoder.core import *
+from timeside.core import implements, interfacedoc
+from timeside.decoder.core import Decoder, IDecoder
import numpy as np
from timeside.core import Processor, implements, interfacedoc, abstract
from timeside.api import IDecoder
-from timeside.tools import *
class Decoder(Processor):
@interfacedoc
def resolution(self):
return self.input_width
-
- def stop(self):
- self.src.send_event(gst.event_new_eos())
from __future__ import division
from timeside.decoder.core import Decoder, IDecoder, implements, interfacedoc
-from timeside.tools.gstutils import MainloopThread, gobject, gst_buffer_to_numpy_array
+from timeside.tools.gstutils import MainloopThread, gobject
+from timeside.tools.gstutils import gst_buffer_to_numpy_array
import threading
from timeside.decoder.utils import get_uri, get_media_uri_info, stack, get_sha1
# TODO check
return self.tags
+ def stop(self):
+ self.src.send_event(gst.event_new_eos())
if __name__ == "__main__":
import doctest
from __future__ import division
-from timeside.decoder.core import *
-from timeside.tools.gstutils import MainloopThread
-
+from timeside.decoder.core import Decoder, IDecoder, interfacedoc, implements
+from timeside.tools.gstutils import MainloopThread, gobject
+from . file import FileDecoder
import Queue
-from gst import _gst as gst
+import threading
+from gst import _gst as gst
GST_APPSINK_MAX_BUFFERS = 10
QUEUE_SIZE = 10
-class LiveDecoder(Decoder):
-
+class LiveDecoder(FileDecoder):
""" gstreamer-based decoder from live source"""
implements(IDecoder)
- output_blocksize = 8 * 1024
-
- pipeline = None
- mainloopthread = None
+ # IProcessor methods
@staticmethod
@interfacedoc
self.last_buffer = None
# a lock to wait wait for gstreamer thread to be ready
- import threading
self.discovered_cond = threading.Condition(threading.Lock())
self.discovered = False
else:
raise IOError('no known audio stream found')
- def _notify_caps_cb(self, pad, args):
- self.discovered_cond.acquire()
-
- caps = pad.get_negotiated_caps()
- if not caps:
- pad.info("no negotiated caps available")
- self.discovered = True
- self.discovered_cond.notify()
- self.discovered_cond.release()
- return
- # the caps are fixed
- # We now get the total length of that stream
- q = gst.query_new_duration(gst.FORMAT_TIME)
- pad.info("sending duration query")
- if pad.get_peer().query(q):
- format, length = q.parse_duration()
- if format == gst.FORMAT_TIME:
- pad.info("got duration (time) : %s" % (gst.TIME_ARGS(length),))
- else:
- pad.info("got duration : %d [format:%d]" % (length, format))
- else:
- length = -1
- gst.warning("duration query failed")
-
- # We store the caps and length in the proper location
- if "audio" in caps.to_string():
- self.input_samplerate = caps[0]["rate"]
- if not self.output_samplerate:
- self.output_samplerate = self.input_samplerate
- self.input_channels = caps[0]["channels"]
- if not self.output_channels:
- self.output_channels = self.input_channels
- self.input_duration = length / 1.e9
-
- self.input_totalframes = int(
- self.input_duration * self.input_samplerate)
- if "x-raw-float" in caps.to_string():
- self.input_width = caps[0]["width"]
- else:
- self.input_width = caps[0]["depth"]
-
- self.discovered = True
- self.discovered_cond.notify()
- self.discovered_cond.release()
-
- def _on_message_cb(self, bus, message):
- t = message.type
- if t == gst.MESSAGE_EOS:
- self.queue.put(gst.MESSAGE_EOS)
- self.pipeline.set_state(gst.STATE_NULL)
- self.mainloop.quit()
- elif t == gst.MESSAGE_ERROR:
- self.pipeline.set_state(gst.STATE_NULL)
- err, debug = message.parse_error()
- self.discovered_cond.acquire()
- self.discovered = True
- self.mainloop.quit()
- self.error_msg = "Error: %s" % err, debug
- self.discovered_cond.notify()
- self.discovered_cond.release()
- elif t == gst.MESSAGE_TAG:
- # TODO
- # msg.parse_tags()
- pass
-
- def _on_new_buffer_cb(self, sink):
- buf = sink.emit('pull-buffer')
- new_array = gst_buffer_to_numpy_array(buf, self.output_channels)
-
- # print 'processing new buffer', new_array.shape
- if self.last_buffer is None:
- self.last_buffer = new_array
- else:
- self.last_buffer = np.concatenate(
- (self.last_buffer, new_array), axis=0)
- while self.last_buffer.shape[0] >= self.output_blocksize:
- new_block = self.last_buffer[:self.output_blocksize]
- self.last_buffer = self.last_buffer[self.output_blocksize:]
- # print 'queueing', new_block.shape, 'remaining',
- # self.last_buffer.shape
- self.queue.put([new_block, False])
-
@interfacedoc
def process(self):
buf = self.queue.get()
frames, eod = buf
return frames, eod
- @interfacedoc
- def totalframes(self):
- if self.input_samplerate == self.output_samplerate:
- return self.input_totalframes
- else:
- ratio = self.output_samplerate / self.input_samplerate
- return int(self.input_totalframes * ratio)
-
- @interfacedoc
def release(self):
- if self.stack:
- self.stack = False
- self.from_stack = True
+ # TODO : check if stack support is needed here
+ #if self.stack:
+ # self.stack = False
+ # self.from_stack = True
pass
# IDecoder methods
-
- @interfacedoc
- def format(self):
- # TODO check
- if self.mimetype == 'application/x-id3':
- self.mimetype = 'audio/mpeg'
- return self.mimetype
-
- @interfacedoc
- def encoding(self):
- # TODO check
- return self.mimetype.split('/')[-1]
-
- @interfacedoc
- def resolution(self):
- # TODO check: width or depth?
- return self.input_width
-
- @interfacedoc
- def metadata(self):
- # TODO check
- return self.tags
from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class AudioSink(GstEncoder):
from timeside.core import Processor, implements, interfacedoc
from timeside.component import abstract
from timeside.api import IEncoder
-from timeside.tools import numpy_array_to_gst_buffer, MainloopThread
+from timeside.tools.gstutils import numpy_array_to_gst_buffer, MainloopThread
import pygst
pygst.require('0.10')
# Author: Guillaume Pellerin <yomguy@parisson.com>
-from timeside.core import Processor, implements, interfacedoc
+from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class FlacEncoder(GstEncoder):
from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class AacEncoder(GstEncoder):
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc
+from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class VorbisEncoder(GstEncoder):
# Author: Paul Brossier <piem@piem.org>
-from timeside.core import Processor, implements, interfacedoc
+from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class WavEncoder(GstEncoder):
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc
+from timeside.core import implements, interfacedoc
from timeside.encoder.core import GstEncoder
from timeside.api import IEncoder
-from timeside.tools import *
class WebMEncoder(GstEncoder):
# Guillaume Pellerin <yomguy@parisson.com>
-import optparse
import math
-import sys
import numpy
try:
- from PIL import ImageFilter, ImageChops, Image, ImageDraw, ImageColor, ImageEnhance
+ from PIL import Image, ImageDraw
except ImportError:
- import ImageFilter
- import ImageChops
import Image
import ImageDraw
- import ImageColor
- import ImageEnhance
-from timeside.core import *
+from timeside.core import Processor, implements, interfacedoc, abstract
+from timeside.core import FixedSizeInputAdapter
from timeside.api import IGrapher
-from timeside.grapher.color_schemes import default_color_schemes
-from utils import *
+from . utils import smooth, im_watermark, normalize
class Spectrum(object):
""" FFT based frequency analysis of audio frames."""
- def __init__(self, fft_size, samplerate, blocksize, totalframes, lower, higher, window_function=None):
+ def __init__(self, fft_size, samplerate, blocksize,
+ totalframes, lower, higher, window_function=None):
self.fft_size = fft_size
self.window = window_function(self.fft_size)
self.window_function = window_function
self.window = self.window_function(self.blocksize)
def process(self, frames, eod, spec_range=120.0):
- """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames.
+ """ Returns a tuple containing the spectral centroid and
+ the spectrum (dB scales) of the input audio frames.
FFT window sizes are adatable to the input frame size."""
samples = frames[:, 0]
(energy * (length - 1)) * \
self.samplerate * 0.5
# clip > log10 > scale between 0 and 1
- spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) -
- self.lower_log) / (self.higher_log - self.lower_log)
+ spectral_centroid = (math.log10(self.clip(spectral_centroid,
+ self.lower,
+ self.higher)) -
+ self.lower_log) / (self.higher_log -
+ self.lower_log)
return (spectral_centroid, db_spectrum)
from timeside.core import implements, interfacedoc, abstract, get_processor
from timeside.api import IGrapher
-from core import Grapher, Image
+from core import Grapher
from .. import analyzer
from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
+#from timeside.grapher.core import *
from timeside.grapher.spectrogram_log import SpectrogramLog
from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
+from timeside.grapher.core import Grapher, Image
+from timeside.grapher.color_schemes import default_color_schemes
+from . utils import interpolate_colors
+import math
class SpectrogramLog(Grapher):
- """ Builds a PIL image representing a spectrogram of the audio stream (level vs. frequency vs. time).
- Adds pixels iteratively thanks to the adapter providing fixed size frame buffers."""
+ """ Builds a PIL image representing a spectrogram of the audio stream
+ (level vs. frequency vs. time).
+ Adds pixels iteratively thanks to the adapter providing
+ fixed size frame buffers."""
implements(IGrapher)
@interfacedoc
- def __init__(self, width=1024, height=256, bg_color=(0, 0, 0), color_scheme='default'):
+ def __init__(self, width=1024, height=256, bg_color=(0, 0, 0),
+ color_scheme='default'):
super(SpectrogramLog, self).__init__(
width, height, bg_color, color_scheme)
self.lower_freq = 100
return "Spectrogram Log"
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None,
+ totalframes=None):
super(SpectrogramLog, self).setup(
channels, samplerate, blocksize, totalframes)
self.image = self.image.convert("P")
# Guillaume Pellerin <yomguy@parisson.com>
try:
- from PIL import ImageFilter, ImageChops, Image, ImageDraw, ImageColor, ImageEnhance
+ from PIL import Image, ImageDraw, ImageColor, ImageEnhance
except ImportError:
- import ImageFilter
- import ImageChops
import Image
import ImageDraw
import ImageColor
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
+from . utils import peaks, interpolate_colors
from timeside.grapher.waveform_simple import Waveform
+from timeside.grapher.color_schemes import default_color_schemes
class WaveformCentroid(Waveform):
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
-from timeside.grapher.waveform_simple import Waveform
+#from timeside.grapher.core import *
+from . waveform_simple import Waveform
+from . utils import peaks
+
+import numpy
class WaveformContourBlack(Waveform):
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
+from timeside.grapher.core import Grapher
+from . utils import peaks
class Waveform(Grapher):
# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.core import implements, interfacedoc
from timeside.api import IGrapher
-from timeside.grapher.core import *
+#from timeside.grapher.core import *
from timeside.grapher.waveform_simple import Waveform
+from . utils import peaks
class WaveformTransparent(Waveform):
- """ Builds a PIL image representing a transparent waveform of the audio stream.
+ """ Builds a PIL image representing a transparent waveform
+ of the audio stream.
"""
implements(IGrapher)
@interfacedoc
- def __init__(self, width=1024, height=256, bg_color=None, color_scheme='default'):
+ def __init__(self, width=1024, height=256, bg_color=None,
+ color_scheme='default'):
super(WaveformTransparent, self).__init__(
width, height, bg_color, color_scheme)
self.line_color = (255, 255, 255)
return "Waveform transparent"
@interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ def setup(self, channels=None, samplerate=None, blocksize=None,
+ totalframes=None):
super(WaveformTransparent, self).setup(
channels, samplerate, blocksize, totalframes)
-from cache import Cache
-from logger import Logger
-from gstutils import *