--- /dev/null
+# -*- coding: utf-8 -*-
+# Author : Thomas fillon <thomas@parisson.com>
+
+#! /usr/bin/env python
+
+from unit_timeside import *
+from timeside.decoder import *
+from timeside.analyzer.preprocessors import downmix_to_mono, frames_adapter
+import numpy as np
+
+BLOCKSIZE = 1024
+STEPSIZE = 256
+
+class FakeAnalyzer(object):
+ def __init__(self, blocksize=BLOCKSIZE, stepsize=STEPSIZE):
+ self.frames = [] # Container for the frame as viewed by process
+ self.input_blocksize = blocksize
+ self.input_stepsize= stepsize
+
+ def process(self, frames, eod):
+ self.frames.append(frames)
+ return frames, eod
+
+
+class TestAnalyzerPreProcessors(TestCase):
+
+ def tearDown(self):
+
+ analyzer = FakeAnalyzer()
+
+ process_output = []
+ for frames, eod in zip(self.input_frames, self.input_eod):
+ process_output.append(analyzer.decorated_process(frames, eod))
+
+ output_frames = np.asarray([frames for frames, _ in process_output])
+ output_eod = [eod for _, eod in process_output]
+
+ self.assertEqual(self.input_frames.tolist(), output_frames.tolist())
+ self.assertEqual(self.input_eod, output_eod)
+ self.assertEqual(self.process_frames.tolist(),
+ np.asarray(analyzer.frames).tolist())
+
+
+class TestDownmixToMono(TestAnalyzerPreProcessors):
+
+ def setUp(self):
+ self.decorator = downmix_to_mono
+ # Decorate the process
+ FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process)
+
+ def test_on_mono(self):
+ "Run on stereo, eod = False"
+ self.input_frames = np.random.randn(30, 4096)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.process_frames = self.input_frames
+
+ def test_on_stereo(self):
+ "Run on stereo, eod = False"
+ self.input_frames = np.random.randn(30, 4096, 2)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.process_frames = self.input_frames.mean(axis=-1)
+
+ def test_on_multichannel(self):
+ "Run on multi-channel, eod = False"
+ self.input_frames = np.random.randn(30, 4096, 6)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.process_frames = self.input_frames.mean(axis=-1)
+
+ def test_on_mono_eod_true(self):
+ "Run on mono, last eod = True"
+ self.input_frames = np.random.randn(30, 4096)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.input_eod[-1] = True
+ self.process_frames = self.input_frames
+
+ def test_on_stereo_eod_true(self):
+ "Run on stereo, last eod = True"
+ self.input_frames = np.random.randn(30, 4096, 2)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.input_eod[-1] = True
+ self.process_frames = self.input_frames.mean(axis=-1)
+
+ def test_on_multichannel_eod_true(self):
+ "Run on multi-channel, last eod = True"
+ self.input_frames = np.random.randn(30, 4096, 6)
+ self.input_eod = np.repeat(False, 30).tolist()
+ self.input_eod[-1] = True
+ self.process_frames = self.input_frames.mean(axis=-1)
+
+class TestFramesAdapter(TestAnalyzerPreProcessors, TestCase):
+
+ def setUp(self):
+ self.decorator = frames_adapter
+ # Decorate the process
+ FakeAnalyzer.decorated_process = self.decorator(FakeAnalyzer.process)
+
+ def test_on_mono(self):
+ "Run on mono"
+ self.input_frames = np.arange(0, 2500).reshape(5, -1)
+ self.input_eod = [False, False, False, False, False]
+
+ self.process_frames = np.asarray([range(0, 1024),
+ range(256, 1280),
+ range(512, 1536),
+ range(768, 1792),
+ range(1024, 2048),
+ range(1280, 2304)])
+
+ def test_on_stereo(self):
+ "Run on stereo"
+ self.input_frames = np.arange(0, 5000).reshape(5, -1, 2)
+ self.input_eod = [False, False, False, False, False]
+
+ self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2),
+ np.arange(512, 2560).reshape(-1, 2),
+ np.arange(1024, 3072).reshape(-1, 2),
+ np.arange(1536, 3584).reshape(-1, 2),
+ np.arange(2048, 4096).reshape(-1, 2),
+ np.arange(2560, 4608).reshape(-1, 2)])
+ def test_on_mono_eod_true(self):
+ "Run on mono, last eod = True"
+ self.input_frames = np.arange(0, 2500).reshape(5, -1)
+ self.input_eod = [False, False, False, False, True]
+ last_frames = range(1536, 2500)
+ last_frames.extend([0]*60)
+ self.process_frames = np.asarray([range(0, 1024),
+ range(256, 1280),
+ range(512, 1536),
+ range(768, 1792),
+ range(1024, 2048),
+ range(1280, 2304),
+ last_frames])
+
+ def test_on_stereo_eod_true(self):
+ "Run on stereo, last eod = True"
+ self.input_frames = np.arange(0, 5000).reshape(5, -1, 2)
+ self.input_eod = [False, False, False, False, True]
+ last_frames = np.hstack([np.arange(3072, 5000),
+ np.zeros((120,))]).reshape(-1, 2)
+ self.process_frames = np.asarray([np.arange(0, 2048).reshape(-1, 2),
+ np.arange(512, 2560).reshape(-1, 2),
+ np.arange(1024, 3072).reshape(-1, 2),
+ np.arange(1536, 3584).reshape(-1, 2),
+ np.arange(2048, 4096).reshape(-1, 2),
+ np.arange(2560, 4608).reshape(-1, 2),
+ last_frames])
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2009-2013 Parisson SARL
+#
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author : Thomas fillon <thomas@parisson.fr>
+
+
+def downmix_to_mono(process_func):
+ '''
+ Pre-processing decorator that downmixes frames from multi-channel to mono
+ Downmix by averaging all channels
+ >>> @downmix_to_mono
+ ... def process(analyzer,frames,eod):
+ ... print 'Frames, eod inside process :'
+ ... print frames, eod
+ ... return frames, eod
+ ...
+ >>> import numpy as np
+ >>> frames = np.asarray([[1,2],[3,4],[5,6],[7,8],[9,10]])
+ >>> eod = False
+ >>> frames_, eod_ = process(object(),frames,eod)
+ Frames, eod inside process :
+ [ 1.5 3.5 5.5 7.5 9.5] False
+
+ Outside Process frames and eod are preserved :
+ >>> frames_
+ array([[ 1, 2],
+ [ 3, 4],
+ [ 5, 6],
+ [ 7, 8],
+ [ 9, 10]])
+ >>> eod_
+ False
+ '''
+
+ import functools
+
+ @functools.wraps(process_func)
+ def wrapper(analyzer, frames, eod):
+ # Pre-processing
+ if frames.ndim > 1:
+ downmix_frames = frames.mean(axis=-1)
+ else:
+ downmix_frames = frames
+ # Processing
+ process_func(analyzer, downmix_frames, eod)
+
+ return frames, eod
+ return wrapper
+
+
+def frames_adapter(process_func):
+ '''
+ Pre-processing decorator that adapt frames to match blocksize and stepsize
+ >>> @frames_adapter
+ ... def process(analyzer,frames,eod):
+ ... analyzer.frames.append(frames)
+ ... return frames, eod
+ ...
+ >>> class Fake_Analyzer(object):
+ ... def __init__(self):
+ ... self.input_blocksize = 4
+ ... self.input_stepsize = 3
+ ... self.frames = [] # Container for the frame as viewed by process
+ >>> import numpy as np
+ >>> analyzer = Fake_Analyzer()
+ >>> frames = np.asarray(range(0,12))
+ >>> eod = False
+ >>> frames_, eod_ = process(analyzer,frames,eod)
+
+ Inside the process the frames have been adapted to match input_blocksize
+ and input_stepsize
+ >>> analyzer.frames
+ [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9])]
+
+ Outside the process, the original frames and eod are preserved :
+ >>> frames_
+ array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
+ >>> eod_
+ False
+
+ Releasing the process with eod=True will zeropad the last frame if necessary
+ >>> frames = np.asarray(range(12,14))
+ >>> eod = True
+ >>> frames_, eod_ = process(analyzer,frames,eod)
+ >>> analyzer.frames
+ [array([0, 1, 2, 3]), array([3, 4, 5, 6]), array([6, 7, 8, 9]), array([ 9, 10, 11, 12]), array([12, 13, 0, 0])]
+ '''
+
+ import functools
+ import numpy as np
+
+ class framesBuffer(object):
+
+ def __init__(self, blocksize, stepsize):
+ self.blocksize = blocksize
+ self.stepsize = stepsize
+ self.stack = None
+
+ def frames(self, frames, eod):
+ if self.stack is not None:
+ stack = np.concatenate([self.stack, frames])
+ else:
+ stack = frames.copy()
+
+ stack_length = len(stack)
+
+ nb_frames = (
+ stack_length - self.blocksize + self.stepsize) // self.stepsize
+ nb_frames = max(nb_frames, 0)
+ frames_length = nb_frames * self.stepsize + \
+ self.blocksize - self.stepsize
+ last_block_size = stack_length - frames_length
+
+ if eod:
+ # Final zeropadding
+ pad_shape = tuple(
+ self.blocksize - last_block_size if i == 0 else x
+ for i, x in enumerate(frames.shape))
+ stack = np.concatenate([stack, np.zeros(pad_shape,
+ dtype=frames.dtype)])
+ nb_frames += 1
+
+ self.stack = stack[nb_frames * self.stepsize:]
+
+ eod_list = np.repeat(False, nb_frames)
+ if eod and len(eod_list):
+ eod_list[-1] = eod
+
+ for n in xrange(nb_frames):
+ yield (
+ stack[
+ n * self.stepsize:n * self.stepsize + self.blocksize],
+ eod_list[n])
+
+ @functools.wraps(process_func)
+ def wrapper(analyzer, frames, eod):
+ # Pre-processing
+ if not hasattr(analyzer, 'frames_buffer'):
+ analyzer.frames_buffer = framesBuffer(analyzer.input_blocksize,
+ analyzer.input_stepsize)
+
+ # Processing
+ for adapted_frames, adapted_eod in analyzer.frames_buffer.frames(frames, eod):
+ process_func(analyzer, adapted_frames, adapted_eod)
+
+ return frames, eod
+ return wrapper
+
+
+if __name__ == "__main__":
+ import doctest
+ doctest.testmod()