From bf4418bced5ba159b98f4bf15cd2dca2ad8a7690 Mon Sep 17 00:00:00 2001 From: achbed Date: Sun, 30 Nov 2014 18:26:06 -0600 Subject: [PATCH] Added new QueueLogger class (logging from a queue so we can do thread-safe logging) Moved core and station to new common log functions (_info and _err) using new QueueLogger Core now attempts to restart stations that stop Core now detects new folders on the fly instead of only at start time (stationfolder option) Station.py has a lot of changes to utilize the common blocking queue only when needed rather than at every step (may increase performance with many stations) TODO: Detect proper vs unexpected shutdown of stations and restart only unexpected ones Make new folder detection for stationfolder an option --- deefuzzer/core.py | 147 ++++++++++++------ deefuzzer/station.py | 306 +++++++++++++++++++++----------------- deefuzzer/tools/logger.py | 29 ++++ 3 files changed, 299 insertions(+), 183 deletions(-) diff --git a/deefuzzer/core.py b/deefuzzer/core.py index 9e09987..1f19437 100644 --- a/deefuzzer/core.py +++ b/deefuzzer/core.py @@ -54,8 +54,10 @@ class DeeFuzzer(Thread): logger = None m3u = None rss = None - station = [] - stations = [] + station_settings = [] + station_instances = {} + watchfolder = {} + logqueue = Queue.Queue() def __init__(self, conf_file): Thread.__init__(self) @@ -70,7 +72,8 @@ class DeeFuzzer(Thread): log_dir = os.sep.join(log_file.split(os.sep)[:-1]) if not os.path.exists(log_dir) and log_dir: os.makedirs(log_dir) - self.logger = Logger(log_file) + self.logger = QueueLogger(log_file, self.logqueue) + self.logger.start() for key in self.conf['deefuzzer'].keys(): if key == 'm3u': @@ -91,31 +94,48 @@ class DeeFuzzer(Thread): elif key == 'stationfolder': # Create stations automagically from a folder structure if isinstance(self.conf['deefuzzer'][key], dict): - self.create_stations_fromfolder(self.conf['deefuzzer'][key]) + self.watchfolder = self.conf['deefuzzer'][key] else: setattr(self, key, self.conf['deefuzzer'][key]) # Set the deefuzzer logger - self.logger.write_info('Starting DeeFuzzer') - self.logger.write_info('Using libshout version %s' % shout.version()) - self.logger.write_info('Number of stations : ' + str(len(self.station))) + self._info('Starting DeeFuzzer') + self._info('Using libshout version %s' % shout.version()) + self._info('Number of stations : ' + str(len(self.station_settings))) + def _log(self, level, msg): + try: + obj = {} + obj['msg'] = 'Core: ' + str(msg) + obj['level'] = level + self.logqueue.put(obj) + except: + pass + + def _info(self, msg): + self._log('info', msg) + + def _err(self, msg): + self._log('err', msg) + def set_m3u_playlist(self): m3u_dir = os.sep.join(self.m3u.split(os.sep)[:-1]) if not os.path.exists(m3u_dir) and m3u_dir: os.makedirs(m3u_dir) m3u = open(self.m3u, 'w') m3u.write('#EXTM3U\n') - for s in self.stations: + for k in self.station_instances.keys(): + s = self.station_instances[k] m3u.write('#EXTINF:%s,%s - %s\n' % ('-1',s.short_name, s.channel.name)) m3u.write('http://' + s.channel.host + ':' + str(s.channel.port) + s.channel.mount + '\n') m3u.close() - self.logger.write_info('Writing M3U file to : ' + self.m3u) + self._info('Writing M3U file to : ' + self.m3u) - def create_stations_fromfolder(self, options): + def create_stations_fromfolder(self): """Scan a folder for subfolders containing media, and make stations from them all.""" + options = self.watchfolder if not 'folder' in options.keys(): # We have no folder specified. Bail. return @@ -125,7 +145,8 @@ class DeeFuzzer(Thread): # The specified path is not a folder. Bail. return - self.logger.write_info('Scanning folder ' + folder + ' for stations') + # This makes the log file a lot more verbose. Commented out since we report on new stations anyway. + # self._info('Scanning folder ' + folder + ' for stations') files = os.listdir(folder) for file in files: @@ -134,12 +155,28 @@ class DeeFuzzer(Thread): if folder_contains_music(filepath): self.create_station(filepath, options) + def station_exists(self, name): + try: + for s in self.station_settings: + if not 'infos' in s.keys(): + continue + if not 'short_name' in s['infos'].keys(): + continue + if s['infos']['short_name'] == name: + return True + return False + except: + pass + return True + def create_station(self, folder, options): """Create a station definition for a folder given the specified options.""" - self.logger.write_info('Creating station for folder ' + folder) s = {} path, name = os.path.split(folder) + if self.station_exists(name): + return + self._info('Creating station for folder ' + folder) d = dict(path=folder,name=name) for i in options.keys(): if not 'folder' in i: @@ -167,7 +204,7 @@ class DeeFuzzer(Thread): # Whatever we have, it's not either a file or folder. Bail. return - self.logger.write_info('Loading station config files in ' + folder) + self._info('Loading station config files in ' + folder) files = os.listdir(folder) for file in files: filepath = os.path.join(folder, file) @@ -177,7 +214,7 @@ class DeeFuzzer(Thread): def load_station_config(self, file): """Load station configuration(s) from a config file.""" - self.logger.write_info('Loading station config file ' + file) + self._info('Loading station config file ' + file) stationdef = get_conf_dict(file) if isinstance(stationdef, dict): if 'station' in stationdef.keys(): @@ -192,48 +229,66 @@ class DeeFuzzer(Thread): try: # We should probably test to see if we're putting the same station in multiple times # Same in this case probably means the same media folder, server, and mountpoint - self.station.append(this_station) + self.station_settings.append(this_station) except Exception: return def run(self): q = Queue.Queue(1) - - ns = len(self.station) - for i in range(0, ns): - try: - station = self.station[i] - - # Apply station defaults if they exist - if 'stationdefaults' in self.conf['deefuzzer']: - if isinstance(self.conf['deefuzzer']['stationdefaults'], dict): - station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults']) - self.stations.append(Station(station, q, self.logger, self.m3u)) - except Exception: - name = str(i) - if 'info' in station.keys(): - if 'short_name' in station['infos']: - name = station['infos']['short_name'] - self.logger.write_error('Error starting station ' + name) - continue - - if self.m3u: - self.set_m3u_playlist() - + ns = -1 p = Producer(q) p.start() - - ns = len(self.stations) - # Start the Stations - for i in range(0, ns): - try: - self.stations[i].start() - except Exception: - continue + started = False + # Keep the Stations running + while True: + self.create_stations_fromfolder() + ns_new = len(self.station_settings) + if(ns_new > ns): + for i in range(ns+1, ns_new): + try: + station = self.station_settings[i] + + # Apply station defaults if they exist + if 'stationdefaults' in self.conf['deefuzzer']: + if isinstance(self.conf['deefuzzer']['stationdefaults'], dict): + station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults']) + + name = 'Station ' + str(i) + if 'info' in station.keys(): + if 'short_name' in station['infos']: + name = station['infos']['short_name'] + y = 1 + while name in self.station_instances.keys(): + y = y + 1 + name = station['infos']['short_name'] + " " + str(y) + + self.station_instances[name] = Station(station, q, self.logqueue, self.m3u) + except Exception: + self._err('Error starting station ' + name) + continue + ns = ns_new + + if self.m3u: + self.set_m3u_playlist() + + for i in self.station_instances.keys(): + try: + if not self.station_instances[i].isAlive(): + self.station_instances[i].start() + msg = 'Started ' + if started: + msg = 'Restarted ' + self._info(msg + 'station ' + i) + except: + pass + + started = False + time.sleep(5) + # end main loop class Producer(Thread): - """a DeeFuzzer Producer master thread""" + """a DeeFuzzer Producer master thread. Used for locking/blocking""" def __init__(self, q): Thread.__init__(self) diff --git a/deefuzzer/station.py b/deefuzzer/station.py index 0397383..a433d68 100644 --- a/deefuzzer/station.py +++ b/deefuzzer/station.py @@ -80,12 +80,13 @@ class Station(Thread): feeds_playlist = 1 feeds_showfilepath = 0 feeds_showfilename = 0 + short_name = '' - def __init__(self, station, q, logger, m3u): + def __init__(self, station, q, logqueue, m3u): Thread.__init__(self) self.station = station self.q = q - self.logger = logger + self.logqueue = logqueue self.m3u = m3u if 'station_dir' in self.station: @@ -185,8 +186,7 @@ class Station(Thread): self.feeds_playlist_file = self.base_name + '_playlist' # Logging - self._info('Opening ' + self.short_name + ' - ' + self.channel.name + \ - ' (' + str(self.lp) + ' tracks)...') + self._info('Opening ' + self.short_name + ' - ' + self.channel.name) self.metadata_relative_dir = 'metadata' self.metadata_url = self.channel.url + '/rss/' + self.metadata_relative_dir @@ -253,12 +253,21 @@ class Station(Thread): if self.record_mode: self.record_callback('/record', [1]) + def _log(self, level, msg): + try: + obj = {} + obj['msg'] = 'Station ' + str(self.channel_url) + ': ' + str(msg) + obj['level'] = str(level) + self.logqueue.put(obj) + except: + pass + def _info(self, msg): - self.logger.write_info('Station ' + self.channel_url + ': ' + str(msg)) + self._log('info', msg) def _err(self, msg): - self.logger.write_error('Station ' + self.channel_url + ': ' + str(msg)) - + self._log('err', msg) + def run_callback(self, path, value): value = value[0] self.run_mode = value @@ -359,10 +368,18 @@ class Station(Thread): file_list.append(root + os.sep + file) file_list.sort() else: - f = open(self.m3u_playlist_file, 'r') - for path in f.readlines(): - if '#' != path[0]: - file_list.append(path[:-1]) + self.q.get(1) + try: + f = open(self.m3u_playlist_file, 'r') + try: + for path in f.readlines(): + if '#' != path[0]: + file_list.append(path[:-1]) + except: + f.close() + except: + pass + self.q.task_done() return file_list def get_jingles(self): @@ -416,7 +433,7 @@ class Station(Thread): if self.shuffle_mode: random.shuffle(self.playlist) - self._info('generating new playlist (' + str(self.lp) + ' tracks)') + self._info('Generating new playlist (' + str(self.lp) + ' tracks)') if self.feeds_playlist: self.update_feeds(self.media_to_objs(self.playlist), self.feeds_playlist_file, '(playlist)') @@ -459,31 +476,36 @@ class Station(Thread): else: mess = 'No media in media_dir !' self._err(mess) - sys.exit(mess) + self.run_mode = 0 def media_to_objs(self, media_list): media_objs = [] for media in media_list: file_name, file_title, file_ext = get_file_info(media) - if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg': - try: - file_meta = Mp3(media) - except: - continue - elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg': - try: - file_meta = Ogg(media) - except: - continue - elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm': - try: - file_meta = WebM(media) - except: - continue - if self.feeds_showfilename: - file_meta.metadata['filename'] = file_name.decode( "utf-8" ) #decode needed for some weird filenames - if self.feeds_showfilepath: - file_meta.metadata['filepath'] = media.decode( "utf-8" ) #decode needed for some weird filenames + self.q.get(1) + try: + if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg': + try: + file_meta = Mp3(media) + except: + continue + elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg': + try: + file_meta = Ogg(media) + except: + continue + elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm': + try: + file_meta = WebM(media) + except: + continue + if self.feeds_showfilename: + file_meta.metadata['filename'] = file_name.decode( "utf-8" ) #decode needed for some weird filenames + if self.feeds_showfilepath: + file_meta.metadata['filepath'] = media.decode( "utf-8" ) #decode needed for some weird filenames + except: + pass + self.q.task_done() media_objs.append(file_meta) return media_objs @@ -554,21 +576,25 @@ class Station(Thread): ) json_data.append(json_item) - rss = RSS2(title = channel_subtitle, - link = self.channel.url, - description = self.channel.description.decode('utf-8'), - lastBuildDate = date_now, + rss = RSS2(title = channel_subtitle, \ + link = self.channel.url, \ + description = self.channel.description.decode('utf-8'), \ + lastBuildDate = date_now, \ items = rss_item_list,) - - if self.feeds_rss: - f = open(rss_file + '.xml', 'w') - rss.write_xml(f, 'utf-8') - f.close() - - if self.feeds_json: - f = open(rss_file + '.json', 'w') - f.write(json.dumps(json_data, separators=(',',':'))) - f.close() + self.q.get(1) + try: + if self.feeds_rss: + f = open(rss_file + '.xml', 'w') + rss.write_xml(f, 'utf-8') + f.close() + + if self.feeds_json: + f = open(rss_file + '.json', 'w') + f.write(json.dumps(json_data, separators=(',',':'))) + f.close() + except: + pass + self.q.task_done() def update_twitter(self, message): try: @@ -619,10 +645,15 @@ class Station(Thread): % (self.id, self.current_media_obj[0].file_name)) self.player.set_media(self.media) - if self.player_mode: - self.stream = self.player.file_read_slow() - else: - self.stream = self.player.file_read_fast() + self.q.get(1) + try: + if self.player_mode: + self.stream = self.player.file_read_slow() + else: + self.stream = self.player.file_read_fast() + except: + pass + self.q.task_done() def set_webm_read_mode(self): self.channel.set_callback(FileReader(self.media).read_callback) @@ -645,53 +676,48 @@ class Station(Thread): while not self.server_ping: try: - self.q.get(1) server = urllib.urlopen(self.server_url) self.server_ping = True - text = 'channel available' - self._info(text) - self.q.task_done() + self._info('Channel available.') except: time.sleep(1) if log: - text = 'could not connect the channel' - self._err(text) + self._err('Could not connect the channel. Waiting for channel to become available.') log = False - self.q.task_done() def icecastloop_nextmedia(self): - self.q.get(1) - self.next_media = 0 - self.media = self.get_next_media() - self.counter += 1 - if self.relay_mode: - self.set_relay_mode() - elif os.path.exists(self.media) and not os.sep+'.' in self.media: - if self.lp == 0: - self._err('has no media to stream !') - return False - self.set_read_mode() - self.q.task_done() - - return True + try: + self.next_media = 0 + self.media = self.get_next_media() + self.counter += 1 + if self.relay_mode: + self.set_relay_mode() + elif os.path.exists(self.media) and not os.sep+'.' in self.media: + if self.lp == 0: + self._err('has no media to stream !') + return False + self.set_read_mode() + + return True + except Exception, e: + self_err('icecastloop_nextmedia: Error: ' + str(e)) + return False def icecastloop_metadata(self): - self.q.get(1) - if (not (self.jingles_mode and (self.counter % 2)) or \ - self.relay_mode) and self.twitter_mode: - self.update_twitter_current() - self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'}) - self.q.task_done() - return True + try: + if (not (self.jingles_mode and (self.counter % 2)) or \ + self.relay_mode) and self.twitter_mode: + self.update_twitter_current() + self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'}) + return True + except Exception, e: + self_err('icecastloop_metadata: Error: ' + str(e)) + return False - def run(self): - self.q.get(1) self.ping_server() - self.q.task_done() if self.type == 'stream-m': - self.q.get(1) if self.relay_mode: self.set_relay_mode() else: @@ -699,76 +725,82 @@ class Station(Thread): self.set_webm_read_mode() self.channel_open() self.channel.start() - self.q.task_done() if self.type == 'icecast': - self.q.get(1) self.channel_open() self._info('channel connected') - self.q.task_done() while self.run_mode: - try: - if not self.icecastloop_nextmedia(): - break - if not self.icecastloop_metadata(): + if not self.icecastloop_nextmedia(): + self._info('Something wrong happened in icecastloop_nextmedia. Ending.') + break + + self.icecastloop_metadata() + + # TEST MODE: Jump thru only the first chunk of each file + # first = True + for self.chunk in self.stream: + # if first: + # first = False + # else: + # break + + if self.next_media or not self.run_mode: break - - for self.chunk in self.stream: + + if self.record_mode: try: - if self.next_media or not self.run_mode: - break + # Record the chunk + self.recorder.write(self.chunk) + except: + self._err('could not write the buffer to the file') + continue + try: + # Send the chunk to the stream + self.channel.send(self.chunk) + self.channel.sync() + except: + self._err('could not send the buffer') + + try: + self.channel.close() + self._info('channel closed') + except: + self._err('could not close the channel') + self.q.task_done() + continue + + try: + self.ping_server() + self.channel_open() + self.channel.set_metadata({'song': self.song, 'charset': 'utf8',}) + self._info('channel restarted') + except: + self._err('could not restart the channel') if self.record_mode: - try: - self.q.get(1) - self.recorder.write(self.chunk) - self.q.task_done() - except: - self._err('could not write the buffer to the file') - self.q.task_done() - continue - + self.recorder.close() + return + + try: + self.channel.send(self.chunk) + self.channel.sync() + except: + self._err('could not send data after restarting the channel') try: - self.q.get(1) - self.channel.send(self.chunk) - self.channel.sync() - self.q.task_done() + self.channel.close() except: - self._err('could not send the buffer') - self.q.task_done() + self._err('could not close the channel') - try: - self.q.get(1) - self.channel.close() - self._info('channel closed') - self.q.task_done() - except: - self._err('could not close the channel') - self.q.task_done() - continue - - try: - self.ping_server() - self.q.get(1) - self.channel_open() - self.channel.set_metadata({'song': self.song, 'charset': 'utf8',}) - self._info('channel restarted') - self.q.task_done() - except: - self._err('could not restart the channel') - self.q.task_done() - continue - continue - - except: # send chunk loop exception - continue - # send chunk loop end - - except: # while run_mode exception - continue + if self.record_mode: + self.recorder.close() + return + + # send chunk loop end # while run_mode loop end + self._info("Play mode ended. Stopping stream.") + if self.record_mode: self.recorder.close() diff --git a/deefuzzer/tools/logger.py b/deefuzzer/tools/logger.py index 041f04c..5bb6570 100644 --- a/deefuzzer/tools/logger.py +++ b/deefuzzer/tools/logger.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import logging +from threading import Thread class Logger: @@ -21,3 +22,31 @@ class Logger: def write_error(self, message): self.logger.error(message) +class QueueLogger(Thread): + """A queue-based logging object""" + + def __init__(self, file, q): + Thread.__init__(self) + self.logger = Logger(file) + self.q = q + + def run(self): + while True: + try: + msg = self.q.get(1) + if not isinstance(msg, dict): + self.logger.write_error(str(msg)) + else: + if not 'msg' in msg.keys(): + continue + + if 'level' in msg.keys(): + if msg['level'] == 'info': + self.logger.write_info(msg['msg']) + else: + self.logger.write_error(msg['msg']) + else: + self.logger.write_error(msg['msg']) + except: + pass + \ No newline at end of file -- 2.39.5