From: Thomas Fillon Date: Sat, 23 Aug 2014 13:41:37 +0000 (+0200) Subject: feat(ProcessPipe): add a graph model to the pipe, prevent doublon of processors in... X-Git-Tag: 0.6~4^2~41 X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=39bf9c63b8b4a462bf7c7f3b74e2b0a6774dc067;p=timeside.git feat(ProcessPipe): add a graph model to the pipe, prevent doublon of processors in the pipe --- diff --git a/tests/test_analyzers_default.py b/tests/test_analyzers_default.py old mode 100644 new mode 100755 diff --git a/tests/test_core_processors.py b/tests/test_core_processors.py index 960709d..d621e54 100644 --- a/tests/test_core_processors.py +++ b/tests/test_core_processors.py @@ -11,6 +11,7 @@ from timeside.component import implements, interfacedoc SAMPLE_RATE_FORCED = 16000 OTHER_SAMPLE_RATE = 32000 + class TestForceSampleRate(unittest.TestCase): class Dummy_Processor(Processor): diff --git a/tests/test_process_pipe.py b/tests/test_process_pipe.py new file mode 100644 index 0000000..468e689 --- /dev/null +++ b/tests/test_process_pipe.py @@ -0,0 +1,38 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +# Author : Thomas Fillon + +from unit_timeside import unittest, TestRunner +import timeside +from timeside.decoder.file import FileDecoder +import os + + +class TestProcessPipe(unittest.TestCase): + """Test process pipe""" + + def test_Pipe(self): + """Test process pipe (Quick and dirty)""" + # TODO clean up and complete + + source = os.path.join(os.path.dirname(__file__), + "samples", "guitar.wav") + + pipe = timeside.core.ProcessPipe() + dec = FileDecoder(source) + pipe.append_processor(dec) + self.assertRaises(TypeError, pipe.append_processor, object()) + dec2 = FileDecoder(source) + self.assertRaises(ValueError, pipe.append_processor, dec2) + + a = timeside.analyzer.odf.OnsetDetectionFunction() + abis = timeside.analyzer.odf.OnsetDetectionFunction() + + a2 = timeside.analyzer.spectrogram.Spectrogram() + pipe2 = (dec | a | a2 | abis) + self.assertEqual(len(pipe2.processors), 4) + #pipe2.draw_graph() + +if __name__ == '__main__': + unittest.main(testRunner=TestRunner()) diff --git a/timeside/core.py b/timeside/core.py index 7de4417..6b43135 100644 --- a/timeside/core.py +++ b/timeside/core.py @@ -27,6 +27,7 @@ from .tools.parameters import HasParam import re import numpy import uuid +import networkx as nx import gobject gobject.threads_init() @@ -83,6 +84,8 @@ class Processor(Component, HasParam): abstract() implements(IProcessor) + type = '' + def __init__(self): super(Processor, self).__init__() @@ -167,6 +170,13 @@ class Processor(Component, HasParam): def __or__(self, other): return ProcessPipe(self, other) + def __eq__(self, other): + return (self.id() == other.id() and + self.get_parameters() == other.get_parameters()) + + def __repr__(self): + return self.id() + '\n' + self.get_parameters() + class FixedSizeInputAdapter(object): @@ -174,9 +184,10 @@ class FixedSizeInputAdapter(object): input buffers.""" def __init__(self, buffer_size, channels, pad=False): - """Construct a new adapter: buffer_size is the desired buffer size in frames, - channels the number of channels, and pad indicates whether the last block should - be padded with zeros.""" + """Construct a new adapter: + buffer_size is the desired buffer size in frames, + channels the number of channels, and pad indicates whether the last + block should be padded with zeros.""" self.buffer = numpy.empty((buffer_size, channels)) self.buffer_size = buffer_size @@ -243,7 +254,7 @@ def get_processor(processor_id): """Return a processor by its id""" if not processor_id in _processors: raise PIDError("No processor registered with id: '%s'" - % processor_id) + % processor_id) return _processors[processor_id] @@ -272,12 +283,80 @@ class ProcessPipe(object): self._streamer = None self._stream_thread = False self._is_running = False + self._graph = nx.DiGraph(name='ProcessPipe') self |= others from timeside.analyzer.core import AnalyzerResultContainer self.results = AnalyzerResultContainer() + def append_processor(self, proc, source_proc=None): + "Append a new processor to the pipe" + if source_proc is None and len(self.processors): + source_proc = self.processors[0] + + if source_proc and not isinstance(source_proc, Processor): + raise TypeError('source_proc must be a Processor or None') + + if not isinstance(proc, Processor): + raise TypeError('proc must be a Processor or None') + + if proc.type == 'decoder' and len(self.processors): + raise ValueError('Only the first processor in a pipe could be a Decoder') + + # TODO : check if the processor is already in the pipe + if source_proc: + for child in self._graph.neighbors_iter(source_proc.uuid()): + child_proc = self._graph.node[child]['processor'] + if proc == child_proc: + proc.UUID = child_proc.UUID + break + if not self._graph.has_node(proc.uuid()): + self.processors.append(proc) # Add processor to the pipe + self._graph.add_node(proc.uuid(), processor=proc, id=proc.id()) + if source_proc: + self._graph.add_edge(self.processors[0].uuid(), proc.uuid(), + type='audio_source') + proc.process_pipe = self + # Add an edge between each parent and proc + for parent in proc.parents: + self._graph.add_edge(parent.uuid(), proc.uuid(), + type='data_source') + + def append_pipe(self, proc_pipe): + "Append a sub-pipe to the pipe" + + for proc in proc_pipe.processors: + self.append_processor(proc) + + def draw_graph(self): + import matplotlib.pyplot as plt + + elarge = [(u, v) for (u, v, d) in self._graph.edges(data=True) + if d['type'] == 'audio_source'] + esmall = [(u, v) for (u, v, d) in self._graph.edges(data=True) + if d['type'] == 'data_source'] + + pos = nx.shell_layout(self._graph) # positions for all nodes + + # nodes + nx.draw_networkx_nodes(self._graph, pos, node_size=700) + + # edges + nx.draw_networkx_edges(self._graph, pos, edgelist=elarge, + arrows=True) + nx.draw_networkx_edges(self._graph, pos, edgelist=esmall, + alpha=0.5, edge_color='b', + style='dashed', arrows=True) + + # labels + labels = {node: repr(self._graph.node[node]['processor']) for node in self._graph.nodes()} + nx.draw_networkx_labels(self._graph, pos, labels, font_size=20, + font_family='sans-serif') + + plt.axis('off') + plt.show() # display + def __or__(self, other): self |= other return self @@ -286,16 +365,16 @@ class ProcessPipe(object): if isinstance(other, Processor): for parent in other.parents: self |= parent - self.processors.append(other) - other.process_pipe = self + self.append_processor(other) + elif isinstance(other, ProcessPipe): - self.processors.extend(other.processors) + self.append_pipe(other) else: try: iter(other) except TypeError: - raise Error( - "Can not add this type of object to a pipe: %s", str(other)) + raise Error("Can not add this type of object to a pipe: %s", + str(other)) for item in other: self |= item @@ -316,7 +395,7 @@ class ProcessPipe(object): source = self.processors[0] items = self.processors[1:] - # Check if any processor in items need to force the asmplerate + # Check if any processor in items need to force the samplerate force_samplerate = set([item.force_samplerate for item in items if item.force_samplerate]) if force_samplerate: