From: Guillaume Pellerin Date: Mon, 24 Sep 2007 03:06:10 +0000 (+0000) Subject: * Create Station class (producer) X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=0fa70d99fc44dd20b66347158c7d926daeff7397;p=deefuzzer.git * Create Station class (producer) * Modify Channel class (worker) * Get the multi-channels work (Thread + Queue) !! * Fix random playlist bug * Cleanup --- diff --git a/d-fuzz.py b/d-fuzz.py index 47b55ec..37f6b8d 100755 --- a/d-fuzz.py +++ b/d-fuzz.py @@ -52,9 +52,6 @@ class DFuzz: """A D-Fuzz station""" def __init__(self, conf_file): - #Thread.__init__(self) - self.status = -1 - self.version = '0.1' self.conf_file = conf_file def get_conf_dict(self): @@ -68,9 +65,8 @@ class DFuzz: return self.conf['station']['name'] def run(self): - print "D-fuzz v"+self.version self.conf = self.get_conf_dict() - print self.conf + #print self.conf # Fix wrong type data from xmltodict when one station (*) if isinstance(self.conf['d-fuzz']['station'], dict): @@ -79,57 +75,72 @@ class DFuzz: nb_stations = len(self.conf['d-fuzz']['station']) print 'Number of stations : ' + str(nb_stations) - # Create a Queue: - #stream_pool = Queue.Queue ( 0 ) - for i in range(0,nb_stations): + # Create a Queue + q = Queue.Queue(1) + # (*) idem if isinstance(self.conf['d-fuzz']['station'], dict): station = self.conf['d-fuzz']['station'] else: station = self.conf['d-fuzz']['station'][i] - print station + #print station name = station['infos']['name'] - channels = int(station['infos']['channels']) - print 'Station %s has %s channels' % (name, str(channels)) - for channel_id in range(0,channels): + nb_channels = int(station['infos']['channels']) + print 'Station %s: %s has %s channels' % (str(i+1), name, str(nb_channels)) + s = Station(station, nb_channels, q) + s.start() + time.sleep(0.1) + for channel_id in range(0, nb_channels): #print channel_id - Channel(station, channel_id + 1).start() - #channel.start() - time.sleep(0.5) + c = Channel(station, channel_id + 1, q) + c.start() + #time.sleep(0.5) + + +class Station(Thread): + """A D-Fuzz Station thread""" + + def __init__(self, station, nb_channels, station_q): + Thread.__init__(self) + self.station_q = station_q + self.station = station + self.nb_channels = nb_channels + + def run(self): + #station_q = self.station_q + i=0 + while 1 : + #print currentThread(),"Produced One Item:",i + self.station_q.put(i,1) + i+=1 + #time.sleep(1) class Channel(Thread): """A channel shouting thread""" - def __init__(self, station, channel_id): + def __init__(self, station, channel_id, channel_q): Thread.__init__(self) + self.channel_q = channel_q self.station = station self.main_command = 'cat' - self.buffer_size = 0xFFFF + self.buffer_size = 16384 self.channel_id = channel_id - - # Pool Queue - #self.stream_pool = stream_pool - self.channel = Shout() - #self.station = station self.id = 999999 + self.counter = 0 self.rand_list = [] - # Media self.media_dir = self.station['media']['dir'] - self.channel.format = self.station['media']['format'] self.mode_shuffle = int(self.station['media']['shuffle']) - # Infos self.short_name = self.station['infos']['short_name'] + '_' + str(self.channel_id) self.channel.name = self.station['infos']['name'] + '_' + str(self.channel_id) self.channel.genre = self.station['infos']['genre'] self.channel.description = self.station['infos']['description'] self.channel.url = self.station['infos']['url'] - # Server self.channel.protocol = 'http' # | 'xaudiocast' | 'icy' self.channel.host = self.station['server']['host'] @@ -139,7 +150,6 @@ class Channel(Thread): self.channel.mount = '/' + self.short_name + '.' + self.channel.format #print self.channel.mount self.channel.public = int(self.station['server']['public']) - # s.audio_info = { 'key': 'val', ... } # (keys are shout.SHOUT_AI_BITRATE, shout.SHOUT_AI_SAMPLERATE, # shout.SHOUT_AI_CHANNELS, shout.SHOUT_AI_QUALITY) @@ -163,12 +173,13 @@ class Channel(Thread): def get_next_media_rand(self, playlist): lp = len(playlist) if self.id >= (lp - 1): + #print 'Get random list...' playlist = self.get_playlist() lp_new = len(playlist) - if lp_new != lp: + if lp_new != lp or self.counter == 0: self.rand_list = range(0,lp_new) random.shuffle(self.rand_list) - #print self.rand_list + #print self.rand_list self.id = 0 else: self.id = self.id + 1 @@ -179,7 +190,6 @@ class Channel(Thread): def core_process(self, command, buffer_size): """Apply command and stream data through a generator. Taken from Telemeta...""" - __chunk = 0 try: proc = subprocess.Popen(command, @@ -190,7 +200,6 @@ class Channel(Thread): close_fds = True) except: raise DFuzzError('Command failure:', command, proc) - # Core processing while True: __chunk = proc.stdout.read(buffer_size) @@ -203,7 +212,6 @@ class Channel(Thread): def run(self): #print "Using libshout version %s" % shout.version() - #__chunk = 0 self.channel.open() print 'Opening ' + self.channel.name + '...' @@ -213,23 +221,16 @@ class Channel(Thread): playlist = self.get_playlist() lp = len(playlist) self.rand_list = range(0,lp) - while True: if lp == 0: break - - # Get a client out of the queue - #client = self.stream_pool.get() - - #if client != None: - - time.sleep(0.1) if self.mode_shuffle == 1: + #print 'Shuffle mode' playlist, media = self.get_next_media_rand(playlist) else: playlist, media = self.get_next_media_lin(playlist) - + self.counter += 1 file_name = string.replace(media, self.media_dir + os.sep, '') #print 'Playlist (%s ch%s) : %s' % (self.short_name, self.channel_id, file_name) #print playlist @@ -241,13 +242,14 @@ class Channel(Thread): print 'D-fuzz this file on %s (channel: %s, track: %s): %s' % (self.short_name, self.channel_id, self.id, file_name) for __chunk in stream: + # Get the queue + self.channel_q.get(1) self.channel.send(__chunk) self.channel.sync() self.channel.close() - class DFuzzError: """The D-Fuzz main error class""" def __init__(self, message, command, subprocess): @@ -264,9 +266,9 @@ class DFuzzError: self.command, error) - def main(): if len(sys.argv) == 2: + print "D-fuzz v"+version dfuzz_main = DFuzz(sys.argv[1]) dfuzz_main.run() else: