+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from tools import check_samples
-
-check_samples()
from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.dc import MeanDCShift
-import os
+from timeside.tools.data_samples import samples as ts_samples
class TestAnalyzerDC(unittest.TestCase):
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- self.expected = {'mean_dc_shift': -0.000}
+ self.expected = {'mean_dc_shift': 0.004}
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
- self.expected = {'mean_dc_shift': 0.054}
+ def testOnScale(self):
+ "runs on C4 Scale"
+ self.source = ts_samples["C4_scale.wav"]
+ self.expected = {'mean_dc_shift': 0.034}
def tearDown(self):
decoder = FileDecoder(self.source)
from unit_timeside import unittest, TestRunner
from timeside.decoder.file import FileDecoder
from timeside.analyzer.level import Level
-import os
+from timeside.tools.data_samples import samples as ts_samples
class TestAnalyzerLevel(unittest.TestCase):
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- max_level_value = -6.021
- rms_level_value = -9.856
+ max_level_value = 0
+ rms_level_value = -2.995
self.expected = {'level.max': max_level_value,
'level.rms': rms_level_value}
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ def testOnC4_Scale(self):
+ "runs on C4 scale"
+ self.source = ts_samples["C4_scale.wav"]
- max_level_value = -4.054
- rms_level_value = -21.945
+ max_level_value = 0
+ rms_level_value = -3.705
self.expected = {'level.max': max_level_value,
'level.rms': rms_level_value}
import timeside
from timeside.decoder.file import FileDecoder
import numpy as np
-import os
+from timeside.tools.data_samples import samples as ts_samples
class TestAnalyzers_with_default(unittest.TestCase):
"""Test analyzer with default parameters"""
def setUp(self):
- source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ source = ts_samples["C4_scale.wav"]
self.decoder = FileDecoder(source)
#! /usr/bin/env python
from unit_timeside import unittest, TestRunner
-import os
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
def testOnGuitar(self):
"runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ self.source = ts_samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
#! /usr/bin/env python
from unit_timeside import unittest, TestRunner
-import os
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join (os.path.dirname(__file__), "samples", "guitar.wav")
+ def testOnScale(self):
+ "runs on C4 scale"
+ self.source = ts_samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
#! /usr/bin/env python
from unit_timeside import unittest, TestRunner
-import os
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ def testOnC4Scale(self):
+ "runs on C4 scale"
+ self.source = ts_samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
#! /usr/bin/env python
from unit_timeside import unittest, TestRunner
-import os
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ def testOnC4Scale(self):
+ "runs on C4 scale"
+ self.source = ts_samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
#! /usr/bin/env python
from unit_timeside import unittest, TestRunner
-import os
from timeside.decoder.file import FileDecoder
from timeside.core import get_processor
from timeside import _WITH_AUBIO
+from timeside.tools.data_samples import samples as ts_samples
@unittest.skipIf(not _WITH_AUBIO, 'Aubio library is not available')
def testOnSweep(self):
"runs on sweep"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "sweep.wav")
+ self.source = ts_samples["sweep.wav"]
- def testOnGuitar(self):
- "runs on guitar"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ def testOnC4Scale(self):
+ "runs on C4 scale"
+ self.source = ts_samples["C4_scale.wav"]
def tearDown(self):
decoder = FileDecoder(self.source)
from __future__ import division
-from numpy import arange, sin
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
from timeside.decoder.utils import get_uri, get_media_uri_info, path2uri
+from timeside.tools.data_samples import samples as ts_samples
import os.path
"Missing file raise IOerror"
self.source = os.path.join(os.path.dirname(__file__),
"a_missing_file_blahblah.wav")
+
def testNotValidUri(self):
"Not valid uri raise IOerror"
self.source = os.path.join("://not/a/valid/uri/parisson.com")
self.test_exact_duration = True
self.expected_channels = 2
self.expected_samplerate = 44100
- self.expected_depth = 16
+ self.expected_depth = 0 # ?
def testWav(self):
"Test wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.wav")
-
+ self.source = ts_samples["sweep.wav"]
def testWavMono(self):
"Test mono wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_mono.wav")
+ self.source = ts_samples["sweep_mono.wav"]
self.expected_channels = 1
def testWav32k(self):
"Test 32kHz wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_32000.wav")
+ self.source = ts_samples["sweep_32000.wav"]
self.expected_samplerate = 32000
def testFlac(self):
"Test flac decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.flac")
+ self.source = ts_samples["sweep.flac"]
+ self.expected_depth = 24
def testOgg(self):
"Test ogg decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.ogg")
+ self.source = ts_samples["sweep.ogg"]
+ self.test_exact_duration = False
self.expected_depth = 0 # ?
def testMp3(self):
"Test mp3 decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.mp3")
+ self.source = ts_samples["sweep.mp3"]
self.expected_depth = 32
self.test_exact_duration = False
-
def tearDown(self):
uri = get_uri(self.source)
uri_info = get_media_uri_info(uri)
self.assertAlmostEqual(self.source_duration,
uri_info['duration'],
places=1)
- self.assertEqual(self.expected_channels, uri_info['streams'][0]['channels'])
- self.assertEqual(self.expected_samplerate, uri_info['streams'][0]['samplerate'])
+ self.assertEqual(self.expected_channels,
+ uri_info['streams'][0]['channels'])
+ self.assertEqual(self.expected_samplerate,
+ uri_info['streams'][0]['samplerate'])
self.assertEqual(self.expected_depth, uri_info['streams'][0]['depth'])
+
if __name__ == '__main__':
unittest.main(testRunner=TestRunner())
from timeside.core import ProcessPipe
from unit_timeside import *
+from timeside.tools.data_samples import samples as ts_samples
import os.path
def testWav(self):
"Test wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.wav")
+ self.source = ts_samples["sweep.wav"]
def testWavMono(self):
"Test mono wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_mono.wav")
+ self.source = ts_samples["sweep_mono.wav"]
self.expected_channels = 1
def testWav32k(self):
"Test 32kHz wav decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_32000.wav")
+ self.source = ts_samples["sweep_32000.wav"]
expected_samplerate = 32000
ratio = expected_samplerate / self.expected_samplerate
def testFlac(self):
"Test flac decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.flac")
+ self.source = ts_samples["sweep.flac"]
self.expected_mime_type = 'audio/x-flac'
def testOgg(self):
"Test ogg decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.ogg")
+ self.source = ts_samples["sweep.ogg"]
- self.expected_totalframes = 352832
+ self.expected_totalframes = 352960
self.expected_mime_type = 'application/ogg'
self.test_exact_duration = False
def testMp3(self):
"Test mp3 decoding"
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.mp3")
+ self.source = ts_samples["sweep.mp3"]
self.expected_totalframes = 353664
self.expected_mime_type = 'audio/mpeg'
self.assertEqual(decoder.mime_type(), self.expected_mime_type)
- expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32] # +32 to handle some issue with ogg
- self.assertIn(totalframes, expected_totalframes)
+ #expected_totalframes = [self.expected_totalframes, self.expected_totalframes +32] # +32 to handle some issue with ogg
+
+ self.assertEqual(totalframes, self.expected_totalframes)
input_duration = decoder.input_totalframes / decoder.input_samplerate
output_duration = decoder.totalframes() / decoder.output_samplerate
def testMp3(self):
"Test mp3 decoding"
super(TestDecodingSegment, self).testMp3()
- self.expected_totalframes = 310715 # was 308701 ?
+ self.expected_totalframes = 309565 # was 308701 ?
class TestDecodingSegmentBadParameters(unittest.TestCase):
def testOgg(self):
"Test ogg decoding"
super(TestDecodingMonoUpsampling, self).testOgg()
- self.expected_totalframes = 384000
+ self.expected_totalframes = 384140
class TestDecodingMonoDownsampling(TestDecoding):
def testOgg(self):
"Test ogg decoding"
super(TestDecodingMonoDownsampling, self).testOgg()
- self.expected_totalframes = 127980
+ self.expected_totalframes = 128027
def testMp3(self):
"Test mp3 decoding"
def testOgg(self):
"Test ogg decoding"
super(TestDecodingStereoDownsampling, self).testOgg()
- self.expected_totalframes = 255992
+ self.expected_totalframes = 256085
def testMp3(self):
"Test mp3 decoding"
def testOgg(self):
"Test ogg decoding"
super(TestDecodingStereoUpsampling, self).testOgg()
- self.expected_totalframes = 768000
+ self.expected_totalframes = 768279
def testMp3(self):
"Test mp3 decoding"
from unit_timeside import unittest, TestRunner
import timeside
from timeside.decoder.file import FileDecoder
-import os
+from timeside.tools.data_samples import samples as ts_samples
class TestProcessPipe(unittest.TestCase):
"""Test process pipe (Quick and dirty)"""
# TODO clean up and complete
- source = os.path.join(os.path.dirname(__file__),
- "samples", "guitar.wav")
+ source = ts_samples["sweep.wav"]
+
pipe = timeside.core.ProcessPipe()
dec = FileDecoder(source)
from timeside.core import get_processor, ProcessPipe
from timeside.decoder.file import FileDecoder
-from timeside.component import *
-
-from unit_timeside import *
+#from timeside.component import *
+import os
+from unit_timeside import unittest, TestRunner
from tools import tmp_file_sink
-import os.path
+from timeside.tools.data_samples import samples as ts_samples
class TestTranscodingFromWav(unittest.TestCase):
"Test transcoding from wav"
def setUp(self):
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.wav")
+ self.source = ts_samples["sweep.wav"]
self.test_duration = True
self.test_channels = True
file_extension = '.' + encoder_cls.file_extension()
self.target = tmp_file_sink(prefix=self.__class__.__name__,
- suffix=file_extension)
+ suffix=file_extension)
encoder = encoder_cls(self.target)
(decoder | encoder).run()
pipe = ProcessPipe(decoder_encoded)
pipe.run()
- import os
os.unlink(self.target)
#print decoder.channels(), decoder.samplerate(), written_frames
decoder_encoded.input_duration,
delta=0.2)
+
class TestTranscodingFromMonoWav(TestTranscodingFromWav):
"Test transcoding from a mono wav"
def setUp(self):
super(TestTranscodingFromMonoWav, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_mono.wav")
+ self.source = ts_samples["sweep_mono.wav"]
def testM4a(self):
"Test conversion to m4a"
def setUp(self):
super(TestTranscodingFromAnotherWav, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/guitar.wav") # Mono
+ self.source = ts_samples["C4_scale.wav"] # Mono
class TestTranscodingFromMp3(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromMp3, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.mp3")
+ self.source = ts_samples["sweep.mp3"]
class TestTranscodingFromFlac(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromFlac, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.flac")
+ self.source = ts_samples["sweep.flac"]
class TestTranscodingFromOgg(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFromOgg, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.ogg")
-
-
-
+ self.source = ts_samples["sweep.ogg"]
class TestTranscodingFrom32kHzWav(TestTranscodingFromWav):
def setUp(self):
super(TestTranscodingFrom32kHzWav, self).setUp()
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep_32000.wav")
+ self.source = ts_samples["sweep_32000.wav"]
class TestTranscodingFromMissingFile(TestTranscodingFromWav):
#from timeside.encoder import *
#from timeside.component import *
-from unit_timeside import *
+from unit_timeside import unittest, TestRunner
from tools import tmp_file_sink
-import os.path
+from timeside.tools.data_samples import samples as ts_samples
class TestTranscodingStreaming(unittest.TestCase):
"Test transcoding and streaming"
def setUp(self):
- self.source = os.path.join(os.path.dirname(__file__),
- "samples/sweep.wav")
+ self.source = ts_samples["sweep.wav"]
self.test_duration = True
self.test_channels = True
self.filesize_delta = None
-import os
-import urllib
+import tempfile
-def check_samples():
- url = 'http://github.com/yomguy/timeside-samples/raw/master/samples/'
- samples = ['guitar.wav', 'sweep.wav', 'sweep_mono.wav', 'sweep_32000.wav', 'sweep.flac', 'sweep.ogg', 'sweep.mp3', 'sweep_source.wav']
- path = os.path.normpath(os.path.dirname(__file__))
- dir = path + os.sep + 'samples'
-
- if not os.path.exists(dir):
- os.makedirs(dir)
-
- for sample in samples:
- path = dir + os.sep + sample
- if not os.path.exists(path):
- print 'downloading: ' + sample
- f = open(path, 'w')
- u = urllib.urlopen(url+sample)
- f.write(u.read())
- f.close()
-
-
-def tmp_file_sink(prefix=None, suffix = None):
- import tempfile
+def tmp_file_sink(prefix=None, suffix=None):
tmpfile = tempfile.NamedTemporaryFile(delete=True,
prefix=prefix,
suffix=suffix)
tmpfile.close()
- return tmpfile.name
\ No newline at end of file
+ return tmpfile.name
import doctest
import sys
import time
-from tools import check_samples
-
-check_samples()
class _TextTestResult(unittest.TestResult):
self.stream = _WritelnDecorator(stream)
self.descriptions = descriptions
self.verbosity = verbosity
- check_samples()
def _makeResult(self):
return _TextTestResult(self.stream, self.descriptions, self.verbosity)
'''
Return the secure hash digest with sha1 algorithm for a given file
- >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
- >>> print sha1sum_file(wav_file) # doctest: +SKIP
- #08301c3f9a8d60926f31e253825cc74263e52ad1
+ >>> from timeside.tools.data_samples import samples as ts_samples
+ >>> wav_file = ts_samples["C4_scale.wav"]
+ >>> print sha1sum_file(wav_file)
+ a598e78d0b5c90da54a77e34c083abdcd38d42ba
'''
import hashlib
import io
def sha1sum_url(url):
'''Return the secure hash digest with sha1 algorithm for a given url
- >>> url = 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav'
+ >>> url = "https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav"
>>> print sha1sum_url(url)
08301c3f9a8d60926f31e253825cc74263e52ad1
- >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
- >>> uri = get_uri(wav_file)
- >>> print sha1sum_url(uri)
- 08301c3f9a8d60926f31e253825cc74263e52ad1
'''
import hashlib
>>> import timeside
- >>> wav_file = 'tests/samples/guitar.wav' # doctest: +SKIP
+ >>> from timeside.tools.data_samples import samples as ts_samples
+ >>> wav_file = ts_samples['C4_scale.wav']
>>> d = timeside.decoder.file.FileDecoder(wav_file)
>>> e = timeside.encoder.audiosink.AudioSink()
>>> (d|e).run() # doctest: +SKIP
self.metadata = metadata
-# Define global variables for use with doctest
-DOCTEST_ALIAS = {'wav_file':
- 'https://github.com/yomguy/timeside-samples/raw/master/samples/guitar.wav'}
-
if __name__ == "__main__":
import doctest
- doctest.testmod(extraglobs=DOCTEST_ALIAS)
+ doctest.testmod()
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Oct 7 09:19:37 2014
+
+@author: thomas
+"""
+
+from __future__ import division
+
+import pygst
+pygst.require("0.10")
+import gobject
+import gst
+import numpy
+import scipy.signal.waveforms
+import os.path
+
+#import timeside
+
+
+class NumpySrc:
+ def __init__(self, array, samplerate):
+ self.appsrc = gst.element_factory_make("appsrc")
+ self.pos = 0
+ self.samplerate = samplerate
+ if array.ndim == 1:
+ array.resize((array.shape[0], 1))
+ self.length, self.channels = array.shape
+ self.array = array.astype("float32")
+ self.per_sample = gst.SECOND // samplerate
+ self.fac = self.channels * array.dtype.itemsize
+ #self.appsrc.set_property("size", (self.length * self.channels *
+ # array.dtype.itemsize))
+ self.appsrc.set_property("format", gst.FORMAT_TIME)
+ capstr = """audio/x-raw-float,
+ width=%d,
+ depth=%d,
+ rate=%d,
+ channels=%d,
+ endianness=(int)1234,
+ signed=true""" % (32,#self.array.dtype.itemsize*8,
+ 32,#self.array.dtype.itemsize*8,
+ self.samplerate,
+ self.channels)
+ print array.dtype.itemsize*8
+ self.appsrc.set_property("caps", gst.caps_from_string(capstr))
+ self.appsrc.set_property("stream-type", 0) # Seekable
+ self.appsrc.set_property('block', True)
+
+ self.appsrc.connect("need-data", self.need_data)
+ self.appsrc.connect("seek-data", self.seek_data)
+ self.appsrc.connect("enough-data", self.enough_data)
+
+ def need_data(self, element, length):
+ #length = length // 64
+ if self.pos >= self.array.shape[0]:
+ element.emit("end-of-stream")
+ else:
+ avalaible_sample = self.length - self.pos
+ if avalaible_sample < length:
+ length = avalaible_sample
+ buf = gst.Buffer(self.array[self.pos:self.pos+length, :].data)
+
+ buf.timestamp = self.pos * self.per_sample
+ buf.duration = int(length*self.per_sample)
+ element.emit("push-buffer", buf)
+ self.pos += length
+
+ def seek_data(self, element, npos):
+ print 'seek %d' % npos
+ self.pos = npos // self.per_sample
+ return True
+
+ def enough_data(self, element):
+ print "----------- enough data ---------------"
+
+
+class SampleArray(object):
+ """Base Class for generating a data sample array"""
+
+ def __init__(self, duration=10, samplerate=44100):
+ self.samplerate = int(samplerate)
+ self.num_samples = numpy.ceil(duration * self.samplerate)
+ self.array = NotImplemented
+
+ @property
+ def time_samples(self):
+ return numpy.arange(0, self.num_samples)
+
+ @property
+ def duration(self):
+ return self.num_samples / self.samplerate
+
+ def __add__(self, other):
+ if not self.samplerate == other.samplerate:
+ raise ValueError("Samplerates mismatch")
+
+ sum_ = SampleArray(samplerate=self.samplerate)
+ sum_.num_samples = self.num_samples + other.num_samples
+ sum_.array = numpy.vstack([self.array, other.array])
+ return sum_
+
+ def __and__(self, other):
+ if not self.samplerate == other.samplerate:
+ raise ValueError("Samplerates mismatch")
+ if not self.num_samples == other.num_samples:
+ raise ValueError("Number of samples mismatch")
+
+ and_ = SampleArray(samplerate=self.samplerate)
+ and_.num_samples = self.num_samples
+ and_.array = numpy.hstack([self.array, other.array])
+ return and_
+
+
+class SineArray(SampleArray):
+ """Class for generating a Sine array"""
+ def __init__(self, frequency=440, duration=10, samplerate=44100,
+ channels=1):
+ super(SineArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+ self.frequency = frequency
+ self.array = numpy.sin((2 * numpy.pi * self.frequency *
+ self.time_samples / self.samplerate))
+ self.array.resize(self.num_samples, 1)
+
+
+class SweepArray(SampleArray):
+ """Class for generating a Sweep array"""
+ def __init__(self, f0=20, f1=None, duration=10, samplerate=44100,
+ method='logarithmic'):
+ super(SweepArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+
+ self.f0 = f0 / samplerate
+ if f1 is None:
+ self.f1 = 0.5
+ else:
+ self.f1 = f1 / samplerate
+ self.method = method
+ self.array = scipy.signal.waveforms.chirp(t=self.time_samples,
+ f0=self.f0,
+ t1=self.time_samples[-1],
+ f1=self.f1,
+ method=self.method)
+ self.array.resize(self.num_samples, 1)
+
+
+class WhiteNoiseArray(SampleArray):
+ """Class for generating a white noise array"""
+ def __init__(self, duration=10, samplerate=44100):
+ super(WhiteNoiseArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+ array = numpy.random.randn(self.num_samples, 1)
+ self.array = array / abs(array).max()
+
+
+class SilenceArray(SampleArray):
+ """Class for generating a silence"""
+ def __init__(self, duration=10, samplerate=44100):
+ super(SilenceArray, self).__init__(duration=duration,
+ samplerate=samplerate)
+
+ self.array = numpy.zeros((self.num_samples, 1))
+
+
+class gst_BuildSample(object):
+ def __init__(self, sample_array, output_file, gst_audio_encoder):
+ if not isinstance(sample_array, SampleArray):
+ raise ValueError("array must be a SampleArray subclass")
+ self.sample_array = sample_array
+ self.samplerate = self.sample_array.samplerate
+ self.output_file = output_file
+ if not isinstance(gst_audio_encoder, list):
+ gst_audio_encoder = [gst_audio_encoder]
+ self.gst_audio_encoder = gst_audio_encoder
+
+ def run(self):
+ pipeline = gst.Pipeline("pipeline")
+ gobject.threads_init()
+ mainloop = gobject.MainLoop()
+
+ numpy_src = NumpySrc(array=self.sample_array.array,
+ samplerate=self.samplerate)
+
+ converter = gst.element_factory_make('audioconvert', 'converter')
+
+ encoder_muxer = []
+ for enc in self.gst_audio_encoder:
+ encoder_muxer.append(gst.element_factory_make(enc))
+
+ filesink = gst.element_factory_make('filesink', 'sink')
+ filesink.set_property('location', self.output_file)
+
+ pipe_elements = [numpy_src.appsrc, converter]
+ pipe_elements.extend(encoder_muxer)
+ pipe_elements.append(filesink)
+
+ pipeline.add(*pipe_elements)
+ gst.element_link_many(*pipe_elements)
+
+ def _on_new_pad(self, source, pad, target_pad):
+ print 'on_new_pad'
+ if not pad.is_linked():
+ if target_pad.is_linked():
+ target_pad.get_peer().unlink(target_pad)
+ pad.link(target_pad)
+
+ def on_eos(bus, msg):
+ pipeline.set_state(gst.STATE_NULL)
+ mainloop.quit()
+
+ def on_error(bus, msg):
+ err, debug_info = msg.parse_error()
+ print ("Error received from element %s: %s" % (msg.src.get_name(),
+ err))
+ print ("Debugging information: %s" % debug_info)
+ mainloop.quit()
+
+ pipeline.set_state(gst.STATE_PLAYING)
+ bus = pipeline.get_bus()
+ bus.add_signal_watch()
+ bus.connect('message::eos', on_eos)
+ bus.connect("message::error", on_error)
+
+ mainloop.run()
+
+
+def generate_sample_file(filename, samples_dir, gst_audio_encoder,
+ sample_array, overwrite=False):
+ sample_file = os.path.join(samples_dir, filename)
+
+ if overwrite or not os.path.exists(sample_file):
+ gst_builder = gst_BuildSample(sample_array=sample_array,
+ output_file=sample_file,
+ gst_audio_encoder=gst_audio_encoder)
+ gst_builder.run()
+ return sample_file
+
+
+def generateSamples(overwrite=False):
+ from timeside import __file__ as ts_file
+ ts_path = os.path.split(os.path.abspath(ts_file))[0]
+ tests_dir = os.path.abspath(os.path.join(ts_path, '../tests'))
+ if os.path.isdir(tests_dir):
+ samples_dir = os.path.abspath(os.path.join(tests_dir, 'samples'))
+ if not os.path.isdir(samples_dir):
+ os.makedirs(samples_dir)
+ else:
+ import tempfile
+ samples_dir = tempfile.mkdtemp(suffix="ts_samples")
+
+ samples = dict()
+
+ # --------- Sweeps ---------
+ # sweep 44100 mono wav
+ filename = 'sweep_mono.wav'
+ samplerate = 44100
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_mono,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo wav
+ sweep_stereo = sweep_mono & sweep_mono
+ filename = 'sweep.wav'
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo mp3
+ filename = 'sweep.mp3'
+ gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo flac
+ filename = 'sweep.flac'
+ gst_audio_encoder = 'flacenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 44100 stereo ogg
+ filename = 'sweep.ogg'
+ gst_audio_encoder = ['vorbisenc', 'oggmux']
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sweep 32000 stereo wav
+ samplerate = 32000
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sweep_stereo = sweep_mono & sweep_mono
+
+ filename = 'sweep_32000.wav'
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SweepArray(duration=8, samplerate=samplerate)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- Sines ---------
+ # sine at 440Hz, 44100 mono wav
+ filename = 'sine440Hz_mono.wav'
+ samplerate = 44100
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_mono,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo wav
+ filename = 'sine440Hz.wav'
+ sweep_stereo = sweep_mono & sweep_mono
+ gst_audio_encoder = 'wavenc'
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo mp3
+ filename = 'sine440Hz.mp3'
+ gst_audio_encoder = ['lamemp3enc', 'xingmux', 'id3v2mux']
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # sine at 440Hz, 44100 stereo ogg
+ filename = 'sine440Hz.ogg'
+ gst_audio_encoder = ['vorbisenc', 'oggmux']
+ sweep_mono = SineArray(duration=8, samplerate=samplerate, frequency=440)
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=sweep_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- Equal tempered scale ---------
+ filename = 'C4_scale.wav'
+ samplerate = 44100
+ f_C4 = 261.63
+ f_D4 = 293.66
+ f_E4 = 329.63
+ f_F4 = 349.23
+ f_G4 = 392.00
+ f_A4 = 440.00
+ f_B4 = 493.88
+ f_C5 = 523.25
+ sineC4 = SineArray(duration=1, samplerate=samplerate, frequency=f_C4)
+ sineD4 = SineArray(duration=1, samplerate=samplerate, frequency=f_D4)
+ sineE4 = SineArray(duration=1, samplerate=samplerate, frequency=f_E4)
+ sineF4 = SineArray(duration=1, samplerate=samplerate, frequency=f_F4)
+ sineG4 = SineArray(duration=1, samplerate=samplerate, frequency=f_G4)
+ sineA4 = SineArray(duration=1, samplerate=samplerate, frequency=f_A4)
+ sineB4 = SineArray(duration=1, samplerate=samplerate, frequency=f_B4)
+ sineC5 = SineArray(duration=1, samplerate=samplerate, frequency=f_C5)
+
+ silence = SilenceArray(duration=0.2, samplerate=samplerate)
+
+ scale = (sineC4 + silence + sineD4 + silence + sineE4 + silence +
+ sineF4 + silence + sineG4 + silence + sineA4 + silence +
+ sineB4 + silence + sineC5)
+
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=scale,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # --------- White noise ---------
+ # white noise - 44100Hz mono
+ filename = 'white_noise_mono.wav'
+ samplerate = 44100
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # white noise - 44100Hz stereo
+ filename = 'white_noise.wav'
+ samplerate = 44100
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ noise_stereo = noise & noise
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ # white noise - 32000Hz stereo
+ filename = 'white_noise_32000.wav'
+ samplerate = 32000
+ noise = WhiteNoiseArray(duration=8, samplerate=samplerate)
+ noise_stereo = noise & noise
+ gst_audio_encoder = 'wavenc'
+ sample_file = generate_sample_file(filename, samples_dir,
+ gst_audio_encoder,
+ sample_array=noise_stereo,
+ overwrite=overwrite)
+ samples.update({filename: sample_file})
+
+ return samples
+
+
+samples = generateSamples()
+
+
+if __name__ == '__main__':
+ generateSamples()