]> git.parisson.com Git - timeside.git/commitdiff
feat(ProcessPipe): add a graph model to the pipe, prevent doublon of processors in...
authorThomas Fillon <thomas@parisson.com>
Sat, 23 Aug 2014 13:41:37 +0000 (15:41 +0200)
committerThomas Fillon <thomas@parisson.com>
Sat, 23 Aug 2014 13:41:37 +0000 (15:41 +0200)
tests/test_analyzers_default.py [changed mode: 0644->0755]
tests/test_core_processors.py
tests/test_process_pipe.py [new file with mode: 0644]
timeside/core.py

old mode 100644 (file)
new mode 100755 (executable)
index 960709d740b9f1af40a83083d5c87d77cf652986..d621e54b572fb69dbf4470e17313919a68ca6b1b 100644 (file)
@@ -11,6 +11,7 @@ from timeside.component import implements, interfacedoc
 SAMPLE_RATE_FORCED = 16000
 OTHER_SAMPLE_RATE = 32000
 
+
 class TestForceSampleRate(unittest.TestCase):
 
     class Dummy_Processor(Processor):
diff --git a/tests/test_process_pipe.py b/tests/test_process_pipe.py
new file mode 100644 (file)
index 0000000..468e689
--- /dev/null
@@ -0,0 +1,38 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Author : Thomas Fillon <thomas@parisson.com>
+
+from unit_timeside import unittest, TestRunner
+import timeside
+from timeside.decoder.file import FileDecoder
+import os
+
+
+class TestProcessPipe(unittest.TestCase):
+    """Test process pipe"""
+
+    def test_Pipe(self):
+        """Test process pipe (Quick and dirty)"""
+        # TODO clean up and complete
+
+        source = os.path.join(os.path.dirname(__file__),
+                              "samples", "guitar.wav")
+
+        pipe = timeside.core.ProcessPipe()
+        dec = FileDecoder(source)
+        pipe.append_processor(dec)
+        self.assertRaises(TypeError, pipe.append_processor, object())
+        dec2 = FileDecoder(source)
+        self.assertRaises(ValueError, pipe.append_processor, dec2)
+
+        a = timeside.analyzer.odf.OnsetDetectionFunction()
+        abis = timeside.analyzer.odf.OnsetDetectionFunction()
+
+        a2 = timeside.analyzer.spectrogram.Spectrogram()
+        pipe2 = (dec | a | a2 | abis)
+        self.assertEqual(len(pipe2.processors), 4)
+        #pipe2.draw_graph()
+
+if __name__ == '__main__':
+    unittest.main(testRunner=TestRunner())
index 7de441770b63f4bfaaa469dffd5d32b278ea2e24..6b43135e44966e7a49dc1582dcf4bc83d74a0e71 100644 (file)
@@ -27,6 +27,7 @@ from .tools.parameters import HasParam
 import re
 import numpy
 import uuid
+import networkx as nx
 
 import gobject
 gobject.threads_init()
@@ -83,6 +84,8 @@ class Processor(Component, HasParam):
     abstract()
     implements(IProcessor)
 
+    type = ''
+
     def __init__(self):
         super(Processor, self).__init__()
 
@@ -167,6 +170,13 @@ class Processor(Component, HasParam):
     def __or__(self, other):
         return ProcessPipe(self, other)
 
+    def __eq__(self, other):
+        return (self.id() == other.id() and
+                self.get_parameters() == other.get_parameters())
+
+    def __repr__(self):
+        return self.id() + '\n' + self.get_parameters()
+
 
 class FixedSizeInputAdapter(object):
 
@@ -174,9 +184,10 @@ class FixedSizeInputAdapter(object):
     input buffers."""
 
     def __init__(self, buffer_size, channels, pad=False):
-        """Construct a new adapter: buffer_size is the desired buffer size in frames,
-        channels the number of channels, and pad indicates whether the last block should
-        be padded with zeros."""
+        """Construct a new adapter:
+        buffer_size is the desired buffer size in frames,
+        channels the number of channels, and pad indicates whether the last
+        block should be padded with zeros."""
 
         self.buffer = numpy.empty((buffer_size, channels))
         self.buffer_size = buffer_size
@@ -243,7 +254,7 @@ def get_processor(processor_id):
     """Return a processor by its id"""
     if not processor_id in _processors:
         raise PIDError("No processor registered with id: '%s'"
-                    % processor_id)
+                       % processor_id)
 
     return _processors[processor_id]
 
@@ -272,12 +283,80 @@ class ProcessPipe(object):
         self._streamer = None
         self._stream_thread = False
         self._is_running = False
+        self._graph = nx.DiGraph(name='ProcessPipe')
 
         self |= others
 
         from timeside.analyzer.core import AnalyzerResultContainer
         self.results = AnalyzerResultContainer()
 
+    def append_processor(self, proc, source_proc=None):
+        "Append a new processor to the pipe"
+        if source_proc is None and len(self.processors):
+            source_proc = self.processors[0]
+
+        if source_proc and not isinstance(source_proc, Processor):
+            raise TypeError('source_proc must be a Processor or None')
+
+        if not isinstance(proc, Processor):
+            raise TypeError('proc must be a Processor or None')
+
+        if proc.type == 'decoder' and len(self.processors):
+            raise ValueError('Only the first processor in a pipe could be a Decoder')
+
+        # TODO : check if the processor is already in the pipe
+        if source_proc:
+            for child in self._graph.neighbors_iter(source_proc.uuid()):
+                child_proc = self._graph.node[child]['processor']
+                if proc == child_proc:
+                    proc.UUID = child_proc.UUID
+                    break
+        if not self._graph.has_node(proc.uuid()):
+            self.processors.append(proc)  # Add processor to the pipe
+            self._graph.add_node(proc.uuid(), processor=proc, id=proc.id())
+            if source_proc:
+                self._graph.add_edge(self.processors[0].uuid(), proc.uuid(),
+                                     type='audio_source')
+            proc.process_pipe = self
+            # Add an edge between each parent and proc
+            for parent in proc.parents:
+                    self._graph.add_edge(parent.uuid(), proc.uuid(),
+                                         type='data_source')
+
+    def append_pipe(self, proc_pipe):
+        "Append a sub-pipe to the pipe"
+
+        for proc in proc_pipe.processors:
+            self.append_processor(proc)
+
+    def draw_graph(self):
+        import matplotlib.pyplot as plt
+
+        elarge = [(u, v) for (u, v, d) in self._graph.edges(data=True)
+                  if d['type'] == 'audio_source']
+        esmall = [(u, v) for (u, v, d) in self._graph.edges(data=True)
+                  if d['type'] == 'data_source']
+
+        pos = nx.shell_layout(self._graph)  # positions for all nodes
+
+        # nodes
+        nx.draw_networkx_nodes(self._graph, pos, node_size=700)
+
+        # edges
+        nx.draw_networkx_edges(self._graph, pos, edgelist=elarge,
+                               arrows=True)
+        nx.draw_networkx_edges(self._graph, pos, edgelist=esmall,
+                               alpha=0.5, edge_color='b',
+                               style='dashed', arrows=True)
+
+        # labels
+        labels = {node: repr(self._graph.node[node]['processor']) for node in self._graph.nodes()}
+        nx.draw_networkx_labels(self._graph, pos, labels, font_size=20,
+                                font_family='sans-serif')
+
+        plt.axis('off')
+        plt.show()  # display
+
     def __or__(self, other):
         self |= other
         return self
@@ -286,16 +365,16 @@ class ProcessPipe(object):
         if isinstance(other, Processor):
             for parent in other.parents:
                 self |= parent
-            self.processors.append(other)
-            other.process_pipe = self
+            self.append_processor(other)
+
         elif isinstance(other, ProcessPipe):
-            self.processors.extend(other.processors)
+            self.append_pipe(other)
         else:
             try:
                 iter(other)
             except TypeError:
-                raise Error(
-                    "Can not add this type of object to a pipe: %s", str(other))
+                raise Error("Can not add this type of object to a pipe: %s",
+                            str(other))
 
             for item in other:
                 self |= item
@@ -316,7 +395,7 @@ class ProcessPipe(object):
         source = self.processors[0]
         items = self.processors[1:]
 
-        # Check if any processor in items need to force the asmplerate
+        # Check if any processor in items need to force the samplerate
         force_samplerate = set([item.force_samplerate for item in items
                                 if item.force_samplerate])
         if force_samplerate: