]> git.parisson.com Git - deefuzzer.git/commitdiff
Added new QueueLogger class (logging from a queue so we can do thread-safe logging)
authorachbed <github@achbed.org>
Mon, 1 Dec 2014 00:26:06 +0000 (18:26 -0600)
committerachbed <github@achbed.org>
Mon, 1 Dec 2014 00:26:06 +0000 (18:26 -0600)
Moved core and station to new common log functions (_info and _err) using new QueueLogger
Core now attempts to restart stations that stop
Core now detects new folders on the fly instead of only at start time (stationfolder option)
Station.py has a lot of changes to utilize the common blocking queue only when needed rather than at every step (may increase performance with many stations)

TODO:
Detect proper vs unexpected shutdown of stations and restart only unexpected ones
Make new folder detection for stationfolder an option

deefuzzer/core.py
deefuzzer/station.py
deefuzzer/tools/logger.py

index 9e099876bc066f6a39ed6d0a94d5976334af2bb6..1f194379cd9edb06e30f352eeed43246ec997089 100644 (file)
@@ -54,8 +54,10 @@ class DeeFuzzer(Thread):
     logger = None
     m3u = None
     rss = None
-    station = []
-    stations = []
+    station_settings = []
+    station_instances = {}
+    watchfolder = {}
+    logqueue = Queue.Queue()
 
     def __init__(self, conf_file):
         Thread.__init__(self)
@@ -70,7 +72,8 @@ class DeeFuzzer(Thread):
         log_dir = os.sep.join(log_file.split(os.sep)[:-1])
         if not os.path.exists(log_dir) and log_dir:
             os.makedirs(log_dir)
-        self.logger = Logger(log_file)
+        self.logger = QueueLogger(log_file, self.logqueue)
+        self.logger.start()
 
         for key in self.conf['deefuzzer'].keys():
             if key == 'm3u':
@@ -91,31 +94,48 @@ class DeeFuzzer(Thread):
             elif key == 'stationfolder':
                 # Create stations automagically from a folder structure
                 if isinstance(self.conf['deefuzzer'][key], dict):
-                    self.create_stations_fromfolder(self.conf['deefuzzer'][key])
+                    self.watchfolder = self.conf['deefuzzer'][key]
             else:
                 setattr(self, key, self.conf['deefuzzer'][key])
 
         # Set the deefuzzer logger
-        self.logger.write_info('Starting DeeFuzzer')
-        self.logger.write_info('Using libshout version %s' % shout.version())
-        self.logger.write_info('Number of stations : ' + str(len(self.station)))
+        self._info('Starting DeeFuzzer')
+        self._info('Using libshout version %s' % shout.version())
+        self._info('Number of stations : ' + str(len(self.station_settings)))
 
 
+    def _log(self, level, msg):
+        try:
+            obj = {}
+            obj['msg'] = 'Core: ' + str(msg)
+            obj['level'] = level
+            self.logqueue.put(obj)
+        except:
+            pass
+    
+    def _info(self, msg):
+        self._log('info', msg)
+    
+    def _err(self, msg):
+        self._log('err', msg)
+    
     def set_m3u_playlist(self):
         m3u_dir = os.sep.join(self.m3u.split(os.sep)[:-1])
         if not os.path.exists(m3u_dir) and m3u_dir:
             os.makedirs(m3u_dir)
         m3u = open(self.m3u, 'w')
         m3u.write('#EXTM3U\n')
-        for s in self.stations:
+        for k in self.station_instances.keys():
+            s = self.station_instances[k]
             m3u.write('#EXTINF:%s,%s - %s\n' % ('-1',s.short_name, s.channel.name))
             m3u.write('http://' + s.channel.host + ':' + str(s.channel.port) + s.channel.mount + '\n')
         m3u.close()
-        self.logger.write_info('Writing M3U file to : ' + self.m3u)
+        self._info('Writing M3U file to : ' + self.m3u)
 
-    def create_stations_fromfolder(self, options):
+    def create_stations_fromfolder(self):
         """Scan a folder for subfolders containing media, and make stations from them all."""
 
+        options = self.watchfolder
         if not 'folder' in options.keys():
             # We have no folder specified.  Bail.
             return
@@ -125,7 +145,8 @@ class DeeFuzzer(Thread):
             # The specified path is not a folder.  Bail.
             return
 
-        self.logger.write_info('Scanning folder ' + folder + ' for stations')
+        # This makes the log file a lot more verbose.  Commented out since we report on new stations anyway.
+        # self._info('Scanning folder ' + folder + ' for stations')
 
         files = os.listdir(folder)
         for file in files:
