From 0ccd782ca45a49c0308a16bc26d151e0ad74d191 Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Mon, 18 Mar 2013 18:08:37 -0500 Subject: [PATCH] timeside/analyzers: add aubio pitch, onset, update onsetrate and bpm --- tests/testaubio_bpm.py | 27 +++++++++++ tests/testaubio_onset.py | 27 +++++++++++ tests/testaubio_onsetrate.py | 27 +++++++++++ tests/testaubio_pitch.py | 27 +++++++++++ timeside/analyzer/aubio_bpm.py | 15 +++--- timeside/analyzer/aubio_onset.py | 70 ++++++++++++++++++++++++++++ timeside/analyzer/aubio_onsetrate.py | 19 ++++---- timeside/analyzer/aubio_pitch.py | 69 +++++++++++++++++++++++++++ 8 files changed, 267 insertions(+), 14 deletions(-) create mode 100755 tests/testaubio_bpm.py create mode 100755 tests/testaubio_onset.py create mode 100755 tests/testaubio_onsetrate.py create mode 100755 tests/testaubio_pitch.py create mode 100644 timeside/analyzer/aubio_onset.py create mode 100644 timeside/analyzer/aubio_pitch.py diff --git a/tests/testaubio_bpm.py b/tests/testaubio_bpm.py new file mode 100755 index 0000000..37d8640 --- /dev/null +++ b/tests/testaubio_bpm.py @@ -0,0 +1,27 @@ +#! /usr/bin/env python + +from unittest import TestCase +from timeside.decoder import * +from timeside.analyzer.aubio_bpm import AubioBPM + +class TestAubioBPM(TestCase): + + def setUp(self): + self.analyzer = AubioBPM() + + def testOnSweep(self): + "runs on sweep" + self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav") + + def testOnGuitar(self): + "runs on guitar" + self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav") + + def tearDown(self): + decoder = FileDecoder(self.source) + (decoder | self.analyzer).run() + #print 'result:', self.analyzer.result() + +if __name__ == '__main__': + from unittest import main + main() diff --git a/tests/testaubio_onset.py b/tests/testaubio_onset.py new file mode 100755 index 0000000..0edfe5b --- /dev/null +++ b/tests/testaubio_onset.py @@ -0,0 +1,27 @@ +#! /usr/bin/env python + +from unittest import TestCase +from timeside.decoder import * +from timeside.analyzer.aubio_onset import AubioOnset + +class TestAubioOnset(TestCase): + + def setUp(self): + self.analyzer = AubioOnset() + + def testOnSweep(self): + "runs on sweep" + self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav") + + def testOnGuitar(self): + "runs on guitar" + self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav") + + def tearDown(self): + decoder = FileDecoder(self.source) + (decoder | self.analyzer).run() + #print "result:", self.analyzer.result() + +if __name__ == '__main__': + from unittest import main + main() diff --git a/tests/testaubio_onsetrate.py b/tests/testaubio_onsetrate.py new file mode 100755 index 0000000..56e5530 --- /dev/null +++ b/tests/testaubio_onsetrate.py @@ -0,0 +1,27 @@ +#! /usr/bin/env python + +from unittest import TestCase +from timeside.decoder import * +from timeside.analyzer.aubio_onsetrate import AubioOnsetRate + +class TestAubioOnset(TestCase): + + def setUp(self): + self.analyzer = AubioOnsetRate() + + def testOnSweep(self): + "runs on sweep" + self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav") + + def testOnGuitar(self): + "runs on guitar" + self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav") + + def tearDown(self): + decoder = FileDecoder(self.source) + (decoder | self.analyzer).run() + #print "result:", self.analyzer.result() + +if __name__ == '__main__': + from unittest import main + main() diff --git a/tests/testaubio_pitch.py b/tests/testaubio_pitch.py new file mode 100755 index 0000000..02990d0 --- /dev/null +++ b/tests/testaubio_pitch.py @@ -0,0 +1,27 @@ +#! /usr/bin/env python + +from unittest import TestCase +from timeside.decoder import * +from timeside.analyzer.aubio_pitch import AubioPitch + +class TestAubioPitch(TestCase): + + def setUp(self): + self.analyzer = AubioPitch() + + def testOnSweep(self): + "runs on sweep" + self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav") + + def testOnGuitar(self): + "runs on guitar" + self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav") + + def tearDown(self): + decoder = FileDecoder(self.source) + (decoder | self.analyzer).run() + #print "result:", self.analyzer.result() + +if __name__ == '__main__': + from unittest import main + main() diff --git a/timeside/analyzer/aubio_bpm.py b/timeside/analyzer/aubio_bpm.py index 5eee47e..b1537ce 100644 --- a/timeside/analyzer/aubio_bpm.py +++ b/timeside/analyzer/aubio_bpm.py @@ -28,11 +28,12 @@ class AubioBPM(Processor): implements(IValueAnalyzer) @interfacedoc - def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + def setup(self, channels=None, samplerate=None, blocksize=None, + totalframes=None): super(AubioBPM, self).setup(channels, samplerate, blocksize, totalframes) self.win_s = 512 self.hop_s = self.win_s / 2 - self.t = tempo("default", self.win_s, self.hop_s, 1) + self.t = tempo("default", self.win_s, self.hop_s, samplerate) self.block_read = 0 self.beats = [] @@ -56,14 +57,16 @@ class AubioBPM(Processor): def process(self, frames, eod=False): i = 0 - while not eod and i < frames.shape[0]: - isbeat = self.t(frames[0,i:i+self.hop_s]) - if isbeat: self.beats += [(isbeat[0] + self.block_read * self.hop_s ) / 44100. ] + while i < frames.shape[0]: + downmixed = frames[i:i+self.hop_s, :].sum(axis = -1) + isbeat = self.t(downmixed) + if isbeat: self.beats += [(isbeat[0] + self.block_read * self.hop_s ) / self.samplerate() ] i += self.hop_s self.block_read += 1 return frames, eod def result(self): - periods = [60./(b - a) for a,b in zip(self.beats[:-1],self.beats[1:])] + if len(self.beats) < 2: return 0 from numpy import median + periods = [60./(b - a) for a,b in zip(self.beats[:-1],self.beats[1:])] return median (periods) diff --git a/timeside/analyzer/aubio_onset.py b/timeside/analyzer/aubio_onset.py new file mode 100644 index 0000000..48bab84 --- /dev/null +++ b/timeside/analyzer/aubio_onset.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2013 Paul Brossier + +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Author: Paul Brossier + +from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter +from timeside.analyzer.core import * +from timeside.api import IValueAnalyzer +from aubio import onset + +class AubioOnset(Processor): + implements(IValueAnalyzer) + + @interfacedoc + def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + super(AubioOnset, self).setup(channels, samplerate, blocksize, totalframes) + self.win_s = 512 + self.hop_s = self.win_s / 2 + self.t = onset("default", self.win_s, self.hop_s, samplerate) + self.block_read = 0 + self.onsets = [] + + @staticmethod + @interfacedoc + def id(): + return "aubio_onset" + + @staticmethod + @interfacedoc + def name(): + return "aubio onset rate" + + @staticmethod + @interfacedoc + def unit(): + return "seconds" + + def __str__(self): + return "%s %s" % (str(self.value), unit()) + + def process(self, frames, eod=False): + i = 0 + while i < frames.shape[0]: + downmixed = frames[i:i+self.hop_s, :].sum(axis = -1) + isonset = self.t(downmixed) + if isonset: + #print self.t.get_last_onset_s() + self.onsets += [self.t.get_last_onset_s()] + i += self.hop_s + self.block_read += 1 + return frames, eod + + def result(self): + return self.onsets diff --git a/timeside/analyzer/aubio_onsetrate.py b/timeside/analyzer/aubio_onsetrate.py index e7417ae..7cfc2d5 100644 --- a/timeside/analyzer/aubio_onsetrate.py +++ b/timeside/analyzer/aubio_onsetrate.py @@ -29,10 +29,10 @@ class AubioOnsetRate(Processor): @interfacedoc def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): - super(AubioOnsetRate, self).setup(channels, samplerate, blocksize, totalframes) + super(AubioOnsetRate, self).setup(channels, samplerate, blocksize, totalframes) self.win_s = 512 self.hop_s = self.win_s / 2 - self.t = onset("default", self.win_s, self.hop_s, 1) + self.t = onset("default", self.win_s, self.hop_s, samplerate) self.block_read = 0 self.onsets = [] @@ -56,14 +56,17 @@ class AubioOnsetRate(Processor): def process(self, frames, eod=False): i = 0 - while not eod and i < frames.shape[0]: - isonset = self.t(frames[0,i:i+self.hop_s]) - if isonset: self.onsets += [(isonset[0] + self.block_read * self.hop_s ) / 44100. ] - i += self.hop_s - self.block_read += 1 + while i < frames.shape[0]: + downmixed = frames[i:i+self.hop_s, :].sum(axis = -1) + isonset = self.t(downmixed) + if isonset: + self.onsets += [self.t.get_last_onset_s()] + i += self.hop_s + self.block_read += 1 return frames, eod def result(self): - periods = [60./(b - a) for a,b in zip(self.onsets[:-1],self.onsets[1:])] + if len(self.onsets) < 2: return 0 from numpy import mean + periods = [60./(b - a) for a,b in zip(self.onsets[:-1],self.onsets[1:])] return mean(periods) diff --git a/timeside/analyzer/aubio_pitch.py b/timeside/analyzer/aubio_pitch.py new file mode 100644 index 0000000..074622a --- /dev/null +++ b/timeside/analyzer/aubio_pitch.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2013 Paul Brossier + +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . + +# Author: Paul Brossier + +from timeside.core import Processor, implements, interfacedoc, FixedSizeInputAdapter +from timeside.analyzer.core import * +from timeside.api import IValueAnalyzer +from aubio import pitch + +class AubioPitch(Processor): + implements(IValueAnalyzer) + + @interfacedoc + def setup(self, channels=None, samplerate=None, blocksize=None, totalframes=None): + super(AubioPitch, self).setup(channels, samplerate, blocksize, totalframes) + self.win_s = 2048 + self.hop_s = self.win_s / 2 + self.p = pitch("default", self.win_s, self.hop_s, samplerate) + self.p.set_unit("freq") + self.block_read = 0 + self.pitches = [] + + @staticmethod + @interfacedoc + def id(): + return "aubio_pitch" + + @staticmethod + @interfacedoc + def name(): + return "aubio pitch" + + @staticmethod + @interfacedoc + def unit(): + return "Hz" + + def __str__(self): + return "%s %s" % (str(self.value), unit()) + + def process(self, frames, eod=False): + i = 0 + while i < frames.shape[0]: + downmixed = frames[i:i+self.hop_s, :].sum(axis = -1) + self.pitches += [self.p(downmixed)[0]] + i += self.hop_s + self.block_read += 1 + return frames, eod + + def result(self): + return self.pitches + -- 2.39.5