image = img_dir + os.sep + image
print 'Test : decoder(%s) | waveform (%s)' % (source, image)
decoder = FileDecoder(audio)
- spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+ spectrogram = Spectrogram()
(decoder | spectrogram).run()
- print 'frames per pixel = ', spectrogram.graph.samples_per_pixel
+ print 'frames per pixel = ', spectrogram.samples_per_pixel
print "render spectrogram to: %s" % image
spectrogram.render(image)
sample_dir = '../samples'
img_dir = '../results/img'
if not os.path.exists(img_dir):
- os.mkdir(img_dir)
+ os.makedirs(img_dir)
test_dict = {'sweep.wav': 'waveform_wav.png',
'sweep.flac': 'waveform_flac.png',
image = img_dir + os.sep + image
print 'Test : decoder(%s) | waveform (%s)' % (source, image)
decoder = FileDecoder(audio)
- waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default')
+ waveform = Waveform(width=1024, height=256, bg_color=(255,255,255), color_scheme='default')
(decoder | waveform).run()
- print 'frames per pixel = ', waveform.graph.samples_per_pixel
+ print 'frames per pixel = ', waveform.samples_per_pixel
print "render waveform to: %s" % image
waveform.render(image)
-
-
import os
import urllib
+
def check_samples():
url = 'http://github.com/yomguy/timeside-samples/raw/master/samples/'
samples = ['guitar.wav', 'sweep.wav', 'sweep_mono.wav', 'sweep_32000.wav', 'sweep.flac', 'sweep.ogg', 'sweep.mp3', 'sweep_source.wav']
yield block, True
self.len = 0
+
def processors(interface=IProcessor, recurse=True):
"""Returns the processors implementing a given interface and, if recurse,
any of the descendants of this interface."""
return implementations(interface, recurse)
+
def get_processor(processor_id):
"""Return a processor by its id"""
if not _processors.has_key(processor_id):
for item in items:
item.release()
- #return self
-
@property
def results(self):
"""
# -*- coding: utf-8 -*-
from core import *
-from waveform import *
+from waveform_centroid import *
from spectrogram import *
from waveform_contour_bk import *
from waveform_contour_wh import *
except ImportError:
import ImageFilter, ImageChops, Image, ImageDraw, ImageColor, ImageEnhance
-from timeside.core import FixedSizeInputAdapter
+from timeside.core import *
from timeside.grapher.color_schemes import default_color_schemes
from utils import *
+
class Spectrum(object):
""" FFT based frequency analysis of audio frames."""
- def __init__(self, fft_size, nframes, samplerate, lower, higher, window_function=numpy.hanning):
+ def __init__(self, fft_size, totalframes, samplerate, lower, higher, window_function=numpy.hanning):
self.fft_size = fft_size
self.window = window_function(self.fft_size)
self.window_function = window_function
self.lower_log = math.log10(self.lower)
self.higher_log = math.log10(self.higher)
self.clip = lambda val, low, high: min(high, max(low, val))
- self.nframes = nframes
+ self.totalframes = totalframes
self.samplerate = samplerate
def process(self, frames, eod, spec_range=120.0):
- """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames. FFT window sizes are adatable to the input frame size."""
+ """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames.
+ FFT window sizes are adatable to the input frame size."""
samples = frames[:,0]
nsamples = len(samples)
spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5
# clip > log10 > scale between 0 and 1
- spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log)
+ spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - \
+ self.lower_log) / (self.higher_log - self.lower_log)
return (spectral_centroid, db_spectrum)
+class Grapher(Processor):
+ '''
+ Generic abstract class for the graphers
+ '''
-class WaveformImage(object):
- """ Builds a PIL image representing a waveform of the audio stream.
- Adds pixels iteratively thanks to the adapter providing fixed size frame buffers.
- Peaks are colored relative to the spectral centroids of each frame packet. """
+ fft_size = 0x400
+ frame_cursor = 0
+ pixel_cursor = 0
+ lower_freq = 200
+ higher_freq = 22050
- def __init__(self, image_width, image_height, nframes, samplerate,
- fft_size, bg_color, color_scheme):
- self.image_width = image_width
- self.image_height = image_height
- self.nframes = nframes
- self.samplerate = samplerate
- self.fft_size = fft_size
+ def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'):
self.bg_color = bg_color
self.color_scheme = color_scheme
+ self.graph = None
+ self.image_width = width
+ self.image_height = height
+ self.bg_color = bg_color
+ self.color_scheme = color_scheme
+ self.previous_x, self.previous_y = None, None
- if isinstance(color_scheme, dict):
- colors = color_scheme['waveform']
- else:
- colors = default_color_schemes[color_scheme]['waveform']
-
- self.color_lookup = interpolate_colors(colors)
+ @staticmethod
+ def id():
+ return "generic_grapher"
- self.samples_per_pixel = self.nframes / float(self.image_width)
- self.buffer_size = int(round(self.samples_per_pixel, 0))
- self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False)
- self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes)
+ @staticmethod
+ def name():
+ return "Generic grapher"
- self.lower = 800
- self.higher = 12000
- self.spectrum = Spectrum(self.fft_size, self.nframes, self.samplerate, self.lower, self.higher, numpy.hanning)
+ def set_colors(self, bg_color, color_scheme):
+ self.bg_color = bg_color
+ self.color_color_scheme = color_scheme
+ def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ super(Grapher, self).setup(channels, samplerate, blocksize, totalframes)
+ self.samplerate = samplerate
+ self.blocksize = blocksize
+ self.totalframes = totalframes
self.image = Image.new("RGBA", (self.image_width, self.image_height), self.bg_color)
+ self.samples_per_pixel = self.totalframes / float(self.image_width)
+ self.buffer_size = int(round(self.samples_per_pixel, 0))
+ self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False)
+ self.pixels_adapter_totalframes = self.pixels_adapter.blocksize(self.totalframes)
+ self.spectrum = Spectrum(self.fft_size, self.totalframes, self.samplerate,
+ self.lower_freq, self.higher_freq, numpy.hanning)
self.pixel = self.image.load()
self.draw = ImageDraw.Draw(self.image)
- self.previous_x, self.previous_y = None, None
- self.frame_cursor = 0
- self.pixel_cursor = 0
-
- def peaks(self, samples):
- """ Find the minimum and maximum peak of the samples.
- Returns that pair in the order they were found.
- So if min was found first, it returns (min, max) else the other way around. """
- max_index = numpy.argmax(samples)
- max_value = samples[max_index]
- min_index = numpy.argmin(samples)
- min_value = samples[min_index]
+ def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)):
+ self.image = im_watermark(text, text, color=color, opacity=opacity, margin=margin)
- if min_index < max_index:
- return (min_value, max_value)
- else:
- return (max_value, min_value)
-
- def color_from_value(self, value):
- """ given a value between 0 and 1, return an (r,g,b) tuple """
-
- return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50))
-
- def draw_peaks(self, x, peaks, spectral_centroid):
+ def draw_centroid_peaks(self, x, peaks, spectral_centroid):
""" draw 2 peaks at x using the spectral_centroid for color """
y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5
y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5
-
line_color = self.color_lookup[int(spectral_centroid*255.0)]
if self.previous_y:
self.draw.line([x, y1, x, y2], line_color)
self.previous_x, self.previous_y = x, y2
-
self.draw_anti_aliased_pixels(x, y1, y2, line_color)
+ def draw_simple_peaks(self, x, peaks, line_color):
+ """ draw 2 peaks at x using the spectral_centroid for color """
+
+ y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5
+ y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5
+
+ if self.previous_y and x < self.image_width-1:
+ if y1 < y2:
+ self.draw.line((x, 0, x, y1), line_color)
+ self.draw.line((x, self.image_height , x, y2), line_color)
+ else:
+ self.draw.line((x, 0, x, y2), line_color)
+ self.draw.line((x, self.image_height , x, y1), line_color)
+ else:
+ self.draw.line((x, 0, x, self.image_height), line_color)
+ self.previous_x, self.previous_y = x, y1
+
def draw_anti_aliased_pixels(self, x, y1, y2, color):
""" vertical anti-aliasing at y1 and y2 """
if alpha > 0.0 and alpha < 1.0 and y_max_int + 1 < self.image_height:
current_pix = self.pixel[int(x), y_max_int + 1]
-
r = int((1-alpha)*current_pix[0] + alpha*color[0])
g = int((1-alpha)*current_pix[1] + alpha*color[1])
b = int((1-alpha)*current_pix[2] + alpha*color[2])
-
self.pixel[x, y_max_int + 1] = (r,g,b)
y_min = min(y1, y2)
if alpha > 0.0 and alpha < 1.0 and y_min_int - 1 >= 0:
current_pix = self.pixel[x, y_min_int - 1]
-
r = int((1-alpha)*current_pix[0] + alpha*color[0])
g = int((1-alpha)*current_pix[1] + alpha*color[1])
b = int((1-alpha)*current_pix[2] + alpha*color[2])
-
self.pixel[x, y_min_int - 1] = (r,g,b)
- def process(self, frames, eod):
- if len(frames) != 1:
- buffer = frames[:,0].copy()
- buffer.shape = (len(buffer),1)
- for samples, end in self.pixels_adapter.process(buffer, eod):
- if self.pixel_cursor < self.image_width:
- (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True)
- peaks = self.peaks(samples)
- self.draw_peaks(self.pixel_cursor, peaks, spectral_centroid)
- self.pixel_cursor += 1
-
- def watermark(self, text, color=None, opacity=.6, margin=(10,10)):
- self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin)
-
- def save(self, filename):
- """ Apply last 2D transforms and write all pixels to the file. """
-
- # middle line (0 for none)
- a = 1
- for x in range(self.image_width):
- self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2]))
- self.image.save(filename)
-
- def release(self):
- pass
-
-
-class WaveformImageJoyContour(WaveformImage):
-
- def __init__(self, image_width, image_height, nframes, samplerate,
- fft_size, bg_color, color_scheme, ndiv=1, symetry=None, color_offset=160):
- WaveformImage.__init__(self, image_width, image_height, nframes, samplerate,
- fft_size, bg_color, color_scheme)
- self.contour = numpy.zeros(self.image_width)
- self.centroids = numpy.zeros(self.image_width)
- self.ndiv = ndiv
- self.x = numpy.r_[0:self.image_width-1:1]
- self.dx1 = self.x[1]-self.x[0]
- self.symetry = symetry
- self.color_offset = color_offset
-
- def get_peaks_contour(self, x, peaks, spectral_centroid=None):
- self.contour[x] = numpy.max(peaks)
- self.centroids[x] = spectral_centroid
-
- def mean(self, samples):
- return numpy.mean(samples)
-
- def normalize(self, contour):
- contour = contour-min(contour)
- return contour/max(contour)
-
def draw_peaks_contour(self):
contour = self.contour.copy()
-
- # Smoothing
contour = smooth(contour, window_len=16)
-
- # Normalize
- contour = self.normalize(contour)
+ contour = normalize(contour)
# Scaling
#ratio = numpy.mean(contour)/numpy.sqrt(2)
ratio = 1
- contour = self.normalize(numpy.expm1(contour/ratio))*(1-10**-6)
+ contour = normalize(numpy.expm1(contour/ratio))*(1-10**-6)
# Spline
#contour = cspline1d(contour)
self.draw.point((x, y+height), line_color)
self.previous_x, self.previous_y = x, y
- def process(self, frames, eod):
- if len(frames) != 1:
- buffer = frames[:,0].copy()
- buffer.shape = (len(buffer),1)
- for samples, end in self.pixels_adapter.process(buffer, eod):
- if self.pixel_cursor < self.image_width:
- peaks = self.peaks(samples)
- self.get_peaks_contour(self.pixel_cursor, peaks)
- self.pixel_cursor += 1
- if eod:
- self.draw_peaks_contour()
-
- def watermark(self, text, color=None, opacity=.6, margin=(10,10)):
- self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin)
-
- def save(self, filename):
- """ Apply last 2D transforms and write all pixels to the file. """
- # middle line (0 for none)
- a = 1
- for x in range(self.image_width):
- self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2]))
- #self.image = self.image.transpose(Image.FLIP_TOP_BOTTOM)
- self.image.save(filename)
-
- def release(self):
- pass
-
-
-class WaveformImageSimple(object):
- """ Builds a PIL image representing a waveform of the audio stream.
- Adds pixels iteratively thanks to the adapter providing fixed size frame buffers.
- """
-
- def __init__(self, image_width, image_height, nframes, samplerate, fft_size, bg_color, color_scheme):
- self.image_width = image_width
- self.image_height = image_height
- self.nframes = nframes
- self.samplerate = samplerate
- self.fft_size = fft_size
- self.bg_color = bg_color
- self.color_scheme = color_scheme
-
- if isinstance(color_scheme, dict):
- colors = color_scheme['waveform']
- else:
- colors = default_color_schemes[color_scheme]['waveform']
- self.line_color = colors[0]
-
- self.samples_per_pixel = self.nframes / float(self.image_width)
- self.buffer_size = int(round(self.samples_per_pixel, 0))
- self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False)
- self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes)
-
- self.image = Image.new("RGBA", (self.image_width, self.image_height))
- self.pixel = self.image.load()
- self.draw = ImageDraw.Draw(self.image)
- self.previous_x, self.previous_y = None, None
- self.frame_cursor = 0
- self.pixel_cursor = 0
-
- def normalize(self, contour):
- contour = contour-min(contour)
- return contour/max(contour)
-
- def peaks(self, samples):
- """ Find the minimum and maximum peak of the samples.
- Returns that pair in the order they were found.
- So if min was found first, it returns (min, max) else the other way around. """
-
- max_index = numpy.argmax(samples)
- max_value = samples[max_index]
-
- min_index = numpy.argmin(samples)
- min_value = samples[min_index]
-
- if min_index < max_index:
- return (min_value, max_value)
- else:
- return (max_value, min_value)
-
- def draw_peaks(self, x, peaks):
- """ draw 2 peaks at x using the spectral_centroid for color """
-
- y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5
- y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5
-
- if self.previous_y and x < self.image_width-1:
- if y1 < y2:
- self.draw.line((x, 0, x, y1), self.line_color)
- self.draw.line((x, self.image_height , x, y2), self.line_color)
- else:
- self.draw.line((x, 0, x, y2), self.line_color)
- self.draw.line((x, self.image_height , x, y1), self.line_color)
- else:
- self.draw.line((x, 0, x, self.image_height), self.line_color)
-
- self.previous_x, self.previous_y = x, y1
-
- def process(self, frames, eod):
- if len(frames) != 1:
- buffer = frames[:,0]
- buffer.shape = (len(buffer),1)
- for samples, end in self.pixels_adapter.process(buffer, eod):
- if self.pixel_cursor < self.image_width-1:
- self.draw_peaks(self.pixel_cursor, self.peaks(samples))
- self.pixel_cursor += 1
- if self.pixel_cursor == self.image_width-1:
- self.draw_peaks(self.pixel_cursor, (0, 0))
- self.pixel_cursor += 1
-
- def watermark(self, text, color=None, opacity=.6, margin=(10,10)):
- self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin)
-
- def save(self, filename):
- """ Apply last 2D transforms and write all pixels to the file. """
-
- # middle line (0 for none)
- a = 1
- for x in range(self.image_width):
- self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2]))
- self.image.save(filename)
-
- def release(self):
- pass
-
-
-class SpectrogramImage(object):
- """ Builds a PIL image representing a spectrogram of the audio stream (level vs. frequency vs. time).
- Adds pixels iteratively thanks to the adapter providing fixed size frame buffers."""
-
- def __init__(self, image_width, image_height, nframes, samplerate, fft_size, bg_color=None, color_scheme='default'):
- self.image_width = image_width
- self.image_height = image_height
- self.nframes = nframes
- self.samplerate = samplerate
- self.fft_size = fft_size
- self.color_scheme = color_scheme
-
- if isinstance(color_scheme, dict):
- colors = color_scheme['spectrogram']
- else:
- colors = default_color_schemes[color_scheme]['spectrogram']
-
- self.image = Image.new("P", (self.image_height, self.image_width))
- self.image.putpalette(interpolate_colors(colors, True))
-
- self.samples_per_pixel = self.nframes / float(self.image_width)
- self.buffer_size = int(round(self.samples_per_pixel, 0))
- self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False)
- self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes)
-
- self.lower = 100
- self.higher = 22050
- self.spectrum = Spectrum(self.fft_size, self.nframes, self.samplerate, self.lower, self.higher, numpy.hanning)
-
- # generate the lookup which translates y-coordinate to fft-bin
- self.y_to_bin = []
- f_min = float(self.lower)
- f_max = float(self.higher)
- y_min = math.log10(f_min)
- y_max = math.log10(f_max)
- for y in range(self.image_height):
- freq = math.pow(10.0, y_min + y / (image_height - 1.0) *(y_max - y_min))
- bin = freq / 22050.0 * (self.fft_size/2 + 1)
-
- if bin < self.fft_size/2:
- alpha = bin - int(bin)
-
- self.y_to_bin.append((int(bin), alpha * 255))
-
- # this is a bit strange, but using image.load()[x,y] = ... is
- # a lot slower than using image.putadata and then rotating the image
- # so we store all the pixels in an array and then create the image when saving
- self.pixels = []
- self.pixel_cursor = 0
-
- def draw_spectrum(self, x, spectrum):
- for (index, alpha) in self.y_to_bin:
- self.pixels.append( int( ((255.0-alpha) * spectrum[index] + alpha * spectrum[index + 1] )) )
-
- for y in range(len(self.y_to_bin), self.image_height):
- self.pixels.append(0)
-
- def process(self, frames, eod):
- if len(frames) != 1:
- buffer = frames[:,0].copy()
- buffer.shape = (len(buffer),1)
-
- # FIXME : breaks spectrum linearity
- for samples, end in self.pixels_adapter.process(buffer, eod):
- if self.pixel_cursor < self.image_width:
- (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True)
- self.draw_spectrum(self.pixel_cursor, db_spectrum)
- self.pixel_cursor += 1
-
- def watermark(self, text, color=None, opacity=.6, margin=(10,10)):
- #self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin)
- pass
-
- def save(self, filename):
- """ Apply last 2D transforms and write all pixels to the file. """
- self.image.putdata(self.pixels)
- self.image.transpose(Image.ROTATE_90).save(filename)
-
- def release(self):
- pass
-
if __name__ == "__main__":
import doctest
from timeside.grapher.core import *
-class Spectrogram(Processor):
- implements(IGrapher)
+class Spectrogram(Grapher):
+ """ Builds a PIL image representing a spectrogram of the audio stream (level vs. frequency vs. time).
+ Adds pixels iteratively thanks to the adapter providing fixed size frame buffers."""
- FFT_SIZE = 0x400
+ implements(IGrapher)
@interfacedoc
def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'):
- self.width = width
- self.height = height
- self.bg_color = bg_color
- self.color_scheme = color_scheme
- self.graph = None
+ super(Spectrogram, self).__init__(width, height, bg_color, color_scheme)
+ self.colors = default_color_schemes[color_scheme]['spectrogram']
+ self.pixels = []
+
+ # generate the lookup which translates y-coordinate to fft-bin
+ self.y_to_bin = []
+ f_min = float(self.lower_freq)
+ f_max = float(self.higher_freq)
+ y_min = math.log10(f_min)
+ y_max = math.log10(f_max)
+ for y in range(self.image_height):
+ freq = math.pow(10.0, y_min + y / (self.image_height - 1.0) *(y_max - y_min))
+ bin = freq / 22050.0 * (self.fft_size/2 + 1)
+ if bin < self.fft_size/2:
+ alpha = bin - int(bin)
+ self.y_to_bin.append((int(bin), alpha * 255))
@staticmethod
@interfacedoc
def name():
return "Spectrogram"
- @interfacedoc
- def set_colors(self, background, scheme):
- self.bg_color = background
- self.color_scheme = scheme
-
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
super(Spectrogram, self).setup(channels, samplerate, blocksize, totalframes)
- self.graph = SpectrogramImage(self.width, self.height, totalframes,
- self.samplerate(), self.FFT_SIZE,
- bg_color=self.bg_color, color_scheme=self.color_scheme)
+ self.spectrum = Spectrum(self.fft_size, self.totalframes, self.samplerate,
+ self.lower_freq, self.higher_freq, numpy.hanning)
+ self.image = Image.new("P", (self.image_height, self.image_width))
+ self.image.putpalette(interpolate_colors(self.colors, True))
+
+ def draw_spectrum(self, x, spectrum):
+ for (index, alpha) in self.y_to_bin:
+ self.pixels.append( int( ((255.0-alpha) * spectrum[index] + alpha * spectrum[index + 1] )) )
+ for y in range(len(self.y_to_bin), self.image_height):
+ self.pixels.append(0)
@interfacedoc
def process(self, frames, eod=False):
- self.graph.process(frames, eod)
+ if len(frames) != 1:
+ buffer = frames[:,0].copy()
+ buffer.shape = (len(buffer),1)
+ for samples, end in self.pixels_adapter.process(buffer, eod):
+ if self.pixel_cursor < self.image_width:
+ (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True)
+ self.draw_spectrum(self.pixel_cursor, db_spectrum)
+ self.pixel_cursor += 1
return frames, eod
- @interfacedoc
- def render(self, output=None):
- if output:
- self.graph.save(output)
- return self.graph.image
-
- def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)):
- self.graph.watermark(text, color=color, opacity=0.9, margin=margin)
-
+ def render(self, filename):
+ """ Apply last 2D transforms and write all pixels to the file. """
+ self.image.putdata(self.pixels)
+ self.image.transpose(Image.ROTATE_90).save(filename)
return palette
-class Noise(object):
- """A class that mimics audiolab.sndfile but generates noise instead of reading
- a wave file. Additionally it can be told to have a "broken" header and thus crashing
- in the middle of the file. Also useful for testing ultra-short files of 20 samples."""
-
- def __init__(self, num_frames, has_broken_header=False):
- self.seekpoint = 0
- self.num_frames = num_frames
- self.has_broken_header = has_broken_header
-
- def seek(self, seekpoint):
- self.seekpoint = seekpoint
-
- def get_nframes(self):
- return self.num_frames
-
- def get_samplerate(self):
- return 44100
-
- def get_channels(self):
- return 1
-
- def read_frames(self, frames_to_read):
- if self.has_broken_header and self.seekpoint + frames_to_read > self.num_frames / 2:
- raise IOError()
-
- num_frames_left = self.num_frames - self.seekpoint
- if num_frames_left < frames_to_read:
- will_read = num_frames_left
- else:
- will_read = frames_to_read
- self.seekpoint += will_read
- return numpy.random.random(will_read)*2 - 1
-
-
def downsample(vector, factor):
"""
downsample(vector, factor):
if opacity != 1:
textlayer = reduce_opacity(textlayer,opacity)
return Image.composite(textlayer, im, textlayer)
+
+
+def peaks(samples):
+ """ Find the minimum and maximum peak of the samples.
+ Returns that pair in the order they were found.
+ So if min was found first, it returns (min, max) else the other way around. """
+ max_index = numpy.argmax(samples)
+ max_value = samples[max_index]
+
+ min_index = numpy.argmin(samples)
+ min_value = samples[min_index]
+
+ if min_index < max_index:
+ return (min_value, max_value)
+ else:
+ return (max_value, min_value)
+
+
+def color_from_value(self, value):
+ """ given a value between 0 and 1, return an (r,g,b) tuple """
+ return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50))
+
+
+def mean(samples):
+ return numpy.mean(samples)
+
+
+def normalize(contour):
+ contour = contour-min(contour)
+ return contour/max(contour)
+
+
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
-# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.api import IGrapher
-from timeside.grapher.core import *
-
-
-class Waveform(Processor):
- implements(IGrapher)
-
- FFT_SIZE = 0x400
-
- @interfacedoc
- def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'):
- self.width = width
- self.height = height
- self.bg_color = bg_color
- self.color_scheme = color_scheme
- self.graph = None
-
- @staticmethod
- @interfacedoc
- def id():
- return "waveform"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Waveform"
-
- @interfacedoc
- def set_colors(self, background, scheme):
- self.bg_color = background
- self.color_scheme = scheme
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(Waveform, self).setup(channels, samplerate, blocksize, totalframes)
- self.graph = WaveformImage(self.width, self.height, totalframes,
- self.samplerate(), self.FFT_SIZE,
- bg_color=self.bg_color,
- color_scheme=self.color_scheme)
-
- @interfacedoc
- def process(self, frames, eod=False):
- self.graph.process(frames, eod)
- return frames, eod
-
- @interfacedoc
- def render(self, output=None):
- if output:
- self.graph.save(output)
- return self.graph.image
-
- def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)):
- self.graph.watermark(text, color=color, opacity=opacity, margin=margin)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
+# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+
+from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.api import IGrapher
+from timeside.grapher.core import *
+from timeside.grapher.waveform_simple import Waveform
+
+
+class WaveformCentroid(Waveform):
+ """ Builds a PIL image representing a waveform of the audio stream.
+ Peaks are colored relatively to the spectral centroids of each frame buffer. """
+
+ implements(IGrapher)
+
+ @interfacedoc
+ def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'):
+ super(WaveformCentroid, self).__init__(width, height, bg_color, color_scheme)
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "waveform_centroid"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Spectral centroid waveform"
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ super(WaveformCentroid, self).setup(channels, samplerate, blocksize, totalframes)
+
+ @interfacedoc
+ def process(self, frames, eod=False):
+ if len(frames) != 1:
+ buffer = frames[:,0].copy()
+ buffer.shape = (len(buffer),1)
+ for samples, end in self.pixels_adapter.process(buffer, eod):
+ if self.pixel_cursor < self.image_width:
+ (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True)
+ self.draw_centroid_peaks(self.pixel_cursor, peaks(samples), spectral_centroid)
+ self.pixel_cursor += 1
+ return frames, eod
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
-# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.api import IGrapher
-from timeside.grapher.core import *
-
-
-class WaveformContourBlack(Processor):
- implements(IGrapher)
-
- FFT_SIZE = 0x400
-
- @interfacedoc
- def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'):
- self.width = width
- self.height = height
- self.bg_color = bg_color
- self.color_scheme = color_scheme
- self.graph = None
- self.ndiv = 4
- self.symetry = True
-
- @staticmethod
- @interfacedoc
- def id():
- return "waveform_contour_bk"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Contour black"
-
- @interfacedoc
- def set_colors(self, background, scheme):
- self.bg_color = background
- self.color_scheme = scheme
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(WaveformContourBlack, self).setup(channels, samplerate, blocksize, totalframes)
- self.graph = WaveformImageJoyContour(self.width, self.height, totalframes,
- self.samplerate(), self.FFT_SIZE,
- bg_color=self.bg_color,
- color_scheme=self.color_scheme,
- ndiv=self.ndiv, symetry=self.symetry)
-
- @interfacedoc
- def process(self, frames, eod=False):
- self.graph.process(frames, eod)
- return frames, eod
-
- @interfacedoc
- def render(self, output):
- if output:
- self.graph.save(output)
- return self.graph.image
-
- def release(self):
- self.graph.release()
-
- def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)):
- self.graph.watermark(text, color=color, opacity=opacity, margin=margin)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
+# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+
+from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.api import IGrapher
+from timeside.grapher.core import *
+from timeside.grapher.waveform_simple import Waveform
+
+
+class WaveformContourBlack(Waveform):
+
+ implements(IGrapher)
+
+ @interfacedoc
+ def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'):
+ super(WaveformContourBlack, self).__init__(width, height, bg_color, color_scheme)
+ self.contour = numpy.zeros(self.image_width)
+ self.centroids = numpy.zeros(self.image_width)
+ self.ndiv = 4
+ self.x = numpy.r_[0:self.image_width-1:1]
+ self.dx1 = self.x[1]-self.x[0]
+ self.symetry = True
+ self.color_offset = 160
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "waveform_contour_black"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Contour black"
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ super(WaveformContourBlack, self).setup(channels, samplerate, blocksize, totalframes)
+
+ def get_peaks_contour(self, x, peaks, spectral_centroid=None):
+ self.contour[x] = numpy.max(peaks)
+ self.centroids[x] = spectral_centroid
+
+ @interfacedoc
+ def process(self, frames, eod=False):
+ if len(frames) != 1:
+ buffer = frames[:,0].copy()
+ buffer.shape = (len(buffer),1)
+ for samples, end in self.pixels_adapter.process(buffer, eod):
+ if self.pixel_cursor < self.image_width:
+ self.get_peaks_contour(self.pixel_cursor, peaks(samples))
+ self.pixel_cursor += 1
+ if eod:
+ self.draw_peaks_contour()
+ return frames, eod
+++ /dev/null
-# -*- coding: utf-8 -*-
-#
-# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
-# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
-
-# This file is part of TimeSide.
-
-# TimeSide is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-
-# TimeSide is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
-
-
-from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
-from timeside.api import IGrapher
-from timeside.grapher.core import *
-
-
-class WaveformContourWhite(Processor):
- implements(IGrapher)
-
- FFT_SIZE = 0x400
-
- @interfacedoc
- def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'):
- self.width = width
- self.height = height
- self.bg_color = bg_color
- self.color_scheme = color_scheme
- self.graph = None
- self.ndiv = 4
- self.symetry = True
-
- @staticmethod
- @interfacedoc
- def id():
- return "waveform_contour_wh"
-
- @staticmethod
- @interfacedoc
- def name():
- return "Contour white"
-
- @interfacedoc
- def set_colors(self, background, scheme):
- self.bg_color = background
- self.color_scheme = scheme
-
- @interfacedoc
- def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(WaveformContourWhite, self).setup(channels, samplerate, blocksize, totalframes)
- self.graph = WaveformImageJoyContour(self.width, self.height, totalframes,
- self.samplerate(), self.FFT_SIZE,
- bg_color=self.bg_color,
- color_scheme=self.color_scheme,
- ndiv=self.ndiv, symetry=self.symetry,
- color_offset=60)
-
- @interfacedoc
- def process(self, frames, eod=False):
- self.graph.process(frames, eod)
- return frames, eod
-
- @interfacedoc
- def render(self, output):
- if output:
- self.graph.save(output)
- return self.graph.image
-
- def release(self):
- self.graph.release()
-
- def watermark(self, text, font=None, color=(0, 0, 0), opacity=.6, margin=(5,5)):
- self.graph.watermark(text, color=color, opacity=opacity, margin=margin)
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2007-2010 Guillaume Pellerin <yomguy@parisson.com>
+# Copyright (c) 2010 Olivier Guilyardi <olivier@samalyse.com>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+
+
+from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter
+from timeside.api import IGrapher
+from timeside.grapher.core import *
+from timeside.grapher.waveform_contour_bk import WaveformContourBlack
+
+
+class WaveformContourWhite(WaveformContourBlack):
+
+ implements(IGrapher)
+
+ @interfacedoc
+ def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'):
+ super(WaveformContourWhite, self).__init__(width, height, bg_color, color_scheme)
+ self.color_offset = 60
+
+ @staticmethod
+ @interfacedoc
+ def id():
+ return "waveform_contour_white"
+
+ @staticmethod
+ @interfacedoc
+ def name():
+ return "Contour white"
+
+ @interfacedoc
+ def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
+ super(WaveformContourWhite, self).setup(channels, samplerate, blocksize, totalframes)
from timeside.grapher.core import *
-class WaveformSimple(Processor):
- implements(IGrapher)
+class Waveform(Grapher):
+ """ Builds a PIL image representing a waveform of the audio stream.
+ Adds pixels iteratively thanks to the adapter providing fixed size frame buffers.
+ """
- FFT_SIZE = 0x400
+ implements(IGrapher)
@interfacedoc
- def __init__(self, width=572, height=74, bg_color=None, color_scheme='awdio'):
- self.width = width
- self.height = height
- self.bg_color = bg_color
- self.color_scheme = color_scheme
+ def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'):
+ super(Waveform, self).__init__(width, height, bg_color, color_scheme)
+ self.line_color = (0,0,0)
+ colors = default_color_schemes[color_scheme]['waveform']
+ self.color_lookup = interpolate_colors(colors)
@staticmethod
@interfacedoc
def id():
- return "waveform_simple"
+ return "waveform"
@staticmethod
@interfacedoc
def name():
return "Waveform simple"
- @interfacedoc
- def set_colors(self, background, scheme):
- self.bg_color = background
- self.color_scheme = scheme
-
@interfacedoc
def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None):
- super(WaveformSimple, self).setup(channels, samplerate, blocksize, totalframes)
- self.graph = WaveformImageSimple(self.width, self.height, self.totalframes(),
- self.samplerate(), self.FFT_SIZE,
- bg_color=self.bg_color,
- color_scheme=self.color_scheme)
+ super(Waveform, self).setup(channels, samplerate, blocksize, totalframes)
@interfacedoc
def process(self, frames, eod=False):
- self.graph.process(frames, eod)
+ if len(frames) != 1:
+ buffer = frames[:,0]
+ buffer.shape = (len(buffer),1)
+ for samples, end in self.pixels_adapter.process(buffer, eod):
+ if self.pixel_cursor < self.image_width-1:
+ self.draw_simple_peaks(self.pixel_cursor, peaks(samples), self.line_color)
+ self.pixel_cursor += 1
+ if self.pixel_cursor == self.image_width-1:
+ self.draw_simple_peaks(self.pixel_cursor, peaks(samples), self.line_color)
+ self.pixel_cursor += 1
return frames, eod
@interfacedoc
def render(self, output):
if output:
- self.graph.save(output)
- return self.graph.image
-
- def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)):
- self.graph.watermark(text, color=color, opacity=opacity, margin=margin)
+ a = 1
+ for x in range(self.image_width):
+ self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2]))
+ self.image.save(output)
+ return self.image