@@ -134,12 +155,28 @@ class DeeFuzzer(Thread):
                 if folder_contains_music(filepath):
                     self.create_station(filepath, options)
 
+    def station_exists(self, name):
+        try:
+            for s in self.station_settings:
+                if not 'infos' in s.keys():
+                    continue
+                if not 'short_name' in s['infos'].keys():
+                    continue
+                if s['infos']['short_name'] == name:
+                    return True
+            return False
+        except:
+            pass
+        return True
+        
     def create_station(self, folder, options):
         """Create a station definition for a folder given the specified options."""
 
-        self.logger.write_info('Creating station for folder ' + folder)
         s = {}
         path, name = os.path.split(folder)
+        if self.station_exists(name):
+            return
+        self._info('Creating station for folder ' + folder)
         d = dict(path=folder,name=name)
         for i in options.keys():
             if not 'folder' in i:
@@ -167,7 +204,7 @@ class DeeFuzzer(Thread):
             # Whatever we have, it's not either a file or folder.  Bail.
             return
 
-        self.logger.write_info('Loading station config files in ' + folder)
+        self._info('Loading station config files in ' + folder)
         files = os.listdir(folder)
         for file in files:
             filepath = os.path.join(folder, file)
@@ -177,7 +214,7 @@ class DeeFuzzer(Thread):
     def load_station_config(self, file):
         """Load station configuration(s) from a config file."""
 
-        self.logger.write_info('Loading station config file ' + file)
+        self._info('Loading station config file ' + file)
         stationdef = get_conf_dict(file)
         if isinstance(stationdef, dict):
             if 'station' in stationdef.keys():
@@ -192,48 +229,66 @@ class DeeFuzzer(Thread):
         try:
             # We should probably test to see if we're putting the same station in multiple times
             # Same in this case probably means the same media folder, server, and mountpoint
-            self.station.append(this_station)
+            self.station_settings.append(this_station)
         except Exception:
             return
 
     def run(self):
         q = Queue.Queue(1)
-
-        ns = len(self.station)
-        for i in range(0, ns):
-            try:
-                station = self.station[i]
-
-                # Apply station defaults if they exist
-                if 'stationdefaults' in self.conf['deefuzzer']:
-                    if isinstance(self.conf['deefuzzer']['stationdefaults'], dict):
-                        station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults'])
-                self.stations.append(Station(station, q, self.logger, self.m3u))
-            except Exception:
-                name = str(i)
-                if 'info' in station.keys():
-                    if 'short_name' in station['infos']:
-                        name = station['infos']['short_name']
-                self.logger.write_error('Error starting station ' + name)
-                continue
-
-        if self.m3u:
-            self.set_m3u_playlist()
-
+        ns = -1
         p = Producer(q)
         p.start()
-
-        ns = len(self.stations)
-        # Start the Stations
-        for i in range(0, ns):
-            try:
-                self.stations[i].start()
-            except Exception:
-                continue
+        started = False
+        # Keep the Stations running
+        while True:
+            self.create_stations_fromfolder()
+            ns_new = len(self.station_settings)
+            if(ns_new > ns):
+                for i in range(ns+1, ns_new):
+                    try:
+                        station = self.station_settings[i]
+
+                        # Apply station defaults if they exist
+                        if 'stationdefaults' in self.conf['deefuzzer']:
+                            if isinstance(self.conf['deefuzzer']['stationdefaults'], dict):
+                                station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults'])
+
+                        name = 'Station ' + str(i)
+                        if 'info' in station.keys():
+                            if 'short_name' in station['infos']:
+                                name = station['infos']['short_name']
+                                y = 1
+                                while name in self.station_instances.keys():
+                                    y = y + 1
+                                    name = station['infos']['short_name'] + " " + str(y)
+
+                        self.station_instances[name] = Station(station, q, self.logqueue, self.m3u)
+                    except Exception:
+                        self._err('Error starting station ' + name)
+                        continue
+                ns = ns_new
+
+                if self.m3u:
+                    self.set_m3u_playlist()
+            
+            for i in self.station_instances.keys():
+                try:
+                    if not self.station_instances[i].isAlive():
+                        self.station_instances[i].start()
+                        msg = 'Started '
+                        if started:
+                            msg = 'Restarted '
+                        self._info(msg + 'station ' + i)
+                except:
+                    pass
+            
+            started = False
+            time.sleep(5)
+            # end main loop
 
 
 class Producer(Thread):
-    """a DeeFuzzer Producer master thread"""
+    """a DeeFuzzer Producer master thread.  Used for locking/blocking"""
 
     def __init__(self, q):
         Thread.__init__(self)
index 0397383b9231a4be2771be3402b4b051f0026e7b..a433d68325497186abc039143644f33d68019255 100644 (file)
@@ -80,12 +80,13 @@ class Station(Thread):
     feeds_playlist = 1
     feeds_showfilepath = 0
     feeds_showfilename = 0
+    short_name = ''
 
-    def __init__(self, station, q, logger, m3u):
+    def __init__(self, station, q, logqueue, m3u):
         Thread.__init__(self)
         self.station = station
         self.q = q
-        self.logger = logger
+        self.logqueue = logqueue
         self.m3u = m3u
 
         if 'station_dir' in self.station:
@@ -185,8 +186,7 @@ class Station(Thread):
         self.feeds_playlist_file = self.base_name + '_playlist'
 
         # Logging
-        self._info('Opening ' + self.short_name + ' - ' + self.channel.name + \
-                ' (' + str(self.lp) + ' tracks)...')
+        self._info('Opening ' + self.short_name + ' - ' + self.channel.name)
 
         self.metadata_relative_dir = 'metadata'
         self.metadata_url = self.channel.url + '/rss/' + self.metadata_relative_dir
@@ -253,12 +253,21 @@ class Station(Thread):
             if self.record_mode:
                 self.record_callback('/record', [1])
 
+    def _log(self, level, msg):
+        try:
+            obj = {}
+            obj['msg'] = 'Station ' + str(self.channel_url) + ': ' + str(msg)
+            obj['level'] = str(level)
+            self.logqueue.put(obj)
+        except:
+            pass
+    
     def _info(self, msg):
-        self.logger.write_info('Station ' + self.channel_url + ': ' + str(msg))
+        self._log('info', msg)
     
     def _err(self, msg):
-        self.logger.write_error('Station ' + self.channel_url + ': ' + str(msg))
-                
+        self._log('err', msg)
+    
     def run_callback(self, path, value):
         value = value[0]
         self.run_mode = value
@@ -359,10 +368,18 @@ class Station(Thread):
                         file_list.append(root + os.sep + file)
             file_list.sort()
         else:
-            f = open(self.m3u_playlist_file, 'r')
-            for path in f.readlines():
-                if '#' != path[0]:
-                    file_list.append(path[:-1])
+            self.q.get(1)
+            try:
+                f = open(self.m3u_playlist_file, 'r')
+                try:
+                    for path in f.readlines():
+                        if '#' != path[0]:
+                            file_list.append(path[:-1])
+                except:
+                    f.close()
+            except:
+                pass
+            self.q.task_done()
         return file_list
 
     def get_jingles(self):
@@ -416,7 +433,7 @@ class Station(Thread):
                 if self.shuffle_mode:
                     random.shuffle(self.playlist)
 
-                self._info('generating new playlist (' + str(self.lp) + ' tracks)')
+                self._info('Generating new playlist (' + str(self.lp) + ' tracks)')
 
                 if self.feeds_playlist:
                     self.update_feeds(self.media_to_objs(self.playlist), self.feeds_playlist_file, '(playlist)')
@@ -459,31 +476,36 @@ class Station(Thread):
         else:
             mess = 'No media in media_dir !'
             self._err(mess)
-            sys.exit(mess)
+            self.run_mode = 0
 
     def media_to_objs(self, media_list):
         media_objs = []
         for media in media_list:
             file_name, file_title, file_ext = get_file_info(media)
-            if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg':
-                try:
-                    file_meta = Mp3(media)
-                except:
-                    continue
-            elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg':
-                try:
-                    file_meta = Ogg(media)
-                except:
-                    continue
-            elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm':
-                try:
-                    file_meta = WebM(media)
-                except:
-                    continue
-            if self.feeds_showfilename:
-                file_meta.metadata['filename'] = file_name.decode( "utf-8" )   #decode needed for some weird filenames
-            if self.feeds_showfilepath:
-                file_meta.metadata['filepath'] = media.decode( "utf-8" )   #decode needed for some weird filenames
+            self.q.get(1)
+            try:
+                if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg':
+                    try:
+                        file_meta = Mp3(media)
+                    except:
+                        continue
+                elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg':
+                    try:
+                        file_meta = Ogg(media)
+                    except:
+                        continue
+                elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm':
+                    try:
+                        file_meta = WebM(media)
+                    except:
+                        continue
+                if self.feeds_showfilename:
+                    file_meta.metadata['filename'] = file_name.decode( "utf-8" )   #decode needed for some weird filenames
+                if self.feeds_showfilepath:
+                    file_meta.metadata['filepath'] = media.decode( "utf-8" )   #decode needed for some weird filenames
+            except:
+                pass
+            self.q.task_done()
             media_objs.append(file_meta)
         return media_objs
 
@@ -554,21 +576,25 @@ class Station(Thread):
                     )
             json_data.append(json_item)
 
-        rss = RSS2(title = channel_subtitle,
-                            link = self.channel.url,
-                            description = self.channel.description.decode('utf-8'),
-                            lastBuildDate = date_now,
+        rss = RSS2(title = channel_subtitle, \
+                            link = self.channel.url, \
+                            description = self.channel.description.decode('utf-8'), \
+                            lastBuildDate = date_now, \
                             items = rss_item_list,)
-
-        if self.feeds_rss:
-            f = open(rss_file + '.xml', 'w')
-            rss.write_xml(f, 'utf-8')
-            f.close()
-
-        if self.feeds_json:
-            f = open(rss_file + '.json', 'w')
-            f.write(json.dumps(json_data, separators=(',',':')))
-            f.close()
+        self.q.get(1)
+        try:
+            if self.feeds_rss:
+                f = open(rss_file + '.xml', 'w')
+                rss.write_xml(f, 'utf-8')
+                f.close()
+
+            if self.feeds_json:
+                f = open(rss_file + '.json', 'w')
+                f.write(json.dumps(json_data, separators=(',',':')))
+                f.close()
+        except:
+            pass
+        self.q.task_done()
 
     def update_twitter(self, message):
         try:
@@ -619,10 +645,15 @@ class Station(Thread):
             % (self.id, self.current_media_obj[0].file_name))
         self.player.set_media(self.media)
 
-        if self.player_mode:
-            self.stream = self.player.file_read_slow()
-        else:
-            self.stream = self.player.file_read_fast()
+        self.q.get(1)
+        try:
+            if self.player_mode:
+                self.stream = self.player.file_read_slow()
+            else:
+                self.stream = self.player.file_read_fast()
+        except:
+            pass
+        self.q.task_done()
 
     def set_webm_read_mode(self):
         self.channel.set_callback(FileReader(self.media).read_callback)
@@ -645,53 +676,48 @@ class Station(Thread):
 
         while not self.server_ping:
             try:
-                self.q.get(1)
                 server = urllib.urlopen(self.server_url)
                 self.server_ping = True
-                text = 'channel available'
-                self._info(text)
-                self.q.task_done()
+                self._info('Channel available.')
             except:
                 time.sleep(1)
                 if log:
-                    text = 'could not connect the channel'
-                    self._err(text)
+                    self._err('Could not connect the channel.  Waiting for channel to become available.')
                     log = False
-                self.q.task_done()
 
     def icecastloop_nextmedia(self):
-        self.q.get(1)
-        self.next_media = 0
-        self.media = self.get_next_media()
-        self.counter += 1
-        if self.relay_mode:
-            self.set_relay_mode()
-        elif os.path.exists(self.media) and not os.sep+'.' in self.media:
-            if self.lp == 0:
-                self._err('has no media to stream !')
-                return False
-            self.set_read_mode()
-        self.q.task_done()
-        
-        return True
+        try:
+            self.next_media = 0
+            self.media = self.get_next_media()
+            self.counter += 1
+            if self.relay_mode:
+                self.set_relay_mode()
+            elif os.path.exists(self.media) and not os.sep+'.' in self.media:
+                if self.lp == 0:
+                    self._err('has no media to stream !')
+                    return False
+                self.set_read_mode()
+            
+            return True
+        except Exception, e:
+            self_err('icecastloop_nextmedia: Error: ' + str(e))
+        return False
     
     def icecastloop_metadata(self):
-        self.q.get(1)
-        if (not (self.jingles_mode and (self.counter % 2)) or \
-                            self.relay_mode) and self.twitter_mode:
-            self.update_twitter_current()
-        self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'})
-        self.q.task_done()
-        return True
+        try:
+            if (not (self.jingles_mode and (self.counter % 2)) or \
+                                self.relay_mode) and self.twitter_mode:
+                self.update_twitter_current()
+            self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'})
+            return True
+        except Exception, e:
+            self_err('icecastloop_metadata: Error: ' + str(e))
+        return False
 
-                
     def run(self):
-        self.q.get(1)
         self.ping_server()
-        self.q.task_done()
 
         if self.type == 'stream-m':
-            self.q.get(1)
             if self.relay_mode:
                 self.set_relay_mode()
             else:
@@ -699,76 +725,82 @@ class Station(Thread):
                 self.set_webm_read_mode()
             self.channel_open()
             self.channel.start()
-            self.q.task_done()
 
         if self.type == 'icecast':
-            self.q.get(1)
             self.channel_open()
             self._info('channel connected')
-            self.q.task_done()
 
             while self.run_mode:
-                try:
-                    if not self.icecastloop_nextmedia():
-                        break
-                    if not self.icecastloop_metadata():
+                if not self.icecastloop_nextmedia():
+                    self._info('Something wrong happened in icecastloop_nextmedia.  Ending.')
+                    break
+                    
+                self.icecastloop_metadata()
+
+                # TEST MODE: Jump thru only the first chunk of each file
+                # first = True
+                for self.chunk in self.stream:
+                    # if first:
+                    #     first = False
+                    # else:
+                    #     break
+                    
+                    if self.next_media or not self.run_mode:
                         break
-
-                    for self.chunk in self.stream:
+                    
+                    if self.record_mode:
                         try:
-                            if self.next_media or not self.run_mode:
-                                break
+                            # Record the chunk
+                            self.recorder.write(self.chunk)
+                        except:
+                            self._err('could not write the buffer to the file')
+                            continue
                                 
+                    try:
+                        # Send the chunk to the stream
+                        self.channel.send(self.chunk)
+                        self.channel.sync()
+                    except:
+                        self._err('could not send the buffer')
+                        
+                        try:
+                            self.channel.close()
+                            self._info('channel closed')
+                        except:
+                            self._err('could not close the channel')
+                            self.q.task_done()
+                            continue
+                            
+                        try:
+                            self.ping_server()
+                            self.channel_open()
+                            self.channel.set_metadata({'song': self.song, 'charset': 'utf8',})
+                            self._info('channel restarted')
+                        except:
+                            self._err('could not restart the channel')
                             if self.record_mode:
-                                try:
-                                    self.q.get(1)
-                                    self.recorder.write(self.chunk)
-                                    self.q.task_done()
-                                except:
-                                    self._err('could not write the buffer to the file')
-                                    self.q.task_done()
-                                    continue
-                                    
+                                self.recorder.close()
+                            return
+                            
+                        try:
+                            self.channel.send(self.chunk)
+                            self.channel.sync()
+                        except:
+                            self._err('could not send data after restarting the channel')
                             try:
-                                self.q.get(1)
-                                self.channel.send(self.chunk)
-                                self.channel.sync()
-                                self.q.task_done()
+                                self.channel.close()
                             except:
-                                self._err('could not send the buffer')
-                                self.q.task_done()
+                                self._err('could not close the channel')
                                 
-                                try:
-                                    self.q.get(1)
-                                    self.channel.close()
-                                    self._info('channel closed')
-                                    self.q.task_done()
-                                except:
-                                    self._err('could not close the channel')
-                                    self.q.task_done()
-                                    continue
-                                    
-                                try:
-                                    self.ping_server()
-                                    self.q.get(1)
-                                    self.channel_open()
-                                    self.channel.set_metadata({'song': self.song, 'charset': 'utf8',})
-                                    self._info('channel restarted')
-                                    self.q.task_done()
-                                except:
-                                    self._err('could not restart the channel')
-                                    self.q.task_done()
-                                    continue
-                                continue
-
-                        except: # send chunk loop exception
-                            continue
-                    # send chunk loop end
-                    
-                except: # while run_mode exception
-                    continue
+                            if self.record_mode:
+                                self.recorder.close()
+                            return
+                            
+                # send chunk loop end
             # while run_mode loop end
             
+            self._info("Play mode ended. Stopping stream.")
+            
             if self.record_mode:
                 self.recorder.close()
 
index 041f04c16c7ab35939c8f459be1fcba954f5517c..5bb6570c3bd15f117456f06f0d7ba12c34708814 100644 (file)
@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 
 import logging
+from threading import Thread
 
 
 class Logger:
@@ -21,3 +22,31 @@ class Logger:
     def write_error(self, message):
         self.logger.error(message)
 
+class QueueLogger(Thread):
+    """A queue-based logging object"""
+    
+    def __init__(self, file, q):
+        Thread.__init__(self)
+        self.logger = Logger(file)
+        self.q = q
+        
+    def run(self):
+        while True:
+            try:
+                msg = self.q.get(1)
+                if not isinstance(msg, dict):
+                    self.logger.write_error(str(msg))
+                else:
+                    if not 'msg' in msg.keys():
+                        continue
+                    
+                    if 'level' in msg.keys():
+                        if msg['level'] == 'info':
+                            self.logger.write_info(msg['msg'])
+                        else:
+                            self.logger.write_error(msg['msg'])
+                    else:
+                         self.logger.write_error(msg['msg'])
+            except:
+                pass
+                
\ No newline at end of file