From ab8799f1c339289c045d33156b803d545ac43f7b Mon Sep 17 00:00:00 2001 From: Guillaume Pellerin Date: Tue, 10 Mar 2009 16:21:35 +0000 Subject: [PATCH] fixed Producer / Station model to allow multi threading shouting ! --- README | 4 +-- defuzz.py | 98 +++++++++++++++++++++++++------------------------------ 2 files changed, 46 insertions(+), 56 deletions(-) diff --git a/README b/README index 4af55d5..22ba2a3 100644 --- a/README +++ b/README @@ -34,11 +34,11 @@ see INSTALL Usage : defuzz CONFIGFILE where CONFIGFILE is the path for a XML config file - ex: defuzz ./myfuzz.xml + ex: defuzz example/myfuzz.xml Note that you must edit the config file with right parameters before executing... You can find an example for the XML file in the directory "example/" of this -application. +application (maybe in /usr/share/defuzz if installed with the help of install.py Be carefull : at the moment the Thread implementation raises exceptions when shutting down with CTRL + C... diff --git a/defuzz.py b/defuzz.py index b57af89..226d14f 100755 --- a/defuzz.py +++ b/defuzz.py @@ -17,6 +17,7 @@ import datetime import string import random import Queue +import shout import subprocess from shout import Shout from tools import * @@ -26,6 +27,7 @@ from mutagen.oggvorbis import OggVorbis version = '0.2.2' year = datetime.datetime.now().strftime("%Y") + def prog_info(): desc = '\n defuzz : easy and light streaming tool\n' ver = ' version : %s \n\n' % (version) @@ -49,6 +51,7 @@ def prog_info(): text = desc + ver + info return text + class DeFuzzError: """The DeFuzz main error class""" def __init__(self, message, command, subprocess): @@ -67,7 +70,7 @@ class DeFuzzError: class DeFuzz: """A DeFuzz station""" - + def __init__(self, conf_file): self.conf_file = conf_file self.conf = self.get_conf_dict() @@ -83,7 +86,7 @@ class DeFuzz: def get_station_names(self): return self.conf['station']['name'] - def run(self): + def start(self): # Fix wrong type data from xmltodict when one station (*) if isinstance(self.conf['defuzz']['station'], dict): nb_stations = 1 @@ -91,57 +94,54 @@ class DeFuzz: nb_stations = len(self.conf['defuzz']['station']) print 'Number of stations : ' + str(nb_stations) + # Create a Queue + q = Queue.Queue(nb_stations) + p = Producer(q) + p.start() + s = [] + for i in range(0,nb_stations): - # Create a Queue - #q = Queue.Queue(1) - - # (*) idem if isinstance(self.conf['defuzz']['station'], dict): station = self.conf['defuzz']['station'] else: station = self.conf['defuzz']['station'][i] #print station name = station['infos']['name'] - nb_channels = int(station['infos']['channels']) - print 'Station %s: %s has %s channels' % (str(i+1), name, str(nb_channels)) - #s = Station(station, nb_channels, q) - #s.start() - #time.sleep(0.1) - for channel_id in range(0, nb_channels): - #print channel_id - c = Channel(station, channel_id + 1) - c.start() - #time.sleep(0.5) - + s.append(Station(station, q)) + + for i in range(0,nb_stations): + s[i].start() + time.sleep(0.1) + #s[i].join() + pass + -class Station(Thread): - """A DeFuzz Station thread""" +class Producer(Thread): + """A DeFuzz Producer master thread""" - def __init__(self, station, nb_channels): + def __init__(self, q): Thread.__init__(self) - #self.station_q = station_q - self.station = station - self.nb_channels = nb_channels + self.q = q def run(self): - #station_q = self.station_q + q = self.q i=0 while 1 : - #print currentThread(),"Produced One Item:",i - self.station_q.put(i,1) + #print "Producer produced one queue step: "+str(i) + self.q.put(i,1) i+=1 - #time.sleep(1) + #time.sleep(0.1) -class Channel(Thread): - """A channel shouting thread""" +class Station(Thread): + """A DeFuzz Station shouting slave thread""" - def __init__(self, station, channel_id): + def __init__(self, station, q): Thread.__init__(self) - #self.channel_q = channel_q + self.q = q self.station = station self.buffer_size = 16384 - self.channel_id = channel_id + #self.channel_id = channel_id self.channel = Shout() self.id = 999999 self.counter = 0 @@ -152,8 +152,8 @@ class Channel(Thread): self.channel.format = self.station['media']['format'] self.mode_shuffle = int(self.station['media']['shuffle']) # Infos - self.short_name = self.station['infos']['short_name'] + '_' + str(self.channel_id) - self.channel.name = self.station['infos']['name'] + '_' + str(self.channel_id) + self.short_name = self.station['infos']['short_name'] + self.channel.name = self.station['infos']['name'] self.channel.genre = self.station['infos']['genre'] self.channel.description = self.station['infos']['description'] self.channel.url = self.station['infos']['url'] @@ -229,28 +229,19 @@ class Channel(Thread): if len(__chunk) == 0: break yield __chunk - - #__chunk = 0 - #file = open(media, 'r') - ## Core processing - #while True: - #__chunk = file.read(buffer_size) - #if len(__chunk) == 0: - #break - #yield __chunk - #file.close() def run(self): - #print "Using libshout version %s" % shout.version() - #__chunk = 0 + print "Using libshout version %s" % shout.version() + q = self.q + __chunk = 0 self.channel.open() - print 'Opening ' + self.channel.name + '...' + print 'Opening ' + self.short_name + ' - ' + self.channel.name + '...' time.sleep(0.1) # Playlist playlist = self.get_playlist() lp = len(playlist) - print playlist + #print playlist self.rand_list = range(0,lp-1) while True: @@ -262,20 +253,19 @@ class Channel(Thread): else: playlist, media = self.get_next_media_lin(playlist) self.counter += 1 + if os.path.exists(media) and not '/.' in media: file_name = string.replace(media, self.media_dir + os.sep, '') - #print 'Playlist (%s ch%s) : %s' % (self.short_name, self.channel_id, file_name) - #print playlist - #print media self.channel.set_metadata({'song': file_name}) stream = self.core_process(media, self.buffer_size) - print 'Defuzzing this file on %s (channel: %s, track: %s): %s' % (self.short_name, self.channel_id, self.id, file_name) + print 'Defuzzing this file on %s : id = %s, name = %s' % (self.short_name, self.id, file_name) for __chunk in stream: # Get the queue - #self.channel_q.get(1) self.channel.send(__chunk) self.channel.sync() + it = q.get(1) + #print "Station eated one queue step: "+str(it) self.channel.close() @@ -284,7 +274,7 @@ def main(): if len(sys.argv) == 2: print "Defuzz v"+version defuzz_main = DeFuzz(sys.argv[1]) - defuzz_main.run() + defuzz_main.start() else: text = prog_info() sys.exit(text) -- 2.39.5