From: achbed Date: Mon, 1 Dec 2014 06:37:45 +0000 (-0600) Subject: Revised crashed station detection X-Git-Url: https://git.parisson.com/?a=commitdiff_plain;h=5fd4bd9fbb76abf3958e16e91a4d50a976012177;p=deefuzzer.git Revised crashed station detection Added main loop in station to allow remote commands to restart playback after stopping Added livecreation option to enable.disable watchfolder mode Added fix for newly created stations not starting Cleaned up a missed queue unlock instance Signed-off-by: achbed --- diff --git a/deefuzzer/core.py b/deefuzzer/core.py index 1f19437..f34cecc 100644 --- a/deefuzzer/core.py +++ b/deefuzzer/core.py @@ -58,6 +58,7 @@ class DeeFuzzer(Thread): station_instances = {} watchfolder = {} logqueue = Queue.Queue() + mainLoop = False def __init__(self, conf_file): Thread.__init__(self) @@ -140,6 +141,15 @@ class DeeFuzzer(Thread): # We have no folder specified. Bail. return + if self.mainLoop: + if not 'livecreation' in options.keys(): + # We have no folder specified. Bail. + return + + if int(options['livecreation']) == 0: + # Livecreation not specified. Bail. + return + folder = str(options['folder']) if not os.path.isdir(folder): # The specified path is not a folder. Bail. @@ -235,18 +245,20 @@ class DeeFuzzer(Thread): def run(self): q = Queue.Queue(1) - ns = -1 + ns = 0 p = Producer(q) p.start() - started = False # Keep the Stations running while True: self.create_stations_fromfolder() ns_new = len(self.station_settings) if(ns_new > ns): - for i in range(ns+1, ns_new): + self._info('Loading new stations') + for i in range(0, ns_new): try: station = self.station_settings[i] + if 'station_name' in station.keys(): + continue # Apply station defaults if they exist if 'stationdefaults' in self.conf['deefuzzer']: @@ -262,7 +274,14 @@ class DeeFuzzer(Thread): y = y + 1 name = station['infos']['short_name'] + " " + str(y) - self.station_instances[name] = Station(station, q, self.logqueue, self.m3u) + self.station_settings[i]['station_name'] = name + new_station = Station(station, q, self.logqueue, self.m3u) + if new_station.valid: + self.station_instances[name] = new_station + self.station_instances[name].start() + self._info('Started station ' + name) + else: + self._err('Error validating station ' + name) except Exception: self._err('Error starting station ' + name) continue @@ -275,14 +294,11 @@ class DeeFuzzer(Thread): try: if not self.station_instances[i].isAlive(): self.station_instances[i].start() - msg = 'Started ' - if started: - msg = 'Restarted ' - self._info(msg + 'station ' + i) + self._info('Restarted crashed station ' + i) except: pass - started = False + self.mainLoop = True time.sleep(5) # end main loop diff --git a/deefuzzer/station.py b/deefuzzer/station.py index a433d68..691c4f6 100644 --- a/deefuzzer/station.py +++ b/deefuzzer/station.py @@ -59,6 +59,7 @@ class Station(Thread): """a DeeFuzzer shouting station thread""" id = 999999 + valid = False counter = 0 delay = 0 start_time = time.time() @@ -81,6 +82,7 @@ class Station(Thread): feeds_showfilepath = 0 feeds_showfilename = 0 short_name = '' + channelIsOpen = False def __init__(self, station, q, logqueue, m3u): Thread.__init__(self) @@ -131,7 +133,8 @@ class Station(Thread): if self.appendtype: self.channel.mount = self.channel.mount + '.' + self.media_format else: - sys.exit('Not a compatible server type. Choose "stream-m" or "icecast".') + self._err('Not a compatible server type. Choose "stream-m" or "icecast".') + return self.channel.url = self.station['infos']['url'] self.channel.name = self.station['infos']['name'] @@ -253,6 +256,8 @@ class Station(Thread): if self.record_mode: self.record_callback('/record', [1]) + self.valid = True + def _log(self, level, msg): try: obj = {} @@ -668,8 +673,24 @@ class Station(Thread): self.update_twitter(message) def channel_open(self): - self.channel.open() - self.channel_delay = self.channel.delay() + if not self.channelIsOpen: + try: + self.channel.open() + self.channel_delay = self.channel.delay() + self._info('channel connected') + self.channelIsOpen = True + return True + except: + self.err('channel could not be opened') + return False + + def channel_close(self): + self.channelIsOpen = False + try: + self.channel.close() + self._info('channel closed') + except: + self._err('channel could not be closed') def ping_server(self): log = True @@ -723,86 +744,72 @@ class Station(Thread): else: self.media = self.get_next_media() self.set_webm_read_mode() - self.channel_open() + if not self.channel_open(): + return self.channel.start() if self.type == 'icecast': - self.channel_open() - self._info('channel connected') - - while self.run_mode: - if not self.icecastloop_nextmedia(): - self._info('Something wrong happened in icecastloop_nextmedia. Ending.') - break - - self.icecastloop_metadata() - - # TEST MODE: Jump thru only the first chunk of each file - # first = True - for self.chunk in self.stream: - # if first: - # first = False - # else: - # break - - if self.next_media or not self.run_mode: - break - - if self.record_mode: - try: - # Record the chunk - self.recorder.write(self.chunk) - except: - self._err('could not write the buffer to the file') - continue - - try: - # Send the chunk to the stream - self.channel.send(self.chunk) - self.channel.sync() - except: - self._err('could not send the buffer') + while True: # Do this so that the handlers will still restart the stream + while self.run_mode: + if not self.channel_open(): + return + if not self.icecastloop_nextmedia(): + self._info('Something wrong happened in icecastloop_nextmedia. Ending.') + return + + self.icecastloop_metadata() + + # TEST MODE: Jump thru only the first chunk of each file + # first = True + for self.chunk in self.stream: + # if first: + # first = False + # else: + # break + + if self.next_media or not self.run_mode: + break + + if self.record_mode: + try: + # Record the chunk + self.recorder.write(self.chunk) + except: + self._err('could not write the buffer to the file') + try: - self.channel.close() - self._info('channel closed') - except: - self._err('could not close the channel') - self.q.task_done() - continue - - try: - self.ping_server() - self.channel_open() - self.channel.set_metadata({'song': self.song, 'charset': 'utf8',}) - self._info('channel restarted') - except: - self._err('could not restart the channel') - if self.record_mode: - self.recorder.close() - return - - try: + # Send the chunk to the stream self.channel.send(self.chunk) self.channel.sync() except: - self._err('could not send data after restarting the channel') + self._err('could not send the buffer') + self.channel_close() + if not self.channel_open(): + self._err('could not restart the channel') + if self.record_mode: + self.recorder.close() + return try: - self.channel.close() + self.channel.set_metadata({'song': self.song, 'charset': 'utf8',}) + self._info('channel restarted') + self.channel.send(self.chunk) + self.channel.sync() except: - self._err('could not close the channel') + self._err('could not send data after restarting the channel') + self.channel_close() + if self.record_mode: + self.recorder.close() + return - if self.record_mode: - self.recorder.close() - return - - # send chunk loop end - # while run_mode loop end - - self._info("Play mode ended. Stopping stream.") - - if self.record_mode: - self.recorder.close() + # send chunk loop end + # while run_mode loop end + + self._info("Play mode ended. Stopping stream.") + + if self.record_mode: + self.recorder.close() - self.channel.close() + self.channel_close() + time.sleep(1) diff --git a/example/deefuzzer_doc.xml b/example/deefuzzer_doc.xml index 138a6a9..6a3dba3 100644 --- a/example/deefuzzer_doc.xml +++ b/example/deefuzzer_doc.xml @@ -151,6 +151,9 @@ /path/to/media + + 1