]> git.parisson.com Git - deefuzzer.git/commitdiff
fix infinite queue and then reapply mt behavior (32 stations max)
authorGuillaume Pellerin <yomguy@parisson.com>
Sat, 11 Apr 2009 13:48:34 +0000 (13:48 +0000)
committerGuillaume Pellerin <yomguy@parisson.com>
Sat, 11 Apr 2009 13:48:34 +0000 (13:48 +0000)
deefuzz-mt.py
deefuzz.py

index c197fc01a5ca820e969d8f659f0dfdbe3e66aaba..f77b4ef52c43f943ca072187892df77bd5c3c9a8 100755 (executable)
@@ -99,7 +99,6 @@ class DeeFuzz:
     def __init__(self, conf_file):
         self.conf_file = conf_file
         self.conf = self.get_conf_dict()
-        self.buffer_size = 1024
 
     def get_conf_dict(self):
         confile = open(self.conf_file,'r')
@@ -120,12 +119,13 @@ class DeeFuzz:
         print 'Number of stations : ' + str(nb_stations)
 
         # Create a Queue
-        q = Queue.Queue(0)
+        q = Queue.Queue(1)
 
         # Create a Producer 
         p = Producer(q)
         p.start()
-        
+
+        self.buffer_size = 65536/nb_stations
         print 'Buffer size per station = ' + str(self.buffer_size)
         
         s = []
@@ -338,22 +338,20 @@ class Station(Thread):
         """Read media and stream data through a generator.
         Taken from Telemeta (see http://telemeta.org)"""
 
-        __chunk = 0
         m = open(media, 'r')
         # Core processing
         while True:
-            __chunk = m.read(self.buffer_size)
-            if len(__chunk) == 0:
+            _chunk = m.read(self.buffer_size)
+            if len(_chunk) == 0:
                 break
-            yield __chunk
+            yield _chunk
         m.close()
 
 
     def run(self):
-        __chunk = 0
-
+        q = self.q
         while True:
-            it = self.q.get(1)
+            it = q.get(1)
             if self.lp == 0:
                 break
             if self.mode_shuffle == 1:
@@ -368,24 +366,24 @@ class Station(Thread):
             elif file_ext.lower() == 'ogg':
                 media_obj = Ogg(media)
 
-            self.q.task_done()
+            q.task_done()
             #self.log_queue(it)
             
             if os.path.exists(media) and not os.sep+'.' in media:
-                it = self.q.get(1)
+                it = q.get(1)
                 title = media_obj.metadata['title']
                 self.channel.set_metadata({'song': str(title)})
                 self.update_rss([media_obj], self.rss_current_file)
                 print 'DeeFuzzing this file on %s :  id = %s, name = %s' % (self.short_name, self.id, file_name)
                 stream = self.core_process_read(media)
-                self.q.task_done()
+                q.task_done()
                 #self.log_queue(it)
                 
-                for __chunk in stream:
-                    it = self.q.get(1)
-                    self.channel.send(__chunk)
+                for _chunk in stream:
+                    it = q.get(1)
+                    self.channel.send(_chunk)
                     self.channel.sync()
-                    self.q.task_done()
+                    q.task_done()
                     #self.log_queue(it)
 
         self.channel.close()
index 002820fd70c38c0b8cef892232a5e768f50725c9..6c82a91a6523e678ba4a6b62e3ab1d0311bd37e0 100755 (executable)
@@ -45,6 +45,7 @@ import random
 import Queue
 import shout
 import subprocess
+from threading import Thread
 from tools import *
 
 version = '0.3'
@@ -117,25 +118,56 @@ class DeeFuzz:
             nb_stations = len(self.conf['deefuzz']['station'])
         print 'Number of stations : ' + str(nb_stations)
 
-        if nb_stations > 1:
-            print "You are trying to start multiple stations at the same time..."
-            print "Please deefuzz-mt.py for that !"
+        # Create a Queue
+        q = Queue.Queue(1)
+
+        # Create a Producer 
+        p = Producer(q)
+        p.start()
 
         # Define the buffer_size
-        buffer_size = 65536
-        print 'Buffer size per station = ' + str(buffer_size)
+        self.buffer_size = 65536/nb_stations
+        print 'Buffer size per station = ' + str(self.buffer_size)
+
+        # Start the stations
+        s = []
+        for i in range(0,nb_stations):
+            if isinstance(self.conf['deefuzz']['station'], dict):
+                station = self.conf['deefuzz']['station']
+            else:
+                station = self.conf['deefuzz']['station'][i]
+            name = station['infos']['name']
+            # Create a Station
+            s.append(Station(station, q, self.buffer_size))
+
+        for i in range(0,nb_stations):
+            # Start the Stations
+            s[i].start()   
 
-        # Start the station
-        station = self.conf['deefuzz']['station']
-        s = Station(station, buffer_size)
-        s.run()
 
+class Producer(Thread):
+    """a DeeFuzz Producer master thread"""
 
-class Station:
-    """a DeeFuzz shouting station"""
+    def __init__(self, q):
+        Thread.__init__(self)
+        self.q = q
+
+    def run(self):
+        i=0
+        q = self.q
+        while 1: 
+            #print "Producer produced one queue step: "+str(i)
+            q.put(i,1)
+            i+=1
 
-    def __init__(self, station, buffer_size):
+
+class Station(Thread):
+    """a DeeFuzz shouting station thread"""
+
+    def __init__(self, station, q, buffer_size):
+        Thread.__init__(self)
         self.station = station
+        self.q = q
         self.buffer_size = buffer_size
         self.channel = shout.Shout()
         self.id = 999999
@@ -179,7 +211,6 @@ class Station:
         self.channel.open()
         print 'Opening ' + self.short_name + ' - ' + self.channel.name + \
                 ' (' + str(self.lp) + ' tracks)...'
-        time.sleep(0.5)
 
     def update_rss(self, media_list, rss_file):
         rss_item_list = []
@@ -317,8 +348,9 @@ class Station:
         m.close()
 
     def run(self):
-
+        q = self.q
         while True:
+            it = q.get(1)
             if self.lp == 0:
                 break
             if self.mode_shuffle == 1:
@@ -326,8 +358,10 @@ class Station:
             else:
                 media = self.get_next_media_lin()
             self.counter += 1
+            q.task_done()
             
             if os.path.exists(media) and not os.sep+'.' in media:
+                it = q.get(1)
                 self.current_media_obj = self.media_to_objs([media])
                 title = self.current_media_obj[0].metadata['title']
                 self.channel.set_metadata({'song': str(title)})
@@ -335,10 +369,13 @@ class Station:
                 file_name, file_title, file_ext = self.get_file_info(media)
                 print 'DeeFuzzing this file on %s :  id = %s, name = %s' % (self.short_name, self.id, file_name)
                 stream = self.core_process_read(media)
-
+                q.task_done()
+                
                 for __chunk in stream:
+                    it = q.get(1)
                     self.channel.send(__chunk)
                     self.channel.sync()
+                    q.task_done()
                 stream.close()
         
         self.channel.close()
@@ -351,6 +388,8 @@ def main():
         d = DeeFuzz(sys.argv[1])
         d.run()
     else:
+        text = prog_info()
+        sys.exit(text)
 
 if __name__ == '__main__':
     main()