From 45427cb3bbacdb20aec69c6c6b53e87981adea82 Mon Sep 17 00:00:00 2001 From: Guillaume Pellerin Date: Mon, 28 Oct 2013 11:23:10 +0100 Subject: [PATCH] refactor and simplify the grapher processor system, add Grapher abstract generic class --- tests/api/test_pipe_spectrogram.py | 4 +- tests/api/test_pipe_waveform.py | 8 +- tests/tools.py | 1 + timeside/core.py | 4 +- timeside/grapher/__init__.py | 2 +- timeside/grapher/core.py | 389 +++--------------- timeside/grapher/spectrogram.py | 68 +-- timeside/grapher/utils.py | 67 ++- .../{waveform.py => waveform_centroid.py} | 47 +-- ...ontour_bk.py => waveform_contour_black.py} | 55 ++- ...ontour_wh.py => waveform_contour_white.py} | 45 +- timeside/grapher/waveform_simple.py | 52 +-- 12 files changed, 222 insertions(+), 520 deletions(-) rename timeside/grapher/{waveform.py => waveform_centroid.py} (56%) rename timeside/grapher/{waveform_contour_bk.py => waveform_contour_black.py} (57%) rename timeside/grapher/{waveform_contour_wh.py => waveform_contour_white.py} (51%) diff --git a/tests/api/test_pipe_spectrogram.py b/tests/api/test_pipe_spectrogram.py index 49eaa8c..a16874e 100644 --- a/tests/api/test_pipe_spectrogram.py +++ b/tests/api/test_pipe_spectrogram.py @@ -22,9 +22,9 @@ for source, image in test_dict.iteritems(): image = img_dir + os.sep + image print 'Test : decoder(%s) | waveform (%s)' % (source, image) decoder = FileDecoder(audio) - spectrogram = Spectrogram(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + spectrogram = Spectrogram() (decoder | spectrogram).run() - print 'frames per pixel = ', spectrogram.graph.samples_per_pixel + print 'frames per pixel = ', spectrogram.samples_per_pixel print "render spectrogram to: %s" % image spectrogram.render(image) diff --git a/tests/api/test_pipe_waveform.py b/tests/api/test_pipe_waveform.py index bbfb54a..a028df5 100644 --- a/tests/api/test_pipe_waveform.py +++ b/tests/api/test_pipe_waveform.py @@ -9,7 +9,7 @@ from timeside.grapher import * sample_dir = '../samples' img_dir = '../results/img' if not os.path.exists(img_dir): - os.mkdir(img_dir) + os.makedirs(img_dir) test_dict = {'sweep.wav': 'waveform_wav.png', 'sweep.flac': 'waveform_flac.png', @@ -22,10 +22,8 @@ for source, image in test_dict.iteritems(): image = img_dir + os.sep + image print 'Test : decoder(%s) | waveform (%s)' % (source, image) decoder = FileDecoder(audio) - waveform = Waveform(width=1024, height=256, bg_color=(0,0,0), color_scheme='default') + waveform = Waveform(width=1024, height=256, bg_color=(255,255,255), color_scheme='default') (decoder | waveform).run() - print 'frames per pixel = ', waveform.graph.samples_per_pixel + print 'frames per pixel = ', waveform.samples_per_pixel print "render waveform to: %s" % image waveform.render(image) - - diff --git a/tests/tools.py b/tests/tools.py index 7f566f1..f17477d 100644 --- a/tests/tools.py +++ b/tests/tools.py @@ -1,6 +1,7 @@ import os import urllib + def check_samples(): url = 'http://github.com/yomguy/timeside-samples/raw/master/samples/' samples = ['guitar.wav', 'sweep.wav', 'sweep_mono.wav', 'sweep_32000.wav', 'sweep.flac', 'sweep.ogg', 'sweep.mp3', 'sweep_source.wav'] diff --git a/timeside/core.py b/timeside/core.py index 901c5bc..3bfdd16 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -193,11 +193,13 @@ class FixedSizeInputAdapter(object): yield block, True self.len = 0 + def processors(interface=IProcessor, recurse=True): """Returns the processors implementing a given interface and, if recurse, any of the descendants of this interface.""" return implementations(interface, recurse) + def get_processor(processor_id): """Return a processor by its id""" if not _processors.has_key(processor_id): @@ -269,8 +271,6 @@ class ProcessPipe(object): for item in items: item.release() - #return self - @property def results(self): """ diff --git a/timeside/grapher/__init__.py b/timeside/grapher/__init__.py index 74981e9..4ca76af 100644 --- a/timeside/grapher/__init__.py +++ b/timeside/grapher/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from core import * -from waveform import * +from waveform_centroid import * from spectrogram import * from waveform_contour_bk import * from waveform_contour_wh import * diff --git a/timeside/grapher/core.py b/timeside/grapher/core.py index 354e60c..bc79f25 100644 --- a/timeside/grapher/core.py +++ b/timeside/grapher/core.py @@ -29,14 +29,15 @@ try: except ImportError: import ImageFilter, ImageChops, Image, ImageDraw, ImageColor, ImageEnhance -from timeside.core import FixedSizeInputAdapter +from timeside.core import * from timeside.grapher.color_schemes import default_color_schemes from utils import * + class Spectrum(object): """ FFT based frequency analysis of audio frames.""" - def __init__(self, fft_size, nframes, samplerate, lower, higher, window_function=numpy.hanning): + def __init__(self, fft_size, totalframes, samplerate, lower, higher, window_function=numpy.hanning): self.fft_size = fft_size self.window = window_function(self.fft_size) self.window_function = window_function @@ -46,11 +47,12 @@ class Spectrum(object): self.lower_log = math.log10(self.lower) self.higher_log = math.log10(self.higher) self.clip = lambda val, low, high: min(high, max(low, val)) - self.nframes = nframes + self.totalframes = totalframes self.samplerate = samplerate def process(self, frames, eod, spec_range=120.0): - """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames. FFT window sizes are adatable to the input frame size.""" + """ Returns a tuple containing the spectral centroid and the spectrum (dB scales) of the input audio frames. + FFT window sizes are adatable to the input frame size.""" samples = frames[:,0] nsamples = len(samples) @@ -84,76 +86,68 @@ class Spectrum(object): spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5 # clip > log10 > scale between 0 and 1 - spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log) + spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - \ + self.lower_log) / (self.higher_log - self.lower_log) return (spectral_centroid, db_spectrum) +class Grapher(Processor): + ''' + Generic abstract class for the graphers + ''' -class WaveformImage(object): - """ Builds a PIL image representing a waveform of the audio stream. - Adds pixels iteratively thanks to the adapter providing fixed size frame buffers. - Peaks are colored relative to the spectral centroids of each frame packet. """ + fft_size = 0x400 + frame_cursor = 0 + pixel_cursor = 0 + lower_freq = 200 + higher_freq = 22050 - def __init__(self, image_width, image_height, nframes, samplerate, - fft_size, bg_color, color_scheme): - self.image_width = image_width - self.image_height = image_height - self.nframes = nframes - self.samplerate = samplerate - self.fft_size = fft_size + def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'): self.bg_color = bg_color self.color_scheme = color_scheme + self.graph = None + self.image_width = width + self.image_height = height + self.bg_color = bg_color + self.color_scheme = color_scheme + self.previous_x, self.previous_y = None, None - if isinstance(color_scheme, dict): - colors = color_scheme['waveform'] - else: - colors = default_color_schemes[color_scheme]['waveform'] - - self.color_lookup = interpolate_colors(colors) + @staticmethod + def id(): + return "generic_grapher" - self.samples_per_pixel = self.nframes / float(self.image_width) - self.buffer_size = int(round(self.samples_per_pixel, 0)) - self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False) - self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes) + @staticmethod + def name(): + return "Generic grapher" - self.lower = 800 - self.higher = 12000 - self.spectrum = Spectrum(self.fft_size, self.nframes, self.samplerate, self.lower, self.higher, numpy.hanning) + def set_colors(self, bg_color, color_scheme): + self.bg_color = bg_color + self.color_color_scheme = color_scheme + def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + super(Grapher, self).setup(channels, samplerate, blocksize, totalframes) + self.samplerate = samplerate + self.blocksize = blocksize + self.totalframes = totalframes self.image = Image.new("RGBA", (self.image_width, self.image_height), self.bg_color) + self.samples_per_pixel = self.totalframes / float(self.image_width) + self.buffer_size = int(round(self.samples_per_pixel, 0)) + self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False) + self.pixels_adapter_totalframes = self.pixels_adapter.blocksize(self.totalframes) + self.spectrum = Spectrum(self.fft_size, self.totalframes, self.samplerate, + self.lower_freq, self.higher_freq, numpy.hanning) self.pixel = self.image.load() self.draw = ImageDraw.Draw(self.image) - self.previous_x, self.previous_y = None, None - self.frame_cursor = 0 - self.pixel_cursor = 0 - - def peaks(self, samples): - """ Find the minimum and maximum peak of the samples. - Returns that pair in the order they were found. - So if min was found first, it returns (min, max) else the other way around. """ - max_index = numpy.argmax(samples) - max_value = samples[max_index] - min_index = numpy.argmin(samples) - min_value = samples[min_index] + def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)): + self.image = im_watermark(text, text, color=color, opacity=opacity, margin=margin) - if min_index < max_index: - return (min_value, max_value) - else: - return (max_value, min_value) - - def color_from_value(self, value): - """ given a value between 0 and 1, return an (r,g,b) tuple """ - - return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50)) - - def draw_peaks(self, x, peaks, spectral_centroid): + def draw_centroid_peaks(self, x, peaks, spectral_centroid): """ draw 2 peaks at x using the spectral_centroid for color """ y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5 y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5 - line_color = self.color_lookup[int(spectral_centroid*255.0)] if self.previous_y: @@ -162,9 +156,25 @@ class WaveformImage(object): self.draw.line([x, y1, x, y2], line_color) self.previous_x, self.previous_y = x, y2 - self.draw_anti_aliased_pixels(x, y1, y2, line_color) + def draw_simple_peaks(self, x, peaks, line_color): + """ draw 2 peaks at x using the spectral_centroid for color """ + + y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5 + y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5 + + if self.previous_y and x < self.image_width-1: + if y1 < y2: + self.draw.line((x, 0, x, y1), line_color) + self.draw.line((x, self.image_height , x, y2), line_color) + else: + self.draw.line((x, 0, x, y2), line_color) + self.draw.line((x, self.image_height , x, y1), line_color) + else: + self.draw.line((x, 0, x, self.image_height), line_color) + self.previous_x, self.previous_y = x, y1 + def draw_anti_aliased_pixels(self, x, y1, y2, color): """ vertical anti-aliasing at y1 and y2 """ @@ -174,11 +184,9 @@ class WaveformImage(object): if alpha > 0.0 and alpha < 1.0 and y_max_int + 1 < self.image_height: current_pix = self.pixel[int(x), y_max_int + 1] - r = int((1-alpha)*current_pix[0] + alpha*color[0]) g = int((1-alpha)*current_pix[1] + alpha*color[1]) b = int((1-alpha)*current_pix[2] + alpha*color[2]) - self.pixel[x, y_max_int + 1] = (r,g,b) y_min = min(y1, y2) @@ -187,78 +195,20 @@ class WaveformImage(object): if alpha > 0.0 and alpha < 1.0 and y_min_int - 1 >= 0: current_pix = self.pixel[x, y_min_int - 1] - r = int((1-alpha)*current_pix[0] + alpha*color[0]) g = int((1-alpha)*current_pix[1] + alpha*color[1]) b = int((1-alpha)*current_pix[2] + alpha*color[2]) - self.pixel[x, y_min_int - 1] = (r,g,b) - def process(self, frames, eod): - if len(frames) != 1: - buffer = frames[:,0].copy() - buffer.shape = (len(buffer),1) - for samples, end in self.pixels_adapter.process(buffer, eod): - if self.pixel_cursor < self.image_width: - (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True) - peaks = self.peaks(samples) - self.draw_peaks(self.pixel_cursor, peaks, spectral_centroid) - self.pixel_cursor += 1 - - def watermark(self, text, color=None, opacity=.6, margin=(10,10)): - self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin) - - def save(self, filename): - """ Apply last 2D transforms and write all pixels to the file. """ - - # middle line (0 for none) - a = 1 - for x in range(self.image_width): - self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2])) - self.image.save(filename) - - def release(self): - pass - - -class WaveformImageJoyContour(WaveformImage): - - def __init__(self, image_width, image_height, nframes, samplerate, - fft_size, bg_color, color_scheme, ndiv=1, symetry=None, color_offset=160): - WaveformImage.__init__(self, image_width, image_height, nframes, samplerate, - fft_size, bg_color, color_scheme) - self.contour = numpy.zeros(self.image_width) - self.centroids = numpy.zeros(self.image_width) - self.ndiv = ndiv - self.x = numpy.r_[0:self.image_width-1:1] - self.dx1 = self.x[1]-self.x[0] - self.symetry = symetry - self.color_offset = color_offset - - def get_peaks_contour(self, x, peaks, spectral_centroid=None): - self.contour[x] = numpy.max(peaks) - self.centroids[x] = spectral_centroid - - def mean(self, samples): - return numpy.mean(samples) - - def normalize(self, contour): - contour = contour-min(contour) - return contour/max(contour) - def draw_peaks_contour(self): contour = self.contour.copy() - - # Smoothing contour = smooth(contour, window_len=16) - - # Normalize - contour = self.normalize(contour) + contour = normalize(contour) # Scaling #ratio = numpy.mean(contour)/numpy.sqrt(2) ratio = 1 - contour = self.normalize(numpy.expm1(contour/ratio))*(1-10**-6) + contour = normalize(numpy.expm1(contour/ratio))*(1-10**-6) # Spline #contour = cspline1d(contour) @@ -309,213 +259,6 @@ class WaveformImageJoyContour(WaveformImage): self.draw.point((x, y+height), line_color) self.previous_x, self.previous_y = x, y - def process(self, frames, eod): - if len(frames) != 1: - buffer = frames[:,0].copy() - buffer.shape = (len(buffer),1) - for samples, end in self.pixels_adapter.process(buffer, eod): - if self.pixel_cursor < self.image_width: - peaks = self.peaks(samples) - self.get_peaks_contour(self.pixel_cursor, peaks) - self.pixel_cursor += 1 - if eod: - self.draw_peaks_contour() - - def watermark(self, text, color=None, opacity=.6, margin=(10,10)): - self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin) - - def save(self, filename): - """ Apply last 2D transforms and write all pixels to the file. """ - # middle line (0 for none) - a = 1 - for x in range(self.image_width): - self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2])) - #self.image = self.image.transpose(Image.FLIP_TOP_BOTTOM) - self.image.save(filename) - - def release(self): - pass - - -class WaveformImageSimple(object): - """ Builds a PIL image representing a waveform of the audio stream. - Adds pixels iteratively thanks to the adapter providing fixed size frame buffers. - """ - - def __init__(self, image_width, image_height, nframes, samplerate, fft_size, bg_color, color_scheme): - self.image_width = image_width - self.image_height = image_height - self.nframes = nframes - self.samplerate = samplerate - self.fft_size = fft_size - self.bg_color = bg_color - self.color_scheme = color_scheme - - if isinstance(color_scheme, dict): - colors = color_scheme['waveform'] - else: - colors = default_color_schemes[color_scheme]['waveform'] - self.line_color = colors[0] - - self.samples_per_pixel = self.nframes / float(self.image_width) - self.buffer_size = int(round(self.samples_per_pixel, 0)) - self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False) - self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes) - - self.image = Image.new("RGBA", (self.image_width, self.image_height)) - self.pixel = self.image.load() - self.draw = ImageDraw.Draw(self.image) - self.previous_x, self.previous_y = None, None - self.frame_cursor = 0 - self.pixel_cursor = 0 - - def normalize(self, contour): - contour = contour-min(contour) - return contour/max(contour) - - def peaks(self, samples): - """ Find the minimum and maximum peak of the samples. - Returns that pair in the order they were found. - So if min was found first, it returns (min, max) else the other way around. """ - - max_index = numpy.argmax(samples) - max_value = samples[max_index] - - min_index = numpy.argmin(samples) - min_value = samples[min_index] - - if min_index < max_index: - return (min_value, max_value) - else: - return (max_value, min_value) - - def draw_peaks(self, x, peaks): - """ draw 2 peaks at x using the spectral_centroid for color """ - - y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5 - y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5 - - if self.previous_y and x < self.image_width-1: - if y1 < y2: - self.draw.line((x, 0, x, y1), self.line_color) - self.draw.line((x, self.image_height , x, y2), self.line_color) - else: - self.draw.line((x, 0, x, y2), self.line_color) - self.draw.line((x, self.image_height , x, y1), self.line_color) - else: - self.draw.line((x, 0, x, self.image_height), self.line_color) - - self.previous_x, self.previous_y = x, y1 - - def process(self, frames, eod): - if len(frames) != 1: - buffer = frames[:,0] - buffer.shape = (len(buffer),1) - for samples, end in self.pixels_adapter.process(buffer, eod): - if self.pixel_cursor < self.image_width-1: - self.draw_peaks(self.pixel_cursor, self.peaks(samples)) - self.pixel_cursor += 1 - if self.pixel_cursor == self.image_width-1: - self.draw_peaks(self.pixel_cursor, (0, 0)) - self.pixel_cursor += 1 - - def watermark(self, text, color=None, opacity=.6, margin=(10,10)): - self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin) - - def save(self, filename): - """ Apply last 2D transforms and write all pixels to the file. """ - - # middle line (0 for none) - a = 1 - for x in range(self.image_width): - self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2])) - self.image.save(filename) - - def release(self): - pass - - -class SpectrogramImage(object): - """ Builds a PIL image representing a spectrogram of the audio stream (level vs. frequency vs. time). - Adds pixels iteratively thanks to the adapter providing fixed size frame buffers.""" - - def __init__(self, image_width, image_height, nframes, samplerate, fft_size, bg_color=None, color_scheme='default'): - self.image_width = image_width - self.image_height = image_height - self.nframes = nframes - self.samplerate = samplerate - self.fft_size = fft_size - self.color_scheme = color_scheme - - if isinstance(color_scheme, dict): - colors = color_scheme['spectrogram'] - else: - colors = default_color_schemes[color_scheme]['spectrogram'] - - self.image = Image.new("P", (self.image_height, self.image_width)) - self.image.putpalette(interpolate_colors(colors, True)) - - self.samples_per_pixel = self.nframes / float(self.image_width) - self.buffer_size = int(round(self.samples_per_pixel, 0)) - self.pixels_adapter = FixedSizeInputAdapter(self.buffer_size, 1, pad=False) - self.pixels_adapter_nframes = self.pixels_adapter.blocksize(self.nframes) - - self.lower = 100 - self.higher = 22050 - self.spectrum = Spectrum(self.fft_size, self.nframes, self.samplerate, self.lower, self.higher, numpy.hanning) - - # generate the lookup which translates y-coordinate to fft-bin - self.y_to_bin = [] - f_min = float(self.lower) - f_max = float(self.higher) - y_min = math.log10(f_min) - y_max = math.log10(f_max) - for y in range(self.image_height): - freq = math.pow(10.0, y_min + y / (image_height - 1.0) *(y_max - y_min)) - bin = freq / 22050.0 * (self.fft_size/2 + 1) - - if bin < self.fft_size/2: - alpha = bin - int(bin) - - self.y_to_bin.append((int(bin), alpha * 255)) - - # this is a bit strange, but using image.load()[x,y] = ... is - # a lot slower than using image.putadata and then rotating the image - # so we store all the pixels in an array and then create the image when saving - self.pixels = [] - self.pixel_cursor = 0 - - def draw_spectrum(self, x, spectrum): - for (index, alpha) in self.y_to_bin: - self.pixels.append( int( ((255.0-alpha) * spectrum[index] + alpha * spectrum[index + 1] )) ) - - for y in range(len(self.y_to_bin), self.image_height): - self.pixels.append(0) - - def process(self, frames, eod): - if len(frames) != 1: - buffer = frames[:,0].copy() - buffer.shape = (len(buffer),1) - - # FIXME : breaks spectrum linearity - for samples, end in self.pixels_adapter.process(buffer, eod): - if self.pixel_cursor < self.image_width: - (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True) - self.draw_spectrum(self.pixel_cursor, db_spectrum) - self.pixel_cursor += 1 - - def watermark(self, text, color=None, opacity=.6, margin=(10,10)): - #self.image = im_watermark(self.image, text, color=color, opacity=opacity, margin=margin) - pass - - def save(self, filename): - """ Apply last 2D transforms and write all pixels to the file. """ - self.image.putdata(self.pixels) - self.image.transpose(Image.ROTATE_90).save(filename) - - def release(self): - pass - if __name__ == "__main__": import doctest diff --git a/timeside/grapher/spectrogram.py b/timeside/grapher/spectrogram.py index 8a01666..d5626a5 100644 --- a/timeside/grapher/spectrogram.py +++ b/timeside/grapher/spectrogram.py @@ -24,18 +24,30 @@ from timeside.api import IGrapher from timeside.grapher.core import * -class Spectrogram(Processor): - implements(IGrapher) +class Spectrogram(Grapher): + """ Builds a PIL image representing a spectrogram of the audio stream (level vs. frequency vs. time). + Adds pixels iteratively thanks to the adapter providing fixed size frame buffers.""" - FFT_SIZE = 0x400 + implements(IGrapher) @interfacedoc def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'): - self.width = width - self.height = height - self.bg_color = bg_color - self.color_scheme = color_scheme - self.graph = None + super(Spectrogram, self).__init__(width, height, bg_color, color_scheme) + self.colors = default_color_schemes[color_scheme]['spectrogram'] + self.pixels = [] + + # generate the lookup which translates y-coordinate to fft-bin + self.y_to_bin = [] + f_min = float(self.lower_freq) + f_max = float(self.higher_freq) + y_min = math.log10(f_min) + y_max = math.log10(f_max) + for y in range(self.image_height): + freq = math.pow(10.0, y_min + y / (self.image_height - 1.0) *(y_max - y_min)) + bin = freq / 22050.0 * (self.fft_size/2 + 1) + if bin < self.fft_size/2: + alpha = bin - int(bin) + self.y_to_bin.append((int(bin), alpha * 255)) @staticmethod @interfacedoc @@ -47,29 +59,33 @@ class Spectrogram(Processor): def name(): return "Spectrogram" - @interfacedoc - def set_colors(self, background, scheme): - self.bg_color = background - self.color_scheme = scheme - @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): super(Spectrogram, self).setup(channels, samplerate, blocksize, totalframes) - self.graph = SpectrogramImage(self.width, self.height, totalframes, - self.samplerate(), self.FFT_SIZE, - bg_color=self.bg_color, color_scheme=self.color_scheme) + self.spectrum = Spectrum(self.fft_size, self.totalframes, self.samplerate, + self.lower_freq, self.higher_freq, numpy.hanning) + self.image = Image.new("P", (self.image_height, self.image_width)) + self.image.putpalette(interpolate_colors(self.colors, True)) + + def draw_spectrum(self, x, spectrum): + for (index, alpha) in self.y_to_bin: + self.pixels.append( int( ((255.0-alpha) * spectrum[index] + alpha * spectrum[index + 1] )) ) + for y in range(len(self.y_to_bin), self.image_height): + self.pixels.append(0) @interfacedoc def process(self, frames, eod=False): - self.graph.process(frames, eod) + if len(frames) != 1: + buffer = frames[:,0].copy() + buffer.shape = (len(buffer),1) + for samples, end in self.pixels_adapter.process(buffer, eod): + if self.pixel_cursor < self.image_width: + (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True) + self.draw_spectrum(self.pixel_cursor, db_spectrum) + self.pixel_cursor += 1 return frames, eod - @interfacedoc - def render(self, output=None): - if output: - self.graph.save(output) - return self.graph.image - - def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)): - self.graph.watermark(text, color=color, opacity=0.9, margin=margin) - + def render(self, filename): + """ Apply last 2D transforms and write all pixels to the file. """ + self.image.putdata(self.pixels) + self.image.transpose(Image.ROTATE_90).save(filename) diff --git a/timeside/grapher/utils.py b/timeside/grapher/utils.py index 36b6973..178e76d 100644 --- a/timeside/grapher/utils.py +++ b/timeside/grapher/utils.py @@ -63,41 +63,6 @@ def interpolate_colors(colors, flat=False, num_colors=256): return palette -class Noise(object): - """A class that mimics audiolab.sndfile but generates noise instead of reading - a wave file. Additionally it can be told to have a "broken" header and thus crashing - in the middle of the file. Also useful for testing ultra-short files of 20 samples.""" - - def __init__(self, num_frames, has_broken_header=False): - self.seekpoint = 0 - self.num_frames = num_frames - self.has_broken_header = has_broken_header - - def seek(self, seekpoint): - self.seekpoint = seekpoint - - def get_nframes(self): - return self.num_frames - - def get_samplerate(self): - return 44100 - - def get_channels(self): - return 1 - - def read_frames(self, frames_to_read): - if self.has_broken_header and self.seekpoint + frames_to_read > self.num_frames / 2: - raise IOError() - - num_frames_left = self.num_frames - self.seekpoint - if num_frames_left < frames_to_read: - will_read = num_frames_left - else: - will_read = frames_to_read - self.seekpoint += will_read - return numpy.random.random(will_read)*2 - 1 - - def downsample(vector, factor): """ downsample(vector, factor): @@ -211,3 +176,35 @@ def im_watermark(im, inputtext, font=None, color=None, opacity=.6, margin=(30,30 if opacity != 1: textlayer = reduce_opacity(textlayer,opacity) return Image.composite(textlayer, im, textlayer) + + +def peaks(samples): + """ Find the minimum and maximum peak of the samples. + Returns that pair in the order they were found. + So if min was found first, it returns (min, max) else the other way around. """ + max_index = numpy.argmax(samples) + max_value = samples[max_index] + + min_index = numpy.argmin(samples) + min_value = samples[min_index] + + if min_index < max_index: + return (min_value, max_value) + else: + return (max_value, min_value) + + +def color_from_value(self, value): + """ given a value between 0 and 1, return an (r,g,b) tuple """ + return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50)) + + +def mean(samples): + return numpy.mean(samples) + + +def normalize(contour): + contour = contour-min(contour) + return contour/max(contour) + + diff --git a/timeside/grapher/waveform.py b/timeside/grapher/waveform_centroid.py similarity index 56% rename from timeside/grapher/waveform.py rename to timeside/grapher/waveform_centroid.py index 426ad72..3e1de94 100644 --- a/timeside/grapher/waveform.py +++ b/timeside/grapher/waveform_centroid.py @@ -22,54 +22,41 @@ from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter from timeside.api import IGrapher from timeside.grapher.core import * +from timeside.grapher.waveform_simple import Waveform -class Waveform(Processor): - implements(IGrapher) +class WaveformCentroid(Waveform): + """ Builds a PIL image representing a waveform of the audio stream. + Peaks are colored relatively to the spectral centroids of each frame buffer. """ - FFT_SIZE = 0x400 + implements(IGrapher) @interfacedoc def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'): - self.width = width - self.height = height - self.bg_color = bg_color - self.color_scheme = color_scheme - self.graph = None + super(WaveformCentroid, self).__init__(width, height, bg_color, color_scheme) @staticmethod @interfacedoc def id(): - return "waveform" + return "waveform_centroid" @staticmethod @interfacedoc def name(): - return "Waveform" - - @interfacedoc - def set_colors(self, background, scheme): - self.bg_color = background - self.color_scheme = scheme + return "Spectral centroid waveform" @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): - super(Waveform, self).setup(channels, samplerate, blocksize, totalframes) - self.graph = WaveformImage(self.width, self.height, totalframes, - self.samplerate(), self.FFT_SIZE, - bg_color=self.bg_color, - color_scheme=self.color_scheme) + super(WaveformCentroid, self).setup(channels, samplerate, blocksize, totalframes) @interfacedoc def process(self, frames, eod=False): - self.graph.process(frames, eod) + if len(frames) != 1: + buffer = frames[:,0].copy() + buffer.shape = (len(buffer),1) + for samples, end in self.pixels_adapter.process(buffer, eod): + if self.pixel_cursor < self.image_width: + (spectral_centroid, db_spectrum) = self.spectrum.process(samples, True) + self.draw_centroid_peaks(self.pixel_cursor, peaks(samples), spectral_centroid) + self.pixel_cursor += 1 return frames, eod - - @interfacedoc - def render(self, output=None): - if output: - self.graph.save(output) - return self.graph.image - - def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)): - self.graph.watermark(text, color=color, opacity=opacity, margin=margin) diff --git a/timeside/grapher/waveform_contour_bk.py b/timeside/grapher/waveform_contour_black.py similarity index 57% rename from timeside/grapher/waveform_contour_bk.py rename to timeside/grapher/waveform_contour_black.py index 6c16707..3edf273 100644 --- a/timeside/grapher/waveform_contour_bk.py +++ b/timeside/grapher/waveform_contour_black.py @@ -22,60 +22,51 @@ from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter from timeside.api import IGrapher from timeside.grapher.core import * +from timeside.grapher.waveform_simple import Waveform -class WaveformContourBlack(Processor): - implements(IGrapher) +class WaveformContourBlack(Waveform): - FFT_SIZE = 0x400 + implements(IGrapher) @interfacedoc def __init__(self, width=1024, height=256, bg_color=(0,0,0), color_scheme='default'): - self.width = width - self.height = height - self.bg_color = bg_color - self.color_scheme = color_scheme - self.graph = None + super(WaveformContourBlack, self).__init__(width, height, bg_color, color_scheme) + self.contour = numpy.zeros(self.image_width) + self.centroids = numpy.zeros(self.image_width) self.ndiv = 4 + self.x = numpy.r_[0:self.image_width-1:1] + self.dx1 = self.x[1]-self.x[0] self.symetry = True + self.color_offset = 160 @staticmethod @interfacedoc def id(): - return "waveform_contour_bk" + return "waveform_contour_black" @staticmethod @interfacedoc def name(): return "Contour black" - @interfacedoc - def set_colors(self, background, scheme): - self.bg_color = background - self.color_scheme = scheme - @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): super(WaveformContourBlack, self).setup(channels, samplerate, blocksize, totalframes) - self.graph = WaveformImageJoyContour(self.width, self.height, totalframes, - self.samplerate(), self.FFT_SIZE, - bg_color=self.bg_color, - color_scheme=self.color_scheme, - ndiv=self.ndiv, symetry=self.symetry) + + def get_peaks_contour(self, x, peaks, spectral_centroid=None): + self.contour[x] = numpy.max(peaks) + self.centroids[x] = spectral_centroid @interfacedoc def process(self, frames, eod=False): - self.graph.process(frames, eod) + if len(frames) != 1: + buffer = frames[:,0].copy() + buffer.shape = (len(buffer),1) + for samples, end in self.pixels_adapter.process(buffer, eod): + if self.pixel_cursor < self.image_width: + self.get_peaks_contour(self.pixel_cursor, peaks(samples)) + self.pixel_cursor += 1 + if eod: + self.draw_peaks_contour() return frames, eod - - @interfacedoc - def render(self, output): - if output: - self.graph.save(output) - return self.graph.image - - def release(self): - self.graph.release() - - def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)): - self.graph.watermark(text, color=color, opacity=opacity, margin=margin) diff --git a/timeside/grapher/waveform_contour_wh.py b/timeside/grapher/waveform_contour_white.py similarity index 51% rename from timeside/grapher/waveform_contour_wh.py rename to timeside/grapher/waveform_contour_white.py index c4681b4..e0e51af 100644 --- a/timeside/grapher/waveform_contour_wh.py +++ b/timeside/grapher/waveform_contour_white.py @@ -22,61 +22,28 @@ from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter from timeside.api import IGrapher from timeside.grapher.core import * +from timeside.grapher.waveform_contour_bk import WaveformContourBlack -class WaveformContourWhite(Processor): - implements(IGrapher) +class WaveformContourWhite(WaveformContourBlack): - FFT_SIZE = 0x400 + implements(IGrapher) @interfacedoc def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'): - self.width = width - self.height = height - self.bg_color = bg_color - self.color_scheme = color_scheme - self.graph = None - self.ndiv = 4 - self.symetry = True + super(WaveformContourWhite, self).__init__(width, height, bg_color, color_scheme) + self.color_offset = 60 @staticmethod @interfacedoc def id(): - return "waveform_contour_wh" + return "waveform_contour_white" @staticmethod @interfacedoc def name(): return "Contour white" - @interfacedoc - def set_colors(self, background, scheme): - self.bg_color = background - self.color_scheme = scheme - @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): super(WaveformContourWhite, self).setup(channels, samplerate, blocksize, totalframes) - self.graph = WaveformImageJoyContour(self.width, self.height, totalframes, - self.samplerate(), self.FFT_SIZE, - bg_color=self.bg_color, - color_scheme=self.color_scheme, - ndiv=self.ndiv, symetry=self.symetry, - color_offset=60) - - @interfacedoc - def process(self, frames, eod=False): - self.graph.process(frames, eod) - return frames, eod - - @interfacedoc - def render(self, output): - if output: - self.graph.save(output) - return self.graph.image - - def release(self): - self.graph.release() - - def watermark(self, text, font=None, color=(0, 0, 0), opacity=.6, margin=(5,5)): - self.graph.watermark(text, color=color, opacity=opacity, margin=margin) diff --git a/timeside/grapher/waveform_simple.py b/timeside/grapher/waveform_simple.py index 63c88c3..2a5abb1 100644 --- a/timeside/grapher/waveform_simple.py +++ b/timeside/grapher/waveform_simple.py @@ -24,51 +24,53 @@ from timeside.api import IGrapher from timeside.grapher.core import * -class WaveformSimple(Processor): - implements(IGrapher) +class Waveform(Grapher): + """ Builds a PIL image representing a waveform of the audio stream. + Adds pixels iteratively thanks to the adapter providing fixed size frame buffers. + """ - FFT_SIZE = 0x400 + implements(IGrapher) @interfacedoc - def __init__(self, width=572, height=74, bg_color=None, color_scheme='awdio'): - self.width = width - self.height = height - self.bg_color = bg_color - self.color_scheme = color_scheme + def __init__(self, width=1024, height=256, bg_color=(255,255,255), color_scheme='default'): + super(Waveform, self).__init__(width, height, bg_color, color_scheme) + self.line_color = (0,0,0) + colors = default_color_schemes[color_scheme]['waveform'] + self.color_lookup = interpolate_colors(colors) @staticmethod @interfacedoc def id(): - return "waveform_simple" + return "waveform" @staticmethod @interfacedoc def name(): return "Waveform simple" - @interfacedoc - def set_colors(self, background, scheme): - self.bg_color = background - self.color_scheme = scheme - @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): - super(WaveformSimple, self).setup(channels, samplerate, blocksize, totalframes) - self.graph = WaveformImageSimple(self.width, self.height, self.totalframes(), - self.samplerate(), self.FFT_SIZE, - bg_color=self.bg_color, - color_scheme=self.color_scheme) + super(Waveform, self).setup(channels, samplerate, blocksize, totalframes) @interfacedoc def process(self, frames, eod=False): - self.graph.process(frames, eod) + if len(frames) != 1: + buffer = frames[:,0] + buffer.shape = (len(buffer),1) + for samples, end in self.pixels_adapter.process(buffer, eod): + if self.pixel_cursor < self.image_width-1: + self.draw_simple_peaks(self.pixel_cursor, peaks(samples), self.line_color) + self.pixel_cursor += 1 + if self.pixel_cursor == self.image_width-1: + self.draw_simple_peaks(self.pixel_cursor, peaks(samples), self.line_color) + self.pixel_cursor += 1 return frames, eod @interfacedoc def render(self, output): if output: - self.graph.save(output) - return self.graph.image - - def watermark(self, text, font=None, color=(255, 255, 255), opacity=.6, margin=(5,5)): - self.graph.watermark(text, color=color, opacity=opacity, margin=margin) + a = 1 + for x in range(self.image_width): + self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2])) + self.image.save(output) + return self.image -- 2.39.5