]> git.parisson.com Git - timeside.git/commitdiff
Analyzers : Add preprocessors as process decorators
authorThomas Fillon <thomas@parisson.com>
Fri, 15 Nov 2013 13:57:19 +0000 (14:57 +0100)
committerThomas Fillon <thomas@parisson.com>
Fri, 15 Nov 2013 13:57:19 +0000 (14:57 +0100)
tests/test_analyzer_preprocessors.py [new file with mode: 0644]
timeside/analyzer/preprocessors.py [new file with mode: 0644]

diff --git a/tests/test_analyzer_preprocessors.py b/tests/test_analyzer_preprocessors.py
new file mode 100644 (file)
index 0000000..b4b703b
--- /dev/null
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Author : Thomas fillon <thomas@parisson.com>
+
+#! /usr/bin/env python
+
+from unit_timeside import *
+from timeside.decoder import *
+from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
+import numpy as np
+
+BLOCKSIZE = 1024
+STEPSIZE = 256
+
+class FakeAnalyzer(object):
+    def __init__(self, blocksize=BLOCKSIZE, stepsize=STEPSIZE):
+        self.frames = []  # Container for the frame as viewed by process
+        self.input_blocksize = blocksize
+        self.input_stepsize= stepsize
+
+    def process(self, frames, eod):
+        self.frames.append(frames)
+        return frames, eod
+
+
+class TestAnalyzerPreProcessors(TestCase):
+
+    def tearDown(self):
+
+        analyzer = FakeAnalyzer()
+
+        process_output = []
+        for frames, eod in zip(self.input_frames, self.input_eod):
+            process_output.append(analyzer.decorated_process(frames, eod))
+
+        output_frames = np.asarray([frames for frames, _ in process_output])
+        output_eod = [eod for _, eod in process_output]
+
+        self.assertEqual(self.input_frames.tolist(), output_frames.tolist())
+        self.assertEqual(self.input_eod, output_eod)
+        self.assertEqual(self.process_frames.tolist(),
+                         np.asarray(analyzer.frames).tolist())
+
+
+class TestDownmixToMono(TestAnalyzerPreProcessors):
+
+    def setUp(self):
+        self.decorator = downmix_to_mono
+        # Decorate the process
+        FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process)
+
+    def test_on_mono(self):
+        "Run on stereo, eod = False"
+        self.input_frames = np.random.randn(30, 4096)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.process_frames = self.input_frames
+
+    def test_on_stereo(self):
+        "Run on stereo, eod = False"
+        self.input_frames = np.random.randn(30, 4096, 2)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.process_frames = self.input_frames.mean(axis=-1)
+
+    def test_on_multichannel(self):
+        "Run on multi-channel, eod = False"
+        self.input_frames = np.random.randn(30, 4096, 6)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.process_frames = self.input_frames.mean(axis=-1)
+
+    def test_on_mono_eod_true(self):
+        "Run on mono, last eod = True"
+        self.input_frames = np.random.randn(30, 4096)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.input_eod[-1] = True
+        self.process_frames = self.input_frames
+
+    def test_on_stereo_eod_true(self):
+        "Run on stereo, last eod = True"
+        self.input_frames = np.random.randn(30, 4096, 2)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.input_eod[-1] = True
+        self.process_frames = self.input_frames.mean(axis=-1)
+
+    def test_on_multichannel_eod_true(self):
+        "Run on multi-channel, last eod = True"
+        self.input_frames = np.random.randn(30, 4096, 6)
+        self.input_eod = np.repeat(False, 30).tolist()
+        self.input_eod[-1] = True
+        self.process_frames = self.input_frames.mean(axis=-1)
+
+class TestFramesAdapter(TestAnalyzerPreProcessors, TestCase):
+
+    def setUp(self):
+        self.decorator = frames_adapter
+        # Decorate the process
+        FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process)
+
+    def test_on_mono(self):
+        "Run on mono"
+        self.input_frames = np.arange(0, 2500).reshape(5, -1)
+        self.input_eod = [False, False, False, False, False]
+
+        self.process_frames = np.asarray([range(0, 1024),
+                                          range(256, 1280),
+                                          range(512, 1536),
+                                          range(768, 1792),
+                                          range(1024, 2048),
+                                          range(1280, 2304)])
+
+    def test_on_stereo(self):
+        "Run on stereo"
+        self.input_frames = np.arange(0, 5000).reshape(5, -1, 2)
+        self.input_eod = [False, False, False, False, False]
+
+        self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2),
+                                          np.arange(512, 2560).reshape(-1, 2),
+                                          np.arange(1024, 3072).reshape(-1, 2),
+                                          np.arange(1536, 3584).reshape(-1, 2),
+                                          np.arange(2048, 4096).reshape(-1, 2),
+                                          np.arange(2560, 4608).reshape(-1, 2)])
+    def test_on_mono_eod_true(self):
+        "Run on mono, last eod = True"
+        self.input_frames = np.arange(0, 2500).reshape(5, -1)
+        self.input_eod = [False, False, False, False, True]
+        last_frames = range(1536, 2500)
+        last_frames.extend([0]*60)
+        self.process_frames = np.asarray([range(0, 1024),
+                                          range(256, 1280),
+                                          range(512, 1536),
+                                          range(768, 1792),
+                                          range(1024, 2048),
+                                          range(1280, 2304),
+                                          last_frames])
+
+    def test_on_stereo_eod_true(self):
+        "Run on stereo, last eod = True"
+        self.input_frames = np.arange(0, 5000).reshape(5, -1, 2)
+        self.input_eod = [False, False, False, False, True]
+        last_frames = np.hstack([np.arange(3072, 5000),
+                                 np.zeros((120,))]).reshape(-1, 2)
+        self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2),
+                                          np.arange(512, 2560).reshape(-1, 2),
+                                          np.arange(1024, 3072).reshape(-1, 2),
+                                          np.arange(1536, 3584).reshape(-1, 2),
+                                          np.arange(2048, 4096).reshape(-1, 2),
+                                          np.arange(2560, 4608).reshape(-1, 2),
+                                          last_frames])
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
diff --git a/timeside/analyzer/preprocessors.py b/timeside/analyzer/preprocessors.py
new file mode 100644 (file)
index 0000000..c96a236
--- /dev/null
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2009-2013 Parisson SARL
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author : Thomas fillon <thomas@parisson.fr>
+
+
+def downmix_to_mono(process_func):
+    '''
+    Pre-processing decorator that downmixes frames from multi-channel to mono
+    Downmix by averaging all channels
+    >>> @downmix_to_mono
+    ... def process(analyzer,frames,eod):
+    ...     print 'Frames, eod inside process :'
+    ...     print frames, eod
+    ...     return frames, eod
+    ...
+    >>> import numpy as np
+    >>> frames = np.asarray([[1,2],[3,4],[5,6],[7,8],[9,10]])
+    >>> eod = False
+    >>> frames_, eod_ = process(object(),frames,eod)
+    Frames, eod inside process :
+    [ 1.5  3.5  5.5  7.5  9.5] False
+
+    Outside Process frames and eod are preserved :
+    >>> frames_
+    array([[ 1,  2],
+           [ 3,  4],
+           [ 5,  6],
+           [ 7,  8],
+           [ 9, 10]])
+    >>> eod_
+    False
+    '''
+
+    import functools
+
+    @functools.wraps(process_func)
+    def wrapper(analyzer, frames, eod):
+        # Pre-processing
+        if frames.ndim > 1:
+            downmix_frames = frames.mean(axis=-1)
+        else:
+            downmix_frames = frames
+        # Processing
+        process_func(analyzer, downmix_frames, eod)
+
+        return frames, eod
+    return wrapper
+
+
+def frames_adapter(process_func):
+    '''
+    Pre-processing decorator that adapt frames to match blocksize and stepsize
+    >>> @frames_adapter
+    ... def process(analyzer,frames,eod):
+    ...     analyzer.frames.append(frames)
+    ...     return frames, eod
+    ...
+    >>> class Fake_Analyzer(object):
+    ...     def __init__(self):
+    ...         self.input_blocksize = 4
+    ...         self.input_stepsize = 3
+    ...         self.frames = [] # Container for the frame as viewed by process
+    >>> import numpy as np
+    >>> analyzer = Fake_Analyzer()
+    >>> frames = np.asarray(range(0,12))
+    >>> eod = False
+    >>> frames_, eod_ = process(analyzer,frames,eod)
+
+    Inside the process the frames have been adapted to match input_blocksize
+    and input_stepsize
+    >>> analyzer.frames
+    [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9])]
+
+    Outside the process, the original frames and eod are preserved :
+    >>> frames_
+    array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11])
+    >>> eod_
+    False
+
+    Releasing the process with eod=True will zeropad the last frame if necessary
+    >>> frames = np.asarray(range(12,14))
+    >>> eod = True
+    >>> frames_, eod_ = process(analyzer,frames,eod)
+    >>> analyzer.frames
+    [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9]), array([ 9, 10, 11, 12]), array([12, 13,  0,  0])]
+    '''
+
+    import functools
+    import numpy as np
+
+    class framesBuffer(object):
+
+        def __init__(self, blocksize, stepsize):
+            self.blocksize = blocksize
+            self.stepsize = stepsize
+            self.stack = None
+
+        def frames(self, frames, eod):
+            if self.stack is not None:
+                stack = np.concatenate([self.stack, frames])
+            else:
+                stack = frames.copy()
+
+            stack_length = len(stack)
+
+            nb_frames = (
+                stack_length - self.blocksize + self.stepsize) // self.stepsize
+            nb_frames = max(nb_frames, 0)
+            frames_length = nb_frames * self.stepsize + \
+                self.blocksize - self.stepsize
+            last_block_size = stack_length - frames_length
+
+            if eod:
+                # Final zeropadding
+                pad_shape = tuple(
+                    self.blocksize - last_block_size if i == 0 else x
+                    for i, x in enumerate(frames.shape))
+                stack = np.concatenate([stack, np.zeros(pad_shape,
+                                                        dtype=frames.dtype)])
+                nb_frames += 1
+
+            self.stack = stack[nb_frames * self.stepsize:]
+
+            eod_list = np.repeat(False, nb_frames)
+            if eod and len(eod_list):
+                eod_list[-1] = eod
+
+            for n in xrange(nb_frames):
+                yield (
+                    stack[
+                        n * self.stepsize:n * self.stepsize + self.blocksize],
+                    eod_list[n])
+
+    @functools.wraps(process_func)
+    def wrapper(analyzer, frames, eod):
+        # Pre-processing
+        if not hasattr(analyzer, 'frames_buffer'):
+            analyzer.frames_buffer = framesBuffer(analyzer.input_blocksize,
+                                                  analyzer.input_stepsize)
+
+        # Processing
+        for adapted_frames, adapted_eod in analyzer.frames_buffer.frames(frames, eod):
+            process_func(analyzer, adapted_frames, adapted_eod)
+
+        return frames, eod
+    return wrapper
+
+
+if __name__ == "__main__":
+    import doctest
+    doctest.testmod()