from __future__ import division
-from timeside.core import Processor, implements, interfacedoc
+from timeside.core import Processor, implements, interfacedoc, abstract
from timeside.api import IDecoder
from timeside.tools import *
return wrapper
-class FileDecoder(Processor):
- """ gstreamer-based decoder """
+class Decoder(Processor):
+ """General abstract base class for Decoder
+ """
implements(IDecoder)
+ abstract()
mimetype = ''
- output_blocksize = 8*1024
output_samplerate = None
- output_channels = None
+ output_channels = None
+
+ def __init__(self, start=0, duration=None):
+ super(Decoder, self).__init__()
+
+ self.uri_start = float(start)
+ if duration:
+ self.uri_duration = float(duration)
+ else:
+ self.uri_duration = duration
+
+ if start == 0 and duration is None:
+ self.is_segment = False
+ else:
+ self.is_segment = True
+
+ @interfacedoc
+ def channels(self):
+ return self.output_channels
+
+ @interfacedoc
+ def samplerate(self):
+ return self.output_samplerate
+
+ @interfacedoc
+ def blocksize(self):
+ return self.output_blocksize
- pipeline = None
- mainloopthread = None
+ @interfacedoc
+ def totalframes(self):
+ return self.input_totalframes
+
+ @interfacedoc
+ def release(self):
+ pass
+
+ @interfacedoc
+ def mediainfo(self):
+ return dict(uri=self.uri,
+ duration=self.uri_duration,
+ start=self.uri_start,
+ is_segment=self.is_segment,
+ samplerate=self.input_samplerate)
+
+ def __del__(self):
+ self.release()
+
+ @interfacedoc
+ def encoding(self):
+ return self.format().split('/')[-1]
+
+ @interfacedoc
+ def resolution(self):
+ return self.input_width
+
+
+class FileDecoder(Decoder):
+ """ gstreamer-based decoder """
+ implements(IDecoder)
+
+ output_blocksize = 8*1024
+
+ pipeline = None
+ mainloopthread = None
# IProcessor methods
"""
- super(FileDecoder, self).__init__()
+ super(FileDecoder, self).__init__(start=start, duration=duration)
self.from_stack = False
self.stack = stack
self.uri = get_uri(uri)
- self.uri_start = float(start)
- if duration:
- self.uri_duration = float(duration)
- else:
- self.uri_duration = duration
-
- if start == 0 and duration is None:
- self.is_segment = False
- else:
- self.is_segment = True
-
-
def set_uri_default_duration(self):
# Set the duration from the length of the file
uri_total_duration = get_media_uri_info(self.uri)['duration']
frames, eod = buf
return frames, eod
- @interfacedoc
- def channels(self):
- return self.output_channels
-
- @interfacedoc
- def samplerate(self):
- return self.output_samplerate
-
- @interfacedoc
- def blocksize(self):
- return self.output_blocksize
-
@interfacedoc
def totalframes(self):
if self.input_samplerate == self.output_samplerate:
self.from_stack = True
pass
- @interfacedoc
- def mediainfo(self):
- return dict(uri=self.uri,
- duration=self.uri_duration,
- start=self.uri_start,
- is_segment=self.is_segment,
- samplerate=self.input_samplerate)
-
- def __del__(self):
- self.release()
-
## IDecoder methods
@interfacedoc
return self.tags
-class ArrayDecoder(Processor):
+class ArrayDecoder(Decoder):
""" Decoder taking Numpy array as input"""
implements(IDecoder)
- mimetype = ''
output_blocksize = 8*1024
- output_samplerate = None
- output_channels = None
# IProcessor methods
duration : float
duration of the segment in seconds
'''
- super(ArrayDecoder, self).__init__()
+ super(ArrayDecoder, self).__init__(start=start, duration=duration)
# Check array dimension
if samples.ndim > 2:
'x'.join([str(dim) for dim in samples.shape]),
samples.dtype.type.__name__])
- self.uri_start = float(start)
- if duration:
- self.uri_duration = float(duration)
- else:
- self.uri_duration = duration
-
- if start == 0 and duration is None:
- self.is_segment = False
- else:
- self.is_segment = True
-
self.frames = self.get_frames()
def setup(self, channels=None, samplerate=None, blocksize=None):
def process(self):
return self.frames.next()
- @interfacedoc
- def channels(self):
- return self.output_channels
-
- @interfacedoc
- def samplerate(self):
- return self.output_samplerate
-
- @interfacedoc
- def blocksize(self):
- return self.output_blocksize
-
- @interfacedoc
- def totalframes(self):
- if self.input_samplerate == self.output_samplerate:
- return self.input_totalframes
- else:
- ratio = self.output_samplerate / self.input_samplerate
- return int(self.input_totalframes * ratio)
-
- @interfacedoc
- def release(self):
- pass
-
- @interfacedoc
- def mediainfo(self):
- return dict(uri=self.uri,
- duration=self.uri_duration,
- start=self.uri_start,
- is_segment=self.is_segment,
- samplerate=self.input_samplerate)
-
- def __del__(self):
- self.release()
-
## IDecoder methods
@interfacedoc
def format(self):
base_type = re.search('^[a-z]*', self.samples.dtype.name).group(0)
return 'audio/x-raw-'+base_type
- @interfacedoc
- def encoding(self):
- return self.format().split('/')[-1]
-
- @interfacedoc
- def resolution(self):
- return self.input_width
-
@interfacedoc
def metadata(self):
return None