import ImageFilter, ImageChops, Image, ImageDraw, ImageColor
import numpy
import scikits.audiolab as audiolab
+from timeside.core import FixedSizeInputAdapter
import Queue
}
-class AudioProcessor(object):
- def __init__(self, buffer_size, fft_size, channels, nframes, samplerate, window_function=numpy.ones):
- self.buffer_size = buffer_size
+class SpectralCentroid(object):
+
+ def __init__(self, fft_size, nframes, samplerate, window_function=numpy.ones):
self.fft_size = fft_size
- self.channels = channels
+ self.buffer_size = self.fft_size
self.window = window_function(self.fft_size)
self.spectrum_range = None
self.lower = 100
self.q = Queue.Queue()
self.nframes = nframes
self.samplerate = samplerate
+ self.spectrum_adapter = FixedSizeInputAdapter(self.fft_size, 1, pad=True)
- def read(self, samples, start, size, resize_if_less=False):
+ def trim(self, samples, start, size, resize_if_less=False):
""" read size samples starting at start, if resize_if_less is True and less than size
samples are read, resize the array to size and fill with zeros """
return samples
- def spectral_centroid(self, samples, cursor, spec_range=120.0):
- """ starting at seek_point read fft_size samples, and calculate the spectral centroid """
+ def process(self, frames, eod, spec_range=120.0):
- #print 'Buffer size = ', len(samples)
- #samples = self.read(samples, cursor - self.fft_size/2, self.fft_size, True)
- #print 'FFT Buffer size = ', len(samples)
+ for buffer, end in self.spectrum_adapter.process(frames, True):
+ samples = buffer[:,0].copy()
+ if end:
+ break
samples *= self.window
fft = numpy.fft.fft(samples)
spectrum = numpy.abs(fft[:fft.shape[0] / 2 + 1]) / float(self.fft_size) # normalized abs(FFT) between 0 and 1
# scale the db spectrum from [- spec_range db ... 0 db] > [0..1]
db_spectrum = ((20*(numpy.log10(spectrum + 1e-30))).clip(-spec_range, 0.0) + spec_range)/spec_range
-
energy = spectrum.sum()
spectral_centroid = 0
if energy > 1e-20:
# calculate the spectral centroid
-
if self.spectrum_range == None:
self.spectrum_range = numpy.arange(length)
spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5
-
# clip > log10 > scale between 0 and 1
spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log)
return (spectral_centroid, db_spectrum)
- def peaks(self, samples):
- """ read all samples between start_seek and end_seek, then find the minimum and maximum peak
- in that range. Returns that pair in the order they were found. So if min was found first,
- it returns (min, max) else the other way around. """
-
- max_index = numpy.argmax(samples)
- max_value = samples[max_index]
-
- min_index = numpy.argmin(samples)
- min_value = samples[min_index]
-
- if min_index < max_index:
- return (min_value, max_value)
- else:
- return (max_value, min_value)
-
def interpolate_colors(colors, flat=False, num_colors=256):
""" given a list of colors, create a larger list of colors interpolating
the first one. If flatten is True a list of numers will be returned. If
class WaveformImage(object):
- def __init__(self, image_width, image_height, buffer_size, fft_size, nframes, nframes_adapter,
- samplerate, channels, bg_color=None, color_scheme=None, filename=None):
+ def __init__(self, image_width, image_height, nframes, samplerate, buffer_size, bg_color=None, color_scheme=None, filename=None):
+
self.image_width = image_width
self.image_height = image_height
- self.fft_size = fft_size
- self.buffer_size = buffer_size
self.nframes = nframes
- self.nframes_adapter = nframes_adapter
self.samplerate = samplerate
- self.channels = channels
+ self.fft_size = buffer_size
+ self.buffer_size = buffer_size
self.filename = filename
- self.samples_per_pixel = self.nframes / float(self.image_width)
- self.nbuffers = self.nframes / self.buffer_size
- self.pixel_per_buffer = self.buffer_size / self.samples_per_pixel
- #print 'pixels per buffer = ', self.pixel_per_buffer
- #print 'nframes (orig) = ', self.nframes
- #print 'nframes (adapter) = ', self.nframes_adapter
- #print 'frames per pixel = ', self.samples_per_pixel
- #print 'nbuffers = ', self.nbuffers
self.bg_color = bg_color
if not bg_color:
# this line gets the old "screaming" colors back...
# colors = [self.color_from_value(value/29.0) for value in range(0,30)]
self.color_lookup = interpolate_colors(colors)
+
+ self.samples_per_pixel = self.nframes / float(self.image_width)
+ #self.nbuffers = self.nframes / self.buffer_size
+ #self.pixel_per_buffer = self.buffer_size / self.samples_per_pixel
+
+ self.peaks_adapter = FixedSizeInputAdapter(int(self.samples_per_pixel), 1, pad=False)
+ self.peaks_adapter_nframes = self.peaks_adapter.nframes(self.nframes)
+ self.spectral_centroid = SpectralCentroid(self.fft_size, self.nframes, self.samplerate, numpy.hanning)
+
self.image = Image.new("RGB", (self.image_width, self.image_height), self.bg_color)
- self.processor = AudioProcessor(self.buffer_size, self.fft_size, self.channels, self.nframes, self.samplerate, numpy.hanning)
+ self.pixel = self.image.load()
self.draw = ImageDraw.Draw(self.image)
self.previous_x, self.previous_y = None, None
- self.pixel = self.image.load()
self.frame_cursor = 0
self.pixel_cursor = 0
+ def peaks(self, samples):
+ """ read all samples between start_seek and end_seek, then find the minimum and maximum peak
+ in that range. Returns that pair in the order they were found. So if min was found first,
+ it returns (min, max) else the other way around. """
+
+ max_index = numpy.argmax(samples)
+ max_value = samples[max_index]
+
+ min_index = numpy.argmin(samples)
+ min_value = samples[min_index]
+
+ if min_index < max_index:
+ return (min_value, max_value)
+ else:
+ return (max_value, min_value)
+
def color_from_value(self, value):
""" given a value between 0 and 1, return an (r,g,b) tuple """
self.pixel[x, y_min_int - 1] = (r,g,b)
def process(self, frames, eod):
- buffer = numpy.transpose(frames)[0].copy()
- buffer_copy = buffer.copy()
-
- #if eod:
- #buffer_size = self.nframes_adapter - self.nframes
- #print buffer_size
- #self.pixel_per_buffer = buffer_size / self.samples_per_pixel
- #print self.pixel_per_buffer
- ##buffer = buffer[0:buffer_size].copy()
-
- for x in range(int(self.pixel_per_buffer)):
- # FIXME : the peaks don't scale to the image width beacause self.pixel_per_buffer is not an integer
- # Will be fixed later...
- frame_cursor = int(x * self.samples_per_pixel)
- frame_next_cursor = int((x + 1) * self.samples_per_pixel)
- buffer_copy_trim = buffer[frame_cursor:frame_next_cursor].copy()
- peaks = self.processor.peaks(buffer_copy_trim)
- (spectral_centroid, db_spectrum) = self.processor.spectral_centroid(buffer_copy, frame_cursor)
+ if len(frames) == 1:
+ frames.shape = (len(frames),1)
+ buffer = frames.copy()
+ else:
+ buffer = frames[:,0].copy()
+ buffer.shape = (len(buffer),1)
+
+ for samples, end in self.peaks_adapter.process(buffer, eod):
+ if self.pixel_cursor == self.image_width:
+ break
+ peaks = self.peaks(samples)
+ # FIXME : too much repeated spectrum computation cycle
+ (spectral_centroid, db_spectrum) = self.spectral_centroid.process(buffer, True)
self.draw_peaks(self.pixel_cursor, peaks, spectral_centroid)
self.pixel_cursor += 1
- #print self.pixel_cursor
-
def save(self):
a = 25
for x in range(self.image_width):
print " done"
-
class Noise(object):
"""A class that mimics audiolab.sndfile but generates noise instead of reading
a wave file. Additionally it can be told to have a "broken" header and thus crashing