--- /dev/null
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Author : Thomas Fillon <thomas@parisson.com>
+
+from unit_timeside import unittest, TestRunner
+import timeside
+from timeside.decoder.file import FileDecoder
+import os
+
+
+class TestProcessPipe(unittest.TestCase):
+ """Test process pipe"""
+
+ def test_Pipe(self):
+ """Test process pipe (Quick and dirty)"""
+ # TODO clean up and complete
+
+ source = os.path.join(os.path.dirname(__file__),
+ "samples", "guitar.wav")
+
+ pipe = timeside.core.ProcessPipe()
+ dec = FileDecoder(source)
+ pipe.append_processor(dec)
+ self.assertRaises(TypeError, pipe.append_processor, object())
+ dec2 = FileDecoder(source)
+ self.assertRaises(ValueError, pipe.append_processor, dec2)
+
+ a = timeside.analyzer.odf.OnsetDetectionFunction()
+ abis = timeside.analyzer.odf.OnsetDetectionFunction()
+
+ a2 = timeside.analyzer.spectrogram.Spectrogram()
+ pipe2 = (dec | a | a2 | abis)
+ self.assertEqual(len(pipe2.processors), 4)
+ #pipe2.draw_graph()
+
+if __name__ == '__main__':
+ unittest.main(testRunner=TestRunner())
import re
import numpy
import uuid
+import networkx as nx
import gobject
gobject.threads_init()
abstract()
implements(IProcessor)
+ type = ''
+
def __init__(self):
super(Processor, self).__init__()
def __or__(self, other):
return ProcessPipe(self, other)
+ def __eq__(self, other):
+ return (self.id() == other.id() and
+ self.get_parameters() == other.get_parameters())
+
+ def __repr__(self):
+ return self.id() + '\n' + self.get_parameters()
+
class FixedSizeInputAdapter(object):
input buffers."""
def __init__(self, buffer_size, channels, pad=False):
- """Construct a new adapter: buffer_size is the desired buffer size in frames,
- channels the number of channels, and pad indicates whether the last block should
- be padded with zeros."""
+ """Construct a new adapter:
+ buffer_size is the desired buffer size in frames,
+ channels the number of channels, and pad indicates whether the last
+ block should be padded with zeros."""
self.buffer = numpy.empty((buffer_size, channels))
self.buffer_size = buffer_size
"""Return a processor by its id"""
if not processor_id in _processors:
raise PIDError("No processor registered with id: '%s'"
- % processor_id)
+ % processor_id)
return _processors[processor_id]
self._streamer = None
self._stream_thread = False
self._is_running = False
+ self._graph = nx.DiGraph(name='ProcessPipe')
self |= others
from timeside.analyzer.core import AnalyzerResultContainer
self.results = AnalyzerResultContainer()
+ def append_processor(self, proc, source_proc=None):
+ "Append a new processor to the pipe"
+ if source_proc is None and len(self.processors):
+ source_proc = self.processors[0]
+
+ if source_proc and not isinstance(source_proc, Processor):
+ raise TypeError('source_proc must be a Processor or None')
+
+ if not isinstance(proc, Processor):
+ raise TypeError('proc must be a Processor or None')
+
+ if proc.type == 'decoder' and len(self.processors):
+ raise ValueError('Only the first processor in a pipe could be a Decoder')
+
+ # TODO : check if the processor is already in the pipe
+ if source_proc:
+ for child in self._graph.neighbors_iter(source_proc.uuid()):
+ child_proc = self._graph.node[child]['processor']
+ if proc == child_proc:
+ proc.UUID = child_proc.UUID
+ break
+ if not self._graph.has_node(proc.uuid()):
+ self.processors.append(proc) # Add processor to the pipe
+ self._graph.add_node(proc.uuid(), processor=proc, id=proc.id())
+ if source_proc:
+ self._graph.add_edge(self.processors[0].uuid(), proc.uuid(),
+ type='audio_source')
+ proc.process_pipe = self
+ # Add an edge between each parent and proc
+ for parent in proc.parents:
+ self._graph.add_edge(parent.uuid(), proc.uuid(),
+ type='data_source')
+
+ def append_pipe(self, proc_pipe):
+ "Append a sub-pipe to the pipe"
+
+ for proc in proc_pipe.processors:
+ self.append_processor(proc)
+
+ def draw_graph(self):
+ import matplotlib.pyplot as plt
+
+ elarge = [(u, v) for (u, v, d) in self._graph.edges(data=True)
+ if d['type'] == 'audio_source']
+ esmall = [(u, v) for (u, v, d) in self._graph.edges(data=True)
+ if d['type'] == 'data_source']
+
+ pos = nx.shell_layout(self._graph) # positions for all nodes
+
+ # nodes
+ nx.draw_networkx_nodes(self._graph, pos, node_size=700)
+
+ # edges
+ nx.draw_networkx_edges(self._graph, pos, edgelist=elarge,
+ arrows=True)
+ nx.draw_networkx_edges(self._graph, pos, edgelist=esmall,
+ alpha=0.5, edge_color='b',
+ style='dashed', arrows=True)
+
+ # labels
+ labels = {node: repr(self._graph.node[node]['processor']) for node in self._graph.nodes()}
+ nx.draw_networkx_labels(self._graph, pos, labels, font_size=20,
+ font_family='sans-serif')
+
+ plt.axis('off')
+ plt.show() # display
+
def __or__(self, other):
self |= other
return self
if isinstance(other, Processor):
for parent in other.parents:
self |= parent
- self.processors.append(other)
- other.process_pipe = self
+ self.append_processor(other)
+
elif isinstance(other, ProcessPipe):
- self.processors.extend(other.processors)
+ self.append_pipe(other)
else:
try:
iter(other)
except TypeError:
- raise Error(
- "Can not add this type of object to a pipe: %s", str(other))
+ raise Error("Can not add this type of object to a pipe: %s",
+ str(other))
for item in other:
self |= item
source = self.processors[0]
items = self.processors[1:]
- # Check if any processor in items need to force the asmplerate
+ # Check if any processor in items need to force the samplerate
force_samplerate = set([item.force_samplerate for item in items
if item.force_samplerate])
if force_samplerate: