import scikits.audiolab as audiolab
class AudioProcessor(Component):
-
+
def __init__(self):
self.fft_size = 2048
self.window_function = numpy.ones
self.clip = lambda val, low, high: min(high, max(low, val))
def pre_process(self, media_item):
- wav_file = media_item.file.path
+ wav_file = media_item
self.audio_file = audiolab.sndfile(wav_file, 'read')
self.frames = self.audio_file.get_nframes()
self.samplerate = self.audio_file.get_samplerate()
def get_samples(self):
samples = self.audio_file.read_frames(self.frames)
return samples
-
+
def get_mono_samples(self):
# convert to mono by selecting left channel only
samples = self.get_samples()
def read(self, start, size, resize_if_less=False):
""" read size samples starting at start, if resize_if_less is True and less than size
samples are read, resize the array to size and fill with zeros """
-
+
# number of zeros to add to start and end of the buffer
add_to_start = 0
add_to_end = 0
-
+
if start < 0:
# the first FFT window starts centered around zero
if size + start <= 0:
to_read = self.frames
else:
self.audio_file.seek(start)
-
+
to_read = size
if start + to_read >= self.frames:
to_read = self.frames - start
add_to_end = size - to_read
-
+
try:
samples = self.audio_file.read_frames(to_read)
except IOError:
if resize_if_less and (add_to_start > 0 or add_to_end > 0):
if add_to_start > 0:
samples = numpy.concatenate((numpy.zeros(add_to_start), samples), axis=1)
-
+
if add_to_end > 0:
samples = numpy.resize(samples, size)
samples[size - add_to_end:] = 0
-
+
return samples
def spectral_centroid(self, seek_point, spec_range=120.0):
""" starting at seek_point read fft_size samples, and calculate the spectral centroid """
-
+
samples = self.read(seek_point - self.fft_size/2, self.fft_size, True)
samples *= self.window
fft = numpy.fft.fft(samples)
spectrum = numpy.abs(fft[:fft.shape[0] / 2 + 1]) / float(self.fft_size) # normalized abs(FFT) between 0 and 1
length = numpy.float64(spectrum.shape[0])
-
+
# scale the db spectrum from [- spec_range db ... 0 db] > [0..1]
db_spectrum = ((20*(numpy.log10(spectrum + 1e-30))).clip(-spec_range, 0.0) + spec_range)/spec_range
-
+
energy = spectrum.sum()
spectral_centroid = 0
-
+
if energy > 1e-20:
# calculate the spectral centroid
-
+
if self.spectrum_range == None:
self.spectrum_range = numpy.arange(length)
-
+
spectral_centroid = (spectrum * self.spectrum_range).sum() / (energy * (length - 1)) * self.samplerate * 0.5
-
+
# clip > log10 > scale between 0 and 1
spectral_centroid = (math.log10(self.clip(spectral_centroid, self.lower, self.higher)) - self.lower_log) / (self.higher_log - self.lower_log)
-
+
return (spectral_centroid, db_spectrum)
""" read all samples between start_seek and end_seek, then find the minimum and maximum peak
in that range. Returns that pair in the order they were found. So if min was found first,
it returns (min, max) else the other way around. """
-
+
# larger blocksizes are faster but take more mem...
# Aha, Watson, a clue, a tradeof!
block_size = 4096
-
+
max_index = -1
max_value = -1
min_index = -1
min_value = 1
-
+
if end_seek > self.frames:
end_seek = self.frames
-
+
if block_size > end_seek - start_seek:
block_size = end_seek - start_seek
-
+
if block_size <= 1:
samples = self.read(start_seek, 1)
return samples[0], samples[0]
elif block_size == 2:
samples = self.read(start_seek, True)
return samples[0], samples[1]
-
+
for i in range(start_seek, end_seek, block_size):
samples = self.read(i, block_size)
-
+
local_max_index = numpy.argmax(samples)
local_max_value = samples[local_max_index]
-
+
if local_max_value > max_value:
max_value = local_max_value
max_index = local_max_index
-
+
local_min_index = numpy.argmin(samples)
local_min_value = samples[local_min_index]
-
+
if local_min_value < min_value:
min_value = local_min_value
min_index = local_min_index
return (max_value, min_value)
-
+
import timeside
from timeside.core import Component, ExtensionPoint, ComponentManager
+
class TestAnalyzers(Component):
analyzers = ExtensionPoint(timeside.analyze.IAnalyzer)
- def run(self):
+ def list(self):
analyzers = []
for analyzer in self.analyzers:
analyzers.append({'name':analyzer.name(),
})
print analyzers
+ def run(self, media):
+ print '\n=== Analyzer testing ===\n'
+ for analyzer in self.analyzers:
+ id = analyzer.id()
+ value = analyzer.render(media)
+ print id + ' = ' + str(value) + ' ' + analyzer.unit()
+
class TestDecoders(Component):
decoders = ExtensionPoint(timeside.decode.IDecoder)
- def run(self):
+ def list(self):
decoders = []
for decoder in self.decoders:
decoders.append({'format':decoder.format(),
class TestEncoders(Component):
encoders = ExtensionPoint(timeside.encode.IEncoder)
- def run(self):
+ def list(self):
encoders = []
for encoder in self.encoders:
encoders.append({'format':encoder.format(),
class TestGraphers(Component):
graphers = ExtensionPoint(timeside.graph.IGrapher)
- def run(self):
+ def list(self):
graphers = []
for grapher in self.graphers:
graphers.append({'id':grapher.id(),
})
print graphers
+ def run(self, media):
+ print '\n=== Grapher testing ===\n'
+ for grapher in self.graphers:
+ id = grapher.id()
+ image = grapher.render(media)
+ file_path = 'results/'+id+'.png'
+ file = open(file_path, 'w')
+ for chunk in image:
+ file.write(chunk)
+ print 'Image exported to :' + file_path
+ file.close()
+
if __name__ == '__main__':
+ sample = 'samples/sweep.wav'
comp_mgr = ComponentManager()
a = TestAnalyzers(comp_mgr)
d = TestDecoders(comp_mgr)
e = TestEncoders(comp_mgr)
g = TestGraphers(comp_mgr)
- a.run()
- d.run()
- e.run()
- g.run()
+ a.list()
+ d.list()
+ e.list()
+ g.list()
+ a.run(sample)
+ #d.run()
+ #e.run()
+ g.run(sample)
+