import os.path
+from glib import GError as GST_IOError
+
+
class TestDecoding(TestCase):
"Test decoding features"
def setUp(self):
self.samplerate, self.channels, self.blocksize = None, None, None
+ self.start = 0
+ self.duration = None
+
+ self.expected_samplerate = 44100
+ self.expected_channels = 2
+ self.expected_totalframes = 352800
def testWav(self):
"Test wav decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep.wav")
- self.expected_channels = 2
- self.expected_samplerate = 44100
def testWavMono(self):
"Test mono wav decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep_mono.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep_mono.wav")
self.expected_channels = 1
- self.expected_samplerate = 44100
def testWav32k(self):
"Test 32kHz wav decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep_32000.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep_32000.wav")
- self.expected_channels = 2
- self.expected_samplerate = 32000
+ expected_samplerate = 32000
+ ratio = expected_samplerate/float(self.expected_samplerate)
+
+ self.expected_totalframes = int(self.expected_totalframes * ratio)
+ self.expected_samplerate = expected_samplerate
def testFlac(self):
"Test flac decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.flac")
-
- self.expected_channels = 2
- self.expected_samplerate = 44100
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep.flac")
def testOgg(self):
"Test ogg decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.ogg")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep.ogg")
- self.expected_channels = 2
- self.expected_samplerate = 44100
+ self.expected_totalframes = 352832
def testMp3(self):
"Test mp3 decoding"
- self.source = os.path.join (os.path.dirname(__file__), "samples/sweep.mp3")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "samples/sweep.mp3")
- self.expected_channels = 2
- self.expected_samplerate = 44100
+ self.expected_totalframes = 353664
def tearDown(self):
- decoder = FileDecoder(self.source)
+ decoder = FileDecoder(uri=self.source,
+ start=self.start,
+ duration=self.duration)
- decoder.setup(samplerate = self.samplerate, channels = self.channels, blocksize = self.blocksize)
+ decoder.setup(samplerate=self.samplerate, channels=self.channels,
+ blocksize=self.blocksize)
- totalframes = 0.
+ totalframes = 0
while True:
frames, eod = decoder.process()
totalframes += frames.shape[0]
- if eod or decoder.eod: break
- self.assertEquals(frames.shape[0], decoder.blocksize() )
- self.assertEquals(frames.shape[1], decoder.channels() )
+ if eod or decoder.eod:
+ break
+ self.assertEquals(frames.shape[0], decoder.blocksize())
+ self.assertEquals(frames.shape[1], decoder.channels())
ratio = decoder.output_samplerate / float(decoder.input_samplerate)
if 0:
self.assertEquals(decoder.input_channels, decoder.output_channels)
# and if we know the expected channels, check the output match
if self.expected_channels:
- self.assertEquals(self.expected_channels, decoder.output_channels)
+ self.assertEquals(self.expected_channels, decoder.output_channels)
# do the same with the sampling rate
if self.samplerate:
self.assertEquals(self.samplerate, decoder.output_samplerate)
if self.expected_samplerate:
self.assertEquals(self.expected_samplerate, decoder.output_samplerate)
- # FIXME compute actual number of frames from file
- if ratio == 1:
- if os.path.splitext(self.source)[-1].lower() == '.mp3':
- self.assertEquals(totalframes, 353664)
- elif os.path.splitext(self.source)[-1].lower() == '.ogg':
- self.assertEquals(totalframes, 352832)
- else:
- if '_32000.wav' in self.source:
- self.assertEquals(totalframes, 256000)
- else:
- self.assertEquals(totalframes, 352800)
+ self.assertEquals(totalframes, self.expected_totalframes)
+
+
+class TestDecodingSegment(TestDecoding):
+
+ def setUp(self):
+ super(TestDecodingSegment, self).setUp()
+ self.start = 1
+ self.duration = 3
+
+ self.expected_totalframes = self.duration * self.expected_samplerate
+
+
+ def testMp3(self):
+ "Test mp3 decoding"
+ super(TestDecodingSegment, self).testMp3()
+ self.expected_totalframes = self.duration * self.expected_samplerate + 1
+
+ def testWav(self):
+ "Test wav decoding"
+ super(TestDecodingSegment, self).testWav()
+
+ def testWavMono(self):
+ "Test mono wav decoding"
+ super(TestDecodingSegment, self).testWavMono()
+
+ def testWav32k(self):
+ "Test 32kHz wav decoding"
+ super(TestDecodingSegment, self).testWav32k()
+
+ @unittest.skip("demonstrating skipping")
+ def testFlac(self):
+ "Test flac decoding"
+
+ @unittest.skip("demonstrating skipping")
+ def testOgg(self):
+ "Test ogg decoding"
+
+
+class TestDecodingSegmentDefaultStart(TestDecodingSegment):
+
+ def setUp(self):
+ super(TestDecodingSegmentDefaultStart, self).setUp()
+ self.duration = 1
+ self.expected_totalframes = self.duration * self.expected_samplerate
+
+ def testMp3(self):
+ "Test mp3 decoding"
+ super(TestDecodingSegmentDefaultStart, self).testMp3()
+ self.expected_totalframes = self.duration * self.expected_samplerate + 1
+
+
+class TestDecodingSegmentDefaultDuration(TestDecodingSegment):
+
+ def setUp(self):
+ super(TestDecodingSegment, self).setUp()
+ self.start = 1
+ self.expected_totalframes = self.expected_totalframes - self.start * self.expected_samplerate
+
+ def testWav(self):
+ "Test wav decoding"
+ super(TestDecodingSegment, self).testWav()
+
+ def testWavMono(self):
+ "Test mono wav decoding"
+ super(TestDecodingSegment, self).testWavMono()
+
+ def testWav32k(self):
+ "Test 32kHz wav decoding"
+ super(TestDecodingSegment, self).testWav32k()
+
+ def testMp3(self):
+ "Test mp3 decoding"
+ super(TestDecodingSegment, self).testMp3()
+ self.expected_totalframes = 308701
+
class TestDecodingStereo(TestDecoding):
def setUp(self):
+ super(TestDecodingStereo, self).setUp()
self.samplerate, self.channels, self.blocksize = None, 2, None
+
class TestDecodingMonoUpsampling(TestDecoding):
def setUp(self):
+ super(TestDecodingMonoUpsampling, self).setUp()
self.samplerate, self.channels, self.blocksize = 48000, None, None
+ self.expected_totalframes = 384000
+
+ def testMp3(self):
+ "Test mp3 decoding"
+ super(TestDecodingMonoUpsampling, self).testMp3()
+ self.expected_totalframes = 384941
+
+ def testWav(self):
+ "Test wav decoding"
+ super(TestDecodingMonoUpsampling, self).testWav()
+
+ def testWavMono(self):
+ "Test mono wav decoding"
+ super(TestDecodingMonoUpsampling, self).testWavMono()
+
+ def testWav32k(self):
+ "Test 32kHz wav decoding"
+ super(TestDecodingMonoUpsampling, self).testWav32k()
+ self.expected_totalframes = 384000
+ def testFlac(self):
+ "Test flac decoding"
+ super(TestDecodingMonoUpsampling, self).testFlac()
+
+ def testOgg(self):
+ "Test ogg decoding"
+ super(TestDecodingMonoUpsampling, self).testOgg()
+ self.expected_totalframes = 384000
+
+
+@unittest.skip("demonstrating skipping")
class TestDecodingMonoDownsampling(TestDecoding):
def setUp(self):
+ super(TestDecodingMonoDownsampling, self).setUp()
self.samplerate, self.channels, self.blocksize = 16000, None, None
+
+@unittest.skip("demonstrating skipping")
class TestDecodingStereoDownsampling(TestDecoding):
def setUp(self):
+ super(TestDecodingStereoDownsampling, self).setUp()
self.samplerate, self.channels, self.blocksize = 32000, 2, None
+
+@unittest.skip("demonstrating skipping")
class TestDecodingStereoUpsampling(TestDecoding):
def setUp(self):
+ super(TestDecodingStereoUpsampling, self).setUp()
self.samplerate, self.channels, self.blocksize = 96000, 2, None
+
class TestDecodingShortBlock(TestDecoding):
def setUp(self):
+ super(TestDecodingShortBlock, self).setUp()
self.samplerate, self.channels, self.blocksize = None, None, 256
+
class TestDecodingLongBlock(TestDecoding):
def setUp(self):
+ super(TestDecodingLongBlock, self).setUp()
self.samplerate, self.channels, self.blocksize = None, None, 1024*8*2
+
class TestDecodingWrongFiles(TestCase):
"Test decoding features"
def testMissingFile(self):
"Test decoding missing file"
- self.source = os.path.join (os.path.dirname(__file__), "a_missing_file_blahblah.wav")
+ self.source = os.path.join(os.path.dirname(__file__),
+ "a_missing_file_blahblah.wav")
+
self.assertRaises(IOError, FileDecoder, self.source)
def testDevNull(self):
"Test decoding dev null"
self.source = "/dev/null"
decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ self.assertRaises(GST_IOError, FileDecoder.setup, decoder)
def testNoAudioStream(self):
"Test decoding file withouth audio stream"
self.source = __file__
decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ self.assertRaises(GST_IOError, FileDecoder.setup, decoder)
def testEmptyFile(self):
"Test decoding empty file"
self.tmpfile = tempfile.NamedTemporaryFile(delete=True)
self.source = self.tmpfile.name
decoder = FileDecoder(self.source)
- self.assertRaises(IOError, FileDecoder.setup, decoder)
+ self.assertRaises(GST_IOError, FileDecoder.setup, decoder)
self.tmpfile.close()
if __name__ == '__main__':
else:
raise IOError('File not found!')
- # Set the default duration from the length of the file
- if not(duration):
- from gst.pbutils import Discoverer
- #import gobject
- uri_discoverer = Discoverer(GST_DISCOVER_TIMEOUT)
- uri_info = uri_discoverer.discover_uri(self.uri)
- duration = uri_info.get_duration() / gst.SECOND - start
-
-
self.uri_start = start
self.uri_duration = duration
+ if start==0 and duration is None:
+ self.IS_SEGMENT = False
+ else:
+ self.IS_SEGMENT = True
+
+ def set_uri_default_duration(self):
+ # Set the duration from the length of the file
+ from gst.pbutils import Discoverer
+ #import gobject
+ uri_discoverer = Discoverer(GST_DISCOVER_TIMEOUT)
+ uri_info = uri_discoverer.discover_uri(self.uri)
+ self.uri_duration = (uri_info.get_duration() / gst.SECOND -
+ self.uri_start)
def setup(self, channels=None, samplerate=None, blocksize=None):
+ if not(self.uri_duration):
+ self.set_uri_default_duration()
+
# a lock to wait wait for gstreamer thread to be ready
import threading
self.discovered_cond = threading.Condition(threading.Lock())
# the output data format we want
if blocksize:
- self.output_blocksize = blocksize
+ self.output_blocksize = blocksize
if samplerate:
self.output_samplerate = int(samplerate)
if channels:
- self.output_channels = int(channels)
-
- uri = self.uri
- # Convert uri_start and uri_duration to nanoseconds
- uri_start = uint64(round(self.uri_start * gst.SECOND))
- uri_duration = int64(round(self.uri_duration * gst.SECOND))
-
- # Create the pipe with Gnonlin gnlurisource
- self.pipe = ''' gnlurisource uri={uri}
- start=0
- duration={uri_duration}
- media-start={uri_start}
- media-duration={uri_duration}
- ! audioconvert name=audioconvert
- ! audioresample
- ! appsink name=sink sync=False async=True
- '''.format(**locals())
+ self.output_channels = int(channels)
+
+ if self.IS_SEGMENT:
+ # Create the pipe with Gnonlin gnlurisource
+ self.pipe = ''' gnlurisource uri={uri}
+ start=0
+ duration={uri_duration}
+ media-start={uri_start}
+ media-duration={uri_duration}
+ ! audioconvert name=audioconvert
+ ! audioresample
+ ! appsink name=sink sync=False async=True
+ '''.format(uri = self.uri,
+ uri_start = uint64(round(self.uri_start * gst.SECOND)),
+ uri_duration = int64(round(self.uri_duration * gst.SECOND)))
+ # convert uri_start and uri_duration to nanoseconds
+ else:
+ # Create the pipe with standard Gstreamer uridecodbin
+ self.pipe = ''' uridecodebin name=uridecodebin uri={uri}
+ ! audioconvert name=audioconvert
+ ! audioresample
+ ! appsink name=sink sync=False async=True
+ '''.format(uri = self.uri)
self.pipeline = gst.parse_launch(self.pipe)