from __future__ import division
from timeside.decoder.core import FileDecoder
+from timeside.core import ProcessPipe
from unit_timeside import *
import os.path
self.expected_totalframes = 310715 # was 308701 ?
+class TestDecodingSegmentBadParameters(unittest.TestCase):
+
+ def setUp(self):
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep.wav")
+
+ def test_bad_start_value(self):
+ "Test decoding segment with start value exceeding the media duration"
+ decoder = FileDecoder(self.source, start=10)
+ pipe = ProcessPipe(decoder)
+ self.assertRaises(ValueError, pipe.run)
+
+ def test_bad_duration_value(self):
+ "Test decoding segment with a too large duration value argument"
+ decoder = FileDecoder(self.source, duration=10)
+ pipe = ProcessPipe(decoder)
+ self.assertRaises(ValueError, pipe.run)
+
+
class TestDecodingStereo(TestDecoding):
def setUp(self):
"Test decoding missing file"
self.source = os.path.join(os.path.dirname(__file__),
"a_missing_file_blahblah.wav")
-
self.assertRaises(IOError, FileDecoder, self.source)
def testDevNull(self):
"Test decoding dev null"
self.source = "/dev/null"
- decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ with self.assertRaises(IOError):
+ FileDecoder(self.source)
def testNoAudioStream(self):
"Test decoding file withouth audio stream"
self.source = __file__
- decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ with self.assertRaises(IOError):
+ FileDecoder(self.source)
def testEmptyFile(self):
"Test decoding empty file"
import tempfile
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
self.source = self.tmpfile.name
- decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ with self.assertRaises(IOError):
+ FileDecoder(self.source)
+
self.tmpfile.close()
if __name__ == '__main__':
self.stack = stack
self.uri = get_uri(uri)
-
- def set_uri_default_duration(self):
- # Set the duration from the length of the file
- uri_total_duration = get_media_uri_info(self.uri)['duration']
- self.uri_duration = uri_total_duration - self.uri_start
+ self.uri_total_duration = get_media_uri_info(self.uri)['duration']
def setup(self, channels=None, samplerate=None, blocksize=None):
self.process_pipe.frames_stack = []
if self.uri_duration is None:
- self.set_uri_default_duration()
+ # Set the duration from the length of the file
+ self.uri_duration = self.uri_total_duration - self.uri_start
+
+ if self.is_segment:
+ # Check start and duration value
+ if self.uri_start > self.uri_total_duration:
+ raise ValueError(('Segment start time value exceed media ' +
+ 'duration'))
+
+ if self.uri_start + self.uri_duration > self.uri_total_duration:
+ raise ValueError("""Segment duration value is too large \
+ given the media duration""")
# a lock to wait wait for gstreamer thread to be ready
import threading