#
# Authors:
# Bram de Jong <bram.dejong at domain.com where domain in gmail>
-# Guillaume Pellerin <pellerin@parisson.com>
+# Guillaume Pellerin <yomguy@parisson.com>
import optparse, math, sys
class AudioProcessor(object):
- def __init__(self, fft_size, channels, window_function=numpy.ones):
+ def __init__(self, buffer_size, fft_size, channels, nframes, samplerate, window_function=numpy.ones):
+ self.buffer_size = buffer_size
self.fft_size = fft_size
self.channels = channels
self.window = window_function(self.fft_size)
self.higher_log = math.log10(self.higher)
self.clip = lambda val, low, high: min(high, max(low, val))
self.q = Queue.Queue()
+ self.nframes = nframes
+ self.samplerate = samplerate
+
+ def read(self, samples, start, size, resize_if_less=False):
+ """ read size samples starting at start, if resize_if_less is True and less than size
+ samples are read, resize the array to size and fill with zeros """
+
+ # number of zeros to add to start and end of the buffer
+ add_to_start = 0
+ add_to_end = 0
+
+ if start < 0:
+ # the first FFT window starts centered around zero
+ if size + start <= 0:
+ if resize_if_less:
+ return numpy.zeros(size)
+ else:
+ return numpy.array([])
+ else:
+ add_to_start = -start # remember: start is negative!
+ to_read = size + start
+
+ if to_read > self.buffer_size:
+ add_to_end = to_read - self.buffer_size
+ to_read = self.buffer_size
+ else:
+ to_read = size
+ if start + to_read >= self.buffer_size:
+ to_read = self.buffer_size - start
+ add_to_end = size - to_read
- def put(self, samples, eod):
- """ Put frames of the first channel in the queue"""
-
- # convert to mono by selecting left channel only
- if self.channels > 1:
- samples = samples[:,0]
+ if resize_if_less and (add_to_start > 0 or add_to_end > 0):
+ if add_to_start > 0:
+ samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1)
- if eod:
- samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1)
-
if add_to_end > 0:
samples = numpy.resize(samples, size)
samples[size - add_to_end:] = 0
-
- return samples
+ return samples
- def spectral_centroid(self, samples, spec_range=120.0):
+ def spectral_centroid(self, samples, cursor, spec_range=120.0):
""" starting at seek_point read fft_size samples, and calculate the spectral centroid """
-
+
+ #print 'Buffer size = ', len(samples)
+ #samples = self.read(samples, cursor - self.fft_size/2, self.fft_size, True)
+ #print 'FFT Buffer size = ', len(samples)
samples *= self.window
fft = numpy.fft.fft(samples)
spectrum = numpy.abs(fft[:fft.shape[0] / 2 + 1]) / float(self.fft_size) # normalized abs(FFT) between 0 and 1
length = numpy.float64(spectrum.shape[0])
-
+
# scale the db spectrum from [- spec_range db ... 0 db] > [0..1]
db_spectrum = ((20*(numpy.log10(spectrum + 1e-30))).clip(-spec_range, 0.0) + spec_range)/spec_range
-
+
energy = spectrum.sum()
spectral_centroid = 0
-
+
if energy > 1e-20:
# calculate the spectral centroid
-
- if not self.spectrum_range:
+
+ if self.spectrum_range == None:
self.spectrum_range = numpy.arange(length)
-
+
spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5
-
+
# clip > log10 > scale between 0 and 1
spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log)
-
+
return (spectral_centroid, db_spectrum)
- def peaks(self, start_seek, end_seek):
+ def peaks(self, samples):
""" read all samples between start_seek and end_seek, then find the minimum and maximum peak
in that range. Returns that pair in the order they were found. So if min was found first,
it returns (min, max) else the other way around. """
-
- # larger blocksizes are faster but take more mem...
- # Aha, Watson, a clue, a tradeof!
- block_size = 4096
-
- max_index = -1
- max_value = -1
- min_index = -1
- min_value = 1
-
- if end_seek > self.frames:
- end_seek = self.frames
-
- if block_size > end_seek - start_seek:
- block_size = end_seek - start_seek
-
- if block_size <= 1:
- samples = self.read(start_seek, 1)
- return samples[0], samples[0]
- elif block_size == 2:
- samples = self.read(start_seek, True)
- return samples[0], samples[1]
-
- for i in range(start_seek, end_seek, block_size):
- samples = self.read(i, block_size)
-
- local_max_index = numpy.argmax(samples)
- local_max_value = samples[local_max_index]
-
- if local_max_value > max_value:
- max_value = local_max_value
- max_index = local_max_index
-
- local_min_index = numpy.argmin(samples)
- local_min_value = samples[local_min_index]
-
- if local_min_value < min_value:
- min_value = local_min_value
- min_index = local_min_index
+
+ max_index = numpy.argmax(samples)
+ max_value = samples[max_index]
+
+ min_index = numpy.argmin(samples)
+ min_value = samples[min_index]
if min_index < max_index:
return (min_value, max_value)
else:
return (max_value, min_value)
-
def interpolate_colors(colors, flat=False, num_colors=256):
""" given a list of colors, create a larger list of colors interpolating
the first one. If flatten is True a list of numers will be returned. If
False, a list of (r,g,b) tuples. num_colors is the number of colors wanted
in the final list """
-
+
palette = []
-
+
for i in range(num_colors):
index = (i * (len(colors) - 1))/(num_colors - 1.0)
index_int = int(index)
alpha = index - float(index_int)
-
+
if alpha > 0:
r = (1.0 - alpha) * colors[index_int][0] + alpha * colors[index_int + 1][0]
g = (1.0 - alpha) * colors[index_int][1] + alpha * colors[index_int + 1][1]
r = (1.0 - alpha) * colors[index_int][0]
g = (1.0 - alpha) * colors[index_int][1]
b = (1.0 - alpha) * colors[index_int][2]
-
+
if flat:
palette.extend((int(r), int(g), int(b)))
else:
palette.append((int(r), int(g), int(b)))
-
+
return palette
-
+
class WaveformImage(object):
-
- def __init__(self, image_width, image_height, nframes, bg_color=None, color_scheme=None, filename=None):
+
+ def __init__(self, image_width, image_height, buffer_size, fft_size, nframes, nframes_adapter,
+ samplerate, channels, bg_color=None, color_scheme=None, filename=None):
self.image_width = image_width
self.image_height = image_height
+ self.fft_size = fft_size
+ self.buffer_size = buffer_size
self.nframes = nframes
+ self.nframes_adapter = nframes_adapter
+ self.samplerate = samplerate
+ self.channels = channels
+ self.filename = filename
+ self.samples_per_pixel = self.nframes / float(self.image_width)
+ self.nbuffers = self.nframes / self.buffer_size
+ self.pixel_per_buffer = self.buffer_size / self.samples_per_pixel
+ #print 'pixels per buffer = ', self.pixel_per_buffer
+ #print 'nframes (orig) = ', self.nframes
+ #print 'nframes (adapter) = ', self.nframes_adapter
+ #print 'frames per pixel = ', self.samples_per_pixel
+ #print 'nbuffers = ', self.nbuffers
+
self.bg_color = bg_color
if not bg_color:
self.bg_color = (0,0,0)
self.color_scheme = color_scheme
- if not color_scheme:
+ if not color_scheme:
self.color_scheme = 'default'
- self.filename = filename
- self.image = Image.new("RGB", (self.image_width, self.image_height), self.bg_color)
- self.samples_per_pixel = self.nframes / float(self.image_width)
- self.processor = AudioProcessor(self.fft_size, numpy.hanning)
- self.draw = ImageDraw.Draw(self.image)
- self.previous_x, self.previous_y = None, None
colors = color_schemes[self.color_scheme]['waveform']
# this line gets the old "screaming" colors back...
# colors = [self.color_from_value(value/29.0) for value in range(0,30)]
self.color_lookup = interpolate_colors(colors)
+ self.image = Image.new("RGB", (self.image_width, self.image_height), self.bg_color)
+ self.processor = AudioProcessor(self.buffer_size, self.fft_size, self.channels, self.nframes, self.samplerate, numpy.hanning)
+ self.draw = ImageDraw.Draw(self.image)
+ self.previous_x, self.previous_y = None, None
self.pixel = self.image.load()
+ self.frame_cursor = 0
+ self.pixel_cursor = 0
def color_from_value(self, value):
""" given a value between 0 and 1, return an (r,g,b) tuple """
-
+
return ImageColor.getrgb("hsl(%d,%d%%,%d%%)" % (int( (1.0 - value) * 360 ), 80, 50))
-
+
def draw_peaks(self, x, peaks, spectral_centroid):
""" draw 2 peaks at x using the spectral_centroid for color """
y1 = self.image_height * 0.5 - peaks[0] * (self.image_height - 4) * 0.5
y2 = self.image_height * 0.5 - peaks[1] * (self.image_height - 4) * 0.5
-
+
line_color = self.color_lookup[int(spectral_centroid*255.0)]
-
+
if self.previous_y != None:
self.draw.line([self.previous_x, self.previous_y, x, y1, x, y2], line_color)
else:
self.draw.line([x, y1, x, y2], line_color)
-
+
self.previous_x, self.previous_y = x, y2
-
+
self.draw_anti_aliased_pixels(x, y1, y2, line_color)
-
+
def draw_anti_aliased_pixels(self, x, y1, y2, color):
""" vertical anti-aliasing at y1 and y2 """
y_max = max(y1, y2)
y_max_int = int(y_max)
alpha = y_max - y_max_int
-
+
if alpha > 0.0 and alpha < 1.0 and y_max_int + 1 < self.image_height:
current_pix = self.pixel[x, y_max_int + 1]
-
+
r = int((1-alpha)*current_pix[0] + alpha*color[0])
g = int((1-alpha)*current_pix[1] + alpha*color[1])
b = int((1-alpha)*current_pix[2] + alpha*color[2])
-
+
self.pixel[x, y_max_int + 1] = (r,g,b)
-
+
y_min = min(y1, y2)
y_min_int = int(y_min)
alpha = 1.0 - (y_min - y_min_int)
-
+
if alpha > 0.0 and alpha < 1.0 and y_min_int - 1 >= 0:
current_pix = self.pixel[x, y_min_int - 1]
-
+
r = int((1-alpha)*current_pix[0] + alpha*color[0])
g = int((1-alpha)*current_pix[1] + alpha*color[1])
b = int((1-alpha)*current_pix[2] + alpha*color[2])
-
+
self.pixel[x, y_min_int - 1] = (r,g,b)
-
- def process(self, frames):
- #for x in range(self.image_width):
- seek_point = int(x * self.samples_per_pixel)
- next_seek_point = int((x + 1) * self.samples_per_pixel)
- (spectral_centroid, db_spectrum) = self.processor.spectral_centroid(seek_point)
- peaks = self.processor.peaks(seek_point, next_seek_point)
- self.draw_peaks(x, peaks, spectral_centroid)
+ def process(self, frames, eod):
+ buffer = numpy.transpose(frames)[0].copy()
+ buffer_copy = buffer.copy()
+
+ #if eod:
+ #buffer_size = self.nframes_adapter - self.nframes
+ #print buffer_size
+ #self.pixel_per_buffer = buffer_size / self.samples_per_pixel
+ #print self.pixel_per_buffer
+ ##buffer = buffer[0:buffer_size].copy()
+
+ for x in range(int(self.pixel_per_buffer)):
+ # FIXME : the peaks don't scale to the image width beacause self.pixel_per_buffer is not an integer
+ # Will be fixed later...
+ frame_cursor = int(x * self.samples_per_pixel)
+ frame_next_cursor = int((x + 1) * self.samples_per_pixel)
+ buffer_copy_trim = buffer[frame_cursor:frame_next_cursor].copy()
+ peaks = self.processor.peaks(buffer_copy_trim)
+ (spectral_centroid, db_spectrum) = self.processor.spectral_centroid(buffer_copy, frame_cursor)
+ self.draw_peaks(self.pixel_cursor, peaks, spectral_centroid)
+ self.pixel_cursor += 1
+
+ #print self.pixel_cursor
def save(self):
a = 25
self.pixel[x, self.image_height/2] = tuple(map(lambda p: p+a, self.pixel[x, self.image_height/2]))
self.image.save(self.filename)
-
+
class SpectrogramImage(object):
def __init__(self, image_width, image_height, fft_size, bg_color = None, color_scheme = None):
#FIXME: bg_color is ignored
- if not color_scheme:
+ if not color_scheme:
color_scheme = 'default'
self.image = Image.new("P", (image_height, image_width))
-
+
self.image_width = image_width
self.image_height = image_height
self.fft_size = fft_size
-
+
colors = color_schemes[color_scheme]['spectrogram']
-
+
self.image.putpalette(interpolate_colors(colors, True))
# generate the lookup which translates y-coordinate to fft-bin
if bin < self.fft_size/2:
alpha = bin - int(bin)
-
+
self.y_to_bin.append((int(bin), alpha * 255))
-
+
# this is a bit strange, but using image.load()[x,y] = ... is
# a lot slower than using image.putadata and then rotating the image
# so we store all the pixels in an array and then create the image when saving
self.pixels = []
-
+
def draw_spectrum(self, x, spectrum):
for (index, alpha) in self.y_to_bin:
self.pixels.append( int( ((255.0-alpha) * spectrum[index] + alpha * spectrum[index + 1] )) )
-
+
for y in range(len(self.y_to_bin), self.image_height):
self.pixels.append(0)
samples_per_pixel = audio_file.get_nframes() / float(image_width)
processor = AudioProcessor(audio_file, fft_size, numpy.hanning)
-
+
spectrogram = SpectrogramImage(image_width, image_height, fft_size, bg_color, color_scheme)
-
+
for x in range(image_width):
-
+
if x % (image_width/10) == 0:
sys.stdout.write('.')
sys.stdout.flush()
-
+
seek_point = int(x * samples_per_pixel)
- next_seek_point = int((x + 1) * samples_per_pixel)
- (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point)
+ next_seek_point = int((x + 1) * samples_per_pixel)
+ (spectral_centroid, db_spectrum) = processor.spectral_centroid(seek_point)
spectrogram.draw_spectrum(x, db_spectrum)
-
+
spectrogram.save(output_filename_s)
-
+
print " done"
else:
will_read = frames_to_read
self.seekpoint += will_read
- return numpy.random.random(will_read)*2 - 1
+ return numpy.random.random(will_read)*2 - 1