From: Guillaume Pellerin Date: Sat, 11 Apr 2009 13:48:34 +0000 (+0000) Subject: fix infinite queue and then reapply mt behavior (32 stations max) X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=e997960e9ea20e8821e7944ca02fe00461050c6f;p=deefuzzer.git fix infinite queue and then reapply mt behavior (32 stations max) --- diff --git a/deefuzz-mt.py b/deefuzz-mt.py index c197fc0..f77b4ef 100755 --- a/deefuzz-mt.py +++ b/deefuzz-mt.py @@ -99,7 +99,6 @@ class DeeFuzz: def __init__(self, conf_file): self.conf_file = conf_file self.conf = self.get_conf_dict() - self.buffer_size = 1024 def get_conf_dict(self): confile = open(self.conf_file,'r') @@ -120,12 +119,13 @@ class DeeFuzz: print 'Number of stations : ' + str(nb_stations) # Create a Queue - q = Queue.Queue(0) + q = Queue.Queue(1) # Create a Producer p = Producer(q) p.start() - + + self.buffer_size = 65536/nb_stations print 'Buffer size per station = ' + str(self.buffer_size) s = [] @@ -338,22 +338,20 @@ class Station(Thread): """Read media and stream data through a generator. Taken from Telemeta (see http://telemeta.org)""" - __chunk = 0 m = open(media, 'r') # Core processing while True: - __chunk = m.read(self.buffer_size) - if len(__chunk) == 0: + _chunk = m.read(self.buffer_size) + if len(_chunk) == 0: break - yield __chunk + yield _chunk m.close() def run(self): - __chunk = 0 - + q = self.q while True: - it = self.q.get(1) + it = q.get(1) if self.lp == 0: break if self.mode_shuffle == 1: @@ -368,24 +366,24 @@ class Station(Thread): elif file_ext.lower() == 'ogg': media_obj = Ogg(media) - self.q.task_done() + q.task_done() #self.log_queue(it) if os.path.exists(media) and not os.sep+'.' in media: - it = self.q.get(1) + it = q.get(1) title = media_obj.metadata['title'] self.channel.set_metadata({'song': str(title)}) self.update_rss([media_obj], self.rss_current_file) print 'DeeFuzzing this file on %s : id = %s, name = %s' % (self.short_name, self.id, file_name) stream = self.core_process_read(media) - self.q.task_done() + q.task_done() #self.log_queue(it) - for __chunk in stream: - it = self.q.get(1) - self.channel.send(__chunk) + for _chunk in stream: + it = q.get(1) + self.channel.send(_chunk) self.channel.sync() - self.q.task_done() + q.task_done() #self.log_queue(it) self.channel.close() diff --git a/deefuzz.py b/deefuzz.py index 002820f..6c82a91 100755 --- a/deefuzz.py +++ b/deefuzz.py @@ -45,6 +45,7 @@ import random import Queue import shout import subprocess +from threading import Thread from tools import * version = '0.3' @@ -117,25 +118,56 @@ class DeeFuzz: nb_stations = len(self.conf['deefuzz']['station']) print 'Number of stations : ' + str(nb_stations) - if nb_stations > 1: - print "You are trying to start multiple stations at the same time..." - print "Please deefuzz-mt.py for that !" + # Create a Queue + q = Queue.Queue(1) + + # Create a Producer + p = Producer(q) + p.start() # Define the buffer_size - buffer_size = 65536 - print 'Buffer size per station = ' + str(buffer_size) + self.buffer_size = 65536/nb_stations + print 'Buffer size per station = ' + str(self.buffer_size) + + # Start the stations + s = [] + for i in range(0,nb_stations): + if isinstance(self.conf['deefuzz']['station'], dict): + station = self.conf['deefuzz']['station'] + else: + station = self.conf['deefuzz']['station'][i] + name = station['infos']['name'] + # Create a Station + s.append(Station(station, q, self.buffer_size)) + + for i in range(0,nb_stations): + # Start the Stations + s[i].start() - # Start the station - station = self.conf['deefuzz']['station'] - s = Station(station, buffer_size) - s.run() +class Producer(Thread): + """a DeeFuzz Producer master thread""" -class Station: - """a DeeFuzz shouting station""" + def __init__(self, q): + Thread.__init__(self) + self.q = q + + def run(self): + i=0 + q = self.q + while 1: + #print "Producer produced one queue step: "+str(i) + q.put(i,1) + i+=1 - def __init__(self, station, buffer_size): + +class Station(Thread): + """a DeeFuzz shouting station thread""" + + def __init__(self, station, q, buffer_size): + Thread.__init__(self) self.station = station + self.q = q self.buffer_size = buffer_size self.channel = shout.Shout() self.id = 999999 @@ -179,7 +211,6 @@ class Station: self.channel.open() print 'Opening ' + self.short_name + ' - ' + self.channel.name + \ ' (' + str(self.lp) + ' tracks)...' - time.sleep(0.5) def update_rss(self, media_list, rss_file): rss_item_list = [] @@ -317,8 +348,9 @@ class Station: m.close() def run(self): - + q = self.q while True: + it = q.get(1) if self.lp == 0: break if self.mode_shuffle == 1: @@ -326,8 +358,10 @@ class Station: else: media = self.get_next_media_lin() self.counter += 1 + q.task_done() if os.path.exists(media) and not os.sep+'.' in media: + it = q.get(1) self.current_media_obj = self.media_to_objs([media]) title = self.current_media_obj[0].metadata['title'] self.channel.set_metadata({'song': str(title)}) @@ -335,10 +369,13 @@ class Station: file_name, file_title, file_ext = self.get_file_info(media) print 'DeeFuzzing this file on %s : id = %s, name = %s' % (self.short_name, self.id, file_name) stream = self.core_process_read(media) - + q.task_done() + for __chunk in stream: + it = q.get(1) self.channel.send(__chunk) self.channel.sync() + q.task_done() stream.close() self.channel.close() @@ -351,6 +388,8 @@ def main(): d = DeeFuzz(sys.argv[1]) d.run() else: + text = prog_info() + sys.exit(text) if __name__ == '__main__': main()