From b5ff38a87cb91a333700aa6f0f00c9b6bbad107f Mon Sep 17 00:00:00 2001 From: Thomas Fillon Date: Fri, 15 Nov 2013 14:57:19 +0100 Subject: [PATCH] Analyzers : Add preprocessors as process decorators --- tests/test_analyzer_preprocessors.py | 149 ++++++++++++++++++++++++ timeside/analyzer/preprocessors.py | 168 +++++++++++++++++++++++++++ 2 files changed, 317 insertions(+) create mode 100644 tests/test_analyzer_preprocessors.py create mode 100644 timeside/analyzer/preprocessors.py diff --git a/tests/test_analyzer_preprocessors.py b/tests/test_analyzer_preprocessors.py new file mode 100644 index 0000000..b4b703b --- /dev/null +++ b/tests/test_analyzer_preprocessors.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# Author : Thomas fillon + +#! /usr/bin/env python + +from unit_timeside import * +from timeside.decoder import * +from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter +import numpy as np + +BLOCKSIZE = 1024 +STEPSIZE = 256 + +class FakeAnalyzer(object): + def __init__(self, blocksize=BLOCKSIZE, stepsize=STEPSIZE): + self.frames = [] # Container for the frame as viewed by process + self.input_blocksize = blocksize + self.input_stepsize= stepsize + + def process(self, frames, eod): + self.frames.append(frames) + return frames, eod + + +class TestAnalyzerPreProcessors(TestCase): + + def tearDown(self): + + analyzer = FakeAnalyzer() + + process_output = [] + for frames, eod in zip(self.input_frames, self.input_eod): + process_output.append(analyzer.decorated_process(frames, eod)) + + output_frames = np.asarray([frames for frames, _ in process_output]) + output_eod = [eod for _, eod in process_output] + + self.assertEqual(self.input_frames.tolist(), output_frames.tolist()) + self.assertEqual(self.input_eod, output_eod) + self.assertEqual(self.process_frames.tolist(), + np.asarray(analyzer.frames).tolist()) + + +class TestDownmixToMono(TestAnalyzerPreProcessors): + + def setUp(self): + self.decorator = downmix_to_mono + # Decorate the process + FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process) + + def test_on_mono(self): + "Run on stereo, eod = False" + self.input_frames = np.random.randn(30, 4096) + self.input_eod = np.repeat(False, 30).tolist() + self.process_frames = self.input_frames + + def test_on_stereo(self): + "Run on stereo, eod = False" + self.input_frames = np.random.randn(30, 4096, 2) + self.input_eod = np.repeat(False, 30).tolist() + self.process_frames = self.input_frames.mean(axis=-1) + + def test_on_multichannel(self): + "Run on multi-channel, eod = False" + self.input_frames = np.random.randn(30, 4096, 6) + self.input_eod = np.repeat(False, 30).tolist() + self.process_frames = self.input_frames.mean(axis=-1) + + def test_on_mono_eod_true(self): + "Run on mono, last eod = True" + self.input_frames = np.random.randn(30, 4096) + self.input_eod = np.repeat(False, 30).tolist() + self.input_eod[-1] = True + self.process_frames = self.input_frames + + def test_on_stereo_eod_true(self): + "Run on stereo, last eod = True" + self.input_frames = np.random.randn(30, 4096, 2) + self.input_eod = np.repeat(False, 30).tolist() + self.input_eod[-1] = True + self.process_frames = self.input_frames.mean(axis=-1) + + def test_on_multichannel_eod_true(self): + "Run on multi-channel, last eod = True" + self.input_frames = np.random.randn(30, 4096, 6) + self.input_eod = np.repeat(False, 30).tolist() + self.input_eod[-1] = True + self.process_frames = self.input_frames.mean(axis=-1) + +class TestFramesAdapter(TestAnalyzerPreProcessors, TestCase): + + def setUp(self): + self.decorator = frames_adapter + # Decorate the process + FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process) + + def test_on_mono(self): + "Run on mono" + self.input_frames = np.arange(0, 2500).reshape(5, -1) + self.input_eod = [False, False, False, False, False] + + self.process_frames = np.asarray([range(0, 1024), + range(256, 1280), + range(512, 1536), + range(768, 1792), + range(1024, 2048), + range(1280, 2304)]) + + def test_on_stereo(self): + "Run on stereo" + self.input_frames = np.arange(0, 5000).reshape(5, -1, 2) + self.input_eod = [False, False, False, False, False] + + self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2), + np.arange(512, 2560).reshape(-1, 2), + np.arange(1024, 3072).reshape(-1, 2), + np.arange(1536, 3584).reshape(-1, 2), + np.arange(2048, 4096).reshape(-1, 2), + np.arange(2560, 4608).reshape(-1, 2)]) + def test_on_mono_eod_true(self): + "Run on mono, last eod = True" + self.input_frames = np.arange(0, 2500).reshape(5, -1) + self.input_eod = [False, False, False, False, True] + last_frames = range(1536, 2500) + last_frames.extend([0]*60) + self.process_frames = np.asarray([range(0, 1024), + range(256, 1280), + range(512, 1536), + range(768, 1792), + range(1024, 2048), + range(1280, 2304), + last_frames]) + + def test_on_stereo_eod_true(self): + "Run on stereo, last eod = True" + self.input_frames = np.arange(0, 5000).reshape(5, -1, 2) + self.input_eod = [False, False, False, False, True] + last_frames = np.hstack([np.arange(3072, 5000), + np.zeros((120,))]).reshape(-1, 2) + self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2), + np.arange(512, 2560).reshape(-1, 2), + np.arange(1024, 3072).reshape(-1, 2), + np.arange(1536, 3584).reshape(-1, 2), + np.arange(2048, 4096).reshape(-1, 2), + np.arange(2560, 4608).reshape(-1, 2), + last_frames]) + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) diff --git a/timeside/analyzer/preprocessors.py b/timeside/analyzer/preprocessors.py new file mode 100644 index 0000000..c96a236 --- /dev/null +++ b/timeside/analyzer/preprocessors.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2009-2013 Parisson SARL +# +# This file is part of TimeSide. + +# TimeSide is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. + +# TimeSide is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with TimeSide. If not, see . +# +# Author : Thomas fillon + + +def downmix_to_mono(process_func): + ''' + Pre-processing decorator that downmixes frames from multi-channel to mono + Downmix by averaging all channels + >>> @downmix_to_mono + ... def process(analyzer,frames,eod): + ... print 'Frames, eod inside process :' + ... print frames, eod + ... return frames, eod + ... + >>> import numpy as np + >>> frames = np.asarray([[1,2],[3,4],[5,6],[7,8],[9,10]]) + >>> eod = False + >>> frames_, eod_ = process(object(),frames,eod) + Frames, eod inside process : + [ 1.5 3.5 5.5 7.5 9.5] False + + Outside Process frames and eod are preserved : + >>> frames_ + array([[ 1, 2], + [ 3, 4], + [ 5, 6], + [ 7, 8], + [ 9, 10]]) + >>> eod_ + False + ''' + + import functools + + @functools.wraps(process_func) + def wrapper(analyzer, frames, eod): + # Pre-processing + if frames.ndim > 1: + downmix_frames = frames.mean(axis=-1) + else: + downmix_frames = frames + # Processing + process_func(analyzer, downmix_frames, eod) + + return frames, eod + return wrapper + + +def frames_adapter(process_func): + ''' + Pre-processing decorator that adapt frames to match blocksize and stepsize + >>> @frames_adapter + ... def process(analyzer,frames,eod): + ... analyzer.frames.append(frames) + ... return frames, eod + ... + >>> class Fake_Analyzer(object): + ... def __init__(self): + ... self.input_blocksize = 4 + ... self.input_stepsize = 3 + ... self.frames = [] # Container for the frame as viewed by process + >>> import numpy as np + >>> analyzer = Fake_Analyzer() + >>> frames = np.asarray(range(0,12)) + >>> eod = False + >>> frames_, eod_ = process(analyzer,frames,eod) + + Inside the process the frames have been adapted to match input_blocksize + and input_stepsize + >>> analyzer.frames + [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9])] + + Outside the process, the original frames and eod are preserved : + >>> frames_ + array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) + >>> eod_ + False + + Releasing the process with eod=True will zeropad the last frame if necessary + >>> frames = np.asarray(range(12,14)) + >>> eod = True + >>> frames_, eod_ = process(analyzer,frames,eod) + >>> analyzer.frames + [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9]), array([ 9, 10, 11, 12]), array([12, 13, 0, 0])] + ''' + + import functools + import numpy as np + + class framesBuffer(object): + + def __init__(self, blocksize, stepsize): + self.blocksize = blocksize + self.stepsize = stepsize + self.stack = None + + def frames(self, frames, eod): + if self.stack is not None: + stack = np.concatenate([self.stack, frames]) + else: + stack = frames.copy() + + stack_length = len(stack) + + nb_frames = ( + stack_length - self.blocksize + self.stepsize) // self.stepsize + nb_frames = max(nb_frames, 0) + frames_length = nb_frames * self.stepsize + \ + self.blocksize - self.stepsize + last_block_size = stack_length - frames_length + + if eod: + # Final zeropadding + pad_shape = tuple( + self.blocksize - last_block_size if i == 0 else x + for i, x in enumerate(frames.shape)) + stack = np.concatenate([stack, np.zeros(pad_shape, + dtype=frames.dtype)]) + nb_frames += 1 + + self.stack = stack[nb_frames * self.stepsize:] + + eod_list = np.repeat(False, nb_frames) + if eod and len(eod_list): + eod_list[-1] = eod + + for n in xrange(nb_frames): + yield ( + stack[ + n * self.stepsize:n * self.stepsize + self.blocksize], + eod_list[n]) + + @functools.wraps(process_func) + def wrapper(analyzer, frames, eod): + # Pre-processing + if not hasattr(analyzer, 'frames_buffer'): + analyzer.frames_buffer = framesBuffer(analyzer.input_blocksize, + analyzer.input_stepsize) + + # Processing + for adapted_frames, adapted_eod in analyzer.frames_buffer.frames(frames, eod): + process_func(analyzer, adapted_frames, adapted_eod) + + return frames, eod + return wrapper + + +if __name__ == "__main__": + import doctest + doctest.testmod() -- 2.39.5