def process(self, source, options=None):
"""Encode and stream audio data through a generator"""
- proc = SubProcessPipe(self.command) % source
-
- while True:
- __chunk = proc.output.read(self.proc.buffer_size)
- status = proc.poll()
- if status != None and status != 0:
- raise ExportProcessError('Command failure:', command, proc)
- if len(__chunk) == 0:
- break
- yield __chunk
+ command = self.command % source
+ proc = SubProcessPipe(command)
+ return proc.output
+
+ #while True:
+ #__chunk = proc.output.read(self.proc.buffer_size)
+ #status = proc.poll()
+ #if status != None and status != 0:
+ #raise ExportProcessError('Command failure:', command, proc)
+ #if len(__chunk) == 0:
+ #break
+ #yield __chunk
proc = SubProcessPipe(command, stdin)
while True:
- __chunk = proc.output.read(self.proc.buffer_size)
- status = proc.poll()
- if status != None and status != 0:
- raise EncodeProcessError('Command failure:', command, proc)
+ __chunk = proc.output.read(proc.buffer_size)
+ #status = proc.poll()
+ #if status != None and status != 0:
+ #raise EncodeProcessError('Command failure:', command, proc)
if len(__chunk) == 0:
break
yield __chunk
def process(self, source, metadata, options=None):
buffer_size = 0xFFFF
- args = self.get_args(options)
+ self.options = options
+ args = self.get_args()
args = ' '.join(args)
- ext = self.get_file_extension()
+ ext = self.file_extension()
command = 'flac -c %s -' % args
stream = self.core_process(command, source)
- temp_file = NamedTemporaryFile(delete=False)
+ temp_file = NamedTemporaryFile()
for __chunk in stream:
temp_file.write(__chunk)
temp_file.flush()
- self.write_tags(temp_file)
+ #self.write_tags(temp_file)
while True:
__chunk = temp_file.read(buffer_size)
def process(self, source, metadata, options=None):
self.metadata = metadata
self.options = options
- args = get_args(options)
+ args = self.get_args()
args = ' '.join(args)
command = 'lame %s - -' % args
def process(self, source, metadata, options=None):
self.metadata = metadata
self.options = options
- args = self.get_args(options)
+ args = self.get_args()
args = ' '.join(args)
command = 'oggenc %s -' % args
value = analyzer.render(media)
print id + ' = ' + str(value) + ' ' + analyzer.unit()
+
class TestDecoders(Component):
decoders = ExtensionPoint(timeside.decode.IDecoder)
})
print decoders
+ def run(self, media, format):
+ for decoder in self.decoders:
+ if decoder.format() == format:
+ break
+ return decoder.process(media)
+
class TestEncoders(Component):
encoders = ExtensionPoint(timeside.encode.IEncoder)
})
print encoders
+ def run(self, source, metadata):
+ print '\n=== Encoder testing ===\n'
+ for encoder in self.encoders:
+ format = encoder.format()
+ ext = encoder.file_extension()
+ stream = encoder.process(source, metadata)
+ file_path = 'results/sweep' + '.' + ext
+ file = open(file_path, 'w')
+ for chunk in stream:
+ file.write(chunk)
+ print 'Sound exported to :' + file_path
+ file.close()
+
+
class TestGraphers(Component):
graphers = ExtensionPoint(timeside.graph.IGrapher)
print 'Image exported to :' + file_path
file.close()
+
+
if __name__ == '__main__':
sample = 'samples/sweep.wav'
+ metadata = {'creator': 'yomguy', 'date': '2009', 'name': 'test'}
comp_mgr = ComponentManager()
a = TestAnalyzers(comp_mgr)
d = TestDecoders(comp_mgr)
e = TestEncoders(comp_mgr)
g = TestGraphers(comp_mgr)
- a.list()
- d.list()
- e.list()
- g.list()
- a.run(sample)
- #d.run()
- #e.run()
- g.run(sample)
+ #a.list()
+ #d.list()
+ #e.list()
+ #g.list()
+ #a.run(sample)
+ #g.run(sample)
+ audio = d.run(sample, 'WAV')
+ e.run(audio, metadata)