]> git.parisson.com Git - timeside.git/commitdiff
Add test for encoder streaming
authorThomas Fillon <thomas@parisson.com>
Fri, 14 Feb 2014 10:58:26 +0000 (11:58 +0100)
committerThomas Fillon <thomas@parisson.com>
Fri, 14 Feb 2014 10:58:26 +0000 (11:58 +0100)
tests/sandbox/test_lolevel_streaming_threaded.py
tests/test_transcoding_streaming.py [new file with mode: 0644]

index f9831e35050087bce43773fee131d0be7164bb7f..c3a97eb7e3452556a2a142de755551b823056480 100644 (file)
@@ -48,11 +48,3 @@ dest2_size = os.path.getsize(dest2)
 print "Filesink filesize: %d" % dest1_size
 print "Appsink filesize: %d" % dest2_size
 #assert os.path.getsize(dest1) == os.path.getsize(dest2)
-
-# Sometime randomly freeze
-# Appsink file is always 1 buffer longer than filesink
-# TODO : Try to transcode with a pure gstreamer pipe to see the file length
-# maybe appsink is fine but filesink not ? just to be checked
-
-# INFO : sweep.mp3 transcoded with pure gst from sweep.flac with the same parameters
-# as in the timeside mp3 encoder gives 70535 bytes
diff --git a/tests/test_transcoding_streaming.py b/tests/test_transcoding_streaming.py
new file mode 100644 (file)
index 0000000..457f6eb
--- /dev/null
@@ -0,0 +1,89 @@
+#! /usr/bin/env python
+
+from __future__ import division
+
+from timeside.core import *
+from timeside.decoder import *
+from timeside.analyzer import *
+from timeside.encoder import *
+from timeside.component import *
+
+from unit_timeside import *
+from tools import tmp_file_sink
+import os.path
+
+
+class TestTranscodingStreaming(unittest.TestCase):
+    "Test transcoding and streaming"
+
+    def setUp(self):
+        self.source = os.path.join(os.path.dirname(__file__),
+                                   "samples/sweep.wav")
+        self.test_duration = True
+        self.test_channels = True
+        self.filesize_delta = None
+
+    def testMp3(self):
+        "Test conversion to mp3"
+        self.encoder_function = Mp3Encoder
+        self.filesize_delta = 156
+
+    def testOgg(self):
+        "Test conversion to ogg"
+        self.encoder_function = VorbisEncoder
+
+    def testWebM(self):
+        "Test conversion to webm"
+        self.encoder_function = WebMEncoder
+        self.test_duration = False  # webmmux encoder with streamable=true
+                                    # does not return a valid duration
+
+    def tearDown(self):
+        decoder = FileDecoder(self.source)
+
+        file_extension = '.' + self.encoder_function.file_extension()
+
+        self.target_filesink = tmp_file_sink(prefix=self.__class__.__name__,
+                                             suffix=file_extension)
+
+        self.target_appsink = tmp_file_sink(prefix=self.__class__.__name__,
+                                            suffix=file_extension)
+
+        encoder = self.encoder_function(self.target_filesink, streaming=True)
+        pipe = (decoder | encoder)
+
+        with open(self.target_appsink, 'w') as f:
+            for chunk in pipe.stream():
+                f.write(chunk)
+
+        decoder_encoded = FileDecoder(self.target_filesink)
+
+        pipe2 = ProcessPipe(decoder_encoded)
+        pipe2.run()
+
+        import os
+        filesink_size = os.path.getsize(self.target_filesink)
+        appsink_size = os.path.getsize(self.target_appsink)
+
+        os.unlink(self.target_filesink)
+        os.unlink(self.target_appsink)
+        #print decoder.channels(), decoder.samplerate(), written_frames
+        #print media_channels
+
+        if self.test_channels:
+            self.assertEqual(decoder.channels(), decoder_encoded.channels())
+        else:
+            self.assertEqual(2, decoder_encoded.channels())  # voaacenc bug ?
+
+        self.assertEqual(decoder.samplerate(),
+                         decoder_encoded.samplerate())
+
+        if self.test_duration:
+            self.assertAlmostEqual(decoder.input_duration,
+                                   decoder_encoded.input_duration,
+                                   delta=0.2)
+        self.assertAlmostEqual(filesink_size, appsink_size,
+                               delta=self.filesize_delta)
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())