From 4b594347fb6590a1bd242d9d5bc5fb102e7264c5 Mon Sep 17 00:00:00 2001 From: yomguy Date: Wed, 17 Feb 2010 02:21:17 +0000 Subject: [PATCH] prepare graph api and new core --- graph/{wav2png.py => core.py} | 219 +++++++++++++--------------------- graph/spectrogram_audiolab.py | 2 +- graph/waveform_audiolab.py | 2 +- tests/api/examples.py | 31 +++-- 4 files changed, 102 insertions(+), 152 deletions(-) rename graph/{wav2png.py => core.py} (76%) diff --git a/graph/wav2png.py b/graph/core.py similarity index 76% rename from graph/wav2png.py rename to graph/core.py index cd6b27e..75976d8 100644 --- a/graph/wav2png.py +++ b/graph/core.py @@ -20,7 +20,6 @@ # # Authors: # Bram de Jong -# Contributors: # Guillaume Pellerin @@ -28,6 +27,8 @@ import optparse, math, sys import ImageFilter, ImageChops, Image, ImageDraw, ImageColor import numpy import scikits.audiolab as audiolab +import Queue + color_schemes = { 'default': { @@ -47,103 +48,29 @@ color_schemes = { } } -class TestAudioFile(object): - """A class that mimics audiolab.sndfile but generates noise instead of reading - a wave file. Additionally it can be told to have a "broken" header and thus crashing - in the middle of the file. Also useful for testing ultra-short files of 20 samples.""" - def __init__(self, num_frames, has_broken_header=False): - self.seekpoint = 0 - self.num_frames = num_frames - self.has_broken_header = has_broken_header - - def seek(self, seekpoint): - self.seekpoint = seekpoint - - def get_nframes(self): - return self.num_frames - - def get_samplerate(self): - return 44100 - - def get_channels(self): - return 1 - - def read_frames(self, frames_to_read): - if self.has_broken_header and self.seekpoint + frames_to_read > self.num_frames / 2: - raise IOError() - - num_frames_left = self.num_frames - self.seekpoint - if num_frames_left < frames_to_read: - will_read = num_frames_left - else: - will_read = frames_to_read - self.seekpoint += will_read - return numpy.random.random(will_read)*2 - 1 - class AudioProcessor(object): - def __init__(self, audio_file, fft_size, window_function=numpy.ones): + def __init__(self, fft_size, channels, window_function=numpy.ones): self.fft_size = fft_size + self.channels = channels self.window = window_function(self.fft_size) - self.audio_file = audio_file - self.frames = audio_file.get_nframes() - self.samplerate = audio_file.get_samplerate() - self.channels = audio_file.get_channels() self.spectrum_range = None self.lower = 100 self.higher = 22050 self.lower_log = math.log10(self.lower) self.higher_log = math.log10(self.higher) self.clip = lambda val, low, high: min(high, max(low, val)) + self.q = Queue.Queue() - def read(self, start, size, resize_if_less=False): - """ read size samples starting at start, if resize_if_less is True and less than size - samples are read, resize the array to size and fill with zeros """ - - # number of zeros to add to start and end of the buffer - add_to_start = 0 - add_to_end = 0 - - if start < 0: - # the first FFT window starts centered around zero - if size + start <= 0: - if resize_if_less: - return numpy.zeros(size) - else: - return numpy.array([]) - else: - self.audio_file.seek(0) - - add_to_start = -start # remember: start is negative! - to_read = size + start - - if to_read > self.frames: - add_to_end = to_read - self.frames - to_read = self.frames - else: - self.audio_file.seek(start) - - to_read = size - if start + to_read >= self.frames: - to_read = self.frames - start - add_to_end = size - to_read - - try: - samples = self.audio_file.read_frames(to_read) - except IOError: - # this can happen for wave files with broken headers... - if resize_if_less: - return numpy.zeros(size) - else: - return numpy.zeros(2) - + def put(self, samples, eod): + """ Put frames of the first channel in the queue""" + # convert to mono by selecting left channel only if self.channels > 1: samples = samples[:,0] - if resize_if_less and (add_to_start > 0 or add_to_end > 0): - if add_to_start > 0: - samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1) + if eod: + samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1) if add_to_end > 0: samples = numpy.resize(samples, size) @@ -152,11 +79,9 @@ class AudioProcessor(object): return samples - def spectral_centroid(self, seek_point, spec_range=120.0): + def spectral_centroid(self, samples, spec_range=120.0): """ starting at seek_point read fft_size samples, and calculate the spectral centroid """ - samples = self.read(seek_point - self.fft_size/2, self.fft_size, True) - samples *= self.window fft = numpy.fft.fft(samples) spectrum = numpy.abs(fft[:fft.shape[0] / 2 + 1]) / float(self.fft_size) # normalized abs(FFT) between 0 and 1 @@ -171,7 +96,7 @@ class AudioProcessor(object): if energy > 1e-20: # calculate the spectral centroid - if self.spectrum_range == None: + if not self.spectrum_range: self.spectrum_range = numpy.arange(length) spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5 @@ -263,31 +188,32 @@ def interpolate_colors(colors, flat=False, num_colors=256): class WaveformImage(object): - def __init__(self, image_width, image_height, bg_color = None, color_scheme = None): - if not bg_color: - bg_color = (0,0,0) - if not color_scheme: - color_scheme = 'default' - - self.image = Image.new("RGB", (image_width, image_height), bg_color) - + + def __init__(self, image_width, image_height, nframes, bg_color=None, color_scheme=None, filename=None): self.image_width = image_width self.image_height = image_height - + self.nframes = nframes + self.bg_color = bg_color + if not bg_color: + self.bg_color = (0,0,0) + self.color_scheme = color_scheme + if not color_scheme: + self.color_scheme = 'default' + self.filename = filename + self.image = Image.new("RGB", (self.image_width, self.image_height), self.bg_color) + self.samples_per_pixel = self.nframes / float(self.image_width) + self.processor = AudioProcessor(self.fft_size, numpy.hanning) self.draw = ImageDraw.Draw(self.image) self.previous_x, self.previous_y = None, None - - colors = color_schemes[color_scheme]['waveform'] - + colors = color_schemes[self.color_scheme]['waveform'] # this line gets the old "screaming" colors back... # colors = [self.color_from_value(value/29.0) for value in range(0,30)] - self.color_lookup = interpolate_colors(colors) - self.pix = self.image.load() + self.pixel = self.image.load() def color_from_value(self, value): """ given a value between 0 and 1, return an (r,g,b) tuple """ - + return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50)) def draw_peaks(self, x, peaks, spectral_centroid): @@ -315,35 +241,42 @@ class WaveformImage(object): alpha = y_max - y_max_int if alpha > 0.0 and alpha < 1.0 and y_max_int + 1 < self.image_height: - current_pix = self.pix[x, y_max_int + 1] + current_pix = self.pixel[x, y_max_int + 1] r = int((1-alpha)*current_pix[0] + alpha*color[0]) g = int((1-alpha)*current_pix[1] + alpha*color[1]) b = int((1-alpha)*current_pix[2] + alpha*color[2]) - self.pix[x, y_max_int + 1] = (r,g,b) + self.pixel[x, y_max_int + 1] = (r,g,b) y_min = min(y1, y2) y_min_int = int(y_min) alpha = 1.0 - (y_min - y_min_int) if alpha > 0.0 and alpha < 1.0 and y_min_int - 1 >= 0: - current_pix = self.pix[x, y_min_int - 1] + current_pix = self.pixel[x, y_min_int - 1] r = int((1-alpha)*current_pix[0] + alpha*color[0]) g = int((1-alpha)*current_pix[1] + alpha*color[1]) b = int((1-alpha)*current_pix[2] + alpha*color[2]) - self.pix[x, y_min_int - 1] = (r,g,b) + self.pixel[x, y_min_int - 1] = (r,g,b) - def save(self, filename): - # draw a zero "zero" line + def process(self, frames): + + #for x in range(self.image_width): + seek_point = int(x * self.samples_per_pixel) + next_seek_point = int((x + 1) * self.samples_per_pixel) + (spectral_centroid, db_spectrum) = self.processor.spectral_centroid(seek_point) + peaks = self.processor.peaks(seek_point, next_seek_point) + self.draw_peaks(x, peaks, spectral_centroid) + + def save(self): a = 25 for x in range(self.image_width): - self.pix[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pix[x, self.image_height/2])) - - self.image.save(filename) - + self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2])) + self.image.save(self.filename) + class SpectrogramImage(object): def __init__(self, image_width, image_height, fft_size, bg_color = None, color_scheme = None): @@ -395,33 +328,6 @@ class SpectrogramImage(object): self.image.transpose(Image.ROTATE_90).save(filename) -def create_wavform_png(input_filename, output_filename_w, image_width, image_height, fft_size, - bg_color = None, color_scheme = None): - audio_file = audiolab.sndfile(input_filename, 'read') - - samples_per_pixel = audio_file.get_nframes() / float(image_width) - processor = AudioProcessor(audio_file, fft_size, numpy.hanning) - - waveform = WaveformImage(image_width, image_height, bg_color, color_scheme) - - for x in range(image_width): - - if x % (image_width/10) == 0: - sys.stdout.write('.') - sys.stdout.flush() - - seek_point = int(x * samples_per_pixel) - next_seek_point = int((x + 1) * samples_per_pixel) - - (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point) - peaks = processor.peaks(seek_point, next_seek_point) - - waveform.draw_peaks(x, peaks, spectral_centroid) - - waveform.save(output_filename_w) - - print " done" - def create_spectrogram_png(input_filename, output_filename_s, image_width, image_height, fft_size, bg_color = None, color_scheme = None): audio_file = audiolab.sndfile(input_filename, 'read') @@ -446,3 +352,38 @@ def create_spectrogram_png(input_filename, output_filename_s, image_width, image print " done" + + +class Noise(object): + """A class that mimics audiolab.sndfile but generates noise instead of reading + a wave file. Additionally it can be told to have a "broken" header and thus crashing + in the middle of the file. Also useful for testing ultra-short files of 20 samples.""" + def __init__(self, num_frames, has_broken_header=False): + self.seekpoint = 0 + self.num_frames = num_frames + self.has_broken_header = has_broken_header + + def seek(self, seekpoint): + self.seekpoint = seekpoint + + def get_nframes(self): + return self.num_frames + + def get_samplerate(self): + return 44100 + + def get_channels(self): + return 1 + + def read_frames(self, frames_to_read): + if self.has_broken_header and self.seekpoint + frames_to_read > self.num_frames / 2: + raise IOError() + + num_frames_left = self.num_frames - self.seekpoint + if num_frames_left < frames_to_read: + will_read = num_frames_left + else: + will_read = frames_to_read + self.seekpoint += will_read + return numpy.random.random(will_read)*2 - 1 + diff --git a/graph/spectrogram_audiolab.py b/graph/spectrogram_audiolab.py index 59c9c1d..6f25f30 100644 --- a/graph/spectrogram_audiolab.py +++ b/graph/spectrogram_audiolab.py @@ -22,7 +22,7 @@ from timeside.core import * from timeside.api import IGrapher from tempfile import NamedTemporaryFile -from timeside.graph.wav2png import * +from timeside.graph.core import * class SpectrogramGrapherAudiolab(Processor): """Spectrogram graph driver (python style thanks to wav2png.py and scikits.audiolab)""" diff --git a/graph/waveform_audiolab.py b/graph/waveform_audiolab.py index 513ec97..8b9c1ff 100644 --- a/graph/waveform_audiolab.py +++ b/graph/waveform_audiolab.py @@ -22,7 +22,7 @@ from timeside.core import * from timeside.api import IGrapher from tempfile import NamedTemporaryFile -from timeside.graph.wav2png import * +from timeside.graph.core import * class WaveFormGrapherAudiolab(Processor): """WaveForm graph driver (python style thanks to wav2png.py and scikits.audiolab)""" diff --git a/tests/api/examples.py b/tests/api/examples.py index 7578443..db84a9c 100644 --- a/tests/api/examples.py +++ b/tests/api/examples.py @@ -19,7 +19,7 @@ class FileDecoder(Processor): self.filename = filename # The file has to be opened here so that nframes(), samplerate(), # etc.. work before setup() is called. - self.file = audiolab.sndfile(self.filename, 'read') + self.file = audiolab.Sndfile(self.filename, 'r') self.position = 0 @interfacedoc @@ -36,31 +36,31 @@ class FileDecoder(Processor): @interfacedoc def channels(self): - return self.file.get_channels() + return self.file.channels @interfacedoc def samplerate(self): - return self.file.get_samplerate() + return self.file.samplerate @interfacedoc def duration(self): - return self.file.get_nframes() / self.file.get_samplerate() + return self.file.nframes / self.file.samplerate @interfacedoc def nframes(self): - return self.file.get_nframes() + return self.file.nframes @interfacedoc def format(self): - return self.file.get_file_format() + return self.file.file_format @interfacedoc def encoding(self): - return self.file.get_encoding() + return self.file.encoding @interfacedoc def resolution(self): resolution = None - encoding = self.file.get_encoding() + encoding = self.file.encoding if encoding == "pcm8": resolution = 8 @@ -209,7 +209,8 @@ class Waveform(Processor): implements(IGrapher) @interfacedoc - def __init__(self, width, height, output=None): + def __init__(self, width, height, nframes, output=None): + self.nframes = nframes self.filename = output self.image = None if width: @@ -247,8 +248,16 @@ class Waveform(Processor): Processor.setup(self, channels, samplerate) if self.image: self.image.close() - self.image = WaveformImage(self.width, self.height, self.bg_color, self.color_scheme, self.filename) - + self.image = WaveformImage(self.width, self.height, self.nframes) + + @interfacedoc + def process(self, frames, eod=False): + self.image.process(frames) + if eod: + self.image.close() + self.image = None + return frames, eod + @interfacedoc def render(self): self.image.process() -- 2.39.5